From 6ba73c2ed91002f690dff24ab26ae0b3c019b8a4 Mon Sep 17 00:00:00 2001 From: Michael Herzberg Date: Wed, 4 Oct 2017 20:54:52 +0100 Subject: [PATCH] Changed autocommit behaviour. --- ExtensionCrawler/db.py | 104 +++++++++++--------- ExtensionCrawler/dbbackend/mysql_backend.py | 88 +++++++++-------- create-db | 28 +++--- 3 files changed, 118 insertions(+), 102 deletions(-) diff --git a/ExtensionCrawler/db.py b/ExtensionCrawler/db.py index d7d55cb..f9291cd 100644 --- a/ExtensionCrawler/db.py +++ b/ExtensionCrawler/db.py @@ -413,62 +413,70 @@ def parse_and_insert_status(ext_id, date, datepath, con): overview_exception=overview_exception) -def update_db_incremental(tmptardir, ext_id, date): +def update_db_incremental(tmptardir, ext_id, date, con=None): + if con is not None: + update_db_incremental_with_connection(tmptardir, ext_id, date, con) + else: + with MysqlBackend( + ext_id, + read_default_file=const_mysql_config_file(), + charset='utf8mb4', + compress=True) as con: + update_db_incremental_with_connection(tmptardir, ext_id, date, con) + + +def update_db_incremental_with_connection(tmptardir, ext_id, date, con): log_info("* Updating db with data from from {}".format(date), 2, ext_id) datepath = os.path.join(tmptardir, date) - # Don't forget to create a ~/.my.cnf file with the credentials - with MysqlBackend( - ext_id, read_default_file=const_mysql_config_file(), - charset='utf8mb4', - autocommit=True, - compress=True - ) as con: - etag = get_etag(ext_id, datepath, con) - - if etag: - try: - parse_and_insert_crx(ext_id, date, datepath, con) - except Exception as e: - log_exception("Exception when parsing crx", 3, ext_id) - else: - crx_status = get_crx_status(datepath) - if crx_status != 401 and crx_status != 204 and crx_status != 404: - log_warning("* WARNING: could not find etag", 3, ext_id) + etag = get_etag(ext_id, datepath, con) + if etag: try: - parse_and_insert_overview(ext_id, date, datepath, con) + parse_and_insert_crx(ext_id, date, datepath, con) except Exception as e: - log_exception("Exception when parsing overview", 3, ext_id) + log_exception("Exception when parsing crx", 3, ext_id) + else: + crx_status = get_crx_status(datepath) + if crx_status != 401 and crx_status != 204 and crx_status != 404: + log_warning("* WARNING: could not find etag", 3, ext_id) + try: + parse_and_insert_overview(ext_id, date, datepath, con) + except Exception as e: + log_exception("Exception when parsing overview", 3, ext_id) + + try: + parse_and_insert_status(ext_id, date, datepath, con) + except Exception as e: + log_exception("Exception when parsing status", 3, ext_id) + + reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text")) + for reviewpath in reviewpaths: try: - parse_and_insert_status(ext_id, date, datepath, con) + parse_and_insert_review(ext_id, date, reviewpath, con) + except json.decoder.JSONDecodeError as e: + log_warning("- WARNING: Review is not a proper json file!", 3, + ext_id) except Exception as e: - log_exception("Exception when parsing status", 3, ext_id) + log_exception("Exception when parsing review", 3, ext_id) - reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text")) - for reviewpath in reviewpaths: - try: - parse_and_insert_review(ext_id, date, reviewpath, con) - except json.decoder.JSONDecodeError as e: - log_warning("- WARNING: Review is not a proper json file!", 3, ext_id) - except Exception as e: - log_exception("Exception when parsing review", 3, ext_id) + supportpaths = glob.glob(os.path.join(datepath, "support*-*.text")) + for supportpath in supportpaths: + try: + parse_and_insert_support(ext_id, date, supportpath, con) + except json.decoder.JSONDecodeError as e: + log_warning("- WARNING: Support is not a proper json file!", 3, + ext_id) + except Exception as e: + log_exception("Exception when parsing support", 3, ext_id) - supportpaths = glob.glob(os.path.join(datepath, "support*-*.text")) - for supportpath in supportpaths: - try: - parse_and_insert_support(ext_id, date, supportpath, con) - except json.decoder.JSONDecodeError as e: - log_warning("- WARNING: Support is not a proper json file!", 3, ext_id) - except Exception as e: - log_exception("Exception when parsing support", 3, ext_id) - - repliespaths = glob.glob(os.path.join(datepath, "*replies.text")) - for repliespath in repliespaths: - try: - parse_and_insert_replies(ext_id, date, repliespath, con) - except json.decoder.JSONDecodeError as e: - log_warning("- WARNING: Reply is not a proper json file!", 3, ext_id) - except Exception as e: - log_exception("Exception when parsing reply", 3, ext_id) + repliespaths = glob.glob(os.path.join(datepath, "*replies.text")) + for repliespath in repliespaths: + try: + parse_and_insert_replies(ext_id, date, repliespath, con) + except json.decoder.JSONDecodeError as e: + log_warning("- WARNING: Reply is not a proper json file!", 3, + ext_id) + except Exception as e: + log_exception("Exception when parsing reply", 3, ext_id) diff --git a/ExtensionCrawler/dbbackend/mysql_backend.py b/ExtensionCrawler/dbbackend/mysql_backend.py index f0fdf77..da6ba51 100644 --- a/ExtensionCrawler/dbbackend/mysql_backend.py +++ b/ExtensionCrawler/dbbackend/mysql_backend.py @@ -18,41 +18,64 @@ from ExtensionCrawler.config import * import MySQLdb import _mysql_exceptions -import atexit import time from random import uniform from ExtensionCrawler.util import log_info, log_error, log_exception - -db = None - - -def close_db(): - if db is not None: - db.close() - - -atexit.register(close_db) +import datetime +from itertools import starmap class MysqlBackend: + db = None + cursor = None + dbargs = None + ext_id = None + + cache = [] + + def __init__(self, ext_id, **kwargs): + self.ext_id = ext_id + self.dbargs = kwargs + + def __enter__(self): + self._create_conn() + return self + + def __exit__(self, *args): + start = time.time() + self.retry(lambda: list(starmap(lambda query, args: self.cursor.executemany(query, args), self.cache))) + self.db.commit() + log_info( + "* Database batch insert finished after {}".format( + datetime.timedelta(seconds=int(time.time() - start))), + 3, + self.ext_id) + self._close_conn() + + def _create_conn(self): + if self.db is None: + self.db = MySQLdb.connect(**self.dbargs) + if self.cursor is None: + self.cursor = self.db.cursor() + + def _close_conn(self): + if self.cursor is not None: + self.cursor.close() + self.cursor = None + if self.db is not None: + self.db.close() + self.db = None + def retry(self, f): - global db for t in range(const_mysql_maxtries()): try: + self._create_conn() return f() except _mysql_exceptions.OperationalError as e: last_exception = e try: - # Reopen database connection - if self.cursor is not None: - self.cursor.close() - self.cursor = None - if db is not None: - db.close() - db = None - db = MySQLdb.connect(**self.dbargs) - self.cursor = db.cursor() + self._close_conn() except Exception as e2: log_error("Surpressed exception: {}".format(str(e2)), 3, self.ext_id) @@ -76,27 +99,6 @@ class MysqlBackend: time.sleep(const_mysql_try_wait() * uniform( 1 - factor, 1 + factor)) - def __init__(self, ext_id, **kwargs): - self.ext_id = ext_id - self.dbargs = kwargs - - def __enter__(self): - global db - if db is None: - db = MySQLdb.connect(**self.dbargs) - self.cursor = db.cursor() - - return self - - def __exit__(self, *args): - try: - if self.cursor is not None: - self.cursor.close() - self.cursor = None - except Exception as e: - log_error("Surpressed exception: {}".format(str(e)), 3, - self.ext_id) - def get_single_value(self, query, args): self.retry(lambda: self.cursor.execute(query, args)) @@ -119,7 +121,7 @@ class MysqlBackend: ",".join(len(args[0]) * ["%s"]), ",".join( ["{c}=VALUES({c})".format(c=c) for c in arglist[0].keys()])) - self.retry(lambda: self.cursor.executemany(query, args)) + self.cache += [(query, args)] def insert(self, table, **kwargs): self.insertmany(table, [kwargs]) diff --git a/create-db b/create-db index 824f611..31c0107 100755 --- a/create-db +++ b/create-db @@ -32,6 +32,7 @@ from ExtensionCrawler.archive import update_db_incremental from ExtensionCrawler.config import * from ExtensionCrawler.util import log_info, log_warning, log_error, log_exception +from ExtensionCrawler.dbbackend.mysql_backend import MysqlBackend def help(): print("""create-db [OPTION]""") @@ -58,17 +59,22 @@ def process_id(from_date, until_date, path): log_info("Start processing extension", 0, extid) iddir = os.path.join(tmpdir, extid) - for date in sorted(os.listdir(iddir)): - if (from_date is not None and date < from_date) or \ - (until_date is not None and date > until_date): - log_info("* Skipping {}".format(date), 2, extid) - continue - try: - update_db_incremental(iddir, extid, date) - except Exception: - log_exception( - "Exception when handling data from {}".format(date), 0, - extid) + with MysqlBackend( + extid, read_default_file=const_mysql_config_file(), + charset='utf8mb4', + compress=True + ) as con: + for date in sorted(os.listdir(iddir)): + if (from_date is not None and date < from_date) or \ + (until_date is not None and date > until_date): + log_info("* Skipping {}".format(date), 2, extid) + continue + try: + update_db_incremental(iddir, extid, date, con) + except Exception: + log_exception( + "Exception when handling data from {}".format(date), 0, + extid) log_info( "Finished extension in {}".format( str(datetime.timedelta(seconds=int(time.time() - start)))),