Added -p option for create-db (parallelism).

This commit is contained in:
Michael Herzberg 2017-06-20 15:10:32 +01:00
parent 437a00d256
commit f40febdf09
1 changed files with 41 additions and 28 deletions

View File

@ -23,6 +23,7 @@ import glob
import tarfile
import tempfile
import traceback
from multiprocessing import Pool
from ExtensionCrawler.sqlite import *
from ExtensionCrawler.config import *
@ -33,13 +34,45 @@ def help():
print(" -h print this help text")
print(" -a=<DIR> archive directory")
print(" -p=<PREFIX> three-letter-prefix")
print(" -t=<THREADS> number of parallel threads")
def process_id(archivedir, verbose, ext_id):
txt = ""
txt = logmsg(verbose, txt, "Processing {} ...\n".format(ext_id))
tarpath = archive_file(archivedir, ext_id)
dbpath = db_file(archivedir, ext_id)
if os.path.exists(dbpath):
os.remove(dbpath)
with tempfile.TemporaryDirectory() as tmpdir:
with tarfile.open(tarpath) as t:
t.extractall(tmpdir)
iddir = os.path.join(tmpdir, ext_id)
for date in sorted(os.listdir(iddir)):
try:
update_txt = update_sqlite_incremental(
archivedir, iddir, ext_id, date, True, "")
txt = logmsg(verbose, txt, update_txt)
except Exception as e:
txt = logmsg(verbose, txt,
"Exception when handling {} on {}:\n".format(
ext_id, date))
txt = logmsg(verbose, txt, traceback.format_exc())
txt = logmsg(verbose, txt, "\n")
return txt
def main(argv):
basedir = "archive"
prefix = ""
parallel = 8
try:
opts, args = getopt.getopt(argv, "ha:p:", ["archive=", "prefix="])
opts, args = getopt.getopt(argv, "ha:p:t:",
["archive=", "prefix=", "threads="])
except getopt.GetoptError:
help()
sys.exit(2)
@ -51,37 +84,17 @@ def main(argv):
basedir = arg
elif opt in ("-p", "--prefix"):
prefix = arg
elif opt in ("-t", "--threads"):
parallel = int(arg)
archivedir = os.path.join(basedir, "data")
threeletterdirs = glob.glob(os.path.join(archivedir, prefix + "*"))
for threeletterdir in threeletterdirs:
for ext_id in set([d[:32] for d in os.listdir(threeletterdir)]):
sys.stdout.write("Processing {} ...\n".format(ext_id))
sys.stdout.flush()
tarpath = archive_file(archivedir, ext_id)
dbpath = db_file(archivedir, ext_id)
if os.path.exists(dbpath):
os.remove(dbpath)
with tempfile.TemporaryDirectory() as tmpdir:
with tarfile.open(tarpath) as t:
t.extractall(tmpdir)
iddir = os.path.join(tmpdir, ext_id)
for date in sorted(os.listdir(iddir)):
try:
sys.stdout.write(
update_sqlite_incremental(
archivedir, iddir, ext_id, date, True, ""))
sys.stdout.flush()
except Exception as e:
sys.stdout.write(
"Exception when handling {} on {}:\n".format(
ext_id, date))
sys.stdout.write(traceback.format_exc())
sys.stdout.flush()
sys.stdout.write("\n")
sys.stdout.flush()
ext_ids = list(set([d[:32] for d in os.listdir(threeletterdir)]))
with Pool(parallel) as p:
for txt in p.imap(partial(process_id, archivedir, True), ext_ids):
sys.stdout.write(txt)
sys.stdout.flush()
if __name__ == "__main__":