Compare commits

...

1 Commits

4 changed files with 91 additions and 55 deletions

View File

@ -23,7 +23,7 @@ import os
import glob
import re
import json
from multiprocessing import Pool
from multiprocessing import Pool, Lock
from functools import partial
import shutil
import tempfile
@ -34,12 +34,13 @@ import datetime
import dateutil
import dateutil.parser
import requests
import random
from ExtensionCrawler.config import (
const_review_payload, const_review_search_url, const_download_url,
get_local_archive_dir, const_overview_url, const_support_url,
const_support_payload, const_review_search_payload, const_review_url)
from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_exception
from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_exception, log_warning
from ExtensionCrawler.db import update_db_incremental
@ -343,23 +344,37 @@ def update_crx(archivedir, tmptardir, ext_id, date):
return RequestResult(res)
def iterate_authors(pages):
def iterate_authors(ext_id, pages):
for page in pages:
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
for annotation in json_page["annotations"]:
if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
yield (annotation["entity"]["author"],
annotation["entity"]["groups"])
try:
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
for annotation in json_page["annotations"]:
if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
yield (annotation["entity"]["author"],
annotation["entity"]["groups"])
except ValueError as e:
log_warning("Could not read review/store/reply page: " + str(e), 3, ext_id)
def update_reviews(tar, date, ext_id):
def restricted_post(url, proxy, **kwargs):
kwargs.update(proxies={"http": proxy, "https": proxy})
if 'lock' in globals():
with lock:
google_dos_protection()
return requests.post(url, **kwargs)
else:
google_dos_protection()
return requests.post(url, **kwargs)
def update_reviews(tar, date, proxy, ext_id):
res = None
try:
pages = []
google_dos_protection()
res = requests.post(
res = restricted_post(
const_review_url(),
proxy,
data=const_review_payload(ext_id, "0", "100"),
timeout=10)
log_info("* review page 0-100: {}".format(str(res.status_code)), 2,
@ -367,9 +382,9 @@ def update_reviews(tar, date, ext_id):
store_request_text(tar, date, 'reviews000-099.text', res)
pages += [res.text]
google_dos_protection()
res = requests.post(
res = restricted_post(
const_review_url(),
proxy,
data=const_review_payload(ext_id, "100", "100"),
timeout=10)
log_info("* review page 100-200: {}".format(str(res.status_code)), 2,
@ -377,13 +392,13 @@ def update_reviews(tar, date, ext_id):
store_request_text(tar, date, 'reviews100-199.text', res)
pages += [res.text]
google_dos_protection()
# Always start with reply number 0 and request 10 replies
ext_id_author_tups = [(ext_id, author, 0, 10, groups)
for author, groups in iterate_authors(pages)]
for author, groups in iterate_authors(ext_id, pages)]
if ext_id_author_tups:
res = requests.post(
res = restricted_post(
const_review_search_url(),
proxy,
data=const_review_search_payload(ext_id_author_tups),
timeout=10)
log_info("* review page replies: {}".format(str(res.status_code)),
@ -396,14 +411,14 @@ def update_reviews(tar, date, ext_id):
return RequestResult(res)
def update_support(tar, date, ext_id):
def update_support(tar, date, proxy, ext_id):
res = None
try:
pages = []
google_dos_protection()
res = requests.post(
res = restricted_post(
const_support_url(),
proxy,
data=const_support_payload(ext_id, "0", "100"),
timeout=10)
log_info("* support page 0-100: {}".format(str(res.status_code)), 2,
@ -411,9 +426,9 @@ def update_support(tar, date, ext_id):
store_request_text(tar, date, 'support000-099.text', res)
pages += [res.text]
google_dos_protection()
res = requests.post(
res = restricted_post(
const_support_url(),
proxy,
data=const_support_payload(ext_id, "100", "100"),
timeout=10)
log_info("* support page 100-200: {}".format(str(res.status_code)), 2,
@ -421,13 +436,13 @@ def update_support(tar, date, ext_id):
store_request_text(tar, date, 'support100-199.text', res)
pages += [res.text]
google_dos_protection()
# Always start with reply number 0 and request 10 replies
ext_id_author_tups = [(ext_id, author, 0, 10, groups)
for author, groups in iterate_authors(pages)]
for author, groups in iterate_authors(ext_id, pages)]
if ext_id_author_tups:
res = requests.post(
res = restricted_post(
const_review_search_url(),
proxy,
data=const_review_search_payload(ext_id_author_tups),
timeout=10)
log_info("* support page replies: {}".format(str(res.status_code)),
@ -440,7 +455,9 @@ def update_support(tar, date, ext_id):
return RequestResult(res)
def update_extension(archivedir, forums, ext_id):
def update_extension(archivedir, forums_ext_ids, proxy, ext_id):
forums = ext_id in forums_ext_ids
log_info("Updating extension {}".format(" (including forums)"
if forums else ""), 1, ext_id)
is_new = False
@ -472,12 +489,12 @@ def update_extension(archivedir, forums, ext_id):
res_reviews = None
res_support = None
if forums:
res_reviews = update_reviews(tmptardir, date, ext_id)
res_reviews = update_reviews(tmptardir, date, proxy, ext_id)
res_crx = update_crx(archivedir, tmptardir, ext_id, date)
if forums:
res_support = update_support(tmptardir, date, ext_id)
res_support = update_support(tmptardir, date, proxy, ext_id)
backup = False
if backup:
@ -548,8 +565,11 @@ def update_extension(archivedir, forums, ext_id):
return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx,
res_reviews, res_support, sql_exception, sql_success)
def init_child(lock_):
global lock
lock = lock_
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, proxy):
ext_with_forums = []
ext_without_forums = []
ext_ids = (list(set(ext_ids) - set(forums_ext_ids)))
@ -557,23 +577,33 @@ def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
log_info("Updating {} extensions ({} including forums)".format(
len(ext_ids), len(forums_ext_ids)))
# First, update all extensions without forums in parallel (increased speed).
# parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
parallel_ids = ext_ids
log_info("Updating {} extensions excluding forums (parallel)".format(
len(parallel_ids)), 1)
with Pool(parallel) as p:
if proxy is None:
# First, update all extensions without forums in parallel (increased speed).
# parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
parallel_ids = ext_ids
log_info("Updating {} extensions excluding forums (parallel)".format(
len(parallel_ids)), 1)
else:
parallel_ids = ext_ids + forums_ext_ids
random.shuffle(parallel_ids)
log_info("Updating {} extensions including forums (parallel)".format(
len(parallel_ids)), 1)
with Pool(parallel, initializer=init_child, initargs=(Lock(),)) as p:
ext_without_forums = list(
p.map(partial(update_extension, archivedir, False), parallel_ids))
p.map(partial(update_extension, archivedir, set(forums_ext_ids), proxy), parallel_ids))
# Second, update extensions with forums sequentially (and with delays) to
# avoid running into Googles DDOS detection.
log_info("Updating {} extensions including forums (sequentially)".format(
len(forums_ext_ids)), 1)
if proxy is None:
log_info("Updating {} extensions including forums (sequentially)".format(
len(forums_ext_ids)), 1)
ext_with_forums = list(
map(partial(update_extension, archivedir, True), forums_ext_ids))
ext_with_forums = list(
map(partial(update_extension, archivedir, set(forums_ext_ids), proxy), forums_ext_ids))
else:
log_info("Skipping sequential forum download")
return ext_with_forums + ext_without_forums

View File

@ -42,14 +42,15 @@ class MysqlBackend:
return self
def __exit__(self, *args):
start = time.time()
self.retry(self._commit_cache)
self.db.commit()
log_info(
"* Database batch insert finished after {}".format(
datetime.timedelta(seconds=int(time.time() - start))),
2,
self.ext_id)
if self.cache is not {}:
start = time.time()
self.retry(self._commit_cache)
self.db.commit()
log_info(
"* Database batch insert finished after {}".format(
datetime.timedelta(seconds=int(time.time() - start))),
2,
self.ext_id)
self._close_conn()
def _commit_cache(self):

View File

@ -26,7 +26,7 @@ import logging
def google_dos_protection(maxrange=0.3):
"""Wait a random number of seconds (between 0.5 to 0.5+maxrange)
to avoid Google's bot detection"""
sleep(0.5+(random()*maxrange))
sleep(1+(random()*maxrange))
def value_of(value, default):
"""Get value or default value if None."""

19
crawler
View File

@ -142,9 +142,10 @@ def helpmsg():
print(" -d discover new extensions")
print(" --max-discover <N> discover at most N new extensions")
print(" -a=<DIR> archive directory")
print(" --proxy HOST[:PORT] the proxy to use when downloading posts parallely")
def print_config(basedir, archive_dir, conf_dir, discover, parallel):
def print_config(basedir, archive_dir, conf_dir, discover, parallel, proxy):
"""Print current configuration."""
log_info("Configuration:")
log_info(" Base dir: {}".format(basedir))
@ -152,6 +153,7 @@ def print_config(basedir, archive_dir, conf_dir, discover, parallel):
log_info(" Configuration directory: {}".format(conf_dir))
log_info(" Discover new extensions: {}".format(discover))
log_info(" Max num. of concurrent downloads: {}".format(parallel))
log_info(" Proxy for forum download: {}".format(proxy))
def parse_args(argv):
@ -161,9 +163,10 @@ def parse_args(argv):
verbose = const_verbose()
discover = const_discover()
max_discover = None
proxy = None
try:
opts, _ = getopt.getopt(argv, "hsda:p:",
["archive=", 'parallel=', 'max-discover='])
["archive=", 'parallel=', 'max-discover=', 'proxy='])
except getopt.GetoptError:
helpmsg()
sys.exit(2)
@ -182,14 +185,16 @@ def parse_args(argv):
elif opt == '--max-discover':
discover = True
max_discover = int(arg)
return basedir, parallel, verbose, discover, max_discover
elif opt == '--proxy':
proxy = arg
return basedir, parallel, verbose, discover, max_discover, proxy
def main(argv):
"""Main function of the extension crawler."""
today = datetime.datetime.now(datetime.timezone.utc).isoformat()
basedir, parallel, verbose, discover, max_discover = parse_args(argv)
basedir, parallel, verbose, discover, max_discover, proxy = parse_args(argv)
if verbose:
loglevel = logging.INFO
@ -216,7 +221,7 @@ def main(argv):
start_time = time.time()
print_config(basedir, archive_dir, conf_dir, discover, parallel)
print_config(basedir, archive_dir, conf_dir, discover, parallel, proxy)
forum_ext_ids = get_forum_ext_ids(conf_dir)
known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids))
@ -235,7 +240,7 @@ def main(argv):
discovered_ids = None
known_ids = None
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids)
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids, proxy)
# We re-try (once) the extensions with unknown exceptions, as
# they are often temporary
@ -250,7 +255,7 @@ def main(argv):
ext_ids_except = sorted(
list(set(has_exception_ids) - set(forum_ext_ids_except)))
res_update = update_extensions(archive_dir, parallel,
forum_ext_ids_except, ext_ids_except)
forum_ext_ids_except, ext_ids_except, proxy)
res = list(set(res) - set(has_exception)) + res_update
end_time = time.time()