Process extensions in chunks.

This commit is contained in:
Achim D. Brucker 2018-04-06 21:34:09 +01:00
parent d5df43c5c3
commit 14a30a570d
1 changed files with 45 additions and 39 deletions

View File

@ -539,48 +539,54 @@ def update_extension(archivedir, forums, ext_id):
res_reviews, res_support, sql_exception, sql_success)
def execute_parallel(archivedir, max_retry, timeout, max_workers, ext_ids):
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
def execute_parallel(archivedir, max_retry, timeout, max_workers, ext_ids, chunksize=30000):
results=[]
for n in range(max_retry):
if n > 0:
log_info("Attempt ({} out of {}): {} extensions excluding forums (parallel)".format(
n,max_retry,len(ext_timeouts)), 1)
ext_ids=ext_timeouts
for chunk in chunks(ext_ids, chunksize):
for n in range(max_retry):
if n > 0:
log_info("Attempt ({} out of {}): {} extensions excluding forums (parallel)".format(
n,max_retry,len(ext_timeouts)), 1)
chunk=ext_timeouts
ext_timeouts=[]
with ProcessPool(max_workers=1, max_tasks=100) as pool:
future = pool.map(partial(update_extension, archivedir, False)
,ext_ids
,timeout=timeout)
iterator = future.result()
ext_timeouts=[]
for ext_id in ext_ids:
try:
results.append(next(iterator))
except StopIteration:
break
except TimeoutError as error:
log_info("WorkerException: Processing of %s took longer than %d seconds" % (ext_id,error.args[1]))
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
except ProcessExpired as error:
log_info("WorkerException: %s (%s). Exit code: %d" % (error, ext_id, error.exitcode))
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
except Exception as error:
log_info("WorkerException: Processing %s raised %s" % (ext_id, error))
log_info(error.traceback) # Python's traceback of remote process
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
with ProcessPool(max_workers=1, max_tasks=100) as pool:
future = pool.map(partial(update_extension, archivedir, False)
,chunk
,timeout=timeout)
iterator = future.result()
ext_timeouts=[]
for ext_id in chunk:
try:
results.append(next(iterator))
except StopIteration:
break
except TimeoutError as error:
log_info("WorkerException: Processing of %s took longer than %d seconds" % (ext_id,error.args[1]))
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
except ProcessExpired as error:
log_info("WorkerException: %s (%s)self. Exit code: %d" % (error, ext_id, error.exitcode))
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
except Exception as error:
log_info("WorkerException: Processing %s raised %s" % (ext_id, error))
log_info(error.traceback) # Python's traceback of remote process
ext_timeouts.append(ext_id)
results.append(UpdateResult(ext_id, False, error,
None, None, None,
None, None, False))
return results