Removed plain multiprocessing option.

This commit is contained in:
Michael Herzberg 2018-04-21 17:25:22 +01:00
parent 0613ac1ac1
commit ac3c1c7f20
3 changed files with 9 additions and 30 deletions

View File

@ -581,7 +581,7 @@ def init_process(verbose, start_pystuck=False):
pystuck.run_server(port=((os.getpid() % 10000) + 10001))
def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck):
def execute_parallel(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck):
results=[]
with ProcessPool(max_workers=max_workers, max_tasks=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool:
future = pool.map(partial(update_extension, archivedir, forums),
@ -608,26 +608,10 @@ def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ex
return results
def execute_parallel_Pool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck):
log_info("Using multiprocessing.Pool: timeout and max_try are *not* supported")
with Pool(processes=max_workers, maxtasksperchild=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool:
# The default chunksize is None, which means that each process will only
# ever get one task with chunksize len(ext_ids)/max_workers. This would
# render maxtasksperchild useless.
results = pool.map(partial(update_extension, archivedir, forums),
ext_ids,
chunksize=1)
return list(results)
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, use_process_pool, verbose, start_pystuck):
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, verbose, start_pystuck):
ext_with_forums = []
ext_without_forums = []
forums_ext_ids = (list(set(forums_ext_ids)))
if use_process_pool:
execute_parallel=execute_parallel_ProcessPool
else:
execute_parallel=execute_parallel_Pool
log_info("Updating {} extensions ({} including forums)".format(
len(ext_ids), len(forums_ext_ids)))

17
crawler
View File

@ -157,7 +157,6 @@ def helpmsg():
print(" -s silent (no log messages)")
print(" -d discover new extensions")
print(" -p <N> number of concurrent downloads")
print(" -P use ProcessPool (default: Pool) for concurrency")
print(
" -F do not download extensions with forums (skip sequential download)"
)
@ -173,7 +172,7 @@ def helpmsg():
def print_config(basedir, archive_dir, conf_dir, discover, parallel,
download_ext_ids_without_forums, download_ext_ids_with_forums,
ext_timeout, use_process_pool, start_pystuck):
ext_timeout, start_pystuck):
"""Print current configuration."""
log_info("Configuration:")
log_info(" Base dir: {}".format(basedir))
@ -185,7 +184,6 @@ def print_config(basedir, archive_dir, conf_dir, discover, parallel,
log_info(" Download ext. with forums: {}".format(
download_ext_ids_with_forums))
log_info(" Max num. of concurrent downloads: {}".format(parallel))
log_info(" Use ProcessPool: {}".format(use_process_pool))
log_info(" Download timeout: {}".format(ext_timeout))
log_info(" Start PyStuck: {}".format(start_pystuck))
@ -196,7 +194,6 @@ def parse_args(argv):
parallel = const_parallel_downloads()
verbose = const_verbose()
discover = const_discover()
use_process_pool = const_use_process_pool()
download_ext_ids_with_forums = const_download_ext_ids_with_forums()
download_ext_ids_without_forums = const_download_ext_ids_without_forums()
ext_timeout = const_ext_timeout()
@ -221,8 +218,6 @@ def parse_args(argv):
ext_timeout = int(arg)
elif opt == '-s':
verbose = False
elif opt == '-P':
use_process_pool = True
elif opt == '-d':
discover = True
elif opt == '-F':
@ -234,7 +229,7 @@ def parse_args(argv):
max_discover = int(arg)
elif opt == '--pystuck':
start_pystuck = True
return basedir, parallel, verbose, discover, max_discover, download_ext_ids_with_forums, download_ext_ids_without_forums, ext_timeout, use_process_pool, start_pystuck
return basedir, parallel, verbose, discover, max_discover, download_ext_ids_with_forums, download_ext_ids_without_forums, ext_timeout, start_pystuck
def main(argv):
@ -247,7 +242,7 @@ def main(argv):
multiprocessing.set_start_method("forkserver")
today = datetime.datetime.now(datetime.timezone.utc).isoformat()
basedir, parallel, verbose, discover, max_discover, download_ext_ids_with_forums, download_ext_ids_without_forums, ext_timeout, use_process_pool, start_pystuck = parse_args(
basedir, parallel, verbose, discover, max_discover, download_ext_ids_with_forums, download_ext_ids_without_forums, ext_timeout, start_pystuck = parse_args(
argv)
setup_logger(verbose)
@ -272,7 +267,7 @@ def main(argv):
print_config(basedir, archive_dir, conf_dir, discover, parallel,
download_ext_ids_with_forums, download_ext_ids_without_forums,
ext_timeout, use_process_pool, start_pystuck)
ext_timeout, start_pystuck)
forum_ext_ids = get_forum_ext_ids(conf_dir)
known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids))
@ -304,7 +299,7 @@ def main(argv):
forum_ext_ids = []
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids,
ext_timeout, use_process_pool, verbose, start_pystuck)
ext_timeout, verbose, start_pystuck)
# We re-try (once) the extensions with unknown exceptions, as
# they are often temporary
@ -319,7 +314,7 @@ def main(argv):
ext_ids_except = sorted(
list(set(has_exception_ids) - set(forum_ext_ids_except)))
res_update = update_extensions(archive_dir, parallel,
forum_ext_ids_except, ext_ids_except, ext_timeout, use_process_pool, verbose, start_pystuck)
forum_ext_ids_except, ext_ids_except, ext_timeout, verbose, start_pystuck)
res = list(set(res) - set(has_exception)) + res_update
end_time = time.time()

View File

@ -12,7 +12,7 @@ LOG=$LOGPREFIX-global.log
date --utc +'* Start Updating Extensions Archive (%c)' | tee $LOG
# Update extensions
singularity exec --bind /srv/:/srv/ $IMAGE crawler -p 42 -d -P --pystuck -a $ARCHIVE > $LOGPREFIX.log
singularity exec --bind /srv/:/srv/ $IMAGE crawler -p 42 -d --pystuck -a $ARCHIVE > $LOGPREFIX.log
date --utc +'* Update Finished (%c)' | tee -a $LOG