diff --git a/ExtensionCrawler/archive.py b/ExtensionCrawler/archive.py index 4f41c9c..b27ab08 100644 --- a/ExtensionCrawler/archive.py +++ b/ExtensionCrawler/archive.py @@ -15,7 +15,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # - """ Module for handling archives of the Browser Extension Crawler. """ @@ -35,19 +34,20 @@ import datetime import dateutil import dateutil.parser import requests +import logging -from ExtensionCrawler.config import (const_review_payload, const_review_search_url, - const_download_url, get_local_archive_dir, - const_overview_url, const_support_url, - const_support_payload, const_review_search_payload, - const_review_url) -from ExtensionCrawler.util import logmsg, google_dos_protection, log, value_of +from ExtensionCrawler.config import ( + const_review_payload, const_review_search_url, const_download_url, + get_local_archive_dir, const_overview_url, const_support_url, + const_support_payload, const_review_search_payload, const_review_url) +from ExtensionCrawler.util import google_dos_protection, value_of from ExtensionCrawler.sqlite import db_file, update_sqlite_incremental class Error(Exception): pass + class CrawlError(Error): def __init__(self, extid, message, pagecontent=""): self.extid = extid @@ -55,6 +55,7 @@ class CrawlError(Error): self.pagecontent = pagecontent super(CrawlError, self).__init__() + class RequestResult: def __init__(self, response=None, exception=None): if response is not None: @@ -97,38 +98,38 @@ class UpdateResult: return self.new def is_ok(self): - return (self.res_overview.is_ok() and - (self.res_crx.is_ok() or self.res_crx.not_modified()) and - ((self.res_reviews is None) or self.res_reviews.is_ok()) and ( - (self.res_support is None) or self.res_support.is_ok())) + return (self.res_overview.is_ok() + and (self.res_crx.is_ok() or self.res_crx.not_modified()) + and ((self.res_reviews is None) or self.res_reviews.is_ok()) + and ((self.res_support is None) or self.res_support.is_ok())) def not_authorized(self): - return (self.res_overview.not_authorized() or - self.res_crx.not_authorized() or - (self.res_reviews is not None and - self.res_reviews.not_authorized()) or ( - self.res_support is not None and - self.res_support.not_authorized())) + return (self.res_overview.not_authorized() + or self.res_crx.not_authorized() + or (self.res_reviews is not None + and self.res_reviews.not_authorized()) + or (self.res_support is not None + and self.res_support.not_authorized())) def not_in_store(self): - return ( - self.res_overview.not_found() or self.res_crx.not_found() or - (self.res_reviews is not None and self.res_reviews.not_found()) or - (self.res_support is not None and self.res_support.not_found())) + return (self.res_overview.not_found() or self.res_crx.not_found() or + (self.res_reviews is not None and self.res_reviews.not_found()) + or (self.res_support is not None + and self.res_support.not_found())) def has_exception(self): - return (self.res_overview.has_exception() or - self.res_crx.has_exception() or - (self.res_reviews is not None and - self.res_reviews.has_exception()) or ( - self.res_support is not None and - self.res_support.has_exception())) + return (self.res_overview.has_exception() + or self.res_crx.has_exception() + or (self.res_reviews is not None + and self.res_reviews.has_exception()) + or (self.res_support is not None + and self.res_support.has_exception())) def raised_google_ddos(self): - return ((self.res_reviews is not None and - self.res_reviews.not_available()) or - (self.res_support is not None and - self.res_support.not_available())) + return ((self.res_reviews is not None + and self.res_reviews.not_available()) + or (self.res_support is not None + and self.res_support.not_available())) def not_modified(self): return self.res_crx.not_modified() @@ -167,8 +168,9 @@ def httpdate(dt): "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" ][dt.month - 1] - return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( - weekday, dt.day, month, dt.year, dt.hour, dt.minute, dt.second) + return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (weekday, dt.day, month, + dt.year, dt.hour, + dt.minute, dt.second) def last_modified_utc_date(path): @@ -191,8 +193,8 @@ def last_crx(archivedir, extid, date=None): t = tarfile.open(tar, 'r') old_crxs = sorted([ x.name for x in t.getmembers() - if x.name.endswith(".crx") and x.size > 0 and (date is None or ( - dateutil.parser.parse( + if x.name.endswith(".crx") and x.size > 0 and ( + date is None or (dateutil.parser.parse( os.path.split(os.path.split(x.name)[0])[1]) <= date)) ]) t.close() @@ -217,20 +219,19 @@ def last_etag(archivedir, extid, crxfile): return etag -def update_overview(tar, date, verbose, ext_id): - logtxt = logmsg(verbose, "", " * overview page: ") +def update_overview(tar, date, ext_id): res = None try: res = requests.get(const_overview_url(ext_id), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) + logging.info(8 * " " + + "* overview page: {}".format(str(res.status_code))) store_request_text(tar, date, 'overview.html', res) except Exception as e: - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception("Exception when retrieving overview page") write_text(tar, date, 'overview.html.exception', traceback.format_exc()) - return RequestResult(res, e), logtxt - logtxt = logmsg(verbose, logtxt, "\n") - return RequestResult(res), logtxt + return RequestResult(res, e) + return RequestResult(res) def validate_crx_response(res, extid, extfilename): @@ -250,24 +251,23 @@ def validate_crx_response(res, extid, extfilename): extfilename)) -def update_crx(archivedir, tmptardir, verbose, ext_id, date): +def update_crx(archivedir, tmptardir, ext_id, date): res = None extfilename = "default_ext_archive.crx" last_crx_file = last_crx(archivedir, ext_id) last_crx_etag = last_etag(archivedir, ext_id, last_crx_file) last_crx_http_date = last_modified_http_date(last_crx_file) - logtxt = logmsg(verbose, "", - " * crx archive (Last: {}): ".format( - value_of(last_crx_http_date, "n/a"))) headers = "" if last_crx_file is not "": headers = {'If-Modified-Since': last_crx_http_date} try: - res = requests.get(const_download_url().format(ext_id), - stream=True, - headers=headers, - timeout=10) - logtxt = logmsg(verbose, logtxt, "{}\n".format(str(res.status_code))) + res = requests.get( + const_download_url().format(ext_id), + stream=True, + headers=headers, + timeout=10) + logging.info(8 * " " + "* crx archive (Last: {}): {}".format( + value_of(last_crx_http_date, "n/a"), str(res.status_code))) extfilename = os.path.basename(res.url) if re.search('&', extfilename): extfilename = "default.crx" @@ -278,19 +278,17 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date): timeout=10, allow_redirects=True).headers.get('ETag') write_text(tmptardir, date, extfilename + ".etag", etag) - logtxt = logmsg(verbose, logtxt, ( - " - checking etag, last: {}\n" + - " current: {}\n").format( - last_crx_etag, etag)) + logging.info(12 * " " + + "- checking etag, last: {}".format(last_crx_etag)) + logging.info(12 * " " + " current: {}".format(etag)) if (etag is not "") and (etag != last_crx_etag): - logtxt = logmsg( - verbose, logtxt, - " - downloading due to different etags\n") + logging.info(12 * " " + "- downloading due to different etags") - res = requests.get(const_download_url().format(ext_id), - stream=True, - timeout=10) + res = requests.get( + const_download_url().format(ext_id), + stream=True, + timeout=10) else: write_text(tmptardir, date, extfilename + ".link", os.path.join("..", @@ -306,28 +304,23 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date): write_text(tmptardir, date, extfilename + ".etag", res.headers.get("ETag")) except Exception as e: - logtxt = logmsg(verbose, logtxt, - " - Exception: {}\n".format(str(e))) + logging.exception("Exception when updating crx") write_text(tmptardir, date, extfilename + ".exception", traceback.format_exc()) - return RequestResult(res, e), logtxt - logtxt = logmsg(verbose, logtxt, "\n") - return RequestResult(res), logtxt + return RequestResult(res, e) + return RequestResult(res) def iterate_authors(pages): for page in pages: json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1]) for annotation in json_page["annotations"]: - if "attributes" in annotation and "replyExists" in annotation[ - "attributes"] and annotation["attributes"]["replyExists"]: + if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]: yield (annotation["entity"]["author"], annotation["entity"]["groups"]) -def update_reviews(tar, date, verbose, ext_id): - dir = os.path.join(os.path.splitext(tar)[0], date) - logtxt = logmsg(verbose, "", " * review page: ") +def update_reviews(tar, date, ext_id): res = None try: pages = [] @@ -337,7 +330,8 @@ def update_reviews(tar, date, verbose, ext_id): const_review_url(), data=const_review_payload(ext_id, "0", "100"), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) + logging.info(8 * " " + + "* review page 0-100: {}".format(str(res.status_code))) store_request_text(tar, date, 'reviews000-099.text', res) pages += [res.text] @@ -346,7 +340,8 @@ def update_reviews(tar, date, verbose, ext_id): const_review_url(), data=const_review_payload(ext_id, "100", "100"), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) + logging.info(8 * " " + "* review page 100-200: {}".format( + str(res.status_code))) store_request_text(tar, date, 'reviews100-199.text', res) pages += [res.text] @@ -359,21 +354,17 @@ def update_reviews(tar, date, verbose, ext_id): const_review_search_url(), data=const_review_search_payload(ext_id_author_tups), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) + logging.info(8 * " " + "* review page replies: {}".format( + str(res.status_code))) store_request_text(tar, date, 'reviewsreplies.text', res) - else: - logtxt = logmsg(verbose, logtxt, "-") except Exception as e: - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception("Exception when updating reviews") write_text(tar, date, 'reviews.html.exception', traceback.format_exc()) - return RequestResult(res, e), logtxt - logtxt = logmsg(verbose, logtxt, "\n") - return RequestResult(res), logtxt + return RequestResult(res, e) + return RequestResult(res) -def update_support(tar, date, verbose, ext_id): - dir = os.path.join(os.path.splitext(tar)[0], date) - logtxt = logmsg(verbose, "", " * support page: ") +def update_support(tar, date, ext_id): res = None try: pages = [] @@ -383,7 +374,8 @@ def update_support(tar, date, verbose, ext_id): const_support_url(), data=const_support_payload(ext_id, "0", "100"), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) + logging.info(8 * " " + + "* support page 0-100: {}".format(str(res.status_code))) store_request_text(tar, date, 'support000-099.text', res) pages += [res.text] @@ -392,7 +384,8 @@ def update_support(tar, date, verbose, ext_id): const_support_url(), data=const_support_payload(ext_id, "100", "100"), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) + logging.info(8 * " " + + "* support page 100-200: {}".format(str(res.status_code))) store_request_text(tar, date, 'support100-199.text', res) pages += [res.text] @@ -405,20 +398,19 @@ def update_support(tar, date, verbose, ext_id): const_review_search_url(), data=const_review_search_payload(ext_id_author_tups), timeout=10) - logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) + logging.info(8 * " " + "* support page replies: {}".format( + str(res.status_code))) store_request_text(tar, date, 'supportreplies.text', res) - else: - logtxt = logmsg(verbose, logtxt, "-") except Exception as e: - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception("Exception when updating support pages") write_text(tar, date, 'support.html.exception', traceback.format_exc()) - return RequestResult(res, e), logtxt - logtxt = logmsg(verbose, logtxt, "\n") - return RequestResult(res), logtxt + return RequestResult(res, e) + return RequestResult(res) -def update_extension(archivedir, verbose, forums, ext_id): - logtxt = logmsg(verbose, "", " Updating extension {}".format(ext_id)) +def update_extension(archivedir, forums, ext_id): + logging.info(4 * " " + "Updating extension {}{}".format( + ext_id, " (including forums)" if forums else "")) is_new = False tar_exception = None sql_exception = None @@ -426,9 +418,6 @@ def update_extension(archivedir, verbose, forums, ext_id): tmptardir = "" start = time.time() - if forums: - logtxt = logmsg(verbose, logtxt, " (including forums)") - logtxt = logmsg(verbose, logtxt, "\n") date = datetime.datetime.now(datetime.timezone.utc).isoformat() tardir = os.path.join(archivedir, get_local_archive_dir(ext_id), ext_id) @@ -437,43 +426,26 @@ def update_extension(archivedir, verbose, forums, ext_id): try: tmpdir = tempfile.mkdtemp() tmptardir = os.path.join(tmpdir, ext_id) - logtxt = logmsg(verbose, logtxt, - " * tmptardir = {}\n".format(tmptardir)) + logging.info(8 * " " + "* tmptardir = {}".format(tmptardir)) os.makedirs( os.path.join(archivedir, get_local_archive_dir(ext_id)), exist_ok=True) except Exception as e: - logtxt = logmsg(verbose, logtxt, - " * FATAL: cannot create tmpdir") - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception(8 * " " + "* FATAL: cannot create tmpdir") tar_exception = e - logtxt = logmsg( - verbose, - logtxt, - " * Duration: {}\n".format( - datetime.timedelta(seconds=int(time.time() - start)))) - log(verbose, logtxt) - return UpdateResult(ext_id, is_new, tar_exception, None, - None, None, None, sql_exception, - False) + return UpdateResult(ext_id, is_new, tar_exception, None, None, None, + None, sql_exception, False) - res_overview, msg_overview = update_overview(tmptardir, date, verbose, - ext_id) + res_overview = update_overview(tmptardir, date, ext_id) res_reviews = None - msg_reviews = "" res_support = None - msg_support = "" if forums: - res_reviews, msg_reviews = update_reviews(tmptardir, date, verbose, - ext_id) + res_reviews = update_reviews(tmptardir, date, ext_id) - res_crx, msg_crx = update_crx(archivedir, tmptardir, verbose, ext_id, date) + res_crx = update_crx(archivedir, tmptardir, ext_id, date) if forums: - res_support, msg_support = update_support(tmptardir, date, verbose, - ext_id) - - logtxt = logtxt + msg_overview + msg_crx + msg_reviews + msg_support + res_support = update_support(tmptardir, date, ext_id) backup = False if backup: @@ -490,11 +462,8 @@ def update_extension(archivedir, verbose, forums, ext_id): if os.path.exists(tar): shutil.copyfile(tar, tardir + ".bak.tar") except Exception as e: - logtxt = logmsg( - verbose, logtxt, - " * FATAL: cannot rename old tar archive") - logtxt = logmsg(verbose, logtxt, - " / Exception: {}\n".format(str(e))) + logging.exception(8 * " " + + "* FATAL: cannot rename old tar archive") tar_exception = e try: write_text(tardir, date, ext_id + ".tar.rename.exception", @@ -509,9 +478,7 @@ def update_extension(archivedir, verbose, forums, ext_id): ar.add(tmptardir, arcname=ext_id) ar.close() except Exception as e: - logtxt = logmsg(verbose, logtxt, - " * FATAL: cannot create tar archive") - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception(8 * " " + "* FATAL: cannot create tar archive") tar_exception = e try: write_text(tardir, date, ext_id + ".tar.create.exception", @@ -520,17 +487,12 @@ def update_extension(archivedir, verbose, forums, ext_id): pass try: - logtxt = logmsg(verbose, logtxt, " * Updating db...\n") + logging.info(8 * " " + "* Updating db...") db_path = db_file(archivedir, ext_id) - msg_updatesqlite = update_sqlite_incremental( - db_path, tmptardir, ext_id, date, verbose, 15 * " ") - logtxt = logmsg(verbose, logtxt, msg_updatesqlite) + update_sqlite_incremental(db_path, tmptardir, ext_id, date) sql_success = True except Exception as e: - logtxt = logmsg(verbose, logtxt, - " * Exception during update of sqlite db ") - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) - + logging.exception(8 * " " + "* Exception during update of sqlite db") sql_exception = e try: @@ -541,9 +503,7 @@ def update_extension(archivedir, verbose, forums, ext_id): try: shutil.rmtree(path=tmpdir) except Exception as e: - logtxt = logmsg(verbose, logtxt, - " * FATAL: cannot remove archive directory") - logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) + logging.exception(8 * " " + "* FATAL: cannot remove archive directory") tar_exception = e try: write_text(tardir, date, ext_id + ".dir.remove.exception", @@ -551,49 +511,41 @@ def update_extension(archivedir, verbose, forums, ext_id): except Exception: pass - logtxt = logmsg( - verbose, - logtxt, - " * Duration: {}\n".format( + logging.info(8 * " " + "* Duration: {}".format( datetime.timedelta(seconds=int(time.time() - start)))) - log(verbose, logtxt) return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx, res_reviews, res_support, sql_exception, sql_success) -def update_extensions(archivedir, verbose, parallel, forums_ext_ids, ext_ids): +def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids): ext_with_forums = [] ext_without_forums = [] ext_ids = list(set(ext_ids) - set(forums_ext_ids)) forums_ext_ids = list(set(forums_ext_ids)) - log(verbose, "Updating {} extensions ({} including forums)\n".format( + logging.info("Updating {} extensions ({} including forums)".format( len(ext_ids), len(forums_ext_ids))) # First, update extensions with forums sequentially (and with delays) to # avoid running into Googles DDOS detection. - log(verbose, - " Updating {} extensions including forums (sequentially))\n".format( - len(forums_ext_ids))) + logging.info(2 * " " + + "Updating {} extensions including forums (sequentially))". + format(len(forums_ext_ids))) ext_with_forums = list( - map( - partial(update_extension, archivedir, verbose, True), - forums_ext_ids)) + map(partial(update_extension, archivedir, True), forums_ext_ids)) # Second, update extensions without forums parallel to increase speed. parallel_ids = list(set(ext_ids) - set(forums_ext_ids)) - log(verbose, - " Updating {} extensions excluding forums (parallel))\n".format( - len(parallel_ids))) + logging.info(2 * " " + + "Updating {} extensions excluding forums (parallel))".format( + len(parallel_ids))) with Pool(parallel) as p: ext_without_forums = list( - p.map( - partial(update_extension, archivedir, verbose, False), - parallel_ids)) + p.map(partial(update_extension, archivedir, False), parallel_ids)) return ext_with_forums + ext_without_forums -def get_existing_ids(archivedir, verbose): +def get_existing_ids(archivedir): byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]' word = byte + byte + byte + byte return list( @@ -601,7 +553,7 @@ def get_existing_ids(archivedir, verbose): glob.glob(os.path.join(archivedir, "*", word + ".tar")))) -def get_forum_ext_ids(confdir, verbose): +def get_forum_ext_ids(confdir): with open(os.path.join(confdir, "forums.conf")) as f: ids = f.readlines() r = re.compile('^[a-p]+$') diff --git a/ExtensionCrawler/config.py b/ExtensionCrawler/config.py index 5fde67f..f75043b 100644 --- a/ExtensionCrawler/config.py +++ b/ExtensionCrawler/config.py @@ -137,6 +137,7 @@ def jsloc_timeout(): def const_basedir(): """Top-level directory for the extension crawler archive.""" return "archive" + def const_parallel_downloads(): """Number of parallel downloads.""" return 36 @@ -145,6 +146,9 @@ def const_verbose(): """Default verbosity.""" return True +def const_log_format(): + return '%(process)s %(asctime)s %(message)s' + def const_discover(): """Default configuration of discovery mode""" return False diff --git a/ExtensionCrawler/discover.py b/ExtensionCrawler/discover.py index 5b8549a..d03fb9d 100644 --- a/ExtensionCrawler/discover.py +++ b/ExtensionCrawler/discover.py @@ -23,7 +23,7 @@ import re from functools import reduce import requests import ExtensionCrawler.config -from ExtensionCrawler.util import log +import logging def crawl_nearly_all_of_ext_ids(): @@ -54,16 +54,15 @@ def crawl_nearly_all_of_ext_ids(): return [re.search("[a-z]{32}", url).group(0) for url in overview_urls] -def get_new_ids(verbose, known_ids): +def get_new_ids(known_ids): """Discover new extension ids.""" - log(verbose, "Discovering new ids ... \n") + logging.info("Discovering new ids ...") discovered_ids = [] try: discovered_ids = ExtensionCrawler.discover.crawl_nearly_all_of_ext_ids() - except Exception as ex: - log(verbose, - " EXCEPTION during discovering of new ids: {}\n".format(str(ex))) + except Exception: + logging.exception("Exception when discovering new ids") new_ids = list(set(discovered_ids) - set(known_ids)) - log(verbose, " Discovered {} new extensions (out of {})\n".format( + logging.info(2 * " " + "Discovered {} new extensions (out of {})".format( len(new_ids), len(discovered_ids))) return new_ids diff --git a/ExtensionCrawler/sqlite.py b/ExtensionCrawler/sqlite.py index d64b34b..499d72e 100644 --- a/ExtensionCrawler/sqlite.py +++ b/ExtensionCrawler/sqlite.py @@ -31,17 +31,16 @@ import json import os import glob import datetime +import logging -def get_etag(ext_id, datepath, con, verbose, indent): - txt = "" - +def get_etag(ext_id, datepath, con): # Trying to parse etag file etagpath = next( iter(glob.glob(os.path.join(datepath, "*.crx.etag"))), None) if etagpath: with open(etagpath) as f: - return f.read(), txt + return f.read() # Trying to parse header file for etag headerpath = next( @@ -52,12 +51,10 @@ def get_etag(ext_id, datepath, con, verbose, indent): try: headers = eval(content) if "ETag" in headers: - return headers["ETag"], txt + return headers["ETag"] except Exception: - txt = logmsg( - verbose, txt, - indent + "* WARNING: could not parse crx header file") - pass + logging.warning(16 * " " + + "* WARNING: could not parse crx header file") # Trying to look up previous etag in database linkpath = next( @@ -70,9 +67,9 @@ def get_etag(ext_id, datepath, con, verbose, indent): result = con.get_most_recent_etag(ext_id, con.convert_date(linked_date)) if result is not None: - return result, txt + return result - return None, txt + return None def get_overview_status(datepath): @@ -102,9 +99,8 @@ def get_crx_status(datepath): return int(f.read()) -def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent): - txt = "" - +def parse_and_insert_overview(ext_id, date, datepath, con): + logging.info(16 * " " + "- parsing overview file") overview_path = os.path.join(datepath, "overview.html") if os.path.exists(overview_path): with open(overview_path) as overview_file: @@ -162,8 +158,7 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent): last_updated = str(last_updated_parent.contents[ 0]) if last_updated_parent else None - etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent) - txt = logmsg(verbose, txt, etag_msg) + etag = get_etag(ext_id, datepath, con) match = re.search( """(.*?)""", @@ -194,18 +189,15 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent): date=con.convert_date(date), category=category) - return txt - -def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent): - txt = "" +def parse_and_insert_crx(ext_id, date, datepath, con): + logging.info(16 * " " + "- parsing crx file") crx_path = next(iter(glob.glob(os.path.join(datepath, "*.crx"))), None) if crx_path: filename = os.path.basename(crx_path) with ZipFile(crx_path) as f: - etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent) - txt = logmsg(verbose, txt, etag_msg) + etag = get_etag(ext_id, datepath, con) size = os.path.getsize(crx_path) public_key = read_crx(crx_path).public_key @@ -267,7 +259,6 @@ def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent): md5=js_file_info['md5'], size=js_file_info['size'], version=js_file_info['ver']) - return txt def get(d, k): @@ -276,6 +267,7 @@ def get(d, k): def parse_and_insert_review(ext_id, date, reviewpath, con): + logging.info(16 * " " + "- parsing review file") with open(reviewpath) as f: content = f.read() stripped = content[content.find('{"'):] @@ -311,6 +303,7 @@ def parse_and_insert_review(ext_id, date, reviewpath, con): def parse_and_insert_support(ext_id, date, supportpath, con): + logging.info(16 * " " + "- parsing support file") with open(supportpath) as f: content = f.read() stripped = content[content.find('{"'):] @@ -345,15 +338,13 @@ def parse_and_insert_support(ext_id, date, supportpath, con): con.insertmany("support", results) -def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent): +def parse_and_insert_replies(ext_id, date, repliespath, con): + logging.info(16 * " " + "- parsing reply file") with open(repliespath) as f: d = json.load(f) if not "searchResults" in d: - txt = logmsg( - verbose, "", - indent + "* WARNING: there are no search results in {}\n". - format(repliespath)) - return txt + logging.warning("* WARNING: there are no search results in {}". + format(repliespath)) results = [] for result in d["searchResults"]: if "annotations" not in result: @@ -388,6 +379,7 @@ def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent): def parse_and_insert_status(ext_id, date, datepath, con): + logging.info(16 * " " + "- parsing status file") overview_status = get_overview_status(datepath) crx_status = get_crx_status(datepath) @@ -406,16 +398,10 @@ def parse_and_insert_status(ext_id, date, datepath, con): overview_exception=overview_exception) -def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose, - indent): - txt = "" - indent2 = indent + 4 * " " - +def update_sqlite_incremental(db_path, tmptardir, ext_id, date): + logging.info(12 * " " + "- parsing data from {}".format(date)) datepath = os.path.join(tmptardir, date) - txt = logmsg(verbose, txt, - indent + "- updating with data from {}\n".format(date)) - if const_use_mysql(): # Don't forget to create a ~/.my.cnf file with the credentials backend = MysqlBackend(read_default_file=const_mysql_config_file()) @@ -423,29 +409,22 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose, backend = SqliteBackend(db_path) with backend as con: - etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent2) - txt = logmsg(verbose, txt, etag_msg) + etag = get_etag(ext_id, datepath, con) if etag: try: - crx_msg = parse_and_insert_crx(ext_id, date, datepath, con, - verbose, indent2) - txt = logmsg(verbose, txt, crx_msg) + parse_and_insert_crx(ext_id, date, datepath, con) except zipfile.BadZipfile as e: - txt = logmsg( - verbose, txt, indent2 + - "* WARNING: the found crx file is not a zip file, exception: " - ) - txt = logmsg(verbose, txt, str(e)) - txt = logmsg(verbose, txt, "\n") + logging.warning( + 16 * " " + + "* WARNING: the found crx file is not a zip file, exception: {}". + format(str(e))) else: crx_status = get_crx_status(datepath) if crx_status != 401 and crx_status != 204 and crx_status != 404: - txt = logmsg(verbose, txt, - indent2 + "* WARNING: could not find etag\n") + logging.warning(16 * " " + "* WARNING: could not find etag") - parse_and_insert_overview(ext_id, date, datepath, con, verbose, - indent2) + parse_and_insert_overview(ext_id, date, datepath, con) parse_and_insert_status(ext_id, date, datepath, con) reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text")) @@ -453,34 +432,24 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose, try: parse_and_insert_review(ext_id, date, reviewpath, con) except json.decoder.JSONDecodeError as e: - txt = logmsg( - verbose, txt, - indent2 + "* Could not parse review file, exception: ") - txt = logmsg(verbose, txt, str(e)) - txt = logmsg(verbose, txt, "\n") + logging.warning(16 * " " + + "* Could not parse review file, exception: {}". + format(str(e))) supportpaths = glob.glob(os.path.join(datepath, "support*-*.text")) for supportpath in supportpaths: try: parse_and_insert_support(ext_id, date, supportpath, con) except json.decoder.JSONDecodeError as e: - txt = logmsg( - verbose, txt, - indent2 + "* Could not parse support file, exception: ") - txt = logmsg(verbose, txt, str(e)) - txt = logmsg(verbose, txt, "\n") + logging.warning( + 16 * " " + "* Could not parse support file, exception: {}". + format(str(e))) repliespaths = glob.glob(os.path.join(datepath, "*replies.text")) for repliespath in repliespaths: try: - reply_txt = parse_and_insert_replies(ext_id, date, repliespath, - con, verbose, indent) - txt = logmsg(verbose, txt, reply_txt) + parse_and_insert_replies(ext_id, date, repliespath, con) except json.decoder.JSONDecodeError as e: - txt = logmsg( - verbose, txt, - indent2 + "* Could not parse reply file, exception: ") - txt = logmsg(verbose, txt, str(e)) - txt = logmsg(verbose, txt, "\n") - - return txt + logging.warning(16 * " " + + "* Could not parse reply file, exception: {}". + format(str(e))) diff --git a/ExtensionCrawler/util.py b/ExtensionCrawler/util.py index d1db32a..2522e1c 100644 --- a/ExtensionCrawler/util.py +++ b/ExtensionCrawler/util.py @@ -18,7 +18,6 @@ """ Various utility methods.""" -import sys from time import sleep from random import random @@ -27,19 +26,6 @@ def google_dos_protection(maxrange=0.3): to avoid Google's bot detection""" sleep(0.5+(random()*maxrange)) -def log(verbose, msg): - """Print log message.""" - if verbose: - sys.stdout.write(msg) - sys.stdout.flush() - -def logmsg(verbose, msg1, msg2): - """Append msg2 to log stream msg1.""" - if verbose: - return msg1 + msg2 - else: - return msg1 - def value_of(value, default): """Get value or default value if None.""" if value is not None and value is not "": diff --git a/crawler b/crawler index 3d60f42..ce88d8a 100755 --- a/crawler +++ b/crawler @@ -25,11 +25,12 @@ import datetime import time import getopt import sqlite3 +import logging from functools import reduce from ExtensionCrawler.discover import get_new_ids from ExtensionCrawler.archive import get_forum_ext_ids, get_existing_ids, update_extensions -from ExtensionCrawler.util import log -import ExtensionCrawler.config +from ExtensionCrawler.config import * + def write_log(dirname, fname, text): """Write text into the file with name fname in directory dirname.""" @@ -92,52 +93,44 @@ def log_failures_to_file(dirname, today, res): write_log(dirname, today + "-sql-not-updated.log", sql_success) -def log_summary(verbose, res, stderr=False, runtime=0): - """Log brief result summary to log stream of stderr.""" - - def printlog(msg): - """Print log message.""" - if stderr: - sys.stderr.write(msg) - else: - log(verbose, msg) +def log_summary(res, runtime=0): + """Log brief result summary.""" corrupt_tar_archives = list(filter(lambda x: x.corrupt_tar(), res)) - printlog("\n") - printlog("Summary:\n") - printlog(" Updated {} out of {} extensions successfully\n".format( + logging.info("Summary:") + logging.info(" Updated {} out of {} extensions successfully".format( str(len(list(filter(lambda x: x.is_ok(), res)))), str(len(res)))) - printlog(" Updated extensions: {:8d}\n".format( + logging.info(" Updated extensions: {:8d}".format( len(list(filter(lambda x: x.is_ok() and not x.not_modified(), res))))) - printlog(" Updated SQL databases: {:8d}\n".format( + logging.info(" Updated SQL databases: {:8d}".format( len(list(filter(lambda x: x.sql_success(), res))))) - printlog(" New extensions: {:8d}\n".format( + logging.info(" New extensions: {:8d}".format( len(list(filter(lambda x: x.is_new(), res))))) - printlog(" Not authorized: {:8d}\n".format( + logging.info(" Not authorized: {:8d}".format( len(list(filter(lambda x: x.not_authorized(), res))))) - printlog(" Raised Google DDOS: {:8d}\n".format( + logging.info(" Raised Google DDOS: {:8d}".format( len(list(filter(lambda x: x.raised_google_ddos(), res))))) - printlog(" Not modified archives: {:8d}\n".format( + logging.info(" Not modified archives: {:8d}".format( len(list(filter(lambda x: x.not_modified(), res))))) - printlog(" Extensions not in store: {:8d}\n".format( + logging.info(" Extensions not in store: {:8d}".format( len(list(filter(lambda x: x.not_in_store(), res))))) - printlog(" Unknown exception: {:8d}\n".format( + logging.info(" Unknown exception: {:8d}".format( len(list(filter(lambda x: x.has_exception(), res))))) - printlog(" Corrupt tar archives: {:8d}\n".format( - len(corrupt_tar_archives))) - printlog(" SQL exception: {:8d}\n".format( + logging.info( + " Corrupt tar archives: {:8d}".format(len(corrupt_tar_archives))) + logging.info(" SQL exception: {:8d}".format( len(list(filter(lambda x: x.sql_exception(), res))))) - printlog(" Total runtime: {}\n".format( + logging.info(" Total runtime: {}".format( str(datetime.timedelta(seconds=int(runtime))))) if corrupt_tar_archives != []: - printlog("\n\n") - printlog("List of extensions with corrupted files/archives:\n") + logging.info("") + logging.info("List of extensions with corrupted files/archives:") list( - map(lambda x: printlog(" " + x.id + ": " + str(x.exception) + "\n"), - corrupt_tar_archives)) - printlog("\n") + map(lambda x: logging.info(" " + x.id + ": " + str(x.exception), corrupt_tar_archives) + )) + logging.info("") def helpmsg(): @@ -149,26 +142,24 @@ def helpmsg(): print(" -a= archive directory") -def print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel): +def print_config(basedir, archive_dir, conf_dir, discover, parallel): """Print current configuration.""" - log(verbose, "Configuration:\n") - log(verbose, " Base dir: {}\n".format(basedir)) - log(verbose, - " Archive directory: {}\n".format(archive_dir)) - log(verbose, " Configuration directory: {}\n".format(conf_dir)) - log(verbose, " Discover new extensions: {}\n".format(discover)) - log(verbose, " Max num. of concurrent downloads: {}\n".format(parallel)) - log(verbose, " SQLite 3 version: {}\n".format( + logging.info("Configuration:") + logging.info(" Base dir: {}".format(basedir)) + logging.info(" Archive directory: {}".format(archive_dir)) + logging.info(" Configuration directory: {}".format(conf_dir)) + logging.info(" Discover new extensions: {}".format(discover)) + logging.info(" Max num. of concurrent downloads: {}".format(parallel)) + logging.info(" SQLite 3 version: {}".format( sqlite3.sqlite_version)) - log(verbose, "\n") def parse_args(argv): """Parse command line arguments. """ - basedir = ExtensionCrawler.config.const_basedir() - parallel = ExtensionCrawler.config.const_parallel_downloads() - verbose = ExtensionCrawler.config.const_verbose() - discover = ExtensionCrawler.config.const_discover() + basedir = const_basedir() + parallel = const_parallel_downloads() + verbose = const_verbose() + discover = const_discover() try: opts, _ = getopt.getopt(argv, "hsda:p:", ["archive=", 'parallel=']) except getopt.GetoptError: @@ -191,9 +182,16 @@ def parse_args(argv): def main(argv): """Main function of the extension crawler.""" + today = datetime.datetime.now(datetime.timezone.utc).isoformat() basedir, parallel, verbose, discover = parse_args(argv) + if verbose: + loglevel = logging.INFO + else: + loglevel = logging.WARNING + logging.basicConfig(level=loglevel, format=const_log_format()) + archive_dir = os.path.join(basedir, "data") os.makedirs(archive_dir, exist_ok=True) conf_dir = os.path.join(basedir, "conf") @@ -204,41 +202,38 @@ def main(argv): start_time = time.time() - print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel) + print_config(basedir, archive_dir, conf_dir, discover, parallel) - forum_ext_ids = get_forum_ext_ids(conf_dir, verbose) - known_ids = list( - set(get_existing_ids(archive_dir, verbose)) | set(forum_ext_ids)) + forum_ext_ids = get_forum_ext_ids(conf_dir) + known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids)) discovered_ids = [] if discover: - discovered_ids = get_new_ids(verbose, known_ids) + discovered_ids = get_new_ids(known_ids) ext_ids = list(set(discovered_ids) | set(known_ids)) discovered_ids = None known_ids = None - res = update_extensions(archive_dir, verbose, parallel, forum_ext_ids, - ext_ids) + res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids) # We re-try (once) the extensions with unknown exceptions, as # they are often temporary has_exception = list(filter(lambda x: x.has_exception(), res)) if has_exception != []: - log(verbose, - " {} extensions with unknown exceptions, start another try ...\n". + logging.info( + " {} extensions with unknown exceptions, start another try ...". format(str(len(has_exception)))) has_exception_ids = list(map(lambda x: x.id, has_exception)) forum_ext_ids_except = list( set(forum_ext_ids).intersection(set(has_exception_ids))) ext_ids_except = sorted( list(set(has_exception_ids) - set(forum_ext_ids_except))) - res_update = update_extensions(archive_dir, verbose, parallel, + res_update = update_extensions(archive_dir, parallel, forum_ext_ids_except, ext_ids_except) res = list(set(res) - set(has_exception)) + res_update end_time = time.time() - log_summary(verbose, res, False, end_time - start_time) - log_summary(verbose, res, True, end_time - start_time) + log_summary(res, end_time - start_time) log_failures_to_file(log_dir, today, res) diff --git a/create-db b/create-db index 217cebe..2f1a4d6 100755 --- a/create-db +++ b/create-db @@ -22,12 +22,13 @@ import sys import tarfile import time import tempfile -import traceback import fnmatch -from multiprocessing import Pool, Lock +from multiprocessing import Pool from functools import partial +import logging from ExtensionCrawler.archive import update_sqlite_incremental +from ExtensionCrawler.config import * def help(): @@ -42,20 +43,6 @@ def help(): print(" -N ") -def guarded_stdout(string): - lock.acquire() - sys.stdout.write(string) - sys.stdout.flush() - lock.release() - - -def guarded_stderr(string): - lock.acquire() - sys.stderr.write(string) - sys.stderr.flush() - lock.release() - - def process_id(dbbasedir, path): start = time.time() with tempfile.TemporaryDirectory() as tmpdir: @@ -63,7 +50,7 @@ def process_id(dbbasedir, path): t.extractall(tmpdir) extid = os.listdir(tmpdir)[0] - guarded_stdout("Processing {}\n".format(extid)) + logging.info("Processing {}".format(extid)) dbpath = os.path.join(dbbasedir, extid + ".sqlite") if os.path.exists(dbpath): os.remove(dbpath) @@ -71,12 +58,11 @@ def process_id(dbbasedir, path): for date in sorted(os.listdir(iddir)): try: - update_sqlite_incremental(dbpath, iddir, extid, date, True, - "") + update_sqlite_incremental(dbpath, iddir, extid, date) except Exception: - guarded_stderr("Exception when handling {} on {}:\n{}\n". - format(extid, date, traceback.format_exc())) - guarded_stdout("Finished {} in {}s\n".format(extid, time.time() - start)) + logging.exception("Exception when handling {} on {}". + format(extid, date)) + logging.info("Finished {} in {}s".format(extid, time.time() - start)) def find(archive, pattern): @@ -97,11 +83,6 @@ def find_from_file(archive, extidlistfile): yield os.path.join(root, file) -def init(l): - global lock - lock = l - - def parse_args(argv): archive = "archive" parallel = 8 @@ -155,9 +136,11 @@ def parse_args(argv): def main(argv): + logging.basicConfig(level=logging.INFO, format=const_log_format()) + dbbasedir, paths, parallel = parse_args(argv) - with Pool(initializer=init, initargs=(Lock(), ), processes=parallel) as p: + with Pool(processes=parallel) as p: p.map(partial(process_id, dbbasedir), paths)