From 97f65945686fb520582be9dea11a73fddc14ac06 Mon Sep 17 00:00:00 2001 From: lukas91 Date: Wed, 12 Jun 2024 10:20:24 +0500 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20yandex=5Fway=5Fmp.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yandex_way_mp.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 yandex_way_mp.py diff --git a/yandex_way_mp.py b/yandex_way_mp.py new file mode 100644 index 0000000..223885e --- /dev/null +++ b/yandex_way_mp.py @@ -0,0 +1,145 @@ +import cx_Oracle as oracledb +import configparser +import os +import json +import logging +import time +import platform +import sys +from multiprocessing import Process, Queue, current_process +import requests + +# Настройка логирования +def setup_logging(): + log_dir = "log" if platform.system() == 'Linux' else "Log" + log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), log_dir) + if not os.path.exists(log_path): + os.makedirs(log_path) + log_filename = os.path.join(log_path, f"{time.strftime('%Y%m%d_yandex_way.log')}") + logging.basicConfig(level=logging.DEBUG, filename=log_filename, filemode='a', format="%(asctime)s %(levelname)s %(message)s") + +# Инициализация клиента Oracle +def init_oracle_client(): + if platform.system() == 'Linux': + oracledb.init_oracle_client(lib_dir='/usr/lib/oracle/11.2/client64/lib') + elif platform.system() == 'Windows': + libpath = os.path.abspath(__file__).replace('yandex_way.py', 'Lib\\instantclient_21_9') + oracledb.init_oracle_client(lib_dir=libpath) + else: + logging.critical(f'System version: {platform.system()}') + sys.exit(0) + +# Загрузка конфигурации +def load_config(path): + config = configparser.ConfigParser() + config.read(path) + return config + +# Функция для получения API ключа +def get_apikey(cursor): + cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'apikey'") + return cursor.fetchone()[0] + +# Функция для получения интервала времени обновления +def get_refresh_time(cursor): + cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'refresh_time'") + result = cursor.fetchone() + return int(result[0]) if result else 30 + +# Функция для обработки записей в процессе +def process_records(records, apikey, queue): + try: + conn = oracledb.connect(user=config['ORACLE']['user'], password=config['ORACLE']['pass'], dsn=dsn_tns) + cursor = conn.cursor() + dep_time = time.time() + 60 * 10 + + for record in records: + origins = f"{record[2]},{record[3]}" + destinations = f"{record[5]},{record[6]}" + params = {'origins': origins, 'destinations': destinations, 'apikey': apikey, 'departure_time': dep_time} + response = requests.get('https://api.routing.yandex.net/v2/distancematrix', params=params) + route_json = response.json() + cursor.execute("BEGIN flm_office.add_way_log(:1, :2, :3, :4); END;", + [record[0], response.status_code, response.url, json.dumps(route_json)]) + + cursor.close() + conn.close() + queue.put("success") + except Exception as e: + logging.error(f'Process {current_process().name} failed: {e}') + queue.put("error") + +# Функция для завершения процессов +def terminate_processes(processes): + for process in processes: + process.join(timeout=15) + if process.is_alive(): + process.terminate() + process.join() + logging.warning(f'Process {process.name} terminated due to timeout') + +# Основная функция для обработки записей +def process_data(): + global dsn_tns + dsn_tns = oracledb.makedsn(config['ORACLE']['host'], config['ORACLE']['port'], service_name=config['ORACLE']['service_name']) + conn = oracledb.connect(user=config['ORACLE']['user'], password=config['ORACLE']['pass'], dsn=dsn_tns) + cursor = conn.cursor() + apikey = get_apikey(cursor) + + while True: + refresh_time = get_refresh_time(cursor) + cursor.execute("SELECT way_id, office_id, latitude_from, longitude_from, site_id, latitude_to, longitude_to FROM vw_ways_to_load ORDER BY status, last_date") + rows = cursor.fetchall() + + if not rows: + logging.info(f'No Data Found, wait {refresh_time} seconds') + time.sleep(refresh_time) + break + + chunk_size = 20 + processes = [] + queue = Queue() + + for i in range(0, len(rows), chunk_size): + chunk = rows[i:i + chunk_size] + p = Process(target=process_records, args=(chunk, apikey, queue)) + processes.append(p) + p.start() + + if len(processes) == 5: + terminate_processes(processes) + processes.clear() + + terminate_processes(processes) + processes.clear() + logging.info(f"Sleeping for {refresh_time} seconds") + time.sleep(refresh_time) + + cursor.close() + conn.close() + +# Основная функция +def main(): + setup_logging() + init_oracle_client() + logging.warning("<<<<< start >>>>>") + + path = os.path.abspath(__file__).replace('.py', '.ini') + if not os.path.isfile(path): + logging.error(f'File not found {path}') + return + + global config + config = load_config(path) + + try: + while True: + process_data() + + except (FileNotFoundError, configparser.Error, oracledb.DatabaseError, Exception) as e: + logging.error(f'{e}') + finally: + logging.warning("<<<<< finish >>>>>") + +if __name__ == "__main__": + main()