Merge branch 'master' of logicalhacking.com:BrowserSecurity/ExtensionCrawler

This commit is contained in:
Achim D. Brucker 2017-08-29 22:54:54 +01:00
commit 4f59c56e73
7 changed files with 228 additions and 340 deletions

View File

@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
Module for handling archives of the Browser Extension Crawler.
"""
@ -35,19 +34,20 @@ import datetime
import dateutil
import dateutil.parser
import requests
import logging
from ExtensionCrawler.config import (const_review_payload, const_review_search_url,
const_download_url, get_local_archive_dir,
const_overview_url, const_support_url,
const_support_payload, const_review_search_payload,
const_review_url)
from ExtensionCrawler.util import logmsg, google_dos_protection, log, value_of
from ExtensionCrawler.config import (
const_review_payload, const_review_search_url, const_download_url,
get_local_archive_dir, const_overview_url, const_support_url,
const_support_payload, const_review_search_payload, const_review_url)
from ExtensionCrawler.util import google_dos_protection, value_of
from ExtensionCrawler.sqlite import db_file, update_sqlite_incremental
class Error(Exception):
pass
class CrawlError(Error):
def __init__(self, extid, message, pagecontent=""):
self.extid = extid
@ -55,6 +55,7 @@ class CrawlError(Error):
self.pagecontent = pagecontent
super(CrawlError, self).__init__()
class RequestResult:
def __init__(self, response=None, exception=None):
if response is not None:
@ -97,38 +98,38 @@ class UpdateResult:
return self.new
def is_ok(self):
return (self.res_overview.is_ok() and
(self.res_crx.is_ok() or self.res_crx.not_modified()) and
((self.res_reviews is None) or self.res_reviews.is_ok()) and (
(self.res_support is None) or self.res_support.is_ok()))
return (self.res_overview.is_ok()
and (self.res_crx.is_ok() or self.res_crx.not_modified())
and ((self.res_reviews is None) or self.res_reviews.is_ok())
and ((self.res_support is None) or self.res_support.is_ok()))
def not_authorized(self):
return (self.res_overview.not_authorized() or
self.res_crx.not_authorized() or
(self.res_reviews is not None and
self.res_reviews.not_authorized()) or (
self.res_support is not None and
self.res_support.not_authorized()))
return (self.res_overview.not_authorized()
or self.res_crx.not_authorized()
or (self.res_reviews is not None
and self.res_reviews.not_authorized())
or (self.res_support is not None
and self.res_support.not_authorized()))
def not_in_store(self):
return (
self.res_overview.not_found() or self.res_crx.not_found() or
(self.res_reviews is not None and self.res_reviews.not_found()) or
(self.res_support is not None and self.res_support.not_found()))
return (self.res_overview.not_found() or self.res_crx.not_found() or
(self.res_reviews is not None and self.res_reviews.not_found())
or (self.res_support is not None
and self.res_support.not_found()))
def has_exception(self):
return (self.res_overview.has_exception() or
self.res_crx.has_exception() or
(self.res_reviews is not None and
self.res_reviews.has_exception()) or (
self.res_support is not None and
self.res_support.has_exception()))
return (self.res_overview.has_exception()
or self.res_crx.has_exception()
or (self.res_reviews is not None
and self.res_reviews.has_exception())
or (self.res_support is not None
and self.res_support.has_exception()))
def raised_google_ddos(self):
return ((self.res_reviews is not None and
self.res_reviews.not_available()) or
(self.res_support is not None and
self.res_support.not_available()))
return ((self.res_reviews is not None
and self.res_reviews.not_available())
or (self.res_support is not None
and self.res_support.not_available()))
def not_modified(self):
return self.res_crx.not_modified()
@ -167,8 +168,9 @@ def httpdate(dt):
"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
"Nov", "Dec"
][dt.month - 1]
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (
weekday, dt.day, month, dt.year, dt.hour, dt.minute, dt.second)
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (weekday, dt.day, month,
dt.year, dt.hour,
dt.minute, dt.second)
def last_modified_utc_date(path):
@ -191,8 +193,8 @@ def last_crx(archivedir, extid, date=None):
t = tarfile.open(tar, 'r')
old_crxs = sorted([
x.name for x in t.getmembers()
if x.name.endswith(".crx") and x.size > 0 and (date is None or (
dateutil.parser.parse(
if x.name.endswith(".crx") and x.size > 0 and (
date is None or (dateutil.parser.parse(
os.path.split(os.path.split(x.name)[0])[1]) <= date))
])
t.close()
@ -217,20 +219,19 @@ def last_etag(archivedir, extid, crxfile):
return etag
def update_overview(tar, date, verbose, ext_id):
logtxt = logmsg(verbose, "", " * overview page: ")
def update_overview(tar, date, ext_id):
res = None
try:
res = requests.get(const_overview_url(ext_id), timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code)))
logging.info(8 * " " +
"* overview page: {}".format(str(res.status_code)))
store_request_text(tar, date, 'overview.html', res)
except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception("Exception when retrieving overview page")
write_text(tar, date, 'overview.html.exception',
traceback.format_exc())
return RequestResult(res, e), logtxt
logtxt = logmsg(verbose, logtxt, "\n")
return RequestResult(res), logtxt
return RequestResult(res, e)
return RequestResult(res)
def validate_crx_response(res, extid, extfilename):
@ -250,24 +251,23 @@ def validate_crx_response(res, extid, extfilename):
extfilename))
def update_crx(archivedir, tmptardir, verbose, ext_id, date):
def update_crx(archivedir, tmptardir, ext_id, date):
res = None
extfilename = "default_ext_archive.crx"
last_crx_file = last_crx(archivedir, ext_id)
last_crx_etag = last_etag(archivedir, ext_id, last_crx_file)
last_crx_http_date = last_modified_http_date(last_crx_file)
logtxt = logmsg(verbose, "",
" * crx archive (Last: {}): ".format(
value_of(last_crx_http_date, "n/a")))
headers = ""
if last_crx_file is not "":
headers = {'If-Modified-Since': last_crx_http_date}
try:
res = requests.get(const_download_url().format(ext_id),
stream=True,
headers=headers,
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}\n".format(str(res.status_code)))
res = requests.get(
const_download_url().format(ext_id),
stream=True,
headers=headers,
timeout=10)
logging.info(8 * " " + "* crx archive (Last: {}): {}".format(
value_of(last_crx_http_date, "n/a"), str(res.status_code)))
extfilename = os.path.basename(res.url)
if re.search('&', extfilename):
extfilename = "default.crx"
@ -278,19 +278,17 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date):
timeout=10,
allow_redirects=True).headers.get('ETag')
write_text(tmptardir, date, extfilename + ".etag", etag)
logtxt = logmsg(verbose, logtxt, (
" - checking etag, last: {}\n" +
" current: {}\n").format(
last_crx_etag, etag))
logging.info(12 * " " +
"- checking etag, last: {}".format(last_crx_etag))
logging.info(12 * " " + " current: {}".format(etag))
if (etag is not "") and (etag != last_crx_etag):
logtxt = logmsg(
verbose, logtxt,
" - downloading due to different etags\n")
logging.info(12 * " " + "- downloading due to different etags")
res = requests.get(const_download_url().format(ext_id),
stream=True,
timeout=10)
res = requests.get(
const_download_url().format(ext_id),
stream=True,
timeout=10)
else:
write_text(tmptardir, date, extfilename + ".link",
os.path.join("..",
@ -306,28 +304,23 @@ def update_crx(archivedir, tmptardir, verbose, ext_id, date):
write_text(tmptardir, date, extfilename + ".etag",
res.headers.get("ETag"))
except Exception as e:
logtxt = logmsg(verbose, logtxt,
" - Exception: {}\n".format(str(e)))
logging.exception("Exception when updating crx")
write_text(tmptardir, date, extfilename + ".exception",
traceback.format_exc())
return RequestResult(res, e), logtxt
logtxt = logmsg(verbose, logtxt, "\n")
return RequestResult(res), logtxt
return RequestResult(res, e)
return RequestResult(res)
def iterate_authors(pages):
for page in pages:
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
for annotation in json_page["annotations"]:
if "attributes" in annotation and "replyExists" in annotation[
"attributes"] and annotation["attributes"]["replyExists"]:
if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
yield (annotation["entity"]["author"],
annotation["entity"]["groups"])
def update_reviews(tar, date, verbose, ext_id):
dir = os.path.join(os.path.splitext(tar)[0], date)
logtxt = logmsg(verbose, "", " * review page: ")
def update_reviews(tar, date, ext_id):
res = None
try:
pages = []
@ -337,7 +330,8 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_url(),
data=const_review_payload(ext_id, "0", "100"),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code)))
logging.info(8 * " " +
"* review page 0-100: {}".format(str(res.status_code)))
store_request_text(tar, date, 'reviews000-099.text', res)
pages += [res.text]
@ -346,7 +340,8 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_url(),
data=const_review_payload(ext_id, "100", "100"),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code)))
logging.info(8 * " " + "* review page 100-200: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'reviews100-199.text', res)
pages += [res.text]
@ -359,21 +354,17 @@ def update_reviews(tar, date, verbose, ext_id):
const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code)))
logging.info(8 * " " + "* review page replies: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'reviewsreplies.text', res)
else:
logtxt = logmsg(verbose, logtxt, "-")
except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception("Exception when updating reviews")
write_text(tar, date, 'reviews.html.exception', traceback.format_exc())
return RequestResult(res, e), logtxt
logtxt = logmsg(verbose, logtxt, "\n")
return RequestResult(res), logtxt
return RequestResult(res, e)
return RequestResult(res)
def update_support(tar, date, verbose, ext_id):
dir = os.path.join(os.path.splitext(tar)[0], date)
logtxt = logmsg(verbose, "", " * support page: ")
def update_support(tar, date, ext_id):
res = None
try:
pages = []
@ -383,7 +374,8 @@ def update_support(tar, date, verbose, ext_id):
const_support_url(),
data=const_support_payload(ext_id, "0", "100"),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code)))
logging.info(8 * " " +
"* support page 0-100: {}".format(str(res.status_code)))
store_request_text(tar, date, 'support000-099.text', res)
pages += [res.text]
@ -392,7 +384,8 @@ def update_support(tar, date, verbose, ext_id):
const_support_url(),
data=const_support_payload(ext_id, "100", "100"),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}/".format(str(res.status_code)))
logging.info(8 * " " +
"* support page 100-200: {}".format(str(res.status_code)))
store_request_text(tar, date, 'support100-199.text', res)
pages += [res.text]
@ -405,20 +398,19 @@ def update_support(tar, date, verbose, ext_id):
const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups),
timeout=10)
logtxt = logmsg(verbose, logtxt, "{}".format(str(res.status_code)))
logging.info(8 * " " + "* support page replies: {}".format(
str(res.status_code)))
store_request_text(tar, date, 'supportreplies.text', res)
else:
logtxt = logmsg(verbose, logtxt, "-")
except Exception as e:
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception("Exception when updating support pages")
write_text(tar, date, 'support.html.exception', traceback.format_exc())
return RequestResult(res, e), logtxt
logtxt = logmsg(verbose, logtxt, "\n")
return RequestResult(res), logtxt
return RequestResult(res, e)
return RequestResult(res)
def update_extension(archivedir, verbose, forums, ext_id):
logtxt = logmsg(verbose, "", " Updating extension {}".format(ext_id))
def update_extension(archivedir, forums, ext_id):
logging.info(4 * " " + "Updating extension {}{}".format(
ext_id, " (including forums)" if forums else ""))
is_new = False
tar_exception = None
sql_exception = None
@ -426,9 +418,6 @@ def update_extension(archivedir, verbose, forums, ext_id):
tmptardir = ""
start = time.time()
if forums:
logtxt = logmsg(verbose, logtxt, " (including forums)")
logtxt = logmsg(verbose, logtxt, "\n")
date = datetime.datetime.now(datetime.timezone.utc).isoformat()
tardir = os.path.join(archivedir, get_local_archive_dir(ext_id), ext_id)
@ -437,43 +426,26 @@ def update_extension(archivedir, verbose, forums, ext_id):
try:
tmpdir = tempfile.mkdtemp()
tmptardir = os.path.join(tmpdir, ext_id)
logtxt = logmsg(verbose, logtxt,
" * tmptardir = {}\n".format(tmptardir))
logging.info(8 * " " + "* tmptardir = {}".format(tmptardir))
os.makedirs(
os.path.join(archivedir, get_local_archive_dir(ext_id)),
exist_ok=True)
except Exception as e:
logtxt = logmsg(verbose, logtxt,
" * FATAL: cannot create tmpdir")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception(8 * " " + "* FATAL: cannot create tmpdir")
tar_exception = e
logtxt = logmsg(
verbose,
logtxt,
" * Duration: {}\n".format(
datetime.timedelta(seconds=int(time.time() - start))))
log(verbose, logtxt)
return UpdateResult(ext_id, is_new, tar_exception, None,
None, None, None, sql_exception,
False)
return UpdateResult(ext_id, is_new, tar_exception, None, None, None,
None, sql_exception, False)
res_overview, msg_overview = update_overview(tmptardir, date, verbose,
ext_id)
res_overview = update_overview(tmptardir, date, ext_id)
res_reviews = None
msg_reviews = ""
res_support = None
msg_support = ""
if forums:
res_reviews, msg_reviews = update_reviews(tmptardir, date, verbose,
ext_id)
res_reviews = update_reviews(tmptardir, date, ext_id)
res_crx, msg_crx = update_crx(archivedir, tmptardir, verbose, ext_id, date)
res_crx = update_crx(archivedir, tmptardir, ext_id, date)
if forums:
res_support, msg_support = update_support(tmptardir, date, verbose,
ext_id)
logtxt = logtxt + msg_overview + msg_crx + msg_reviews + msg_support
res_support = update_support(tmptardir, date, ext_id)
backup = False
if backup:
@ -490,11 +462,8 @@ def update_extension(archivedir, verbose, forums, ext_id):
if os.path.exists(tar):
shutil.copyfile(tar, tardir + ".bak.tar")
except Exception as e:
logtxt = logmsg(
verbose, logtxt,
" * FATAL: cannot rename old tar archive")
logtxt = logmsg(verbose, logtxt,
" / Exception: {}\n".format(str(e)))
logging.exception(8 * " " +
"* FATAL: cannot rename old tar archive")
tar_exception = e
try:
write_text(tardir, date, ext_id + ".tar.rename.exception",
@ -509,9 +478,7 @@ def update_extension(archivedir, verbose, forums, ext_id):
ar.add(tmptardir, arcname=ext_id)
ar.close()
except Exception as e:
logtxt = logmsg(verbose, logtxt,
" * FATAL: cannot create tar archive")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception(8 * " " + "* FATAL: cannot create tar archive")
tar_exception = e
try:
write_text(tardir, date, ext_id + ".tar.create.exception",
@ -520,17 +487,12 @@ def update_extension(archivedir, verbose, forums, ext_id):
pass
try:
logtxt = logmsg(verbose, logtxt, " * Updating db...\n")
logging.info(8 * " " + "* Updating db...")
db_path = db_file(archivedir, ext_id)
msg_updatesqlite = update_sqlite_incremental(
db_path, tmptardir, ext_id, date, verbose, 15 * " ")
logtxt = logmsg(verbose, logtxt, msg_updatesqlite)
update_sqlite_incremental(db_path, tmptardir, ext_id, date)
sql_success = True
except Exception as e:
logtxt = logmsg(verbose, logtxt,
" * Exception during update of sqlite db ")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception(8 * " " + "* Exception during update of sqlite db")
sql_exception = e
try:
@ -541,9 +503,7 @@ def update_extension(archivedir, verbose, forums, ext_id):
try:
shutil.rmtree(path=tmpdir)
except Exception as e:
logtxt = logmsg(verbose, logtxt,
" * FATAL: cannot remove archive directory")
logtxt = logmsg(verbose, logtxt, " / Exception: {}\n".format(str(e)))
logging.exception(8 * " " + "* FATAL: cannot remove archive directory")
tar_exception = e
try:
write_text(tardir, date, ext_id + ".dir.remove.exception",
@ -551,49 +511,41 @@ def update_extension(archivedir, verbose, forums, ext_id):
except Exception:
pass
logtxt = logmsg(
verbose,
logtxt,
" * Duration: {}\n".format(
logging.info(8 * " " + "* Duration: {}".format(
datetime.timedelta(seconds=int(time.time() - start))))
log(verbose, logtxt)
return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx,
res_reviews, res_support, sql_exception, sql_success)
def update_extensions(archivedir, verbose, parallel, forums_ext_ids, ext_ids):
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
ext_with_forums = []
ext_without_forums = []
ext_ids = list(set(ext_ids) - set(forums_ext_ids))
forums_ext_ids = list(set(forums_ext_ids))
log(verbose, "Updating {} extensions ({} including forums)\n".format(
logging.info("Updating {} extensions ({} including forums)".format(
len(ext_ids), len(forums_ext_ids)))
# First, update extensions with forums sequentially (and with delays) to
# avoid running into Googles DDOS detection.
log(verbose,
" Updating {} extensions including forums (sequentially))\n".format(
len(forums_ext_ids)))
logging.info(2 * " " +
"Updating {} extensions including forums (sequentially))".
format(len(forums_ext_ids)))
ext_with_forums = list(
map(
partial(update_extension, archivedir, verbose, True),
forums_ext_ids))
map(partial(update_extension, archivedir, True), forums_ext_ids))
# Second, update extensions without forums parallel to increase speed.
parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
log(verbose,
" Updating {} extensions excluding forums (parallel))\n".format(
len(parallel_ids)))
logging.info(2 * " " +
"Updating {} extensions excluding forums (parallel))".format(
len(parallel_ids)))
with Pool(parallel) as p:
ext_without_forums = list(
p.map(
partial(update_extension, archivedir, verbose, False),
parallel_ids))
p.map(partial(update_extension, archivedir, False), parallel_ids))
return ext_with_forums + ext_without_forums
def get_existing_ids(archivedir, verbose):
def get_existing_ids(archivedir):
byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]'
word = byte + byte + byte + byte
return list(
@ -601,7 +553,7 @@ def get_existing_ids(archivedir, verbose):
glob.glob(os.path.join(archivedir, "*", word + ".tar"))))
def get_forum_ext_ids(confdir, verbose):
def get_forum_ext_ids(confdir):
with open(os.path.join(confdir, "forums.conf")) as f:
ids = f.readlines()
r = re.compile('^[a-p]+$')

View File

@ -137,6 +137,7 @@ def jsloc_timeout():
def const_basedir():
"""Top-level directory for the extension crawler archive."""
return "archive"
def const_parallel_downloads():
"""Number of parallel downloads."""
return 36
@ -145,6 +146,9 @@ def const_verbose():
"""Default verbosity."""
return True
def const_log_format():
return '%(process)s %(asctime)s %(message)s'
def const_discover():
"""Default configuration of discovery mode"""
return False

View File

@ -23,7 +23,7 @@ import re
from functools import reduce
import requests
import ExtensionCrawler.config
from ExtensionCrawler.util import log
import logging
def crawl_nearly_all_of_ext_ids():
@ -54,16 +54,15 @@ def crawl_nearly_all_of_ext_ids():
return [re.search("[a-z]{32}", url).group(0) for url in overview_urls]
def get_new_ids(verbose, known_ids):
def get_new_ids(known_ids):
"""Discover new extension ids."""
log(verbose, "Discovering new ids ... \n")
logging.info("Discovering new ids ...")
discovered_ids = []
try:
discovered_ids = ExtensionCrawler.discover.crawl_nearly_all_of_ext_ids()
except Exception as ex:
log(verbose,
" EXCEPTION during discovering of new ids: {}\n".format(str(ex)))
except Exception:
logging.exception("Exception when discovering new ids")
new_ids = list(set(discovered_ids) - set(known_ids))
log(verbose, " Discovered {} new extensions (out of {})\n".format(
logging.info(2 * " " + "Discovered {} new extensions (out of {})".format(
len(new_ids), len(discovered_ids)))
return new_ids

View File

@ -31,17 +31,16 @@ import json
import os
import glob
import datetime
import logging
def get_etag(ext_id, datepath, con, verbose, indent):
txt = ""
def get_etag(ext_id, datepath, con):
# Trying to parse etag file
etagpath = next(
iter(glob.glob(os.path.join(datepath, "*.crx.etag"))), None)
if etagpath:
with open(etagpath) as f:
return f.read(), txt
return f.read()
# Trying to parse header file for etag
headerpath = next(
@ -52,12 +51,10 @@ def get_etag(ext_id, datepath, con, verbose, indent):
try:
headers = eval(content)
if "ETag" in headers:
return headers["ETag"], txt
return headers["ETag"]
except Exception:
txt = logmsg(
verbose, txt,
indent + "* WARNING: could not parse crx header file")
pass
logging.warning(16 * " " +
"* WARNING: could not parse crx header file")
# Trying to look up previous etag in database
linkpath = next(
@ -70,9 +67,9 @@ def get_etag(ext_id, datepath, con, verbose, indent):
result = con.get_most_recent_etag(ext_id,
con.convert_date(linked_date))
if result is not None:
return result, txt
return result
return None, txt
return None
def get_overview_status(datepath):
@ -102,9 +99,8 @@ def get_crx_status(datepath):
return int(f.read())
def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent):
txt = ""
def parse_and_insert_overview(ext_id, date, datepath, con):
logging.info(16 * " " + "- parsing overview file")
overview_path = os.path.join(datepath, "overview.html")
if os.path.exists(overview_path):
with open(overview_path) as overview_file:
@ -162,8 +158,7 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent):
last_updated = str(last_updated_parent.contents[
0]) if last_updated_parent else None
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent)
txt = logmsg(verbose, txt, etag_msg)
etag = get_etag(ext_id, datepath, con)
match = re.search(
"""<Attribute name="item_category">(.*?)</Attribute>""",
@ -194,18 +189,15 @@ def parse_and_insert_overview(ext_id, date, datepath, con, verbose, indent):
date=con.convert_date(date),
category=category)
return txt
def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent):
txt = ""
def parse_and_insert_crx(ext_id, date, datepath, con):
logging.info(16 * " " + "- parsing crx file")
crx_path = next(iter(glob.glob(os.path.join(datepath, "*.crx"))), None)
if crx_path:
filename = os.path.basename(crx_path)
with ZipFile(crx_path) as f:
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent)
txt = logmsg(verbose, txt, etag_msg)
etag = get_etag(ext_id, datepath, con)
size = os.path.getsize(crx_path)
public_key = read_crx(crx_path).public_key
@ -267,7 +259,6 @@ def parse_and_insert_crx(ext_id, date, datepath, con, verbose, indent):
md5=js_file_info['md5'],
size=js_file_info['size'],
version=js_file_info['ver'])
return txt
def get(d, k):
@ -276,6 +267,7 @@ def get(d, k):
def parse_and_insert_review(ext_id, date, reviewpath, con):
logging.info(16 * " " + "- parsing review file")
with open(reviewpath) as f:
content = f.read()
stripped = content[content.find('{"'):]
@ -311,6 +303,7 @@ def parse_and_insert_review(ext_id, date, reviewpath, con):
def parse_and_insert_support(ext_id, date, supportpath, con):
logging.info(16 * " " + "- parsing support file")
with open(supportpath) as f:
content = f.read()
stripped = content[content.find('{"'):]
@ -345,15 +338,13 @@ def parse_and_insert_support(ext_id, date, supportpath, con):
con.insertmany("support", results)
def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent):
def parse_and_insert_replies(ext_id, date, repliespath, con):
logging.info(16 * " " + "- parsing reply file")
with open(repliespath) as f:
d = json.load(f)
if not "searchResults" in d:
txt = logmsg(
verbose, "",
indent + "* WARNING: there are no search results in {}\n".
format(repliespath))
return txt
logging.warning("* WARNING: there are no search results in {}".
format(repliespath))
results = []
for result in d["searchResults"]:
if "annotations" not in result:
@ -388,6 +379,7 @@ def parse_and_insert_replies(ext_id, date, repliespath, con, verbose, indent):
def parse_and_insert_status(ext_id, date, datepath, con):
logging.info(16 * " " + "- parsing status file")
overview_status = get_overview_status(datepath)
crx_status = get_crx_status(datepath)
@ -406,16 +398,10 @@ def parse_and_insert_status(ext_id, date, datepath, con):
overview_exception=overview_exception)
def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose,
indent):
txt = ""
indent2 = indent + 4 * " "
def update_sqlite_incremental(db_path, tmptardir, ext_id, date):
logging.info(12 * " " + "- parsing data from {}".format(date))
datepath = os.path.join(tmptardir, date)
txt = logmsg(verbose, txt,
indent + "- updating with data from {}\n".format(date))
if const_use_mysql():
# Don't forget to create a ~/.my.cnf file with the credentials
backend = MysqlBackend(read_default_file=const_mysql_config_file())
@ -423,29 +409,22 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose,
backend = SqliteBackend(db_path)
with backend as con:
etag, etag_msg = get_etag(ext_id, datepath, con, verbose, indent2)
txt = logmsg(verbose, txt, etag_msg)
etag = get_etag(ext_id, datepath, con)
if etag:
try:
crx_msg = parse_and_insert_crx(ext_id, date, datepath, con,
verbose, indent2)
txt = logmsg(verbose, txt, crx_msg)
parse_and_insert_crx(ext_id, date, datepath, con)
except zipfile.BadZipfile as e:
txt = logmsg(
verbose, txt, indent2 +
"* WARNING: the found crx file is not a zip file, exception: "
)
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
logging.warning(
16 * " " +
"* WARNING: the found crx file is not a zip file, exception: {}".
format(str(e)))
else:
crx_status = get_crx_status(datepath)
if crx_status != 401 and crx_status != 204 and crx_status != 404:
txt = logmsg(verbose, txt,
indent2 + "* WARNING: could not find etag\n")
logging.warning(16 * " " + "* WARNING: could not find etag")
parse_and_insert_overview(ext_id, date, datepath, con, verbose,
indent2)
parse_and_insert_overview(ext_id, date, datepath, con)
parse_and_insert_status(ext_id, date, datepath, con)
reviewpaths = glob.glob(os.path.join(datepath, "reviews*-*.text"))
@ -453,34 +432,24 @@ def update_sqlite_incremental(db_path, tmptardir, ext_id, date, verbose,
try:
parse_and_insert_review(ext_id, date, reviewpath, con)
except json.decoder.JSONDecodeError as e:
txt = logmsg(
verbose, txt,
indent2 + "* Could not parse review file, exception: ")
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
logging.warning(16 * " " +
"* Could not parse review file, exception: {}".
format(str(e)))
supportpaths = glob.glob(os.path.join(datepath, "support*-*.text"))
for supportpath in supportpaths:
try:
parse_and_insert_support(ext_id, date, supportpath, con)
except json.decoder.JSONDecodeError as e:
txt = logmsg(
verbose, txt,
indent2 + "* Could not parse support file, exception: ")
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
logging.warning(
16 * " " + "* Could not parse support file, exception: {}".
format(str(e)))
repliespaths = glob.glob(os.path.join(datepath, "*replies.text"))
for repliespath in repliespaths:
try:
reply_txt = parse_and_insert_replies(ext_id, date, repliespath,
con, verbose, indent)
txt = logmsg(verbose, txt, reply_txt)
parse_and_insert_replies(ext_id, date, repliespath, con)
except json.decoder.JSONDecodeError as e:
txt = logmsg(
verbose, txt,
indent2 + "* Could not parse reply file, exception: ")
txt = logmsg(verbose, txt, str(e))
txt = logmsg(verbose, txt, "\n")
return txt
logging.warning(16 * " " +
"* Could not parse reply file, exception: {}".
format(str(e)))

View File

@ -18,7 +18,6 @@
""" Various utility methods."""
import sys
from time import sleep
from random import random
@ -27,19 +26,6 @@ def google_dos_protection(maxrange=0.3):
to avoid Google's bot detection"""
sleep(0.5+(random()*maxrange))
def log(verbose, msg):
"""Print log message."""
if verbose:
sys.stdout.write(msg)
sys.stdout.flush()
def logmsg(verbose, msg1, msg2):
"""Append msg2 to log stream msg1."""
if verbose:
return msg1 + msg2
else:
return msg1
def value_of(value, default):
"""Get value or default value if None."""
if value is not None and value is not "":

109
crawler
View File

@ -25,11 +25,12 @@ import datetime
import time
import getopt
import sqlite3
import logging
from functools import reduce
from ExtensionCrawler.discover import get_new_ids
from ExtensionCrawler.archive import get_forum_ext_ids, get_existing_ids, update_extensions
from ExtensionCrawler.util import log
import ExtensionCrawler.config
from ExtensionCrawler.config import *
def write_log(dirname, fname, text):
"""Write text into the file with name fname in directory dirname."""
@ -92,52 +93,44 @@ def log_failures_to_file(dirname, today, res):
write_log(dirname, today + "-sql-not-updated.log", sql_success)
def log_summary(verbose, res, stderr=False, runtime=0):
"""Log brief result summary to log stream of stderr."""
def printlog(msg):
"""Print log message."""
if stderr:
sys.stderr.write(msg)
else:
log(verbose, msg)
def log_summary(res, runtime=0):
"""Log brief result summary."""
corrupt_tar_archives = list(filter(lambda x: x.corrupt_tar(), res))
printlog("\n")
printlog("Summary:\n")
printlog(" Updated {} out of {} extensions successfully\n".format(
logging.info("Summary:")
logging.info(" Updated {} out of {} extensions successfully".format(
str(len(list(filter(lambda x: x.is_ok(), res)))), str(len(res))))
printlog(" Updated extensions: {:8d}\n".format(
logging.info(" Updated extensions: {:8d}".format(
len(list(filter(lambda x: x.is_ok() and not x.not_modified(), res)))))
printlog(" Updated SQL databases: {:8d}\n".format(
logging.info(" Updated SQL databases: {:8d}".format(
len(list(filter(lambda x: x.sql_success(), res)))))
printlog(" New extensions: {:8d}\n".format(
logging.info(" New extensions: {:8d}".format(
len(list(filter(lambda x: x.is_new(), res)))))
printlog(" Not authorized: {:8d}\n".format(
logging.info(" Not authorized: {:8d}".format(
len(list(filter(lambda x: x.not_authorized(), res)))))
printlog(" Raised Google DDOS: {:8d}\n".format(
logging.info(" Raised Google DDOS: {:8d}".format(
len(list(filter(lambda x: x.raised_google_ddos(), res)))))
printlog(" Not modified archives: {:8d}\n".format(
logging.info(" Not modified archives: {:8d}".format(
len(list(filter(lambda x: x.not_modified(), res)))))
printlog(" Extensions not in store: {:8d}\n".format(
logging.info(" Extensions not in store: {:8d}".format(
len(list(filter(lambda x: x.not_in_store(), res)))))
printlog(" Unknown exception: {:8d}\n".format(
logging.info(" Unknown exception: {:8d}".format(
len(list(filter(lambda x: x.has_exception(), res)))))
printlog(" Corrupt tar archives: {:8d}\n".format(
len(corrupt_tar_archives)))
printlog(" SQL exception: {:8d}\n".format(
logging.info(
" Corrupt tar archives: {:8d}".format(len(corrupt_tar_archives)))
logging.info(" SQL exception: {:8d}".format(
len(list(filter(lambda x: x.sql_exception(), res)))))
printlog(" Total runtime: {}\n".format(
logging.info(" Total runtime: {}".format(
str(datetime.timedelta(seconds=int(runtime)))))
if corrupt_tar_archives != []:
printlog("\n\n")
printlog("List of extensions with corrupted files/archives:\n")
logging.info("")
logging.info("List of extensions with corrupted files/archives:")
list(
map(lambda x: printlog(" " + x.id + ": " + str(x.exception) + "\n"),
corrupt_tar_archives))
printlog("\n")
map(lambda x: logging.info(" " + x.id + ": " + str(x.exception), corrupt_tar_archives)
))
logging.info("")
def helpmsg():
@ -149,26 +142,24 @@ def helpmsg():
print(" -a=<DIR> archive directory")
def print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel):
def print_config(basedir, archive_dir, conf_dir, discover, parallel):
"""Print current configuration."""
log(verbose, "Configuration:\n")
log(verbose, " Base dir: {}\n".format(basedir))
log(verbose,
" Archive directory: {}\n".format(archive_dir))
log(verbose, " Configuration directory: {}\n".format(conf_dir))
log(verbose, " Discover new extensions: {}\n".format(discover))
log(verbose, " Max num. of concurrent downloads: {}\n".format(parallel))
log(verbose, " SQLite 3 version: {}\n".format(
logging.info("Configuration:")
logging.info(" Base dir: {}".format(basedir))
logging.info(" Archive directory: {}".format(archive_dir))
logging.info(" Configuration directory: {}".format(conf_dir))
logging.info(" Discover new extensions: {}".format(discover))
logging.info(" Max num. of concurrent downloads: {}".format(parallel))
logging.info(" SQLite 3 version: {}".format(
sqlite3.sqlite_version))
log(verbose, "\n")
def parse_args(argv):
"""Parse command line arguments. """
basedir = ExtensionCrawler.config.const_basedir()
parallel = ExtensionCrawler.config.const_parallel_downloads()
verbose = ExtensionCrawler.config.const_verbose()
discover = ExtensionCrawler.config.const_discover()
basedir = const_basedir()
parallel = const_parallel_downloads()
verbose = const_verbose()
discover = const_discover()
try:
opts, _ = getopt.getopt(argv, "hsda:p:", ["archive=", 'parallel='])
except getopt.GetoptError:
@ -191,9 +182,16 @@ def parse_args(argv):
def main(argv):
"""Main function of the extension crawler."""
today = datetime.datetime.now(datetime.timezone.utc).isoformat()
basedir, parallel, verbose, discover = parse_args(argv)
if verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
logging.basicConfig(level=loglevel, format=const_log_format())
archive_dir = os.path.join(basedir, "data")
os.makedirs(archive_dir, exist_ok=True)
conf_dir = os.path.join(basedir, "conf")
@ -204,41 +202,38 @@ def main(argv):
start_time = time.time()
print_config(verbose, basedir, archive_dir, conf_dir, discover, parallel)
print_config(basedir, archive_dir, conf_dir, discover, parallel)
forum_ext_ids = get_forum_ext_ids(conf_dir, verbose)
known_ids = list(
set(get_existing_ids(archive_dir, verbose)) | set(forum_ext_ids))
forum_ext_ids = get_forum_ext_ids(conf_dir)
known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids))
discovered_ids = []
if discover:
discovered_ids = get_new_ids(verbose, known_ids)
discovered_ids = get_new_ids(known_ids)
ext_ids = list(set(discovered_ids) | set(known_ids))
discovered_ids = None
known_ids = None
res = update_extensions(archive_dir, verbose, parallel, forum_ext_ids,
ext_ids)
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids)
# We re-try (once) the extensions with unknown exceptions, as
# they are often temporary
has_exception = list(filter(lambda x: x.has_exception(), res))
if has_exception != []:
log(verbose,
" {} extensions with unknown exceptions, start another try ...\n".
logging.info(
" {} extensions with unknown exceptions, start another try ...".
format(str(len(has_exception))))
has_exception_ids = list(map(lambda x: x.id, has_exception))
forum_ext_ids_except = list(
set(forum_ext_ids).intersection(set(has_exception_ids)))
ext_ids_except = sorted(
list(set(has_exception_ids) - set(forum_ext_ids_except)))
res_update = update_extensions(archive_dir, verbose, parallel,
res_update = update_extensions(archive_dir, parallel,
forum_ext_ids_except, ext_ids_except)
res = list(set(res) - set(has_exception)) + res_update
end_time = time.time()
log_summary(verbose, res, False, end_time - start_time)
log_summary(verbose, res, True, end_time - start_time)
log_summary(res, end_time - start_time)
log_failures_to_file(log_dir, today, res)

View File

@ -22,12 +22,13 @@ import sys
import tarfile
import time
import tempfile
import traceback
import fnmatch
from multiprocessing import Pool, Lock
from multiprocessing import Pool
from functools import partial
import logging
from ExtensionCrawler.archive import update_sqlite_incremental
from ExtensionCrawler.config import *
def help():
@ -42,20 +43,6 @@ def help():
print(" -N <MAXTASKID> ")
def guarded_stdout(string):
lock.acquire()
sys.stdout.write(string)
sys.stdout.flush()
lock.release()
def guarded_stderr(string):
lock.acquire()
sys.stderr.write(string)
sys.stderr.flush()
lock.release()
def process_id(dbbasedir, path):
start = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
@ -63,7 +50,7 @@ def process_id(dbbasedir, path):
t.extractall(tmpdir)
extid = os.listdir(tmpdir)[0]
guarded_stdout("Processing {}\n".format(extid))
logging.info("Processing {}".format(extid))
dbpath = os.path.join(dbbasedir, extid + ".sqlite")
if os.path.exists(dbpath):
os.remove(dbpath)
@ -71,12 +58,11 @@ def process_id(dbbasedir, path):
for date in sorted(os.listdir(iddir)):
try:
update_sqlite_incremental(dbpath, iddir, extid, date, True,
"")
update_sqlite_incremental(dbpath, iddir, extid, date)
except Exception:
guarded_stderr("Exception when handling {} on {}:\n{}\n".
format(extid, date, traceback.format_exc()))
guarded_stdout("Finished {} in {}s\n".format(extid, time.time() - start))
logging.exception("Exception when handling {} on {}".
format(extid, date))
logging.info("Finished {} in {}s".format(extid, time.time() - start))
def find(archive, pattern):
@ -97,11 +83,6 @@ def find_from_file(archive, extidlistfile):
yield os.path.join(root, file)
def init(l):
global lock
lock = l
def parse_args(argv):
archive = "archive"
parallel = 8
@ -155,9 +136,11 @@ def parse_args(argv):
def main(argv):
logging.basicConfig(level=logging.INFO, format=const_log_format())
dbbasedir, paths, parallel = parse_args(argv)
with Pool(initializer=init, initargs=(Lock(), ), processes=parallel) as p:
with Pool(processes=parallel) as p:
p.map(partial(process_id, dbbasedir), paths)