ExtensionCrawler/extgrep

234 lines
8.2 KiB
Python
Executable File

#!/usr/bin/env python3.7
#
# Copyright (C) 2019 The University of Sheffield, UK
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import argparse
import io
import logging
import re
import json
import sys
import importlib.util
import csv
import math
import ast
from zipfile import ZipFile
from ExtensionCrawler.config import (const_log_format, const_basedir)
from ExtensionCrawler.archive import iter_tar_entries_by_date
from ExtensionCrawler.js_mincer import mince_js
def get_shannon_entropy(string):
"""
This code has been borrowed from
"http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html" and
"git@github.com:dxa4481/truffleHog.git"
"""
chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
if not string:
return 0
entropy = 0
for x in chars:
p_x = float(string.count(x))/len(string)
if p_x > 0:
entropy += - p_x*math.log(p_x, 2)
return entropy
def is_likely_hash(string):
return get_shannon_entropy(string) > 2.0 and len([c for c in string if c.isdigit()]) > 4
def import_regexs(path):
spec = importlib.util.spec_from_file_location("MinerStrings", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def get_etag(headers_content):
d = ast.literal_eval(headers_content)
if "ETag" in d:
return d["ETag"]
def get_name_and_version(overview_contents):
# Extract extension name
match = re.search("""<meta itemprop="name" content="(.*?)"\s*/>""",
overview_contents)
name = match.group(1) if match else None
# Extract extension version
match = re.search(
"""<meta itemprop="version" content="(.*?)"\s*/>""", overview_contents)
version = match.group(1) if match else None
return name, version
def first_match_in_locations(search_tag, pattern, locations):
for location_tag, lines in locations:
for line in lines:
m = re.search(pattern, line)
if m:
matched_string = m.group()
if search_tag is not "MINING_KEYS_REGEX" or is_likely_hash(matched_string):
return [[location_tag, search_tag, matched_string]]
return []
def handle_extid(conf, extid, csvwriter):
miner_strings = import_regexs(conf.REGEXP_FILE).MinerStrings()
results = []
still_in_store = None
crx_etags = [None]
for date, tups in iter_tar_entries_by_date(conf.archive_dir, extid):
if conf.from_date and not (conf.from_date <= date):
continue
if conf.latest_date and not (date <= conf.latest_date):
continue
crx_etag = None
name = None
version = None
date_matches = []
for tarentry, tarfile in tups:
tarentry_filename = tarentry.name.split("/")[-1]
if tarentry_filename.endswith(".crx.headers"):
crx_etag = get_etag(tarfile.read().decode())
if crx_etag:
crx_etags += [crx_etag]
if tarentry_filename == "overview.html":
name, version = get_name_and_version(tarfile.read().decode())
if tarentry_filename == "overview.html.status":
still_in_store = tarfile.read().decode().startswith("2")
if tarentry_filename.endswith(".crx") and tarentry.size > 0:
with ZipFile(tarfile) as zf:
for zipentry in zf.infolist():
file_matches = []
if zipentry.filename.endswith(".js") or zipentry.filename.endswith(".html"):
with zf.open(zipentry) as f:
verbatim_lines = []
joined_string_lines = []
for block in mince_js(io.TextIOWrapper(f, encoding="utf-8", errors="surrogateescape")):
verbatim_lines += block.content.splitlines()
joined_string_lines += "".join(map(lambda x: x[1], block.string_literals)).splitlines()
for search_tag in miner_strings.strings.keys():
for search_string in miner_strings.strings[search_tag]:
for match in first_match_in_locations(search_tag, re.escape(search_string),
[("verbatim", verbatim_lines),
("joined_string", joined_string_lines)]):
file_matches.append(match)
for search_tag in miner_strings.patterns.keys():
for search_pattern in miner_strings.patterns[search_tag]:
for match in first_match_in_locations(search_tag, search_pattern,
[("verbatim", verbatim_lines),
("joined_string", joined_string_lines)]):
file_matches.append(match)
for match in file_matches:
date_matches.append([zipentry.filename] + match)
for match in date_matches:
results += [[date, crx_etag, name, version] + match]
for result in results:
csvwriter.writerow([str(x) for x in ([extid, still_in_store, crx_etags[-1]] + result)])
def main(conf):
logger = logging.getLogger()
ch = logging.StreamHandler(sys.stderr)
ch.setFormatter(logging.Formatter(const_log_format()))
logger.addHandler(ch)
if conf.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
with open(conf.EXTID_FILE) as f:
csvwriter = csv.writer(sys.stdout, csv.unix_dialect)
csvwriter.writerow(["extid", "still_in_store", "most_recent_crx_etag", "date", "crx_etag", "name", "version", "path", "position", "tag", "match"])
for extid in [l.strip() for l in f.readlines()]:
handle_extid(conf, extid, csvwriter)
def build_parser():
main_parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description='Grep for extensions.')
main_parser.add_argument(
'REGEXP_FILE',
help='python file with regular expressions')
main_parser.add_argument(
'EXTID_FILE',
help='file with extension ids')
main_parser.add_argument(
'-v',
'--verbose',
action='store_true',
default=False,
help='increase verbosity')
main_parser.add_argument(
'-D',
'--latest-date',
metavar='DATE',
type=str,
help='select latest crx from tar, released before DATE.\n' +
'Together with --from-date, specifies all crx released in specified\n' +
'date range.')
main_parser.add_argument(
'-d',
'--from-date',
metavar='DATE',
type=str,
help='select oldest crx from tar released after DATE.\n' +
'Together with --latest-date, specifies all crx released in specified\n' +
'date range.')
main_parser.add_argument(
'-a',
'--archive-dir',
metavar='archive',
type=str,
default=const_basedir(),
help='archive directory')
return main_parser
if __name__ == "__main__":
main_parser = build_parser()
main_conf = main_parser.parse_args()
sys.exit(main(main_conf))