diff --git a/yandex_way_mp.py b/yandex_way_mp.py index 223885e..8df1b6b 100644 --- a/yandex_way_mp.py +++ b/yandex_way_mp.py @@ -46,6 +46,16 @@ def get_refresh_time(cursor): result = cursor.fetchone() return int(result[0]) if result else 30 +def get_all_chunk_size(cursor): + cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'all_chunk_size'") + result = cursor.fetchone() + return int(result[0]) if result else 100 + +def get_process_count(cursor): + cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'process_count'") + result = cursor.fetchone() + return int(result[0]) if result else 5 + # Функция для обработки записей в процессе def process_records(records, apikey, queue): try: @@ -70,9 +80,9 @@ def process_records(records, apikey, queue): queue.put("error") # Функция для завершения процессов -def terminate_processes(processes): +def terminate_processes(processes, p_timeout = 5): for process in processes: - process.join(timeout=15) + process.join(timeout=p_timeout) if process.is_alive(): process.terminate() process.join() @@ -88,15 +98,18 @@ def process_data(): while True: refresh_time = get_refresh_time(cursor) + all_chunk_size = get_all_chunk_size(cursor) + all_process_count = get_process_count(cursor) cursor.execute("SELECT way_id, office_id, latitude_from, longitude_from, site_id, latitude_to, longitude_to FROM vw_ways_to_load ORDER BY status, last_date") rows = cursor.fetchall() - + rows = rows[:all_chunk_size] + if not rows: logging.info(f'No Data Found, wait {refresh_time} seconds') time.sleep(refresh_time) break - chunk_size = 20 + chunk_size = int(all_chunk_size/all_process_count) processes = [] queue = Queue() @@ -106,10 +119,14 @@ def process_data(): processes.append(p) p.start() - if len(processes) == 5: + if len(processes) == all_process_count: terminate_processes(processes) processes.clear() + start_next_pool = 10 + logging.info(f"End pool, wait {start_next_pool} seconds") + time.sleep(start_next_pool) + terminate_processes(processes) processes.clear() logging.info(f"Sleeping for {refresh_time} seconds")