ExtensionCrawler/crx-jsstrings

296 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3.5
#
# Copyright (C) 2017 The University of Sheffield, UK
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Tool for extracting crx file from a tar archive."""
import collections
import datetime
import getopt
import io
import os
import logging
import re
import sys
import tarfile
import zlib
from functools import partial
from multiprocessing import Pool
from zipfile import ZipFile
import dateutil
import dateutil.parser
import jsbeautifier
from ExtensionCrawler.config import (const_log_format, const_basedir)
from ExtensionCrawler.archive import get_existing_ids, last_crx
from ExtensionCrawler.config import (archive_file, get_local_archive_dir)
from ExtensionCrawler.js_decomposer import init_file_info
from ExtensionCrawler.js_mincer import mince_js
# Script should run with python 3.4 or 3.5
assert sys.version_info >= (3, 4) and sys.version_info < (3, 6)
JsStringsConfig = collections.namedtuple('JsStringsConfig', [
'comment', 'strings', 'group', 'program', 'beautify', 'basedir', 'regexp',
'parallel', "verbose"
])
def jsstrings_data(path, data, config):
match = False
print("## Analyzing " + path)
file_info = init_file_info(path, data)
if file_info['size'] == 0:
return match
if not file_info['dec_encoding'] is None:
try:
dec = zlib.decompressobj(zlib.MAX_WBITS | 16)
dec_data = dec.decompress(data, 100 * file_info['size'])
if file_info['dec_encoding'] is None:
logging.warning("Encoding is None for " + path +
" using utf-8.")
str_data = dec_data.decode('UTF-8')
else:
str_data = dec_data.decode(file_info['dec_encoding'])
del dec_data
except Exception:
return match
else:
if file_info['encoding'] is None:
logging.warning("Encoding is None for " + path + " using utf-8.")
str_data = data.decode('UTF-8')
else:
str_data = data.decode(file_info['encoding'])
if config.beautify:
str_data = jsbeautifier.beautify(str_data)
with io.StringIO(str_data) as str_obj:
for block in mince_js(
str_obj, single_line_comments_block=config.group):
if analyze_block(True, config.comment, config.program,
config.strings, config.regexp, block):
match = True
return match
def helpmsg():
"""Print help message."""
print("crx-jsstrings [OPTION] [crx-file|tar-file|ext_id] [js-file]")
print(" -h print this help text")
print(" -i ignore comments")
print(" -s strings")
print(" -g group single line comments")
print(" -c program code")
print(" -b beautify JavaScript files before analyzing them")
print(" -a=<DIR> archive directory")
print(" -n <TASKID> process chunk n where n in [1,N]")
print(" -N <MAXTASKID> ")
print(
" -r regexp select only comments/code/strings where regexp matches")
print(
" -d date use latest extension that was released not later than date (only for tar archives)"
)
def analyze_block(verbose, comment, program, strings, regexp, block):
"""Print code/comment blocks."""
match = False
rgx = None
if regexp is not None:
rgx = re.compile(regexp)
if comment and block.is_comment():
if regexp is None or rgx.match(block.content):
if verbose:
print(block)
match = True
elif block.is_code():
if program:
if regexp is None or rgx.match(block.content):
if verbose:
print(block)
match = True
if strings:
for string in block.string_literals:
if regexp is None or rgx.match(string):
if verbose:
print(string)
match = True
return match
def analyze_crx(config, crx, path):
match = False
if path is None:
with ZipFile(crx) as crxobj:
js_files = list(
filter(
lambda x: x.filename.endswith(".js")
or x.filename.endswith(".js.gz")
or x.filename.endswith(".jgz")
or x.filename.endswith(".jsg")
or x.filename.endswith(".css.gz"),
crxobj.infolist()))
for jsfile in js_files:
with crxobj.open(jsfile) as js_file_obj:
data = js_file_obj.read()
path = js_file_obj.name
if jsstrings_data(path, data, config):
match = True
else:
with ZipFile(crx) as crxobj:
with crxobj.open(path) as js_file:
data = js_file.read()
match = jsstrings_data(path, data, config)
return match
def analyze_tar(config, date, path, filename):
last_crx_file = ''
match = False
extid = os.path.splitext(os.path.basename(filename))[0]
if date is not None:
dateobj = dateutil.parser.parse(date)
if dateobj.tzinfo is None or dateobj.tzinfo.utcoffset(dateobj) is None:
dateobj = dateobj.replace(tzinfo=datetime.timezone.utc)
last_crx_file = last_crx(
os.path.join(config.basedir, "data"), extid, dateobj)
else:
last_crx_file = last_crx(os.path.join(config.basedir, "data"), extid)
if last_crx_file == "" or last_crx_file is None:
print("No crx in " + extid)
else:
print("# Start analyzing " + extid)
with tarfile.open(filename, 'r') as archive:
with archive.extractfile(last_crx_file) as crx:
match = analyze_crx(config, crx, path)
if match:
print("RegExp found in " + extid)
else:
print("RegExp not found in " + extid)
def process_group(config, taskid, maxtaskid, date, path):
archive_dir = os.path.join(config.basedir, "data")
ext_ids = get_existing_ids(archive_dir)
chunksize = int(len(ext_ids) / maxtaskid)
if taskid == maxtaskid:
ext_ids = ext_ids[(taskid - 1) * chunksize:]
else:
ext_ids = ext_ids[(taskid - 1) * chunksize:taskid * chunksize]
ext_ids = list(map(partial(archive_file, archive_dir), ext_ids))
with Pool(config.parallel) as p:
p.map(partial(analyze_tar, config, date, path), ext_ids)
def main(argv):
"""Main function: JavaScript strings on steroids."""
config = JsStringsConfig(
comment=True,
strings=False,
group=False,
program=False,
beautify=False,
basedir=const_basedir(),
regexp=None,
parallel=1,
verbose=True)
filename = None
path = None
date = None
taskid = -1
maxtaskid = -1
extid_re = re.compile('^[a-p]+$')
try:
opts, args = getopt.getopt(argv, "hibcd:sn:N:a:vr:", [
"--regexp", "--date", "--archive", "--beautify"
])
except getopt.GetoptError:
helpmsg()
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
helpmsg()
sys.exit()
elif opt in ("-a", "--archive"):
config = config._replace(basedir=arg)
elif opt == '-i':
config = config._replace(comment=False)
elif opt == '-s':
config = config._replace(strings=True)
elif opt == '-g':
config = config._replace(group=True)
elif opt == '-c':
config = config._replace(program=True)
elif opt in ('-b', "--beautify"):
config = config._replace(beautify=True)
elif opt in ('-r', "--regexp"):
config = config._replace(regexp=arg)
elif opt in ('-d', "--date"):
date = arg
elif opt in ("-n", "--taskid"):
taskid = int(arg)
elif opt in ("-N", "--maxtaskid"):
maxtaskid = int(arg)
if len(args) == 1:
filename = args[0]
elif len(args) == 2:
filename = args[0]
path = args[1]
elif (not len(args) == 0) or taskid < 1 or maxtaskid < 1:
helpmsg()
sys.exit()
if config.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
logger = logging.getLogger()
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter(const_log_format()))
logger.addHandler(ch)
logger.setLevel(loglevel)
if taskid > 0 and maxtaskid > 0:
process_group(config, taskid, maxtaskid, date, path)
else:
if filename.endswith('.crx'):
analyze_crx(config, filename, path)
elif filename.endswith('.tar'):
analyze_tar(config, date, path, filename)
elif extid_re.match(filename):
extid = filename
filename = os.path.join(config.basedir, 'data',
get_local_archive_dir(extid),
extid + ".tar")
analyze_tar(config, date, path, filename)
else:
with open(filename, 'rb') as fileobj:
data = fileobj.read()
jsstrings_data(filename, data, config)
if __name__ == "__main__":
main(sys.argv[1:])