#!/usr/bin/env python3.7 # # Copyright (C) 2017 The University of Sheffield, UK # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later """Tool for extracting crx file from a tar archive.""" import datetime import argparse import io import fnmatch import os import logging import re import sys import operator import tarfile import zlib from functools import partial, reduce from colorama import init, Fore from multiprocessing import Pool from zipfile import ZipFile import dateutil import dateutil.parser import jsbeautifier from ExtensionCrawler.config import (const_log_format, const_basedir) from ExtensionCrawler.archive import last_crx, first_crx, all_crx from ExtensionCrawler.config import get_local_archive_dir from ExtensionCrawler.js_decomposer import init_file_info from ExtensionCrawler.js_mincer import mince_js def is_file_with_c_style_comments(filename): """Test if filename indicates file with C-style comment.""" return (filename.endswith(".js") or filename.endswith(".js.gz") or filename.endswith(".jgz") or filename.endswith(".jsg") or filename.endswith(".css.gz") or filename.endswith(".c") or filename.endswith(".cpp") or filename.endswith(".java")) def jsstrings_data(conf, path, data): """Analyze data in memory.""" if conf.file_pattern is not None: if path is None: return False elif not fnmatch.fnmatch(path, conf.file_pattern): logging.debug("Filename \'" + path + "\' does not match pattern \'" + conf.file_pattern + "\'") return False match = False logging.debug("Start analyzing " + path) file_info = init_file_info(path, data) if file_info['size'] == 0: return match if not file_info['dec_encoding'] is None: try: dec = zlib.decompressobj(zlib.MAX_WBITS | 16) dec_data = dec.decompress(data, 100 * file_info['size']) if file_info['dec_encoding'] is None: logging.debug("Encoding is None for " + path + " using utf-8.") str_data = dec_data.decode('UTF-8') else: str_data = dec_data.decode(file_info['dec_encoding']) del dec_data except Exception: return match else: if file_info['encoding'] is None: logging.warning("Encoding is None for " + path + " using utf-8.") str_data = data.decode('UTF-8') else: str_data = data.decode(file_info['encoding']) if conf.beautify: str_data = jsbeautifier.beautify(str_data) with io.StringIO(str_data) as str_obj: first = True for block in mince_js( str_obj, single_line_comments_block=conf.group_single_line_comments): if analyze_block(conf, path, block, first): match = True first = False if match and 0 < conf.output_decoration < 2: print(path) return match def print_block(conf, path, block, first=False, string_match=False, code_match=False): if conf.output_decoration > 1: line_no = block.start[0] prefix = " " * (block.start[1] - 1) # TODO: use classifier classifier = "X" sep = "=" * (len(path) + 17) if not first: print(sep) if block.is_code(): classifier = "S" else: classifier = "C" if block.is_code() and string_match: if conf.join_string_literals: loc = '({0:d} - {1:d})'.format(block.start[0], block.end[0]) loc = (' ' * (11 - len(loc))) + loc line = '{0} {1} [L]: {2}'.format( path, loc, block.string_literals[0].rstrip()) print(line) else: for (pos, string) in block.string_literals: loc = '({0[0]:d}/{0[1]:d})'.format(pos) loc = (' ' * (11 - len(loc))) + loc line = '{0} {1} [L]: {2}'.format(path, loc, string.rstrip()) print(line) if code_match: print("-" * (len(path) + 17)) if block.is_comment() or (block.is_code() and code_match): for line in block.content.splitlines(True): if line_no == block.start[0]: line = prefix + line line = '{0} {1:11d} [{2}]: {3}'.format(path, line_no, classifier, line.rstrip()) print(line) line_no += 1 def analyze_block(conf, path, block, first=False): """Print code/comment blocks.""" match = False regexps = [] if conf.reg_exp is not None: for regexp in conf.reg_exp: if conf.case_insensitive: regexps.append(re.compile(r'(' + regexp + ')', re.IGNORECASE)) else: regexps.append(re.compile(r'(' + regexp + ')')) if block.is_comment(): content = block.content if conf.reg_exp_comments is not None: for regexp in conf.reg_exp_comments: if conf.case_insensitive: regexps.append( re.compile(r'(' + regexp + ')', re.IGNORECASE)) else: regexps.append(re.compile(r'(' + regexp + ')')) for regexp in regexps: if regexp.search(block.content): if conf.colorize: content = regexp.sub(Fore.RED + r'\1' + Fore.RESET, content) match = True if match: block.content = content print_block(conf, path, block, first) elif block.is_code(): content = block.content regexps_string = regexps.copy() regexps_code = regexps.copy() if conf.reg_exp_string_literals is not None: for regexp in conf.reg_exp_string_literals: if conf.case_insensitive: regexps.append( re.compile(r'(' + regexp + ')', re.IGNORECASE)) else: regexps.append(re.compile(r'(' + regexp + ')')) if conf.reg_exp_source is not None: for regexp in conf.reg_exp_source: if conf.case_insensitive: regexps.append( re.compile(r'(' + regexp + ')', re.IGNORECASE)) else: regexps.append(re.compile(r'(' + regexp + ')')) string_match = False match_idxs = set() string_literals = block.string_literals.copy() if conf.join_string_literals: joined_string = "" for (_, string) in block.string_literals: joined_string = joined_string + string joined_cstring = joined_string for regexp in regexps_string: if regexp.search(joined_string): if conf.colorize: joined_cstring = regexp.sub( Fore.BLUE + r'\1' + Fore.RESET, joined_cstring) string_match = True block.string_literals = [joined_cstring] else: for regexp in regexps_string: for idx, (pos, string) in enumerate(block.string_literals): if regexp.search(string): if conf.colorize: string_literals[idx] = (( string_literals[idx])[0], regexp.sub( Fore.BLUE + r'\1' + Fore.RESET, (string_literals[idx])[1])) match_idxs.add(idx) string_match = True block.string_literals = [] for idx, string in enumerate(string_literals): if idx in match_idxs: block.string_literals.append(string) code_match = False for regexp in regexps_code: if regexp.search(block.content): if conf.colorize: content = regexp.sub(Fore.CYAN + r'\1' + Fore.RESET, content) code_match = True match = string_match or code_match block.content = content if match: print_block(conf, path, block, first, string_match, code_match) return match def analyze_crx(conf, crx, path=""): """Analyze crx file.""" match = False with ZipFile(crx) as crxobj: js_files = list( filter(lambda x: is_file_with_c_style_comments(x.filename), crxobj.infolist())) for jsfile in js_files: with crxobj.open(jsfile) as js_file_obj: data = js_file_obj.read() file_path = path + "/" + js_file_obj.name if jsstrings_data(conf, file_path, data): match = True return match def analyze_tar(conf, tarfilename): last_crx_file = '' extid = os.path.splitext(os.path.basename(tarfilename))[0] from_dateobj = None latest_dateobj = None if conf.from_date is not None: from_dateobj = dateutil.parser.parse(conf.from_date) if from_dateobj.tzinfo is None or from_dateobj.tzinfo.utcoffset( from_dateobj) is None: from_dateobj = from_dateobj.replace(tzinfo=datetime.timezone.utc) if conf.latest_date is not None: latest_dateobj = dateutil.parser.parse(conf.latest_date) if latest_dateobj.tzinfo is None or latest_dateobj.tzinfo.utcoffset( latest_dateobj) is None: latest_dateobj = latest_dateobj.replace( tzinfo=datetime.timezone.utc) match = False if from_dateobj is None: last_crx_file, _ = last_crx( os.path.join(conf.archive_dir, "data"), extid, latest_dateobj) if last_crx_file == "" or last_crx_file is None: logging.warning("No crx in " + extid) else: with tarfile.open(tarfilename, 'r') as archive: with archive.extractfile(last_crx_file) as crx: match = analyze_crx(conf, crx, last_crx_file) else: if latest_dateobj is None: # only from date is given first_crx_file = first_crx( os.path.join(conf.archive_dir, "data"), extid, from_dateobj) if first_crx_file == "" or first_crx_file is None: logging.warning("No crx in " + extid) else: with tarfile.open(tarfilename, 'r') as archive: with archive.extractfile(first_crx_file) as crx: match = analyze_crx(conf, crx, last_crx_file) else: # both dates are given all_crx_files = all_crx( os.path.join(conf.archive_dir, "data"), extid) if not all_crx_files: logging.warning("No crx in " + extid) else: with tarfile.open(tarfilename, 'r') as archive: for crx_file in all_crx_files: with archive.extractfile(crx_file) as crx: match = analyze_crx(conf, crx, last_crx_file) or match return match def analyze_file(conf, filename): with open(filename, 'rb') as fileobj: data = fileobj.read() return jsstrings_data(conf, filename, data) def compute_tasks(file_or_extids, taskid=1, maxtaskid=1): """Function for computing list of tasks.""" extid_re = re.compile(r'^[a-p]+$') tasks = [] for file_or_extid in file_or_extids: if is_file_with_c_style_comments(file_or_extid): tasks.append(file_or_extid) elif file_or_extid.endswith('.tar'): tasks.append(file_or_extid) elif file_or_extid.endswith('.crx'): tasks.append(file_or_extid) elif extid_re.match(file_or_extid): tasks.append(file_or_extid) else: # default: a file with extension ide with open(file_or_extid) as fileobj: for line in fileobj: line = line.strip() if extid_re.match(line): tasks.append(line) chunksize = int(len(tasks) / maxtaskid) if taskid == maxtaskid: tasks = tasks[(taskid - 1) * chunksize:] else: tasks = tasks[(taskid - 1) * chunksize:taskid * chunksize] return tasks def analyze_task(conf, task): """Analyze one file/tar/crx/extid.""" logging.debug("Analyzing " + task) extid_re = re.compile('^[a-p]+$') if task.endswith('.crx'): retval = analyze_crx(conf, task) elif task.endswith('.tar'): retval = analyze_tar(conf, task) elif extid_re.match(task): tarfilename = "data/" + get_local_archive_dir(task) + "/" + task + '.tar' retval = analyze_tar(conf, conf.archive_dir + "/" + tarfilename) else: retval = analyze_file(conf, task) return retval def main(conf): """Main function: JavaScript strings on steroids.""" logger = logging.getLogger() ch = logging.StreamHandler(sys.stdout) ch.setFormatter(logging.Formatter(const_log_format())) logger.addHandler(ch) if conf.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.WARNING) if conf.colorize: init() tasks = compute_tasks(conf.FILE_OR_EXTID, conf.taskid, conf.max_taskid) with Pool(conf.parallel) as p: retvals = p.map(partial(analyze_task, conf), tasks) return not reduce(operator.or_, retvals, False) if __name__ == "__main__": main_parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, description= 'A combination of strings and grep for JavaScript and CSS files.') main_parser.add_argument( '-r', '--reg-exp', metavar='REGEXP', type=str, nargs='+', help='search for regular expression') main_parser.add_argument( '-v', '--verbose', action='store_true', default=False, help='increase verbosity') main_parser.add_argument( '-i', '--case-insensitive', action='store_true', default=False, help='match case insensitive') main_parser.add_argument( '-o', '--output-decoration', metavar='L', choices=[0, 1, 2, 3], type=int, default=2, help='select level L of details shown for matches:\n' + ' 0: exit 0 in case of match found, 1 otherwise\n' + ' 1: show only matching filename\n' + ' 2: show matching lines (NOT YET SUPPORTED)\n' + ' 3: show matching blocks') main_parser.add_argument( '-p', '--parallel', metavar='P', type=int, default=1, help='run P threads in parallel') main_parser.add_argument( '-D', '--latest-date', metavar='DATE', type=str, help='select latest crx from tar, released before DATE.\n' + 'Together with --latest-date, specifies all crx released in specified\n' + 'date range.') main_parser.add_argument( '-d', '--from-date', metavar='DATE', type=str, help='select oldest crx from tar released after DATE.\n' + 'Together with --from-date, specifies all crx released in specified\n' + 'date range.') main_parser.add_argument( '-f', '--file-pattern', metavar='pattern', type=str, help='process only files matching pattern') main_parser.add_argument( '-a', '--archive-dir', metavar='archive', type=str, default=const_basedir(), help='archive directory') main_parser.add_argument( '-C', '--colorize', action='store_true', help='use colors') main_parser.add_argument( '-n', '--taskid', metavar='n', type=int, default=1, help='task id') main_parser.add_argument( '-N', '--max-taskid', metavar='N', type=int, default=1, help='max task id') main_parser.add_argument( 'FILE_OR_EXTID', nargs='+', help="extid/js/css/crx/tar file") comment_group = main_parser.add_argument_group('comment blocks') comment_group.add_argument( '-g', '--group-single-line-comments', help='Group consecutive singe-line comments into blocks') comment_group.add_argument( '-c', '--reg-exp-comments', metavar='REGEXP', type=str, nargs='+', help='search comments for regular expression') source_group = main_parser.add_argument_group('source blocks') source_group.add_argument( '-b', '--beautify', action='store_true', default=False, help='beautify source code') source_group.add_argument( '-s', '--reg-exp-source', metavar='REGEXP', type=str, nargs='+', help='search source for regular expression') strings_group = main_parser.add_argument_group('string literals') strings_group.add_argument( '-j', '--join-string-literals', action='store_true', help='join string literals (heuristic)') strings_group.add_argument( '-l', '--reg-exp-string-literals', metavar='REGEXP', type=str, nargs='+', help='search string literals for regular expression') main_conf = main_parser.parse_args() sys.exit(main(main_conf))