ExtensionCrawler/create-db

169 lines
6.0 KiB
Python
Executable File

#!/usr/bin/env python3.7
#
# Copyright (C) 2016,2017 The University of Sheffield, UK
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import getopt
import sys
import tarfile
import time
import tempfile
from functools import partial
import fnmatch
import multiprocessing
from pebble import ProcessPool
import os
import datetime
from ExtensionCrawler.archive import update_db_incremental
from ExtensionCrawler.config import archive_file, const_basedir, const_mysql_config_file
from ExtensionCrawler.util import log_info, log_exception, setup_logger, set_logger_tag
from ExtensionCrawler.dbbackend.mysql_backend import MysqlBackend
def print_help():
print("""create-db [OPTION]""")
print(""" -h print this help text""")
print(""" -a <DIR> archive directory""")
print(""" -p <PREFIX> three-letter-prefix""")
print(""" -e <EXTIDFILELIST> file with extension ids""")
print(""" --from-date <DATE> only process information gathered after"""
""" this date (compared lexographically)""")
print(""" --until-date <DATE> only process information gathered before"""
""" this date (compared lexographically)""")
print(""" -t <THREADS> number of parallel threads""")
print(""" -n <TASKID> process chunk n where n in [1,N]""")
print(""" -N <MAXTASKID> """)
print(""" --delayed uses INSERT DELAYED INTO statements""")
def init_process(verbose):
# When not using fork, we need to setup logging again in the worker threads
setup_logger(verbose)
def process_id(from_date, until_date, delayed, path):
start = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
with tarfile.open(path) as t:
t.extractall(tmpdir)
extid = os.listdir(tmpdir)[0]
set_logger_tag(extid)
log_info("Start processing extension", 0)
iddir = os.path.join(tmpdir, extid)
try:
with MysqlBackend(
extid,
delayed=delayed,
cache_etags=True,
read_default_file=const_mysql_config_file(),
charset='utf8mb4') as con:
for date in sorted(os.listdir(iddir)):
if (from_date is not None and date < from_date) or \
(until_date is not None and date > until_date):
log_info("* Skipping {}".format(date), 2)
continue
try:
update_db_incremental(iddir, extid, date, con)
except Exception:
log_exception("Exception when handling data from {}".format(date), 0)
except Exception:
log_exception("Exception when handling extension", 0)
log_info("Finished extension in {}".format(str(datetime.timedelta(seconds=int(time.time() - start)))), 0)
def find(archive, pattern):
for root, _, files in os.walk(os.path.join(archive, "data")):
for file in files:
if fnmatch.fnmatch(file, pattern + ".tar") or fnmatch.fnmatch(file, pattern + ".[0-9][0-9][0-9].tar.xz"):
yield os.path.join(root, file)
def find_from_file(archive, extidlistfile):
with open(extidlistfile, 'r') as f:
for line in f.readlines():
yield archive_file(os.path.join(archive, "data"), line.strip())
def parse_args(argv):
archive = const_basedir()
parallel = 8
taskid = 1
maxtaskid = 1
from_date = None
until_date = None
delayed = False
paths = []
try:
opts, args = getopt.getopt(argv, "ha:p:e:t:n:N:", [
"archive=", "prefix=", "extidlistfile=", "threads=", "taskid=",
"maxtaskid=", "from-date=", "until-date=", "delayed", "help"
])
except getopt.GetoptError:
print_help()
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
print_help()
sys.exit()
elif opt in ("-a", "--archive"):
archive = arg
elif opt in ("-p", "--prefix"):
paths += find(archive, arg + "*")
elif opt in ("-e", "--extidlistfile"):
paths += find_from_file(archive, arg)
elif opt in ("-t", "--threads"):
parallel = int(arg)
elif opt in ("-n", "--taskid"):
taskid = int(arg)
elif opt in ("-N", "--maxtaskid"):
maxtaskid = int(arg)
elif opt == "--from-date":
from_date = arg
elif opt == "--until-date":
until_date = arg
elif opt == "--delayed":
delayed = True
if not paths:
paths = list(find(archive, "*"))
chunksize = int(len(paths) / maxtaskid)
if taskid == maxtaskid:
paths = paths[(taskid - 1) * chunksize:]
else:
paths = paths[(taskid - 1) * chunksize:taskid * chunksize]
return paths, parallel, from_date, until_date, delayed
def main(argv):
multiprocessing.set_start_method("forkserver")
verbose = True
setup_logger(verbose)
paths, parallel, from_date, until_date, delayed = parse_args(argv)
with ProcessPool(max_workers=parallel, max_tasks=100, initializer=init_process, initargs=(verbose,)) as p:
p.map(partial(process_id, from_date, until_date, delayed), paths)
if __name__ == "__main__":
main(sys.argv[1:])