diff --git a/ExtensionCrawler/archive.py b/ExtensionCrawler/archive.py index b760565..4c1e1da 100644 --- a/ExtensionCrawler/archive.py +++ b/ExtensionCrawler/archive.py @@ -24,7 +24,7 @@ import glob import re import json import gc -from multiprocessing import Pool +import random from concurrent.futures import TimeoutError from pebble import ProcessPool, ProcessExpired from functools import partial @@ -42,8 +42,9 @@ from ExtensionCrawler.config import ( const_review_payload, const_review_search_url, const_download_url, get_local_archive_dir, const_overview_url, const_support_url, const_support_payload, const_review_search_payload, const_review_url) -from ExtensionCrawler.util import google_dos_protection, value_of, log_info, log_warning, log_exception, setup_logger +from ExtensionCrawler.util import value_of, log_info, log_warning, log_exception, setup_logger from ExtensionCrawler.db import update_db_incremental +from ExtensionCrawler.request_manager import RequestManager class Error(Exception): pass @@ -271,7 +272,8 @@ def all_crx(archivedir, extid, date=None): def update_overview(tar, date, ext_id): res = None try: - res = requests.get(const_overview_url(ext_id), timeout=10) + with request_manager.normal_request(): + res = requests.get(const_overview_url(ext_id), timeout=10) log_info("* overview page: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'overview.html', res) except Exception as e: @@ -309,11 +311,12 @@ def update_crx(archivedir, tmptardir, ext_id, date): headers = {'If-Modified-Since': last_crx_http_date} try: log_info("* Checking If-Modified-Since", 2, ext_id) - res = requests.get( - const_download_url().format(ext_id), - stream=True, - headers=headers, - timeout=10) + with request_manager.normal_request(): + res = requests.get( + const_download_url().format(ext_id), + stream=True, + headers=headers, + timeout=10) log_info("* crx archive (Last: {}): {}".format( value_of(last_crx_http_date, "n/a"), str(res.status_code)), 2, ext_id) @@ -322,10 +325,11 @@ def update_crx(archivedir, tmptardir, ext_id, date): extfilename = "default.crx" if res.status_code == 304: - etag = requests.head( - const_download_url().format(ext_id), - timeout=10, - allow_redirects=True).headers.get('ETag') + with request_manager.normal_request(): + etag = requests.head( + const_download_url().format(ext_id), + timeout=10, + allow_redirects=True).headers.get('ETag') write_text(tmptardir, date, extfilename + ".etag", etag) log_info("- checking etag, last: {}".format(last_crx_etag), 3, ext_id) @@ -334,10 +338,11 @@ def update_crx(archivedir, tmptardir, ext_id, date): if (etag is not "") and (etag != last_crx_etag): log_info("- downloading due to different etags", 3, ext_id) - res = requests.get( - const_download_url().format(ext_id), - stream=True, - timeout=10) + with request_manager.normal_request(): + res = requests.get( + const_download_url().format(ext_id), + stream=True, + timeout=10) else: write_text(tmptardir, date, extfilename + ".link", os.path.join("..", @@ -378,35 +383,35 @@ def update_reviews(tar, date, ext_id): try: pages = [] - # google_dos_protection() - res = requests.post( - const_review_url(), - data=const_review_payload(ext_id, "0", "100"), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_review_url(), + data=const_review_payload(ext_id, "0", "100"), + timeout=10) log_info("* review page 0-100: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'reviews000-099.text', res) pages += [res.text] - google_dos_protection() - res = requests.post( - const_review_url(), - data=const_review_payload(ext_id, "100", "100"), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_review_url(), + data=const_review_payload(ext_id, "100", "100"), + timeout=10) log_info("* review page 100-200: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'reviews100-199.text', res) pages += [res.text] - google_dos_protection() # Always start with reply number 0 and request 10 replies ext_id_author_tups = [(ext_id, author, 0, 10, groups) for author, groups in iterate_authors(pages)] if ext_id_author_tups: - res = requests.post( - const_review_search_url(), - data=const_review_search_payload(ext_id_author_tups), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_review_search_url(), + data=const_review_search_payload(ext_id_author_tups), + timeout=10) log_info("* review page replies: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'reviewsreplies.text', res) @@ -422,35 +427,35 @@ def update_support(tar, date, ext_id): try: pages = [] - google_dos_protection() - res = requests.post( - const_support_url(), - data=const_support_payload(ext_id, "0", "100"), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_support_url(), + data=const_support_payload(ext_id, "0", "100"), + timeout=10) log_info("* support page 0-100: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'support000-099.text', res) pages += [res.text] - google_dos_protection() - res = requests.post( - const_support_url(), - data=const_support_payload(ext_id, "100", "100"), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_support_url(), + data=const_support_payload(ext_id, "100", "100"), + timeout=10) log_info("* support page 100-200: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'support100-199.text', res) pages += [res.text] - google_dos_protection() # Always start with reply number 0 and request 10 replies ext_id_author_tups = [(ext_id, author, 0, 10, groups) for author, groups in iterate_authors(pages)] if ext_id_author_tups: - res = requests.post( - const_review_search_url(), - data=const_review_search_payload(ext_id_author_tups), - timeout=10) + with request_manager.restricted_request(): + res = requests.post( + const_review_search_url(), + data=const_review_search_payload(ext_id_author_tups), + timeout=10) log_info("* support page replies: {}".format(str(res.status_code)), 2, ext_id) store_request_text(tar, date, 'supportreplies.text', res) @@ -461,7 +466,8 @@ def update_support(tar, date, ext_id): return RequestResult(res) -def update_extension(archivedir, forums, ext_id): +def update_extension(archivedir, tup): + ext_id, forums = tup log_info("Updating extension {}".format(" (including forums)" if forums else ""), 1, ext_id) is_new = False @@ -570,7 +576,7 @@ def update_extension(archivedir, forums, ext_id): res_reviews, res_support, sql_exception, sql_success) -def init_process(verbose, start_pystuck=False): +def init_process(verbose, start_pystuck, rm): # When not using fork, we need to setup logging again in the worker threads setup_logger(verbose) @@ -578,14 +584,26 @@ def init_process(verbose, start_pystuck=False): import pystuck pystuck.run_server(port=((os.getpid() % 10000) + 10001)) + global request_manager + request_manager = rm + + +def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, use_process_pool, verbose, start_pystuck): + ext_with_forums = list(set(forums_ext_ids)) + ext_without_forums = list(set(ext_ids) - set(forums_ext_ids)) + + tups = [(extid, True) for extid in ext_with_forums] + [(extid, False) for extid in ext_without_forums] + random.shuffle(tups) + + log_info("Updating {} extensions ({} including forums, {} excluding forums)".format( + len(tups), len(ext_with_forums), len(ext_without_forums))) -def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck): results=[] - with ProcessPool(max_workers=max_workers, max_tasks=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool: - future = pool.map(partial(update_extension, archivedir, forums), - ext_ids, - chunksize=1, - timeout=timeout) + with ProcessPool(max_workers=parallel, max_tasks=100, initializer=init_process, initargs=(verbose, start_pystuck, RequestManager(parallel))) as pool: + future = pool.map(partial(update_extension, archivedir), + tups, + chunksize=1, + timeout=timeout) iterator = future.result() for ext_id in ext_ids: try: @@ -606,46 +624,6 @@ def execute_parallel_ProcessPool(archivedir, max_retry, timeout, max_workers, ex return results -def execute_parallel_Pool(archivedir, max_retry, timeout, max_workers, ext_ids, forums, verbose, start_pystuck): - log_info("Using multiprocessing.Pool: timeout and max_try are *not* supported") - with Pool(processes=max_workers, maxtasksperchild=100, initializer=init_process, initargs=(verbose, start_pystuck)) as pool: - # The default chunksize is None, which means that each process will only - # ever get one task with chunksize len(ext_ids)/max_workers. This would - # render maxtasksperchild useless. - results = pool.map(partial(update_extension, archivedir, forums), - ext_ids, - chunksize=1) - return list(results) - - -def update_extensions(archivedir, parallel, forums_ext_ids, ext_ids, timeout, use_process_pool, verbose, start_pystuck): - ext_with_forums = [] - ext_without_forums = [] - forums_ext_ids = (list(set(forums_ext_ids))) - if use_process_pool: - execute_parallel=execute_parallel_ProcessPool - else: - execute_parallel=execute_parallel_Pool - - log_info("Updating {} extensions ({} including forums)".format( - len(ext_ids), len(forums_ext_ids))) - - # First, update all extensions without forums in parallel (increased speed). - # parallel_ids = list(set(ext_ids) - set(forums_ext_ids)) - parallel_ids = ext_ids - log_info("Updating {} extensions excluding forums (parallel)".format( - len(parallel_ids)), 1) - ext_without_forums = execute_parallel(archivedir, 3, timeout, parallel, parallel_ids, False, verbose, start_pystuck) - - # Second, update extensions with forums sequentially (and with delays) to - # avoid running into Googles DDOS detection. - log_info("Updating {} extensions including forums (sequentially)".format( - len(forums_ext_ids)), 1) - ext_with_forums = execute_parallel(archivedir, 3, timeout, 1, forums_ext_ids, True, verbose, start_pystuck) - - return ext_with_forums + ext_without_forums - - def get_existing_ids(archivedir): byte = '[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z]' word = byte + byte + byte + byte diff --git a/ExtensionCrawler/request_manager.py b/ExtensionCrawler/request_manager.py new file mode 100644 index 0000000..790dac4 --- /dev/null +++ b/ExtensionCrawler/request_manager.py @@ -0,0 +1,37 @@ +import time +import random +from contextlib import contextmanager +from multiprocessing import Lock, BoundedSemaphore, Value + +from ExtensionCrawler.util import google_dos_protection +from ExtensionCrawler.util import log_info + + +class RequestManager: + def __init__(self, max_workers): + self.max_workers = max_workers + self.lock = Lock() + self.sem = BoundedSemaphore(max_workers) + self.last_request = Value('d', 0.0) + self.last_restricted_request = Value('d', 0.0) + + @contextmanager + def normal_request(self): + with self.lock: + self.sem.acquire() + time.sleep(max(0.0, self.last_restricted_request.value + 0.7 + (random.random() * 0.15) - time.time())) + yield None + self.last_request.value = time.time() + self.sem.release() + + @contextmanager + def restricted_request(self): + with self.lock: + for i in range(self.max_workers): + self.sem.acquire() + time.sleep(max(0.0, self.last_request.value + 0.7 + (random.random() * 0.15) - time.time())) + yield None + self.last_request.value = time.time() + self.last_restricted_request.value = time.time() + for i in range(self.max_workers): + self.sem.release()