Обновить yandex_way_mp.py
This commit is contained in:
parent
97f6594568
commit
925aee9132
@ -46,6 +46,16 @@ def get_refresh_time(cursor):
|
|||||||
result = cursor.fetchone()
|
result = cursor.fetchone()
|
||||||
return int(result[0]) if result else 30
|
return int(result[0]) if result else 30
|
||||||
|
|
||||||
|
def get_all_chunk_size(cursor):
|
||||||
|
cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'all_chunk_size'")
|
||||||
|
result = cursor.fetchone()
|
||||||
|
return int(result[0]) if result else 100
|
||||||
|
|
||||||
|
def get_process_count(cursor):
|
||||||
|
cursor.execute("SELECT value FROM ln_params WHERE section = 'YandexAPI' AND name = 'process_count'")
|
||||||
|
result = cursor.fetchone()
|
||||||
|
return int(result[0]) if result else 5
|
||||||
|
|
||||||
# Функция для обработки записей в процессе
|
# Функция для обработки записей в процессе
|
||||||
def process_records(records, apikey, queue):
|
def process_records(records, apikey, queue):
|
||||||
try:
|
try:
|
||||||
@ -70,9 +80,9 @@ def process_records(records, apikey, queue):
|
|||||||
queue.put("error")
|
queue.put("error")
|
||||||
|
|
||||||
# Функция для завершения процессов
|
# Функция для завершения процессов
|
||||||
def terminate_processes(processes):
|
def terminate_processes(processes, p_timeout = 5):
|
||||||
for process in processes:
|
for process in processes:
|
||||||
process.join(timeout=15)
|
process.join(timeout=p_timeout)
|
||||||
if process.is_alive():
|
if process.is_alive():
|
||||||
process.terminate()
|
process.terminate()
|
||||||
process.join()
|
process.join()
|
||||||
@ -88,15 +98,18 @@ def process_data():
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
refresh_time = get_refresh_time(cursor)
|
refresh_time = get_refresh_time(cursor)
|
||||||
|
all_chunk_size = get_all_chunk_size(cursor)
|
||||||
|
all_process_count = get_process_count(cursor)
|
||||||
cursor.execute("SELECT way_id, office_id, latitude_from, longitude_from, site_id, latitude_to, longitude_to FROM vw_ways_to_load ORDER BY status, last_date")
|
cursor.execute("SELECT way_id, office_id, latitude_from, longitude_from, site_id, latitude_to, longitude_to FROM vw_ways_to_load ORDER BY status, last_date")
|
||||||
rows = cursor.fetchall()
|
rows = cursor.fetchall()
|
||||||
|
rows = rows[:all_chunk_size]
|
||||||
|
|
||||||
if not rows:
|
if not rows:
|
||||||
logging.info(f'No Data Found, wait {refresh_time} seconds')
|
logging.info(f'No Data Found, wait {refresh_time} seconds')
|
||||||
time.sleep(refresh_time)
|
time.sleep(refresh_time)
|
||||||
break
|
break
|
||||||
|
|
||||||
chunk_size = 20
|
chunk_size = int(all_chunk_size/all_process_count)
|
||||||
processes = []
|
processes = []
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
@ -106,10 +119,14 @@ def process_data():
|
|||||||
processes.append(p)
|
processes.append(p)
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
if len(processes) == 5:
|
if len(processes) == all_process_count:
|
||||||
terminate_processes(processes)
|
terminate_processes(processes)
|
||||||
processes.clear()
|
processes.clear()
|
||||||
|
|
||||||
|
start_next_pool = 10
|
||||||
|
logging.info(f"End pool, wait {start_next_pool} seconds")
|
||||||
|
time.sleep(start_next_pool)
|
||||||
|
|
||||||
terminate_processes(processes)
|
terminate_processes(processes)
|
||||||
processes.clear()
|
processes.clear()
|
||||||
logging.info(f"Sleeping for {refresh_time} seconds")
|
logging.info(f"Sleeping for {refresh_time} seconds")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user