Добавить yandex_way_mp.py
This commit is contained in:
parent
25154dbd98
commit
97f6594568
145
yandex_way_mp.py
Normal file
145
yandex_way_mp.py
Normal file
@ -0,0 +1,145 @@
|
||||
import cx_Oracle as oracledb
|
||||
import configparser
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import platform
|
||||
import sys
|
||||
from multiprocessing import Process, Queue, current_process
|
||||
import requests
|
||||
|
||||
# Настройка логирования
|
||||
def setup_logging():
|
||||
log_dir = "log" if platform.system() == 'Linux' else "Log"
|
||||
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), log_dir)
|
||||
if not os.path.exists(log_path):
|
||||
os.makedirs(log_path)
|
||||
log_filename = os.path.join(log_path, f"{time.strftime('%Y%m%d_yandex_way.log')}")
|
||||
logging.basicConfig(level=logging.DEBUG, filename=log_filename, filemode='a', format="%(asctime)s %(levelname)s %(message)s")
|
||||
|
||||
# Инициализация клиента Oracle
|
||||
def init_oracle_client():
|
||||
if platform.system() == 'Linux':
|
||||
oracledb.init_oracle_client(lib_dir='/usr/lib/oracle/11.2/client64/lib')
|
||||
elif platform.system() == 'Windows':
|
||||
libpath = os.path.abspath(__file__).replace('yandex_way.py', 'Lib\\instantclient_21_9')
|
||||
oracledb.init_oracle_client(lib_dir=libpath)
|
||||
else:
|
||||
logging.critical(f'System version: {platform.system()}')
|
||||
sys.exit(0)
|
||||
|
||||
# Загрузка конфигурации
|
||||
def load_config(path):
|
||||
config = configparser.ConfigParser()
|
||||
config.read(path)
|
||||
return config
|
||||
|
||||
# Функция для получения API ключа
|
||||
def get_apikey(cursor):
|
||||
cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'apikey'")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
# Функция для получения интервала времени обновления
|
||||
def get_refresh_time(cursor):
|
||||
cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'refresh_time'")
|
||||
result = cursor.fetchone()
|
||||
return int(result[0]) if result else 30
|
||||
|
||||
# Функция для обработки записей в процессе
|
||||
def process_records(records, apikey, queue):
|
||||
try:
|
||||
conn = oracledb.connect(user=config['ORACLE']['user'], password=config['ORACLE']['pass'], dsn=dsn_tns)
|
||||
cursor = conn.cursor()
|
||||
dep_time = time.time() + 60 * 10
|
||||
|
||||
for record in records:
|
||||
origins = f"{record[2]},{record[3]}"
|
||||
destinations = f"{record[5]},{record[6]}"
|
||||
params = {'origins': origins, 'destinations': destinations, 'apikey': apikey, 'departure_time': dep_time}
|
||||
response = requests.get('https://api.routing.yandex.net/v2/distancematrix', params=params)
|
||||
route_json = response.json()
|
||||
cursor.execute("BEGIN flm_office.add_way_log(:1, :2, :3, :4); END;",
|
||||
[record[0], response.status_code, response.url, json.dumps(route_json)])
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
queue.put("success")
|
||||
except Exception as e:
|
||||
logging.error(f'Process {current_process().name} failed: {e}')
|
||||
queue.put("error")
|
||||
|
||||
# Функция для завершения процессов
|
||||
def terminate_processes(processes):
|
||||
for process in processes:
|
||||
process.join(timeout=15)
|
||||
if process.is_alive():
|
||||
process.terminate()
|
||||
process.join()
|
||||
logging.warning(f'Process {process.name} terminated due to timeout')
|
||||
|
||||
# Основная функция для обработки записей
|
||||
def process_data():
|
||||
global dsn_tns
|
||||
dsn_tns = oracledb.makedsn(config['ORACLE']['host'], config['ORACLE']['port'], service_name=config['ORACLE']['service_name'])
|
||||
conn = oracledb.connect(user=config['ORACLE']['user'], password=config['ORACLE']['pass'], dsn=dsn_tns)
|
||||
cursor = conn.cursor()
|
||||
apikey = get_apikey(cursor)
|
||||
|
||||
while True:
|
||||
refresh_time = get_refresh_time(cursor)
|
||||
cursor.execute("SELECT way_id, office_id, latitude_from, longitude_from, site_id, latitude_to, longitude_to FROM vw_ways_to_load ORDER BY status, last_date")
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
logging.info(f'No Data Found, wait {refresh_time} seconds')
|
||||
time.sleep(refresh_time)
|
||||
break
|
||||
|
||||
chunk_size = 20
|
||||
processes = []
|
||||
queue = Queue()
|
||||
|
||||
for i in range(0, len(rows), chunk_size):
|
||||
chunk = rows[i:i + chunk_size]
|
||||
p = Process(target=process_records, args=(chunk, apikey, queue))
|
||||
processes.append(p)
|
||||
p.start()
|
||||
|
||||
if len(processes) == 5:
|
||||
terminate_processes(processes)
|
||||
processes.clear()
|
||||
|
||||
terminate_processes(processes)
|
||||
processes.clear()
|
||||
logging.info(f"Sleeping for {refresh_time} seconds")
|
||||
time.sleep(refresh_time)
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
# Основная функция
|
||||
def main():
|
||||
setup_logging()
|
||||
init_oracle_client()
|
||||
logging.warning("<<<<< start >>>>>")
|
||||
|
||||
path = os.path.abspath(__file__).replace('.py', '.ini')
|
||||
if not os.path.isfile(path):
|
||||
logging.error(f'File not found {path}')
|
||||
return
|
||||
|
||||
global config
|
||||
config = load_config(path)
|
||||
|
||||
try:
|
||||
while True:
|
||||
process_data()
|
||||
|
||||
except (FileNotFoundError, configparser.Error, oracledb.DatabaseError, Exception) as e:
|
||||
logging.error(f'{e}')
|
||||
finally:
|
||||
logging.warning("<<<<< finish >>>>>")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user