Compare commits
1 Commits
master
...
parallel_f
Author | SHA1 | Date |
---|---|---|
Michael Herzberg | f5d92bf32f |
|
@ -23,7 +23,7 @@ import os
|
|||
import glob
|
||||
import re
|
||||
import json
|
||||
from multiprocessing import Pool
|
||||
from multiprocessing import Pool, Lock
|
||||
from functools import partial
|
||||
import shutil
|
||||
import tempfile
|
||||
|
@ -34,12 +34,13 @@ import datetime
|
|||
import dateutil
|
||||
import dateutil.parser
|
||||
import requests
|
||||
import random
|
||||
|
||||
from ExtensionCrawler.config import (
|
||||
const_review_payload, const_review_search_url, const_download_url,
|
||||
get_local_archive_dir, const_overview_url, const_support_url,
|
||||
const_support_payload, const_review_search_payload, const_review_url)
|
||||
from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_exception
|
||||
from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_exception, log_warning
|
||||
from ExtensionCrawler.db import update_db_incremental
|
||||
|
||||
|
||||
|
@ -343,23 +344,37 @@ def update_crx(archivedir, tmptardir, ext_id, date):
|
|||
return RequestResult(res)
|
||||
|
||||
|
||||
def iterate_authors(pages):
|
||||
def iterate_authors(ext_id, pages):
|
||||
for page in pages:
|
||||
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
|
||||
for annotation in json_page["annotations"]:
|
||||
if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
|
||||
yield (annotation["entity"]["author"],
|
||||
annotation["entity"]["groups"])
|
||||
try:
|
||||
json_page = json.loads(page[page.index("{\""):page.rindex("}}},") + 1])
|
||||
for annotation in json_page["annotations"]:
|
||||
if "attributes" in annotation and "replyExists" in annotation["attributes"] and annotation["attributes"]["replyExists"]:
|
||||
yield (annotation["entity"]["author"],
|
||||
annotation["entity"]["groups"])
|
||||
except ValueError as e:
|
||||
log_warning("Could not read review/store/reply page: " + str(e), 3, ext_id)
|
||||
|
||||
|
||||
def update_reviews(tar, date, ext_id):
|
||||
def restricted_post(url, proxy, **kwargs):
|
||||
kwargs.update(proxies={"http": proxy, "https": proxy})
|
||||
if 'lock' in globals():
|
||||
with lock:
|
||||
google_dos_protection()
|
||||
return requests.post(url, **kwargs)
|
||||
else:
|
||||
google_dos_protection()
|
||||
return requests.post(url, **kwargs)
|
||||
|
||||
|
||||
def update_reviews(tar, date, proxy, ext_id):
|
||||
res = None
|
||||
try:
|
||||
pages = []
|
||||
|
||||
google_dos_protection()
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_review_url(),
|
||||
proxy,
|
||||
data=const_review_payload(ext_id, "0", "100"),
|
||||
timeout=10)
|
||||
log_info("* review page 0-100: {}".format(str(res.status_code)), 2,
|
||||
|
@ -367,9 +382,9 @@ def update_reviews(tar, date, ext_id):
|
|||
store_request_text(tar, date, 'reviews000-099.text', res)
|
||||
pages += [res.text]
|
||||
|
||||
google_dos_protection()
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_review_url(),
|
||||
proxy,
|
||||
data=const_review_payload(ext_id, "100", "100"),
|
||||
timeout=10)
|
||||
log_info("* review page 100-200: {}".format(str(res.status_code)), 2,
|
||||
|
@ -377,13 +392,13 @@ def update_reviews(tar, date, ext_id):
|
|||
store_request_text(tar, date, 'reviews100-199.text', res)
|
||||
pages += [res.text]
|
||||
|
||||
google_dos_protection()
|
||||
# Always start with reply number 0 and request 10 replies
|
||||
ext_id_author_tups = [(ext_id, author, 0, 10, groups)
|
||||
for author, groups in iterate_authors(pages)]
|
||||
for author, groups in iterate_authors(ext_id, pages)]
|
||||
if ext_id_author_tups:
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_review_search_url(),
|
||||
proxy,
|
||||
data=const_review_search_payload(ext_id_author_tups),
|
||||
timeout=10)
|
||||
log_info("* review page replies: {}".format(str(res.status_code)),
|
||||
|
@ -396,14 +411,14 @@ def update_reviews(tar, date, ext_id):
|
|||
return RequestResult(res)
|
||||
|
||||
|
||||
def update_support(tar, date, ext_id):
|
||||
def update_support(tar, date, proxy, ext_id):
|
||||
res = None
|
||||
try:
|
||||
pages = []
|
||||
|
||||
google_dos_protection()
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_support_url(),
|
||||
proxy,
|
||||
data=const_support_payload(ext_id, "0", "100"),
|
||||
timeout=10)
|
||||
log_info("* support page 0-100: {}".format(str(res.status_code)), 2,
|
||||
|
@ -411,9 +426,9 @@ def update_support(tar, date, ext_id):
|
|||
store_request_text(tar, date, 'support000-099.text', res)
|
||||
pages += [res.text]
|
||||
|
||||
google_dos_protection()
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_support_url(),
|
||||
proxy,
|
||||
data=const_support_payload(ext_id, "100", "100"),
|
||||
timeout=10)
|
||||
log_info("* support page 100-200: {}".format(str(res.status_code)), 2,
|
||||
|
@ -421,13 +436,13 @@ def update_support(tar, date, ext_id):
|
|||
store_request_text(tar, date, 'support100-199.text', res)
|
||||
pages += [res.text]
|
||||
|
||||
google_dos_protection()
|
||||
# Always start with reply number 0 and request 10 replies
|
||||
ext_id_author_tups = [(ext_id, author, 0, 10, groups)
|
||||
for author, groups in iterate_authors(pages)]
|
||||
for author, groups in iterate_authors(ext_id, pages)]
|
||||
if ext_id_author_tups:
|
||||
res = requests.post(
|
||||
res = restricted_post(
|
||||
const_review_search_url(),
|
||||
proxy,
|
||||
data=const_review_search_payload(ext_id_author_tups),
|
||||
timeout=10)
|
||||
log_info("* support page replies: {}".format(str(res.status_code)),
|
||||
|
@ -440,7 +455,9 @@ def update_support(tar, date, ext_id):
|
|||
return RequestResult(res)
|
||||
|
||||
|
||||
def update_extension(archivedir, forums, ext_id):
|
||||
def update_extension(archivedir, forums_ext_ids, proxy, ext_id):
|
||||
forums = ext_id in forums_ext_ids
|
||||
|
||||
log_info("Updating extension {}".format(" (including forums)"
|
||||
if forums else ""), 1, ext_id)
|
||||
is_new = False
|
||||
|
@ -472,12 +489,12 @@ def update_extension(archivedir, forums, ext_id):
|
|||
res_reviews = None
|
||||
res_support = None
|
||||
if forums:
|
||||
res_reviews = update_reviews(tmptardir, date, ext_id)
|
||||
res_reviews = update_reviews(tmptardir, date, proxy, ext_id)
|
||||
|
||||
res_crx = update_crx(archivedir, tmptardir, ext_id, date)
|
||||
|
||||
if forums:
|
||||
res_support = update_support(tmptardir, date, ext_id)
|
||||
res_support = update_support(tmptardir, date, proxy, ext_id)
|
||||
|
||||
backup = False
|
||||
if backup:
|
||||
|
@ -548,8 +565,11 @@ def update_extension(archivedir, forums, ext_id):
|
|||
return UpdateResult(ext_id, is_new, tar_exception, res_overview, res_crx,
|
||||
res_reviews, res_support, sql_exception, sql_success)
|
||||
|
||||
def init_child(lock_):
|
||||
global lock
|
||||
lock = lock_
|
||||
|
||||
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
|
||||
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, proxy):
|
||||
ext_with_forums = []
|
||||
ext_without_forums = []
|
||||
ext_ids = (list(set(ext_ids) - set(forums_ext_ids)))
|
||||
|
@ -557,23 +577,33 @@ def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids):
|
|||
log_info("Updating {} extensions ({} including forums)".format(
|
||||
len(ext_ids), len(forums_ext_ids)))
|
||||
|
||||
# First, update all extensions without forums in parallel (increased speed).
|
||||
# parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
|
||||
parallel_ids = ext_ids
|
||||
log_info("Updating {} extensions excluding forums (parallel)".format(
|
||||
len(parallel_ids)), 1)
|
||||
with Pool(parallel) as p:
|
||||
if proxy is None:
|
||||
# First, update all extensions without forums in parallel (increased speed).
|
||||
# parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
|
||||
parallel_ids = ext_ids
|
||||
log_info("Updating {} extensions excluding forums (parallel)".format(
|
||||
len(parallel_ids)), 1)
|
||||
else:
|
||||
parallel_ids = ext_ids + forums_ext_ids
|
||||
random.shuffle(parallel_ids)
|
||||
log_info("Updating {} extensions including forums (parallel)".format(
|
||||
len(parallel_ids)), 1)
|
||||
|
||||
with Pool(parallel, initializer=init_child, initargs=(Lock(),)) as p:
|
||||
ext_without_forums = list(
|
||||
p.map(partial(update_extension, archivedir, False), parallel_ids))
|
||||
p.map(partial(update_extension, archivedir, set(forums_ext_ids), proxy), parallel_ids))
|
||||
|
||||
|
||||
# Second, update extensions with forums sequentially (and with delays) to
|
||||
# avoid running into Googles DDOS detection.
|
||||
log_info("Updating {} extensions including forums (sequentially)".format(
|
||||
len(forums_ext_ids)), 1)
|
||||
if proxy is None:
|
||||
log_info("Updating {} extensions including forums (sequentially)".format(
|
||||
len(forums_ext_ids)), 1)
|
||||
|
||||
ext_with_forums = list(
|
||||
map(partial(update_extension, archivedir, True), forums_ext_ids))
|
||||
ext_with_forums = list(
|
||||
map(partial(update_extension, archivedir, set(forums_ext_ids), proxy), forums_ext_ids))
|
||||
else:
|
||||
log_info("Skipping sequential forum download")
|
||||
|
||||
|
||||
return ext_with_forums + ext_without_forums
|
||||
|
|
|
@ -42,14 +42,15 @@ class MysqlBackend:
|
|||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
start = time.time()
|
||||
self.retry(self._commit_cache)
|
||||
self.db.commit()
|
||||
log_info(
|
||||
"* Database batch insert finished after {}".format(
|
||||
datetime.timedelta(seconds=int(time.time() - start))),
|
||||
2,
|
||||
self.ext_id)
|
||||
if self.cache is not {}:
|
||||
start = time.time()
|
||||
self.retry(self._commit_cache)
|
||||
self.db.commit()
|
||||
log_info(
|
||||
"* Database batch insert finished after {}".format(
|
||||
datetime.timedelta(seconds=int(time.time() - start))),
|
||||
2,
|
||||
self.ext_id)
|
||||
self._close_conn()
|
||||
|
||||
def _commit_cache(self):
|
||||
|
|
|
@ -26,7 +26,7 @@ import logging
|
|||
def google_dos_protection(maxrange=0.3):
|
||||
"""Wait a random number of seconds (between 0.5 to 0.5+maxrange)
|
||||
to avoid Google's bot detection"""
|
||||
sleep(0.5+(random()*maxrange))
|
||||
sleep(1+(random()*maxrange))
|
||||
|
||||
def value_of(value, default):
|
||||
"""Get value or default value if None."""
|
||||
|
|
19
crawler
19
crawler
|
@ -142,9 +142,10 @@ def helpmsg():
|
|||
print(" -d discover new extensions")
|
||||
print(" --max-discover <N> discover at most N new extensions")
|
||||
print(" -a=<DIR> archive directory")
|
||||
print(" --proxy HOST[:PORT] the proxy to use when downloading posts parallely")
|
||||
|
||||
|
||||
def print_config(basedir, archive_dir, conf_dir, discover, parallel):
|
||||
def print_config(basedir, archive_dir, conf_dir, discover, parallel, proxy):
|
||||
"""Print current configuration."""
|
||||
log_info("Configuration:")
|
||||
log_info(" Base dir: {}".format(basedir))
|
||||
|
@ -152,6 +153,7 @@ def print_config(basedir, archive_dir, conf_dir, discover, parallel):
|
|||
log_info(" Configuration directory: {}".format(conf_dir))
|
||||
log_info(" Discover new extensions: {}".format(discover))
|
||||
log_info(" Max num. of concurrent downloads: {}".format(parallel))
|
||||
log_info(" Proxy for forum download: {}".format(proxy))
|
||||
|
||||
|
||||
def parse_args(argv):
|
||||
|
@ -161,9 +163,10 @@ def parse_args(argv):
|
|||
verbose = const_verbose()
|
||||
discover = const_discover()
|
||||
max_discover = None
|
||||
proxy = None
|
||||
try:
|
||||
opts, _ = getopt.getopt(argv, "hsda:p:",
|
||||
["archive=", 'parallel=', 'max-discover='])
|
||||
["archive=", 'parallel=', 'max-discover=', 'proxy='])
|
||||
except getopt.GetoptError:
|
||||
helpmsg()
|
||||
sys.exit(2)
|
||||
|
@ -182,14 +185,16 @@ def parse_args(argv):
|
|||
elif opt == '--max-discover':
|
||||
discover = True
|
||||
max_discover = int(arg)
|
||||
return basedir, parallel, verbose, discover, max_discover
|
||||
elif opt == '--proxy':
|
||||
proxy = arg
|
||||
return basedir, parallel, verbose, discover, max_discover, proxy
|
||||
|
||||
|
||||
def main(argv):
|
||||
"""Main function of the extension crawler."""
|
||||
|
||||
today = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
||||
basedir, parallel, verbose, discover, max_discover = parse_args(argv)
|
||||
basedir, parallel, verbose, discover, max_discover, proxy = parse_args(argv)
|
||||
|
||||
if verbose:
|
||||
loglevel = logging.INFO
|
||||
|
@ -216,7 +221,7 @@ def main(argv):
|
|||
|
||||
start_time = time.time()
|
||||
|
||||
print_config(basedir, archive_dir, conf_dir, discover, parallel)
|
||||
print_config(basedir, archive_dir, conf_dir, discover, parallel, proxy)
|
||||
|
||||
forum_ext_ids = get_forum_ext_ids(conf_dir)
|
||||
known_ids = list(set(get_existing_ids(archive_dir)) | set(forum_ext_ids))
|
||||
|
@ -235,7 +240,7 @@ def main(argv):
|
|||
discovered_ids = None
|
||||
known_ids = None
|
||||
|
||||
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids)
|
||||
res = update_extensions(archive_dir, parallel, forum_ext_ids, ext_ids, proxy)
|
||||
|
||||
# We re-try (once) the extensions with unknown exceptions, as
|
||||
# they are often temporary
|
||||
|
@ -250,7 +255,7 @@ def main(argv):
|
|||
ext_ids_except = sorted(
|
||||
list(set(has_exception_ids) - set(forum_ext_ids_except)))
|
||||
res_update = update_extensions(archive_dir, parallel,
|
||||
forum_ext_ids_except, ext_ids_except)
|
||||
forum_ext_ids_except, ext_ids_except, proxy)
|
||||
res = list(set(res) - set(has_exception)) + res_update
|
||||
|
||||
end_time = time.time()
|
||||
|
|
Loading…
Reference in New Issue