#!/usr/bin/env python3.7 # # Copyright (C) 2016,2017 The University of Sheffield, UK # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import getopt import sys import tarfile import time import tempfile from functools import partial import fnmatch import multiprocessing from pebble import ProcessPool import os import datetime from ExtensionCrawler.archive import update_db_incremental from ExtensionCrawler.config import archive_file, const_basedir, const_mysql_config_file from ExtensionCrawler.util import log_info, log_exception, setup_logger, set_logger_tag from ExtensionCrawler.dbbackend.mysql_backend import MysqlBackend def print_help(): print("""create-db [OPTION]""") print(""" -h print this help text""") print(""" -a archive directory""") print(""" -p three-letter-prefix""") print(""" -e file with extension ids""") print(""" --from-date only process information gathered after""" """ this date (compared lexographically)""") print(""" --until-date only process information gathered before""" """ this date (compared lexographically)""") print(""" -t number of parallel threads""") print(""" -n process chunk n where n in [1,N]""") print(""" -N """) print(""" --delayed uses INSERT DELAYED INTO statements""") def init_process(verbose): # When not using fork, we need to setup logging again in the worker threads setup_logger(verbose) def process_id(from_date, until_date, delayed, path): start = time.time() with tempfile.TemporaryDirectory() as tmpdir: with tarfile.open(path) as t: t.extractall(tmpdir) extid = os.listdir(tmpdir)[0] set_logger_tag(extid) log_info("Start processing extension", 0) iddir = os.path.join(tmpdir, extid) try: with MysqlBackend( extid, delayed=delayed, cache_etags=True, read_default_file=const_mysql_config_file(), charset='utf8mb4') as con: for date in sorted(os.listdir(iddir)): if (from_date is not None and date < from_date) or \ (until_date is not None and date > until_date): log_info("* Skipping {}".format(date), 2) continue try: update_db_incremental(iddir, extid, date, con) except Exception: log_exception("Exception when handling data from {}".format(date), 0) except Exception: log_exception("Exception when handling extension", 0) log_info("Finished extension in {}".format(str(datetime.timedelta(seconds=int(time.time() - start)))), 0) def find(archive, pattern): for root, _, files in os.walk(os.path.join(archive, "data")): for file in files: if fnmatch.fnmatch(file, pattern + ".tar") or fnmatch.fnmatch(file, pattern + ".[0-9][0-9][0-9].tar.xz"): yield os.path.join(root, file) def find_from_file(archive, extidlistfile): with open(extidlistfile, 'r') as f: for line in f.readlines(): yield archive_file(os.path.join(archive, "data"), line.strip()) def parse_args(argv): archive = const_basedir() parallel = 8 taskid = 1 maxtaskid = 1 from_date = None until_date = None delayed = False paths = [] try: opts, args = getopt.getopt(argv, "ha:p:e:t:n:N:", [ "archive=", "prefix=", "extidlistfile=", "threads=", "taskid=", "maxtaskid=", "from-date=", "until-date=", "delayed", "help" ]) except getopt.GetoptError: print_help() sys.exit(2) for opt, arg in opts: if opt in ("-h", "--help"): print_help() sys.exit() elif opt in ("-a", "--archive"): archive = arg elif opt in ("-p", "--prefix"): paths += find(archive, arg + "*") elif opt in ("-e", "--extidlistfile"): paths += find_from_file(archive, arg) elif opt in ("-t", "--threads"): parallel = int(arg) elif opt in ("-n", "--taskid"): taskid = int(arg) elif opt in ("-N", "--maxtaskid"): maxtaskid = int(arg) elif opt == "--from-date": from_date = arg elif opt == "--until-date": until_date = arg elif opt == "--delayed": delayed = True if not paths: paths = list(find(archive, "*")) chunksize = int(len(paths) / maxtaskid) if taskid == maxtaskid: paths = paths[(taskid - 1) * chunksize:] else: paths = paths[(taskid - 1) * chunksize:taskid * chunksize] return paths, parallel, from_date, until_date, delayed def main(argv): multiprocessing.set_start_method("forkserver") verbose = True setup_logger(verbose) paths, parallel, from_date, until_date, delayed = parse_args(argv) with ProcessPool(max_workers=parallel, max_tasks=100, initializer=init_process, initargs=(verbose,)) as p: p.map(partial(process_id, from_date, until_date, delayed), paths) if __name__ == "__main__": main(sys.argv[1:])