Changed logging to use logging library.

This commit is contained in:
Michael Herzberg 2017-08-29 22:29:38 +01:00
parent bddd80c138
commit 3e24d1f08c
7 changed files with 228 additions and 340 deletions

View File

@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
# #
""" """
Module for handling archives of the Browser Extension Crawler. Module for handling archives of the Browser Extension Crawler.
""" """
@ -35,19 +34,20 @@ import datetime
import dateutil import dateutil
import dateutil.parser import dateutil.parser
import requests import requests
import logging
from ExtensionCrawler.config import (const_review_payload, const_review_search_url, from ExtensionCrawler.config import (
const_download_url, get_local_archive_dir, const_review_payload, const_review_search_url, const_download_url,
const_overview_url, const_support_url, get_local_archive_dir, const_overview_url, const_support_url,
const_support_payload, const_review_search_payload, const_support_payload, const_review_search_payload, const_review_url)
const_review_url) from ExtensionCrawler.util import google_dos_protection, value_of
from ExtensionCrawler.util import logmsg, google_dos_protection, log, value_of
from ExtensionCrawler.sqlite import db_file, update_sqlite_incremental from ExtensionCrawler.sqlite import db_file, update_sqlite_incremental
class Error(Exception): class Error(Exception):
pass pass
class CrawlError(Error): class CrawlError(Error):
def __init__(self, extid, message, pagecontent=""): def __init__(self, extid, message, pagecontent=""):
self.extid = extid self.extid = extid
@ -55,6 +55,7 @@ class CrawlError(Error):
self.pagecontent = pagecontent self.pagecontent = pagecontent
super(CrawlError, self).__init__() super(CrawlError, self).__init__()
class RequestResult: class RequestResult:
def __init__(self, response=None, exception=None): def __init__(self, response=None, exception=None):
if response is not None: if response is not None:
@ -97,38 +98,38 @@ class UpdateResult:
return self.new return self.new
def is_ok(self): def is_ok(self):
return (self.res_overview.is_ok() and return (self.res_overview.is_ok()
(self.res_crx.is_ok() or self.res_crx.not_modified()) and and (self.res_crx.is_ok() or self.res_crx.not_modified())
((self.res_reviews is None) or self.res_reviews.is_ok()) and ( and ((self.res_reviews is None) or self.res_reviews.is_ok())
(self.res_support is None) or self.res_support.is_ok())) and ((self.res_support is None) or self.res_support.is_ok()))
def not_authorized(self): def not_authorized(self):
return (self.res_overview.not_authorized() or return (self.res_overview.not_authorized()
self.res_crx.not_authorized() or or self.res_crx.not_authorized()
(self.res_reviews is not None and or (self.res_reviews is not None
self.res_reviews.not_authorized()) or ( and self.res_reviews.not_authorized())
self.res_support is not None and or (self.res_support is not None
self.res_support.not_authorized())) and self.res_support.not_authorized()))
def not_in_store(self): def not_in_store(self):
return ( return (self.res_overview.not_found() or self.res_crx.not_found() or
self.res_overview.not_found() or self.res_crx.not_found() or (self.res_reviews is not None and self.res_reviews.not_found())
(self.res_reviews is not None and self.res_reviews.not_found()) or or (self.res_support is not None
(self.res_support is not None and self.res_support.not_found())) and self.res_support.not_found()))
def has_exception(self): def has_exception(self):
return (self.res_overview.has_exception() or return (self.res_overview.has_exception()
self.res_crx.has_exception() or or self.res_crx.has_exception()
(self.res_reviews is not None and or (self.res_reviews is not None
self.res_reviews.has_exception()) or ( and self.res_reviews.has_exception())
self.res_support is not None and or (self.res_support is not None
self.res_support.has_exception())) and self.res_support.has_exception()))
def raised_google_ddos(self): def raised_google_ddos(self):
return ((self.res_reviews is not None and return ((self.res_reviews is not None
self.res_reviews.not_available()) or and self.res_reviews.not_available())
(self.res_support is not None and or (self.res_support is not None
self.res_support.not_available())) and self.res_support.not_available()))
def not_modified(self): def not_modified(self):
return self.res_crx.not_modified() return self.res_crx.not_modified()
@ -167,8 +168,9 @@ def httpdate(dt):
"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
"Nov", "Dec" "Nov", "Dec"
][dt.month - 1] ][dt.month - 1]
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (weekday, dt.day, month,
weekday, dt.day, month, dt.year, dt.hour, dt.minute, dt.second) dt.year, dt.hour,
dt.minute, dt.second)
def last_modified_utc_date(path): def last_modified_utc_date(path):
@ -191,8 +193,8 @@ def last_crx(archivedir, extid, date=None):
t = tarfile.open(tar, 'r') t = tarfile.open(tar, 'r')
old_crxs = sorted([ old_crxs = sorted([
x.name for x in t.getmembers() x.name for x in t.getmembers()
if x.name.endswith(".crx") and x.size > 0 and (date is None or ( if x.name.endswith(".crx") and x.size > 0 and (
dateutil.parser.parse( date is None or (dateutil.parser.parse(
os.path.split(os.path.split(x.name)[0])[1]) <= date)) os.path.split(os.path.split(x.name)[0])[1]) <= date))
]) ])
t.close() t.close()
@ -217,20 +219,19 @@ def last_etag(archivedir, extid, crxfile):
return etag return etag
def update_overview(tar, date, verbose, ext_id): def update_overview(tar, date, ext_id):
logtxt = logmsg(verbose, "", " * overview page: ")
res = None res = None
try: try:
res = requests.get(const_overview_url(ext_id), timeout=10) res = requests.get(const_overview_url(ext_id), timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) logging.info(8 * " " +
"* overview page: {}".format(str(res.status_code)))
store_request_text(tar, date, 'overview.html', res) store_request_text(tar, date, 'overview.html', res)
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) logging.exception("Exception when retrieving overview page")
write_text(tar, date, 'overview.html.exception', write_text(tar, date, 'overview.html.exception',
traceback.format_exc()) traceback.format_exc())
return RequestResult(res, e), logtxt return RequestResult(res, e)
logtxt = logmsg(verbose, logtxt, "\n") return RequestResult(res)
return RequestResult(res), logtxt
def validate_crx_response(res, extid, extfilename): def validate_crx_response(res, extid, extfilename):
@ -250,24 +251,23 @@ def validate_crx_response(res, extid, extfilename):
extfilename)) extfilename))
def update_crx(archivedir, tmptardir, verbose, ext_id, date): def update_crx(archivedir, tmptardir, ext_id, date):
res = None res = None
extfilename = "default_ext_archive.crx" extfilename = "default_ext_archive.crx"
last_crx_file = last_crx(archivedir, ext_id) last_crx_file = last_crx(archivedir, ext_id)
last_crx_etag = last_etag(archivedir, ext_id, last_crx_file) last_crx_etag = last_etag(archivedir, ext_id, last_crx_file)
last_crx_http_date = last_modified_http_date(last_crx_file) last_crx_http_date = last_modified_http_date(last_crx_file)
logtxt = logmsg(verbose, "",
" * crx archive (Last: {}): ".format(
value_of(last_crx_http_date, "n/a")))
headers = "" headers = ""
if last_crx_file is not "": if last_crx_file is not "":
headers = {'If-Modified-Since': last_crx_http_date} headers = {'If-Modified-Since': last_crx_http_date}
try: try:
res = requests.get(const_download_url().format(ext_id), res = requests.get(
stream=True, const_download_url().format(ext_id),
headers=headers, stream=True,
timeout=10) headers=headers,
logtxt = logmsg(verbose, logtxt, "{}\n".format(str(res.status_code))) timeout=10)
logging.info(8 * " " + "* crx archive (Last: {}): {}".format(
value_of(last_crx_http_date, "n/a"), str(res.status_code)))
extfilename = os.path.basename(res.url) extfilename = os.path.basename(res.url)
if re.search('&', extfilename): if re.search('&', extfilename):
extfilename = "default.crx" extfilename = "default.crx"
@ -278,19 +278,17 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date):
timeout=10, timeout=10,
allow_redirects=True).headers.get('ETag') allow_redirects=True).headers.get('ETag')
write_text(tmptardir, date, extfilename + ".etag", etag) write_text(tmptardir, date, extfilename + ".etag", etag)
logtxt = logmsg(verbose, logtxt, ( logging.info(12 * " " +
" - checking etag, last: {}\n" + "- checking etag, last: {}".format(last_crx_etag))
" current: {}\n").format( logging.info(12 * " " + " current: {}".format(etag))
last_crx_etag, etag))
if (etag is not "") and (etag != last_crx_etag): if (etag is not "") and (etag != last_crx_etag):
logtxt = logmsg( logging.info(12 * " " + "- downloading due to different etags")
verbose, logtxt,
" - downloading due to different etags\n")
res = requests.get(const_download_url().format(ext_id), res = requests.get(
stream=True, const_download_url().format(ext_id),
timeout=10) stream=True,
timeout=10)
else: else:
write_text(tmptardir, date, extfilename + ".link", write_text(tmptardir, date, extfilename + ".link",
os.path.join("..", os.path.join("..",
@ -306,28 +304,23 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date):
write_text(tmptardir, date, extfilename + ".etag", write_text(tmptardir, date, extfilename + ".etag",
res.headers.get("ETag")) res.headers.get("ETag"))
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, logging.exception("Exception when updating crx")
" - Exception: {}\n".format(str(e)))
write_text(tmptardir, date, extfilename + ".exception", write_text(tmptardir, date, extfilename + ".exception",
traceback.format_exc()) traceback.format_exc())
return RequestResult(res, e), logtxt return RequestResult(res, e)
logtxt = logmsg(verbose, logtxt, "\n") return RequestResult(res)
return RequestResult(res), logtxt
def iterate_authors(pages): def iterate_authors(pages):
for page in pages: for page in pages:
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1]) json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
for annotation in json_page["annotations"]: for annotation in json_page["annotations"]:
if "attributes" in annotation and "replyExists" in annotation[ if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
"attributes"] and annotation["attributes"]["replyExists"]:
yield (annotation["entity"]["author"], yield (annotation["entity"]["author"],
annotation["entity"]["groups"]) annotation["entity"]["groups"])
def update_reviews(tar, date, verbose, ext_id): def update_reviews(tar, date, ext_id):
dir = os.path.join(os.path.splitext(tar)[0], date)
logtxt = logmsg(verbose, "", " * review page: ")
res = None res = None
try: try:
pages = [] pages = []
@ -337,7 +330,8 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_url(), const_review_url(),
data=const_review_payload(ext_id, "0", "100"), data=const_review_payload(ext_id, "0", "100"),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) logging.info(8 * " " +
"* review page 0-100: {}".format(str(res.status_code)))
store_request_text(tar, date, 'reviews000-099.text', res) store_request_text(tar, date, 'reviews000-099.text', res)
pages += [res.text] pages += [res.text]
@ -346,7 +340,8 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_url(), const_review_url(),
data=const_review_payload(ext_id, "100", "100"), data=const_review_payload(ext_id, "100", "100"),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) logging.info(8 * " " + "* review page 100-200: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'reviews100-199.text', res) store_request_text(tar, date, 'reviews100-199.text', res)
pages += [res.text] pages += [res.text]
@ -359,21 +354,17 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_search_url(), const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups), data=const_review_search_payload(ext_id_author_tups),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) logging.info(8 * " " + "* review page replies: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'reviewsreplies.text', res) store_request_text(tar, date, 'reviewsreplies.text', res)
else:
logtxt = logmsg(verbose, logtxt, "-")
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) logging.exception("Exception when updating reviews")
write_text(tar, date, 'reviews.html.exception', traceback.format_exc()) write_text(tar, date, 'reviews.html.exception', traceback.format_exc())
return RequestResult(res, e), logtxt return RequestResult(res, e)
logtxt = logmsg(verbose, logtxt, "\n") return RequestResult(res)
return RequestResult(res), logtxt
def update_support(tar, date, verbose, ext_id): def update_support(tar, date, ext_id):
dir = os.path.join(os.path.splitext(tar)[0], date)
logtxt = logmsg(verbose, "", " * support page: ")
res = None res = None
try: try:
pages = [] pages = []
@ -383,7 +374,8 @@ def update_support(tar, date, verbose, ext_id):
const_support_url(), const_support_url(),
data=const_support_payload(ext_id, "0", "100"), data=const_support_payload(ext_id, "0", "100"),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) logging.info(8 * " " +
"* support page 0-100: {}".format(str(res.status_code)))
store_request_text(tar, date, 'support000-099.text', res) store_request_text(tar, date, 'support000-099.text', res)
pages += [res.text] pages += [res.text]
@ -392,7 +384,8 @@ def update_support(tar, date, verbose, ext_id):
const_support_url(), const_support_url(),
data=const_support_payload(ext_id, "100", "100"), data=const_support_payload(ext_id, "100", "100"),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code))) logging.info(8 * " " +
"* support page 100-200: {}".format(str(res.status_code)))
store_request_text(tar, date, 'support100-199.text', res) store_request_text(tar, date, 'support100-199.text', res)
pages += [res.text] pages += [res.text]
@ -405,20 +398,19 @@ def update_support(tar, date, verbose, ext_id):
const_review_search_url(), const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups), data=const_review_search_payload(ext_id_author_tups),
timeout=10) timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code))) logging.info(8 * " " + "* support page replies: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'supportreplies.text', res) store_request_text(tar, date, 'supportreplies.text', res)
else:
logtxt = logmsg(verbose, logtxt, "-")
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e))) logging.exception("Exception when updating support pages")
write_text(tar, date, 'support.html.exception', traceback.format_exc()) write_text(tar, date, 'support.html.exception', traceback.format_exc())
return RequestResult(res, e), logtxt return RequestResult(res, e)
logtxt = logmsg(verbose, logtxt, "\n") return RequestResult(res)
return RequestResult(res), logtxt
def update_extension(archivedir, verbose, forums, ext_id): def update_extension(archivedir, forums, ext_id):
logtxt = logmsg(verbose, "", " Updating extension {}".format(ext_id)) logging.info(4 * " " + "Updating extension {}{}".format(
ext_id, " (including forums)" if forums else ""))
is_new = False is_new = False
tar_exception = None tar_exception = None
sql_exception = None sql_exception = None
@ -426,9 +418,6 @@ def update_extension(archivedir, verbose, forums, ext_id):
tmptardir = "" tmptardir = ""
start = time.time() start = time.time()
if forums:
logtxt = logmsg(verbose, logtxt, " (including forums)")
logtxt = logmsg(verbose, logtxt, "\n")
date = datetime.datetime.now(datetime.timezone.utc).isoformat() date = datetime.datetime.now(datetime.timezone.utc).isoformat()
tardir = os.path.join(archivedir, get_local_archive_dir(ext_id), ext_id) tardir = os.path.join(archivedir, get_local_archive_dir(ext_id), ext_id)
@ -437,43 +426,26 @@ def update_extension(archivedir, verbose, forums, ext_id):
try: try:
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
tmptardir = os.path.join(tmpdir, ext_id) tmptardir = os.path.join(tmpdir, ext_id)
logtxt = logmsg(verbose, logtxt, logging.info(8 * " " + "* tmptardir = {}".format(tmptardir))
" * tmptardir = {}\n".format(tmptardir))
os.makedirs( os.makedirs(
os.path.join(archivedir, get_local_archive_dir(ext_id)), os.path.join(archivedir, get_local_archive_dir(ext_id)),
exist_ok=True) exist_ok=True)
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, logging.exception(8 * " " + "* FATAL: cannot create tmpdir")
" * FATAL: cannot create tmpdir")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
tar_exception = e tar_exception = e
logtxt = logmsg( return UpdateResult(ext_id, is_new, tar_exception, None, None, None,
verbose, None, sql_exception, False)
logtxt,
" * Duration: {}\n".format(
datetime.timedelta(seconds=int(time.time() - start))))
log(verbose, logtxt)
return UpdateResult(ext_id, is_new, tar_exception, None,
None, None, None, sql_exception,
False)
res_overview, msg_overview = update_overview(tmptardir, date, verbose, res_overview = update_overview(tmptardir, date, ext_id)
ext_id)
res_reviews = None res_reviews = None
msg_reviews = ""
res_support = None res_support = None
msg_support = ""
if forums: if forums:
res_reviews, msg_reviews = update_reviews(tmptardir, date, verbose, res_reviews = update_reviews(tmptardir, date, ext_id)
ext_id)
res_crx, msg_crx = update_crx(archivedir, tmptardir, verbose, ext_id, date) res_crx = update_crx(archivedir, tmptardir, ext_id, date)
if forums: if forums:
res_support, msg_support = update_support(tmptardir, date, verbose, res_support = update_support(tmptardir, date, ext_id)
ext_id)
logtxt = logtxt + msg_overview + msg_crx + msg_reviews + msg_support
backup = False backup = False
if backup: if backup:
@ -490,11 +462,8 @@ def update_extension(archivedir, verbose, forums, ext_id):
if os.path.exists(tar): if os.path.exists(tar):
shutil.copyfile(tar, tardir + ".bak.tar") shutil.copyfile(tar, tardir + ".bak.tar")
except Exception as e: except Exception as e:
logtxt = logmsg( logging.exception(8 * " " +
verbose, logtxt, "* FATAL: cannot rename old tar archive")
" * FATAL: cannot rename old tar archive")
logtxt = logmsg(verbose, logtxt,
" / Exception: {}\n".format(str(e)))
tar_exception = e tar_exception = e
try: try:
write_text(tardir, date, ext_id + ".tar.rename.exception", write_text(tardir, date, ext_id + ".tar.rename.exception",
@ -509,9 +478,7 @@ def update_extension(archivedir, verbose, forums, ext_id):
ar.add(tmptardir, arcname=ext_id) ar.add(tmptardir, arcname=ext_id)
ar.close() ar.close()
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, logging.exception(8 * " " + "* FATAL: cannot create tar archive")
" * FATAL: cannot create tar archive")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
tar_exception = e tar_exception = e
try: try:
write_text(tardir, date, ext_id + ".tar.create.exception", write_text(tardir, date, ext_id + ".tar.create.exception",
@ -520,17 +487,12 @@ def update_extension(archivedir, verbose, forums, ext_id):
pass pass
try: try:
logtxt = logmsg(verbose, logtxt, " * Updating db...\n") logging.info(8 * " " + "* Updating db...")
db_path = db_file(archivedir, ext_id) db_path = db_file(archivedir, ext_id)
msg_updatesqlite = update_sqlite_incremental( update_sqlite_incremental(db_path, tmptardir, ext_id, date)
db_path, tmptardir, ext_id, date, verbose, 15 * " ")
logtxt = logmsg(verbose, logtxt, msg_updatesqlite)
sql_success = True sql_success = True
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, logging.exception(8 * " " + "* Exception during update of sqlite db")
" * Exception during update of sqlite db ")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
sql_exception = e sql_exception = e
try: try:
@ -541,9 +503,7 @@ def update_extension(archivedir, verbose, forums, ext_id):
try: try:
shutil.rmtree(path=tmpdir) shutil.rmtree(path=tmpdir)
except Exception as e: except Exception as e:
logtxt = logmsg(verbose, logtxt, logging.exception(8 * " " + "* FATAL: cannot remove archive directory")
" * FATAL: cannot remove archive directory")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
tar_exception = e tar_exception = e
try: try:
write_text(tardir, date, ext_id + ".dir.remove.exception", write_text(tardir, date, ext_id + ".dir.remove.exception",
@ -551,49 +511,41 @@ def update_extension(archivedir, verbose, forums, ext_id):
except Exception: except Exception:
pass pass
logtxt = logmsg( logging.info(8 * " " + "* Duration: {}".format(
verbose,
logtxt,
" * Duration: {}\n".format(
datetime.timedelta(seconds=int(time.time() - start)))) datetime.timedelta(seconds=int(time.time() - start))))
log(verbose, logtxt)
return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx, return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx,
res_reviews, res_support, sql_exception, sql_success) res_reviews, res_support, sql_exception, sql_success)
def update_extensions(archivedir, verbose, parallel, forums_ext_ids, ext_ids): def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
ext_with_forums = [] ext_with_forums = []
ext_without_forums = [] ext_without_forums = []
ext_ids = list(set(ext_ids) - set(forums_ext_ids)) ext_ids = list(set(ext_ids) - set(forums_ext_ids))
forums_ext_ids = list(set(forums_ext_ids)) forums_ext_ids = list(set(forums_ext_ids))
log(verbose, "Updating {} extensions ({} including forums)\n".format( logging.info("Updating {} extensions ({} including forums)".format(
len(ext_ids), len(forums_ext_ids))) len(ext_ids), len(forums_ext_ids)))
# First, update extensions with forums sequentially (and with delays) to # First, update extensions with forums sequentially (and with delays) to
# avoid running into Googles DDOS detection. # avoid running into Googles DDOS detection.
log(verbose, logging.info(2 * " " +
" Updating {} extensions including forums (sequentially))\n".format( "Updating {} extensions including forums (sequentially))".
len(forums_ext_ids))) format(len(forums_ext_ids)))
ext_with_forums = list( ext_with_forums = list(
map( map(partial(update_extension, archivedir, True), forums_ext_ids))
partial(update_extension, archivedir, verbose, True),
forums_ext_ids))
# Second, update extensions without forums parallel to increase speed. # Second, update extensions without forums parallel to increase speed.
parallel_ids = list(set(ext_ids) - set(forums_ext_ids)) parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
log(verbose, logging.info(2 * " " +
" Updating {} extensions excluding forums (parallel))\n".format( "Updating {} extensions excluding forums (parallel))".format(
len(parallel_ids))) len(parallel_ids)))
with Pool(parallel) as p: with Pool(parallel) as p:
ext_without_forums = list( ext_without_forums = list(
p.map( p.map(partial(update_extension, archivedir, False), parallel_ids))
partial(update_extension, archivedir, verbose, False),
parallel_ids))
return ext_with_forums + ext_without_forums return ext_with_forums + ext_without_forums
def get_existing_ids(archivedir, verbose): def get_existing_ids(archivedir):
byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]' byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]'
word = byte + byte + byte + byte word = byte + byte + byte + byte
return list( return list(
@ -601,7 +553,7 @@ def get_existing_ids(archivedir, verbose):
glob.glob(os.path.join(archivedir, "*", word + ".tar")))) glob.glob(os.path.join(archivedir, "*", word + ".tar"))))
def get_forum_ext_ids(confdir, verbose): def get_forum_ext_ids(confdir):
with open(os.path.join(confdir, "forums.conf")) as f: with open(os.path.join(confdir, "forums.conf")) as f:
ids = f.readlines() ids = f.readlines()
r = re.compile('^[a-p]+$') r = re.compile('^[a-p]+$')

View File

@ -137,6 +137,7 @@ def jsloc_timeout():
def const_basedir(): def const_basedir():
"""Top-level directory for the extension crawler archive.""" """Top-level directory for the extension crawler archive."""
return "archive" return "archive"
def const_parallel_downloads(): def const_parallel_downloads():
"""Number of parallel downloads.""" """Number of parallel downloads."""
return 36 return 36
@ -145,6 +146,9 @@ def const_verbose():
"""Default verbosity.""" """Default verbosity."""
return True return True
def const_log_format():
return '%(process)s %(asctime)s %(message)s'
def const_discover(): def const_discover():
"""Default configuration of discovery mode""" """Default configuration of discovery mode"""
return False return False

View File

@ -23,7 +23,7 @@ import re
from functools import reduce from functools import reduce
import requests import requests
import ExtensionCrawler.config import ExtensionCrawler.config
from ExtensionCrawler.util import log import logging
def crawl_nearly_all_of_ext_ids(): def crawl_nearly_all_of_ext_ids():
@ -54,16 +54,15 @@ def crawl_nearly_all_of_ext_ids():
return [re.search("[a-z]{32}", url).group(0) for url in overview_urls] return [re.search("[a-z]{32}", url).group(0) for url in overview_urls]
def get_new_ids(verbose, known_ids): def get_new_ids(known_ids):
"""Discover new extension ids.""" """Discover new extension ids."""
log(verbose, "Discovering new ids ... \n") logging.info("Discovering new ids ...")
discovered_ids = [] discovered_ids = []
try: try:
discovered_ids = ExtensionCrawler.discover.crawl_nearly_all_of_ext_ids() discovered_ids = ExtensionCrawler.discover.crawl_nearly_all_of_ext_ids()
except Exception as ex: except Exception:
log(verbose, logging.exception("Exception when discovering new ids")
" EXCEPTION during discovering of new ids: {}\n".format(str(ex)))
new_ids = list(set(discovered_ids) - set(known_ids)) new_ids = list(set(discovered_ids) - set(known_ids))
log(verbose, " Discovered {} new extensions (out of {})\n".format( logging.info(2 * " " + "Discovered {} new extensions (out of {})".format(
len(new_ids), len(discovered_ids))) len(new_ids), len(discovered_ids)))
return new_ids return new_ids

View File

@ -31,17 +31,16 @@ import json
import os import os
import glob import glob
import datetime import datetime
import logging
def get_etag(ext_id, datepath, con, verbose, indent): def get_etag(ext_id, datepath, con):
txt = ""
# Trying to parse etag file # Trying to parse etag file
etagpath = next( etagpath = next(
iter(glob.glob(os.path.join(datepath, "*.crx.etag"))), None) iter(glob.glob(os.path.join(datepath, "*.crx.etag"))), None)
if etagpath: if etagpath:
with open(etagpath) as f: with open(etagpath) as f:
return f.read(), txt return f.read()
# Trying to parse header file for etag # Trying to parse header file for etag
headerpath = next( headerpath = next(
@ -52,12 +51,10 @@ def get_etag(ext_id, datepath, con, verbose, indent):
try: try:
headers = eval(content) headers = eval(content)
if "ETag" in headers: if "ETag" in headers:
return headers["ETag"], txt return headers["ETag"]
except Exception: except Exception:
txt = logmsg( logging.warning(16 * " " +
verbose, txt, "* WARNING: could not parse crx header file")
indent + "* WARNING: could not parse crx header file")
pass
# Trying to look up previous etag in database # Trying to look up previous etag in database
linkpath = next( linkpath = next(
@ -70,9 +67,9 @@ def get_etag(ext_id, datepath, con, verbose, indent):
result = con.get_most_recent_etag(ext_id, result = con.get_most_recent_etag(ext_id,
con.convert_date(linked_date)) con.convert_date(linked_date))
if result is not None: if result is not None:
return result, txt return result
return None, txt return None
def get_overview_status(datepath): def get_overview_status(datepath):
@ -102,9 +99,8 @@ def get_crx_status(datepath):
return int(f.read()) return int(f.read())
def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent): def parse_and_insert_overview(ext_id, date, datepath, con):
txt = "" logging.info(16 * " " + "- parsing overview file")
overview_path = os.path.join(datepath, "overview.html") overview_path = os.path.join(datepath, "overview.html")
if os.path.exists(overview_path): if os.path.exists(overview_path):
with open(overview_path) as overview_file: with open(overview_path) as overview_file:
@ -162,8 +158,7 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent):
last_updated = str(last_updated_parent.contents[ last_updated = str(last_updated_parent.contents[
0]) if last_updated_parent else None 0]) if last_updated_parent else None
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent) etag = get_etag(ext_id, datepath, con)
txt = logmsg(verbose, txt, etag_msg)
match = re.search( match = re.search(
"""<Attribute name="item_category">(.*?)</Attribute>""", """<Attribute name="item_category">(.*?)</Attribute>""",
@ -194,18 +189,15 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent):
date=con.convert_date(date), date=con.convert_date(date),
category=category) category=category)
return txt
def parse_and_insert_crx(ext_id, date, datepath, con):
def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent): logging.info(16 * " " + "- parsing crx file")
txt = ""
crx_path = next(iter(glob.glob(os.path.join(datepath, "*.crx"))), None) crx_path = next(iter(glob.glob(os.path.join(datepath, "*.crx"))), None)
if crx_path: if crx_path:
filename = os.path.basename(crx_path) filename = os.path.basename(crx_path)
with ZipFile(crx_path) as f: with ZipFile(crx_path) as f:
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent) etag = get_etag(ext_id, datepath, con)
txt = logmsg(verbose, txt, etag_msg)
size = os.path.getsize(crx_path) size = os.path.getsize(crx_path)
public_key = read_crx(crx_path).public_key public_key = read_crx(crx_path).public_key
@ -267,7 +259,6 @@ def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent):
md5=js_file_info['md5'], md5=js_file_info['md5'],
size=js_file_info['size'], size=js_file_info['size'],
version=js_file_info['ver']) version=js_file_info['ver'])
return txt
def get(d, k): def get(d, k):
@ -276,6 +267,7 @@ def get(d, k):
def parse_and_insert_review(ext_id, date, reviewpath, con): def parse_and_insert_review(ext_id, date, reviewpath, con):
logging.info(16 * " " + "- parsing review file")
with open(reviewpath) as f: with open(reviewpath) as f:
content = f.read() content = f.read()
stripped = content[content.find('{"'):] stripped = content[content.find('{"'):]
@ -311,6 +303,7 @@ def parse_and_insert_review(ext_id, date, reviewpath, con):
def parse_and_insert_support(ext_id, date, supportpath, con): def parse_and_insert_support(ext_id, date, supportpath, con):
logging.info(16 * " " + "- parsing support file")
with open(supportpath) as f: with open(supportpath) as f:
content = f.read() content = f.read()
stripped = content[content.find('{"'):] stripped = content[content.find('{"'):]
@ -345,15 +338,13 @@ def parse_and_insert_support(ext_id, date, supportpath, con):
con.insertmany("support", results) con.insertmany("support", results)
def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent): def parse_and_insert_replies(ext_id, date, repliespath, con):
logging.info(16 * " " + "- parsing reply file")
with open(repliespath) as f: with open(repliespath) as f:
d = json.load(f) d = json.load(f)
if not "searchResults" in d: if not "searchResults" in d:
txt = logmsg( logging.warning("* WARNING: there are no search results in {}".
verbose, "", format(repliespath))
indent + "* WARNING: there are no search results in {}\n".
format(repliespath))
return txt
results = [] results = []
for result in d["searchResults"]: for result in d["searchResults"]:
if "annotations" not in result: if "annotations" not in result:
@ -388,6 +379,7 @@ def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent):
def parse_and_insert_status(ext_id, date, datepath, con): def parse_and_insert_status(ext_id, date, datepath, con):
logging.info(16 * " " + "- parsing status file")
overview_status = get_overview_status(datepath) overview_status = get_overview_status(datepath)
crx_status = get_crx_status(datepath) crx_status = get_crx_status(datepath)
@ -406,16 +398,10 @@ def parse_and_insert_status(ext_id, date, datepath, con):
overview_exception=overview_exception) overview_exception=overview_exception)
def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose, def update_sqlite_incremental(db_path, tmptardir, ext_id, date):
indent): logging.info(12 * " " + "- parsing data from {}".format(date))
txt = ""
indent2 = indent + 4 * " "
datepath = os.path.join(tmptardir, date) datepath = os.path.join(tmptardir, date)
txt = logmsg(verbose, txt,
indent + "- updating with data from {}\n".format(date))
if const_use_mysql(): if const_use_mysql():
# Don't forget to create a ~/.my.cnf file with the credentials # Don't forget to create a ~/.my.cnf file with the credentials
backend = MysqlBackend(read_default_file=const_mysql_config_file()) backend = MysqlBackend(read_default_file=const_mysql_config_file())
@ -423,29 +409,22 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose,
backend = SqliteBackend(db_path) backend = SqliteBackend(db_path)
with backend as con: with backend as con:
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent2) etag = get_etag(ext_id, datepath, con)
txt = logmsg(verbose, txt, etag_msg)
if etag: if etag:
try: try:
crx_msg = parse_and_insert_crx(ext_id, date, datepath, con, parse_and_insert_crx(ext_id, date, datepath, con)
verbose, indent2)
txt = logmsg(verbose, txt, crx_msg)
except zipfile.BadZipfile as e: except zipfile.BadZipfile as e:
txt = logmsg( logging.warning(
verbose, txt, indent2 + 16 * " " +
"* WARNING: the found crx file is not a zip file, exception: " "* WARNING: the found crx file is not a zip file, exception: {}".
) format(str(e)))
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
else: else:
crx_status = get_crx_status(datepath) crx_status = get_crx_status(datepath)
if crx_status != 401 and crx_status != 204 and crx_status != 404: if crx_status != 401 and crx_status != 204 and crx_status != 404:
txt = logmsg(verbose, txt, logging.warning(16 * " " + "* WARNING: could not find etag")
indent2 + "* WARNING: could not find etag\n")
parse_and_insert_overview(ext_id, date, datepath, con, verbose, parse_and_insert_overview(ext_id, date, datepath, con)
indent2)
parse_and_insert_status(ext_id, date, datepath, con) parse_and_insert_status(ext_id, date, datepath, con)
reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text")) reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text"))
@ -453,34 +432,24 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose,
try: try:
parse_and_insert_review(ext_id, date, reviewpath, con) parse_and_insert_review(ext_id, date, reviewpath, con)
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
txt = logmsg( logging.warning(16 * " " +
verbose, txt, "* Could not parse review file, exception: {}".
indent2 + "* Could not parse review file, exception: ") format(str(e)))
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
supportpaths = glob.glob(os.path.join(datepath, "support*-*.text")) supportpaths = glob.glob(os.path.join(datepath, "support*-*.text"))
for supportpath in supportpaths: for supportpath in supportpaths:
try: try:
parse_and_insert_support(ext_id, date, supportpath, con) parse_and_insert_support(ext_id, date, supportpath, con)
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
txt = logmsg( logging.warning(
verbose, txt, 16 * " " + "* Could not parse support file, exception: {}".
indent2 + "* Could not parse support file, exception: ") format(str(e)))
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
repliespaths = glob.glob(os.path.join(datepath, "*replies.text")) repliespaths = glob.glob(os.path.join(datepath, "*replies.text"))
for repliespath in repliespaths: for repliespath in repliespaths:
try: try:
reply_txt = parse_and_insert_replies(ext_id, date, repliespath, parse_and_insert_replies(ext_id, date, repliespath, con)
con, verbose, indent)
txt = logmsg(verbose, txt, reply_txt)
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
txt = logmsg( logging.warning(16 * " " +
verbose, txt, "* Could not parse reply file, exception: {}".
indent2 + "* Could not parse reply file, exception: ") format(str(e)))
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
return txt

View File

@ -18,7 +18,6 @@
""" Various utility methods.""" """ Various utility methods."""
import sys
from time import sleep from time import sleep
from random import random from random import random
@ -27,19 +26,6 @@ def google_dos_protection(maxrange=0.3):
to avoid Google's bot detection""" to avoid Google's bot detection"""
sleep(0.5+(random()*maxrange)) sleep(0.5+(random()*maxrange))
def log(verbose, msg):
"""Print log message."""
if verbose:
sys.stdout.write(msg)
sys.stdout.flush()
def logmsg(verbose, msg1, msg2):
"""Append msg2 to log stream msg1."""
if verbose:
return msg1 + msg2
else:
return msg1
def value_of(value, default): def value_of(value, default):
"""Get value or default value if None.""" """Get value or default value if None."""
if value is not None and value is not "": if value is not None and value is not "":

109
crawler
View File

@ -25,11 +25,12 @@ import datetime
import time import time
import getopt import getopt
import sqlite3 import sqlite3
import logging
from functools import reduce from functools import reduce
from ExtensionCrawler.discover import get_new_ids from ExtensionCrawler.discover import get_new_ids
from ExtensionCrawler.archive import get_forum_ext_ids, get_existing_ids, update_extensions from ExtensionCrawler.archive import get_forum_ext_ids, get_existing_ids, update_extensions
from ExtensionCrawler.util import log from ExtensionCrawler.config import *
import ExtensionCrawler.config
def write_log(dirname, fname, text): def write_log(dirname, fname, text):
"""Write text into the file with name fname in directory dirname.""" """Write text into the file with name fname in directory dirname."""
@ -92,52 +93,44 @@ def log_failures_to_file(dirname, today, res):
write_log(dirname, today + "-sql-not-updated.log", sql_success) write_log(dirname, today + "-sql-not-updated.log", sql_success)
def log_summary(verbose, res, stderr=False, runtime=0): def log_summary(res, runtime=0):
"""Log brief result summary to log stream of stderr.""" """Log brief result summary."""
def printlog(msg):
"""Print log message."""
if stderr:
sys.stderr.write(msg)
else:
log(verbose, msg)
corrupt_tar_archives = list(filter(lambda x: x.corrupt_tar(), res)) corrupt_tar_archives = list(filter(lambda x: x.corrupt_tar(), res))
printlog("\n") logging.info("Summary:")
printlog("Summary:\n") logging.info(" Updated {} out of {} extensions successfully".format(
printlog(" Updated {} out of {} extensions successfully\n".format(
str(len(list(filter(lambda x: x.is_ok(), res)))), str(len(res)))) str(len(list(filter(lambda x: x.is_ok(), res)))), str(len(res))))
printlog(" Updated extensions: {:8d}\n".format( logging.info(" Updated extensions: {:8d}".format(
len(list(filter(lambda x: x.is_ok() and not x.not_modified(), res))))) len(list(filter(lambda x: x.is_ok() and not x.not_modified(), res)))))
printlog(" Updated SQL databases: {:8d}\n".format( logging.info(" Updated SQL databases: {:8d}".format(
len(list(filter(lambda x: x.sql_success(), res))))) len(list(filter(lambda x: x.sql_success(), res)))))
printlog(" New extensions: {:8d}\n".format( logging.info(" New extensions: {:8d}".format(
len(list(filter(lambda x: x.is_new(), res))))) len(list(filter(lambda x: x.is_new(), res)))))
printlog(" Not authorized: {:8d}\n".format( logging.info(" Not authorized: {:8d}".format(
len(list(filter(lambda x: x.not_authorized(), res))))) len(list(filter(lambda x: x.not_authorized(), res)))))
printlog(" Raised Google DDOS: {:8d}\n".format( logging.info(" Raised Google DDOS: {:8d}".format(
len(list(filter(lambda x: x.raised_google_ddos(), res))))) len(list(filter(lambda x: x.raised_google_ddos(), res)))))
printlog(" Not modified archives: {:8d}\n".format( logging.info(" Not modified archives: {:8d}".format(
len(list(filter(lambda x: x.not_modified(), res))))) len(list(filter(lambda x: x.not_modified(), res)))))
printlog(" Extensions not in store: {:8d}\n".format( logging.info(" Extensions not in store: {:8d}".format(
len(list(filter(lambda x: x.not_in_store(), res))))) len(list(filter(lambda x: x.not_in_store(), res)))))
printlog(" Unknown exception: {:8d}\n".format( logging.info(" Unknown exception: {:8d}".format(
len(list(filter(lambda x: x.has_exception(), res))))) len(list(filter(lambda x: x.has_exception(), res)))))
printlog(" Corrupt tar archives: {:8d}\n".format( logging.info(
len(corrupt_tar_archives))) " Corrupt tar archives: {:8d}".format(len(corrupt_tar_archives)))
printlog(" SQL exception: {:8d}\n".format( logging.info(" SQL exception: {:8d}".format(
len(list(filter(lambda x: x.sql_exception(), res))))) len(list(filter(lambda x: x.sql_exception(), res)))))
printlog(" Total runtime: {}\n".format( logging.info(" Total runtime: {}".format(
str(datetime.timedelta(seconds=int(runtime))))) str(datetime.timedelta(seconds=int(runtime)))))
if corrupt_tar_archives != []: if corrupt_tar_archives != []:
printlog("\n\n") logging.info("")
printlog("List of extensions with corrupted files/archives:\n") logging.info("List of extensions with corrupted files/archives:")
list( list(
map(lambda x: printlog(" " + x.id + ": " + str(x.exception) + "\n"), map(lambda x: logging.info(" " + x.id + ": " + str(x.exception), corrupt_tar_archives)
corrupt_tar_archives)) ))
printlog("\n") logging.info("")
def helpmsg(): def helpmsg():
@ -149,26 +142,24 @@ def helpmsg():
print(" -a=<DIR> archive directory") print(" -a=<DIR> archive directory")
def print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel): def print_config(basedir, archive_dir, conf_dir, discover, parallel):
"""Print current configuration.""" """Print current configuration."""
log(verbose, "Configuration:\n") logging.info("Configuration:")
log(verbose, " Base dir: {}\n".format(basedir)) logging.info(" Base dir: {}".format(basedir))
log(verbose, logging.info(" Archive directory: {}".format(archive_dir))
" Archive directory: {}\n".format(archive_dir)) logging.info(" Configuration directory: {}".format(conf_dir))
log(verbose, " Configuration directory: {}\n".format(conf_dir)) logging.info(" Discover new extensions: {}".format(discover))
log(verbose, " Discover new extensions: {}\n".format(discover)) logging.info(" Max num. of concurrent downloads: {}".format(parallel))
log(verbose, " Max num. of concurrent downloads: {}\n".format(parallel)) logging.info(" SQLite 3 version: {}".format(
log(verbose, " SQLite 3 version: {}\n".format(
sqlite3.sqlite_version)) sqlite3.sqlite_version))
log(verbose, "\n")
def parse_args(argv): def parse_args(argv):
"""Parse command line arguments. """ """Parse command line arguments. """
basedir = ExtensionCrawler.config.const_basedir() basedir = const_basedir()
parallel = ExtensionCrawler.config.const_parallel_downloads() parallel = const_parallel_downloads()
verbose = ExtensionCrawler.config.const_verbose() verbose = const_verbose()
discover = ExtensionCrawler.config.const_discover() discover = const_discover()
try: try:
opts, _ = getopt.getopt(argv, "hsda:p:", ["archive=", 'parallel=']) opts, _ = getopt.getopt(argv, "hsda:p:", ["archive=", 'parallel='])
except getopt.GetoptError: except getopt.GetoptError:
@ -191,9 +182,16 @@ def parse_args(argv):
def main(argv): def main(argv):
"""Main function of the extension crawler.""" """Main function of the extension crawler."""
today = datetime.datetime.now(datetime.timezone.utc).isoformat() today = datetime.datetime.now(datetime.timezone.utc).isoformat()
basedir, parallel, verbose, discover = parse_args(argv) basedir, parallel, verbose, discover = parse_args(argv)
if verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
logging.basicConfig(level=loglevel, format=const_log_format())
archive_dir = os.path.join(basedir, "data") archive_dir = os.path.join(basedir, "data")
os.makedirs(archive_dir, exist_ok=True) os.makedirs(archive_dir, exist_ok=True)
conf_dir = os.path.join(basedir, "conf") conf_dir = os.path.join(basedir, "conf")
@ -204,41 +202,38 @@ def main(argv):
start_time = time.time() start_time = time.time()
print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel) print_config(basedir, archive_dir, conf_dir, discover, parallel)
forum_ext_ids = get_forum_ext_ids(conf_dir, verbose) forum_ext_ids = get_forum_ext_ids(conf_dir)
known_ids = list( known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids))
set(get_existing_ids(archive_dir, verbose)) | set(forum_ext_ids))
discovered_ids = [] discovered_ids = []
if discover: if discover:
discovered_ids = get_new_ids(verbose, known_ids) discovered_ids = get_new_ids(known_ids)
ext_ids = list(set(discovered_ids) | set(known_ids)) ext_ids = list(set(discovered_ids) | set(known_ids))
discovered_ids = None discovered_ids = None
known_ids = None known_ids = None
res = update_extensions(archive_dir, verbose, parallel, forum_ext_ids, res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids)
ext_ids)
# We re-try (once) the extensions with unknown exceptions, as # We re-try (once) the extensions with unknown exceptions, as
# they are often temporary # they are often temporary
has_exception = list(filter(lambda x: x.has_exception(), res)) has_exception = list(filter(lambda x: x.has_exception(), res))
if has_exception != []: if has_exception != []:
log(verbose, logging.info(
" {} extensions with unknown exceptions, start another try ...\n". " {} extensions with unknown exceptions, start another try ...".
format(str(len(has_exception)))) format(str(len(has_exception))))
has_exception_ids = list(map(lambda x: x.id, has_exception)) has_exception_ids = list(map(lambda x: x.id, has_exception))
forum_ext_ids_except = list( forum_ext_ids_except = list(
set(forum_ext_ids).intersection(set(has_exception_ids))) set(forum_ext_ids).intersection(set(has_exception_ids)))
ext_ids_except = sorted( ext_ids_except = sorted(
list(set(has_exception_ids) - set(forum_ext_ids_except))) list(set(has_exception_ids) - set(forum_ext_ids_except)))
res_update = update_extensions(archive_dir, verbose, parallel, res_update = update_extensions(archive_dir, parallel,
forum_ext_ids_except, ext_ids_except) forum_ext_ids_except, ext_ids_except)
res = list(set(res) - set(has_exception)) + res_update res = list(set(res) - set(has_exception)) + res_update
end_time = time.time() end_time = time.time()
log_summary(verbose, res, False, end_time - start_time) log_summary(res, end_time - start_time)
log_summary(verbose, res, True, end_time - start_time)
log_failures_to_file(log_dir, today, res) log_failures_to_file(log_dir, today, res)

View File

@ -22,12 +22,13 @@ import sys
import tarfile import tarfile
import time import time
import tempfile import tempfile
import traceback
import fnmatch import fnmatch
from multiprocessing import Pool, Lock from multiprocessing import Pool
from functools import partial from functools import partial
import logging
from ExtensionCrawler.archive import update_sqlite_incremental from ExtensionCrawler.archive import update_sqlite_incremental
from ExtensionCrawler.config import *
def help(): def help():
@ -42,20 +43,6 @@ def help():
print(" -N <MAXTASKID> ") print(" -N <MAXTASKID> ")
def guarded_stdout(string):
lock.acquire()
sys.stdout.write(string)
sys.stdout.flush()
lock.release()
def guarded_stderr(string):
lock.acquire()
sys.stderr.write(string)
sys.stderr.flush()
lock.release()
def process_id(dbbasedir, path): def process_id(dbbasedir, path):
start = time.time() start = time.time()
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
@ -63,7 +50,7 @@ def process_id(dbbasedir, path):
t.extractall(tmpdir) t.extractall(tmpdir)
extid = os.listdir(tmpdir)[0] extid = os.listdir(tmpdir)[0]
guarded_stdout("Processing {}\n".format(extid)) logging.info("Processing {}".format(extid))
dbpath = os.path.join(dbbasedir, extid + ".sqlite") dbpath = os.path.join(dbbasedir, extid + ".sqlite")
if os.path.exists(dbpath): if os.path.exists(dbpath):
os.remove(dbpath) os.remove(dbpath)
@ -71,12 +58,11 @@ def process_id(dbbasedir, path):
for date in sorted(os.listdir(iddir)): for date in sorted(os.listdir(iddir)):
try: try:
update_sqlite_incremental(dbpath, iddir, extid, date, True, update_sqlite_incremental(dbpath, iddir, extid, date)
"")
except Exception: except Exception:
guarded_stderr("Exception when handling {} on {}:\n{}\n". logging.exception("Exception when handling {} on {}".
format(extid, date, traceback.format_exc())) format(extid, date))
guarded_stdout("Finished {} in {}s\n".format(extid, time.time() - start)) logging.info("Finished {} in {}s".format(extid, time.time() - start))
def find(archive, pattern): def find(archive, pattern):
@ -97,11 +83,6 @@ def find_from_file(archive, extidlistfile):
yield os.path.join(root, file) yield os.path.join(root, file)
def init(l):
global lock
lock = l
def parse_args(argv): def parse_args(argv):
archive = "archive" archive = "archive"
parallel = 8 parallel = 8
@ -155,9 +136,11 @@ def parse_args(argv):
def main(argv): def main(argv):
logging.basicConfig(level=logging.INFO, format=const_log_format())
dbbasedir, paths, parallel = parse_args(argv) dbbasedir, paths, parallel = parse_args(argv)
with Pool(initializer=init, initargs=(Lock(), ), processes=parallel) as p: with Pool(processes=parallel) as p:
p.map(partial(process_id, dbbasedir), paths) p.map(partial(process_id, dbbasedir), paths)