Changed autocommit behaviour.

This commit is contained in:
Michael Herzberg 2017-10-04 20:54:52 +01:00
parent f37e19f46a
commit 6ba73c2ed9
3 changed files with 118 additions and 102 deletions

View File

@ -413,62 +413,70 @@ def parse_and_insert_status(ext_id, date, datepath, con):
overview_exception=overview_exception)
def update_db_incremental(tmptardir, ext_id, date):
def update_db_incremental(tmptardir, ext_id, date, con=None):
if con is not None:
update_db_incremental_with_connection(tmptardir, ext_id, date, con)
else:
with MysqlBackend(
ext_id,
read_default_file=const_mysql_config_file(),
charset='utf8mb4',
compress=True) as con:
update_db_incremental_with_connection(tmptardir, ext_id, date, con)
def update_db_incremental_with_connection(tmptardir, ext_id, date, con):
log_info("* Updating db with data from from {}".format(date), 2, ext_id)
datepath = os.path.join(tmptardir, date)
# Don't forget to create a ~/.my.cnf file with the credentials
with MysqlBackend(
ext_id, read_default_file=const_mysql_config_file(),
charset='utf8mb4',
autocommit=True,
compress=True
) as con:
etag = get_etag(ext_id, datepath, con)
if etag:
try:
parse_and_insert_crx(ext_id, date, datepath, con)
except Exception as e:
log_exception("Exception when parsing crx", 3, ext_id)
else:
crx_status = get_crx_status(datepath)
if crx_status != 401 and crx_status != 204 and crx_status != 404:
log_warning("* WARNING: could not find etag", 3, ext_id)
etag = get_etag(ext_id, datepath, con)
if etag:
try:
parse_and_insert_overview(ext_id, date, datepath, con)
parse_and_insert_crx(ext_id, date, datepath, con)
except Exception as e:
log_exception("Exception when parsing overview", 3, ext_id)
log_exception("Exception when parsing crx", 3, ext_id)
else:
crx_status = get_crx_status(datepath)
if crx_status != 401 and crx_status != 204 and crx_status != 404:
log_warning("* WARNING: could not find etag", 3, ext_id)
try:
parse_and_insert_overview(ext_id, date, datepath, con)
except Exception as e:
log_exception("Exception when parsing overview", 3, ext_id)
try:
parse_and_insert_status(ext_id, date, datepath, con)
except Exception as e:
log_exception("Exception when parsing status", 3, ext_id)
reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text"))
for reviewpath in reviewpaths:
try:
parse_and_insert_status(ext_id, date, datepath, con)
parse_and_insert_review(ext_id, date, reviewpath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Review is not a proper json file!", 3,
ext_id)
except Exception as e:
log_exception("Exception when parsing status", 3, ext_id)
log_exception("Exception when parsing review", 3, ext_id)
reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text"))
for reviewpath in reviewpaths:
try:
parse_and_insert_review(ext_id, date, reviewpath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Review is not a proper json file!", 3, ext_id)
except Exception as e:
log_exception("Exception when parsing review", 3, ext_id)
supportpaths = glob.glob(os.path.join(datepath, "support*-*.text"))
for supportpath in supportpaths:
try:
parse_and_insert_support(ext_id, date, supportpath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Support is not a proper json file!", 3,
ext_id)
except Exception as e:
log_exception("Exception when parsing support", 3, ext_id)
supportpaths = glob.glob(os.path.join(datepath, "support*-*.text"))
for supportpath in supportpaths:
try:
parse_and_insert_support(ext_id, date, supportpath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Support is not a proper json file!", 3, ext_id)
except Exception as e:
log_exception("Exception when parsing support", 3, ext_id)
repliespaths = glob.glob(os.path.join(datepath, "*replies.text"))
for repliespath in repliespaths:
try:
parse_and_insert_replies(ext_id, date, repliespath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Reply is not a proper json file!", 3, ext_id)
except Exception as e:
log_exception("Exception when parsing reply", 3, ext_id)
repliespaths = glob.glob(os.path.join(datepath, "*replies.text"))
for repliespath in repliespaths:
try:
parse_and_insert_replies(ext_id, date, repliespath, con)
except json.decoder.JSONDecodeError as e:
log_warning("- WARNING: Reply is not a proper json file!", 3,
ext_id)
except Exception as e:
log_exception("Exception when parsing reply", 3, ext_id)

View File

@ -18,41 +18,64 @@
from ExtensionCrawler.config import *
import MySQLdb
import _mysql_exceptions
import atexit
import time
from random import uniform
from ExtensionCrawler.util import log_info, log_error, log_exception
db = None
def close_db():
if db is not None:
db.close()
atexit.register(close_db)
import datetime
from itertools import starmap
class MysqlBackend:
db = None
cursor = None
dbargs = None
ext_id = None
cache = []
def __init__(self, ext_id, **kwargs):
self.ext_id = ext_id
self.dbargs = kwargs
def __enter__(self):
self._create_conn()
return self
def __exit__(self, *args):
start = time.time()
self.retry(lambda: list(starmap(lambda query, args: self.cursor.executemany(query, args), self.cache)))
self.db.commit()
log_info(
"* Database batch insert finished after {}".format(
datetime.timedelta(seconds=int(time.time() - start))),
3,
self.ext_id)
self._close_conn()
def _create_conn(self):
if self.db is None:
self.db = MySQLdb.connect(**self.dbargs)
if self.cursor is None:
self.cursor = self.db.cursor()
def _close_conn(self):
if self.cursor is not None:
self.cursor.close()
self.cursor = None
if self.db is not None:
self.db.close()
self.db = None
def retry(self, f):
global db
for t in range(const_mysql_maxtries()):
try:
self._create_conn()
return f()
except _mysql_exceptions.OperationalError as e:
last_exception = e
try:
# Reopen database connection
if self.cursor is not None:
self.cursor.close()
self.cursor = None
if db is not None:
db.close()
db = None
db = MySQLdb.connect(**self.dbargs)
self.cursor = db.cursor()
self._close_conn()
except Exception as e2:
log_error("Surpressed exception: {}".format(str(e2)), 3,
self.ext_id)
@ -76,27 +99,6 @@ class MysqlBackend:
time.sleep(const_mysql_try_wait() * uniform(
1 - factor, 1 + factor))
def __init__(self, ext_id, **kwargs):
self.ext_id = ext_id
self.dbargs = kwargs
def __enter__(self):
global db
if db is None:
db = MySQLdb.connect(**self.dbargs)
self.cursor = db.cursor()
return self
def __exit__(self, *args):
try:
if self.cursor is not None:
self.cursor.close()
self.cursor = None
except Exception as e:
log_error("Surpressed exception: {}".format(str(e)), 3,
self.ext_id)
def get_single_value(self, query, args):
self.retry(lambda: self.cursor.execute(query, args))
@ -119,7 +121,7 @@ class MysqlBackend:
",".join(len(args[0]) * ["%s"]),
",".join(
["{c}=VALUES({c})".format(c=c) for c in arglist[0].keys()]))
self.retry(lambda: self.cursor.executemany(query, args))
self.cache += [(query, args)]
def insert(self, table, **kwargs):
self.insertmany(table, [kwargs])

View File

@ -32,6 +32,7 @@ from ExtensionCrawler.archive import update_db_incremental
from ExtensionCrawler.config import *
from ExtensionCrawler.util import log_info, log_warning, log_error, log_exception
from ExtensionCrawler.dbbackend.mysql_backend import MysqlBackend
def help():
print("""create-db [OPTION]""")
@ -58,17 +59,22 @@ def process_id(from_date, until_date, path):
log_info("Start processing extension", 0, extid)
iddir = os.path.join(tmpdir, extid)
for date in sorted(os.listdir(iddir)):
if (from_date is not None and date < from_date) or \
(until_date is not None and date > until_date):
log_info("* Skipping {}".format(date), 2, extid)
continue
try:
update_db_incremental(iddir, extid, date)
except Exception:
log_exception(
"Exception when handling data from {}".format(date), 0,
extid)
with MysqlBackend(
extid, read_default_file=const_mysql_config_file(),
charset='utf8mb4',
compress=True
) as con:
for date in sorted(os.listdir(iddir)):
if (from_date is not None and date < from_date) or \
(until_date is not None and date > until_date):
log_info("* Skipping {}".format(date), 2, extid)
continue
try:
update_db_incremental(iddir, extid, date, con)
except Exception:
log_exception(
"Exception when handling data from {}".format(date), 0,
extid)
log_info(
"Finished extension in {}".format(
str(datetime.timedelta(seconds=int(time.time() - start)))),