#!/usr/bin/env python3.7 # # Copyright (C) 2019 The University of Sheffield, UK # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later import argparse import io import logging import re import json import sys import importlib.util import csv import math import ast from zipfile import ZipFile from ExtensionCrawler.config import (const_log_format, const_basedir) from ExtensionCrawler.archive import iter_tar_entries_by_date from ExtensionCrawler.js_mincer import mince_js def get_shannon_entropy(string): """ This code has been borrowed from "http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html" and "git@github.com:dxa4481/truffleHog.git" """ chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" if not string: return 0 entropy = 0 for x in chars: p_x = float(string.count(x))/len(string) if p_x > 0: entropy += - p_x*math.log(p_x, 2) return entropy def is_likely_hash(string): return get_shannon_entropy(string) > 2.0 and len([c for c in string if c.isdigit()]) > 4 def import_regexs(path): spec = importlib.util.spec_from_file_location("MinerStrings", path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def get_etag(headers_content): d = ast.literal_eval(headers_content) if "ETag" in d: return d["ETag"] def get_name_and_version(overview_contents): # Extract extension name match = re.search("""""", overview_contents) name = match.group(1) if match else None # Extract extension version match = re.search( """""", overview_contents) version = match.group(1) if match else None return name, version def first_match_in_locations(search_tag, pattern, locations): for location_tag, lines in locations: for line in lines: m = re.search(pattern, line) if m: matched_string = m.group() if search_tag is not "MINING_KEYS_REGEX" or is_likely_hash(matched_string): return [[location_tag, search_tag, matched_string]] return [] def handle_extid(conf, extid, csvwriter): miner_strings = import_regexs(conf.REGEXP_FILE).MinerStrings() results = [] still_in_store = None crx_etags = [None] for date, tups in iter_tar_entries_by_date(conf.archive_dir, extid): if conf.from_date and not (conf.from_date <= date): continue if conf.latest_date and not (date <= conf.latest_date): continue crx_etag = None name = None version = None date_matches = [] for tarentry, tarfile in tups: tarentry_filename = tarentry.name.split("/")[-1] if tarentry_filename.endswith(".crx.headers"): crx_etag = get_etag(tarfile.read().decode()) if crx_etag: crx_etags += [crx_etag] if tarentry_filename == "overview.html": name, version = get_name_and_version(tarfile.read().decode()) if tarentry_filename == "overview.html.status": still_in_store = tarfile.read().decode().startswith("2") if tarentry_filename.endswith(".crx") and tarentry.size > 0: with ZipFile(tarfile) as zf: for zipentry in zf.infolist(): file_matches = [] if zipentry.filename.endswith(".js") or zipentry.filename.endswith(".html"): with zf.open(zipentry) as f: verbatim_lines = [] joined_string_lines = [] for block in mince_js(io.TextIOWrapper(f, encoding="utf-8", errors="surrogateescape")): verbatim_lines += block.content.splitlines() joined_string_lines += "".join(map(lambda x: x[1], block.string_literals)).splitlines() for search_tag in miner_strings.strings.keys(): for search_string in miner_strings.strings[search_tag]: for match in first_match_in_locations(search_tag, re.escape(search_string), [("verbatim", verbatim_lines), ("joined_string", joined_string_lines)]): file_matches.append(match) for search_tag in miner_strings.patterns.keys(): for search_pattern in miner_strings.patterns[search_tag]: for match in first_match_in_locations(search_tag, search_pattern, [("verbatim", verbatim_lines), ("joined_string", joined_string_lines)]): file_matches.append(match) for match in file_matches: date_matches.append([zipentry.filename] + match) for match in date_matches: results += [[date, crx_etag, name, version] + match] for result in results: csvwriter.writerow([str(x) for x in ([extid, still_in_store, crx_etags[-1]] + result)]) def main(conf): logger = logging.getLogger() ch = logging.StreamHandler(sys.stderr) ch.setFormatter(logging.Formatter(const_log_format())) logger.addHandler(ch) if conf.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.WARNING) with open(conf.EXTID_FILE) as f: csvwriter = csv.writer(sys.stdout, csv.unix_dialect) csvwriter.writerow(["extid", "still_in_store", "most_recent_crx_etag", "date", "crx_etag", "name", "version", "path", "position", "tag", "match"]) for extid in [l.strip() for l in f.readlines()]: handle_extid(conf, extid, csvwriter) def build_parser(): main_parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, description='Grep for extensions.') main_parser.add_argument( 'REGEXP_FILE', help='python file with regular expressions') main_parser.add_argument( 'EXTID_FILE', help='file with extension ids') main_parser.add_argument( '-v', '--verbose', action='store_true', default=False, help='increase verbosity') main_parser.add_argument( '-D', '--latest-date', metavar='DATE', type=str, help='select latest crx from tar, released before DATE.\n' + 'Together with --from-date, specifies all crx released in specified\n' + 'date range.') main_parser.add_argument( '-d', '--from-date', metavar='DATE', type=str, help='select oldest crx from tar released after DATE.\n' + 'Together with --latest-date, specifies all crx released in specified\n' + 'date range.') main_parser.add_argument( '-a', '--archive-dir', metavar='archive', type=str, default=const_basedir(), help='archive directory') return main_parser if __name__ == "__main__": main_parser = build_parser() main_conf = main_parser.parse_args() sys.exit(main(main_conf))