Use a lock to mix forum downloads into the parallel mode.

This commit is contained in:
Michael Herzberg 2018-04-21 13:59:33 +01:00
parent aee916a629
commit dbeba9e9bf
2 changed files with 109 additions and 94 deletions

View File

@ -24,7 +24,7 @@ import glob
import re import re
import json import json
import gc import gc
from multiprocessing import Pool import random
from concurrent.futures import TimeoutError from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired from pebble import ProcessPool, ProcessExpired
from functools import partial from functools import partial
@ -42,8 +42,9 @@ from ExtensionCrawler.config import (
const_review_payload, const_review_search_url, const_download_url, const_review_payload, const_review_search_url, const_download_url,
get_local_archive_dir, const_overview_url, const_support_url, get_local_archive_dir, const_overview_url, const_support_url,
const_support_payload, const_review_search_payload, const_review_url) const_support_payload, const_review_search_payload, const_review_url)
from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_warning, log_exception, setup_logger from ExtensionCrawler.util import value_of, log_info, log_warning, log_exception, setup_logger
from ExtensionCrawler.db import update_db_incremental from ExtensionCrawler.db import update_db_incremental
from ExtensionCrawler.request_manager import RequestManager
class Error(Exception): class Error(Exception):
pass pass
@ -271,6 +272,7 @@ def all_crx(archivedir, extid, date=None):
def update_overview(tar, date, ext_id): def update_overview(tar, date, ext_id):
res = None res = None
try: try:
with request_manager.normal_request():
res = requests.get(const_overview_url(ext_id), timeout=10) res = requests.get(const_overview_url(ext_id), timeout=10)
log_info("* overview page: {}".format(str(res.status_code)), 2, ext_id) log_info("* overview page: {}".format(str(res.status_code)), 2, ext_id)
store_request_text(tar, date, 'overview.html', res) store_request_text(tar, date, 'overview.html', res)
@ -309,6 +311,7 @@ def update_crx(archivedir, tmptardir, ext_id, date):
headers = {'If-Modified-Since': last_crx_http_date} headers = {'If-Modified-Since': last_crx_http_date}
try: try:
log_info("* Checking If-Modified-Since", 2, ext_id) log_info("* Checking If-Modified-Since", 2, ext_id)
with request_manager.normal_request():
res = requests.get( res = requests.get(
const_download_url().format(ext_id), const_download_url().format(ext_id),
stream=True, stream=True,
@ -322,6 +325,7 @@ def update_crx(archivedir, tmptardir, ext_id, date):
extfilename = "default.crx" extfilename = "default.crx"
if res.status_code == 304: if res.status_code == 304:
with request_manager.normal_request():
etag = requests.head( etag = requests.head(
const_download_url().format(ext_id), const_download_url().format(ext_id),
timeout=10, timeout=10,
@ -334,6 +338,7 @@ def update_crx(archivedir, tmptardir, ext_id, date):
if (etag is not "") and (etag != last_crx_etag): if (etag is not "") and (etag != last_crx_etag):
log_info("- downloading due to different etags", 3, ext_id) log_info("- downloading due to different etags", 3, ext_id)
with request_manager.normal_request():
res = requests.get( res = requests.get(
const_download_url().format(ext_id), const_download_url().format(ext_id),
stream=True, stream=True,
@ -378,7 +383,7 @@ def update_reviews(tar, date, ext_id):
try: try:
pages = [] pages = []
# google_dos_protection() with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_review_url(), const_review_url(),
data=const_review_payload(ext_id, "0", "100"), data=const_review_payload(ext_id, "0", "100"),
@ -388,7 +393,7 @@ def update_reviews(tar, date, ext_id):
store_request_text(tar, date, 'reviews000-099.text', res) store_request_text(tar, date, 'reviews000-099.text', res)
pages += [res.text] pages += [res.text]
google_dos_protection() with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_review_url(), const_review_url(),
data=const_review_payload(ext_id, "100", "100"), data=const_review_payload(ext_id, "100", "100"),
@ -398,11 +403,11 @@ def update_reviews(tar, date, ext_id):
store_request_text(tar, date, 'reviews100-199.text', res) store_request_text(tar, date, 'reviews100-199.text', res)
pages += [res.text] pages += [res.text]
google_dos_protection()
# Always start with reply number 0 and request 10 replies # Always start with reply number 0 and request 10 replies
ext_id_author_tups = [(ext_id, author, 0, 10, groups) ext_id_author_tups = [(ext_id, author, 0, 10, groups)
for author, groups in iterate_authors(pages)] for author, groups in iterate_authors(pages)]
if ext_id_author_tups: if ext_id_author_tups:
with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_review_search_url(), const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups), data=const_review_search_payload(ext_id_author_tups),
@ -422,7 +427,7 @@ def update_support(tar, date, ext_id):
try: try:
pages = [] pages = []
google_dos_protection() with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_support_url(), const_support_url(),
data=const_support_payload(ext_id, "0", "100"), data=const_support_payload(ext_id, "0", "100"),
@ -432,7 +437,7 @@ def update_support(tar, date, ext_id):
store_request_text(tar, date, 'support000-099.text', res) store_request_text(tar, date, 'support000-099.text', res)
pages += [res.text] pages += [res.text]
google_dos_protection() with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_support_url(), const_support_url(),
data=const_support_payload(ext_id, "100", "100"), data=const_support_payload(ext_id, "100", "100"),
@ -442,11 +447,11 @@ def update_support(tar, date, ext_id):
store_request_text(tar, date, 'support100-199.text', res) store_request_text(tar, date, 'support100-199.text', res)
pages += [res.text] pages += [res.text]
google_dos_protection()
# Always start with reply number 0 and request 10 replies # Always start with reply number 0 and request 10 replies
ext_id_author_tups = [(ext_id, author, 0, 10, groups) ext_id_author_tups = [(ext_id, author, 0, 10, groups)
for author, groups in iterate_authors(pages)] for author, groups in iterate_authors(pages)]
if ext_id_author_tups: if ext_id_author_tups:
with request_manager.restricted_request():
res = requests.post( res = requests.post(
const_review_search_url(), const_review_search_url(),
data=const_review_search_payload(ext_id_author_tups), data=const_review_search_payload(ext_id_author_tups),
@ -461,7 +466,8 @@ def update_support(tar, date, ext_id):
return RequestResult(res) return RequestResult(res)
def update_extension(archivedir, forums, ext_id): def update_extension(archivedir, tup):
ext_id, forums = tup
log_info("Updating extension {}".format(" (including forums)" log_info("Updating extension {}".format(" (including forums)"
if forums else ""), 1, ext_id) if forums else ""), 1, ext_id)
is_new = False is_new = False
@ -570,7 +576,7 @@ def update_extension(archivedir, forums, ext_id):
res_reviews, res_support, sql_exception, sql_success) res_reviews, res_support, sql_exception, sql_success)
def init_process(verbose, start_pystuck=False): def init_process(verbose, start_pystuck, rm):
# When not using fork, we need to setup logging again in the worker threads # When not using fork, we need to setup logging again in the worker threads
setup_logger(verbose) setup_logger(verbose)
@ -578,12 +584,24 @@ def init_process(verbose, start_pystuck=False):
import pystuck import pystuck
pystuck.run_server(port=((os.getpid() % 10000) + 10001)) pystuck.run_server(port=((os.getpid() % 10000) + 10001))
global request_manager
request_manager = rm
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, use_process_pool, verbose, start_pystuck):
ext_with_forums = list(set(forums_ext_ids))
ext_without_forums = list(set(ext_ids) - set(forums_ext_ids))
tups = [(extid, True) for extid in ext_with_forums] + [(extid, False) for extid in ext_without_forums]
random.shuffle(tups)
log_info("Updating {} extensions ({} including forums, {} excluding forums)".format(
len(tups), len(ext_with_forums), len(ext_without_forums)))
def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck):
results=[] results=[]
with ProcessPool(max_workers=max_workers, max_tasks=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool: with ProcessPool(max_workers=parallel, max_tasks=100, initializer=init_process, initargs=(verbose, start_pystuck, RequestManager(parallel))) as pool:
future = pool.map(partial(update_extension, archivedir, forums), future = pool.map(partial(update_extension, archivedir),
ext_ids, tups,
chunksize=1, chunksize=1,
timeout=timeout) timeout=timeout)
iterator = future.result() iterator = future.result()
@ -606,46 +624,6 @@ def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ex
return results return results
def execute_parallel_Pool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck):
log_info("Using multiprocessing.Pool: timeout and max_try are *not* supported")
with Pool(processes=max_workers, maxtasksperchild=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool:
# The default chunksize is None, which means that each process will only
# ever get one task with chunksize len(ext_ids)/max_workers. This would
# render maxtasksperchild useless.
results = pool.map(partial(update_extension, archivedir, forums),
ext_ids,
chunksize=1)
return list(results)
def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, use_process_pool, verbose, start_pystuck):
ext_with_forums = []
ext_without_forums = []
forums_ext_ids = (list(set(forums_ext_ids)))
if use_process_pool:
execute_parallel=execute_parallel_ProcessPool
else:
execute_parallel=execute_parallel_Pool
log_info("Updating {} extensions ({} including forums)".format(
len(ext_ids), len(forums_ext_ids)))
# First, update all extensions without forums in parallel (increased speed).
# parallel_ids = list(set(ext_ids) - set(forums_ext_ids))
parallel_ids = ext_ids
log_info("Updating {} extensions excluding forums (parallel)".format(
len(parallel_ids)), 1)
ext_without_forums = execute_parallel(archivedir, 3, timeout, parallel, parallel_ids, False, verbose, start_pystuck)
# Second, update extensions with forums sequentially (and with delays) to
# avoid running into Googles DDOS detection.
log_info("Updating {} extensions including forums (sequentially)".format(
len(forums_ext_ids)), 1)
ext_with_forums = execute_parallel(archivedir, 3, timeout, 1, forums_ext_ids, True, verbose, start_pystuck)
return ext_with_forums + ext_without_forums
def get_existing_ids(archivedir): def get_existing_ids(archivedir):
byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]' byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]'
word = byte + byte + byte + byte word = byte + byte + byte + byte

View File

@ -0,0 +1,37 @@
import time
import random
from contextlib import contextmanager
from multiprocessing import Lock, BoundedSemaphore, Value
from ExtensionCrawler.util import google_dos_protection
from ExtensionCrawler.util import log_info
class RequestManager:
def __init__(self, max_workers):
self.max_workers = max_workers
self.lock = Lock()
self.sem = BoundedSemaphore(max_workers)
self.last_request = Value('d', 0.0)
self.last_restricted_request = Value('d', 0.0)
@contextmanager
def normal_request(self):
with self.lock:
self.sem.acquire()
time.sleep(max(0.0, self.last_restricted_request.value + 0.7 + (random.random() * 0.15) - time.time()))
yield None
self.last_request.value = time.time()
self.sem.release()
@contextmanager
def restricted_request(self):
with self.lock:
for i in range(self.max_workers):
self.sem.acquire()
time.sleep(max(0.0, self.last_request.value + 0.7 + (random.random() * 0.15) - time.time()))
yield None
self.last_request.value = time.time()
self.last_restricted_request.value = time.time()
for i in range(self.max_workers):
self.sem.release()