#!/usr/bin/env python3 # # Copyright (C) 2016,2017 The University of Sheffield, UK # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import getopt import os import sys import tarfile import tempfile import traceback import fnmatch from multiprocessing import Pool, Lock from functools import partial from ExtensionCrawler.archive import update_sqlite_incremental def help(): print("create-db [OPTION] DBBASEDIR") print(" DBBASEDIR directory for generated db files") print(" -h print this help text") print(" -a archive directory") print(" -p three-letter-prefix") print(" -e file with extension ids") print(" -t number of parallel threads") print(" -n process chunk n where n in [1,N]") print(" -N ") def guarded_stdout(string): lock.acquire() sys.stdout.write(string) lock.release() def guarded_stderr(string): lock.acquire() sys.stderr.write(string) lock.release() def process_id(dbbasedir, path): with tempfile.TemporaryDirectory() as tmpdir: with tarfile.open(path) as t: t.extractall(tmpdir) extid = os.listdir(tmpdir)[0] dbpath = os.path.join(dbbasedir, extid + ".sqlite") if os.path.exists(dbpath): os.remove(dbpath) iddir = os.path.join(tmpdir, extid) for date in sorted(os.listdir(iddir)): try: update_sqlite_incremental(dbpath, iddir, extid, date, True, "") except Exception: guarded_stderr("Exception when handling {} on {}:\n{}\n". format(extid, date, traceback.format_exc())) def find(archive, pattern): for root, _, files in os.walk(os.path.join(archive, "data")): for file in files: if fnmatch.fnmatch(file, pattern + ".tar"): yield os.path.join(root, file) def find_from_file(archive, extidlistfile): with open(extidlistfile, 'r') as f: extids = [l.strip() for l in f.readlines()] for root, _, files in os.walk(os.path.join(archive, "data")): for file in files: for extid in extids: if fnmatch.fnmatch(file, extid + ".tar"): yield os.path.join(root, file) def init(l): global lock lock = l def parse_args(argv): archive = "archive" parallel = 8 taskid = 1 maxtaskid = 1 paths = [] try: opts, args = getopt.getopt(argv, "ha:p:e:t:n:N:", [ "archive=", "prefix=", "extidlistfile=", "threads=", "taskid=", "maxtaskid=" ]) except getopt.GetoptError: help() sys.exit(2) for opt, arg in opts: if opt == '-h': help() sys.exit() elif opt in ("-a", "--archive"): archive = arg elif opt in ("-p", "--prefix"): paths += find(archive, arg + "*") elif opt in ("-e", "--extidlistfile"): paths += find_from_file(archive, arg) elif opt in ("-t", "--threads"): parallel = int(arg) elif opt in ("-n", "--taskid"): taskid = int(arg) elif opt in ("-N", "--maxtaskid"): maxtaskid = int(arg) if len(args) < 1: help() sys.exit(2) dbbasedir = args[0] paths += args[1:] if paths == []: paths = list(find(archive, "*")) chunksize = int(len(paths) / maxtaskid) if taskid == maxtaskid: paths = paths[(taskid - 1) * chunksize:] else: paths = paths[(taskid - 1) * chunksize:taskid * chunksize] return dbbasedir, paths, parallel def main(argv): dbbasedir, paths, parallel = parse_args(argv) with Pool(initializer=init, initargs=(Lock(), ), processes=parallel) as p: p.map(partial(process_id, dbbasedir), paths) if __name__ == "__main__": main(sys.argv[1:])