Improved grepper.

This commit is contained in:
Michael Herzberg 2017-08-23 16:52:18 +01:00
parent fddd2374c4
commit a6ee53d35b
4 changed files with 144 additions and 161 deletions

182
grepper
View File

@ -19,84 +19,116 @@
import getopt
import os
import sys
import glob
import tarfile
import traceback
import jsbeautifier
from multiprocessing import Pool
import fnmatch
from multiprocessing import Pool, Lock
from zipfile import ZipFile
from functools import partial
import re
from ExtensionCrawler.config import *
from ExtensionCrawler.util import *
def help():
print("grepper [OPTION] BASEDIR GREP1 [GREP2...]")
print(" GREP1 [GREP2...] regex patterns")
print(" BASEDIR directory for output")
print(" -h print this help text")
print(" -b beautify JavaScript before matching")
print(" -a <DIR> archive directory")
print(" -p <PREFIX> three-letter-prefix")
print(" -t <THREADS> number of threads to use")
print("grepper [OPTION] GREP [FILE]")
print(" GREP regex pattern")
print(" [FILE] path(s) to extension tar")
print(" -h print this help text")
print(" -b beautify JavaScript before matching")
print(" -a <DIR> archive directory")
print(" -p <PREFIX> three-letter-prefix")
print(" -e <EXTIDFILELIST> file with extension ids")
print(" -t <THREADS> number of threads to use")
print(" -n <TASKID> process chunk n where n in [1,N]")
print(" -N <MAXTASKID> ")
def process_crx(ext_id, date, crx, greps, beautify, out_f):
with ZipFile(crx) as z:
for zip_file_info in z.infolist():
if not zip_file_info.filename.endswith(".js"):
continue
with z.open(zip_file_info) as f:
content = f.read().decode(errors="surrogateescape")
if beautify:
content = jsbeautifier.beautify(content)
for i, line in enumerate(content.splitlines()):
for gr in greps:
if not re.search(gr, line):
def guarded_stdout(string):
lock.acquire()
sys.stdout.write(string)
lock.release()
def guarded_stderr(string):
lock.acquire()
sys.stderr.write(string)
lock.release()
def process_crx(ext_id, date, crx, pattern, beautify):
try:
with ZipFile(crx) as z:
for zip_file_info in z.infolist():
if not zip_file_info.filename.endswith(".js"):
continue
with z.open(zip_file_info) as f:
content = f.read().decode(errors="surrogateescape")
if beautify:
content = jsbeautifier.beautify(content)
for i, line in enumerate(content.splitlines()):
if not re.search(pattern, line):
continue
args = [
ext_id, date, gr, zip_file_info.filename +
" (line " + str(i + 1) + ")", line
ext_id, date, zip_file_info.filename + " (line " +
str(i + 1) + ")", line
]
print("|".join(args), file=out_f)
guarded_stdout("|".join(args) + "\n")
except Exception:
guarded_stderr("Exception when handling {}, {}:\n{}".format(
ext_id, date, traceback.format_exc()))
def process_id(archivedir, outdir, greps, beautify, verbose, ext_id):
txt = ""
txt = logmsg(verbose, txt, "Processing {} ...\n".format(ext_id))
tarpath = archive_file(archivedir, ext_id)
greppath = os.path.join(outdir, ext_id + ".grep")
if os.path.exists(greppath):
os.remove(greppath)
with open(greppath, "w") as out_f:
with tarfile.open(tarpath, 'r') as t:
def process_id(pattern, beautify, path):
try:
with tarfile.open(path, 'r') as t:
for tar_info in t.getmembers():
if not tar_info.name.endswith(".crx") or tar_info.size is 0:
continue
date = tar_info.name.split("/")[1]
try:
with t.extractfile(tar_info) as crx:
process_crx(ext_id, date, crx, greps, beautify, out_f)
except Exception:
txt = logmsg(
verbose, txt,
"Exception when handling {}:\n".format(tar_info.name))
txt = logmsg(verbose, txt, traceback.format_exc())
return txt
extid, date = tar_info.name.split("/")[:2]
with t.extractfile(tar_info) as crx:
process_crx(extid, date, crx, pattern, beautify)
except Exception:
guarded_stderr("Exception when handling {}:\n{}".format(
path, traceback.format_exc()))
def main(argv):
def find(archive, pattern):
for root, _, files in os.walk(os.path.join(archive, "data")):
for file in files:
if fnmatch.fnmatch(file, pattern + ".tar"):
yield os.path.join(root, file)
def find_from_file(archive, extidlistfile):
with open(extidlistfile, 'r') as f:
extids = [l.strip() for l in f.readlines()]
for root, _, files in os.walk(os.path.join(archive, "data")):
for file in files:
for extid in extids:
if fnmatch.fnmatch(file, extid + ".tar"):
yield os.path.join(root, file)
def init(l):
global lock
lock = l
def parse_args(argv):
archive = "archive"
prefix = ""
beautify = False
parallel = 8
taskid = 1
maxtaskid = 1
paths = []
try:
opts, args = getopt.getopt(
argv, "ha:p:bt:", ["archive=", "prefix=", "beautify", "threads="])
opts, args = getopt.getopt(argv, "ha:p:e:bt:n:N:", [
"archive=", "prefix=", "extidlistfile=", "beautify", "threads=",
"taskid=", "maxtaskid="
])
except getopt.GetoptError:
help()
sys.exit(2)
@ -108,37 +140,43 @@ def main(argv):
archive = arg
elif opt in ("-p", "--prefix"):
prefix = arg
paths += find(archive, prefix + "*")
elif opt in ("-e", "--extidlistfile"):
extidlistfile = arg
paths += find_from_file(archive, extidlistfile)
elif opt in ("-b", "--beautify"):
beautify = True
elif opt in ("-t", "--threads"):
parallel = int(arg)
elif opt in ("-n", "--taskid"):
taskid = int(arg)
elif opt in ("-N", "--maxtaskid"):
maxtaskid = int(arg)
if len(args) < 2:
if len(args) is 0:
help()
sys.exit(2)
outdir = args[0]
greps = args[1:]
pattern = args[0]
paths += args[1:]
if paths == []:
paths = list(find(archive, "*"))
archivedir = os.path.join(archive, "data")
threeletterdirs = glob.glob(os.path.join(archivedir, prefix + "*"))
chunksize = int(len(paths) / maxtaskid)
if taskid == maxtaskid:
paths = paths[(taskid - 1) * chunksize:]
else:
paths = paths[(taskid - 1) * chunksize:taskid * chunksize]
print("Using archive '{}'".format(archive))
print("Using prefix '{}'".format(prefix))
print("Using beautifier? '{}'".format(beautify))
print("Using {} threads".format(parallel))
print("Found {} three-letter-dirs".format(len(threeletterdirs)))
return pattern, paths, beautify, parallel
for threeletterdir in threeletterdirs:
ext_ids = list(set([d[:32] for d in os.listdir(threeletterdir)]))
with Pool(parallel) as p:
for txt in p.imap(
partial(process_id, archivedir, outdir, greps, beautify,
True), ext_ids):
sys.stdout.write(txt)
sys.stdout.flush()
print("Sucessfully finished grepping")
def main(argv):
pattern, paths, beautify, parallel = parse_args(argv)
l = Lock()
with Pool(initializer=init, initargs=(l, ), processes=parallel) as p:
p.map(partial(process_id, pattern, beautify), paths)
if __name__ == "__main__":

View File

@ -1,89 +0,0 @@
#!/usr/bin/env python3
import sge_common
import sys
import getopt
import os
def get_sge_content(jobname, stdoutpath, execpath, archivepath, outpath,
greps):
return \
"""#!/bin/bash
#
#$ -t 1-256
#$ -j yes
#$ -o "{stdoutpath}"
#$ -N "{jobname}"
#
module -s load apps/python/conda 2> /dev/null
source activate mypython35
function task_id_to_letter_256 {{
ABC=abcdefghijklmnopqrstuvwxyz
let "I1 = (($1-1) / 16) % 16"
let "I2 = ($1-1) % 16"
echo ${{ABC:$I1:1}}${{ABC:$I2:1}}
}}
"{execpath}" -a "{archivepath}" -p "$(task_id_to_letter_256 $SGE_TASK_ID)" "{outpath}" {greps}
""".format(
jobname=jobname,
stdoutpath=stdoutpath,
execpath=execpath,
archivepath=archivepath,
outpath=outpath,
greps=" ".join(['"{}"'.format(gr) for gr in greps]))
def helpmsg():
print(__file__ + " ARCHIVE OUTPUT GREP1 [GREP2 ...]")
print(" -h print this help text")
def main(argv):
try:
opts, args = getopt.getopt(argv, "h")
except getopt.GetoptError:
helpmsg()
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
helpmsg()
sys.exit()
if len(args) < 3:
helpmsg()
sys.exit(2)
basedir = os.path.expanduser(args[0])
outdir = os.path.expanduser(args[1])
greps = args[2:]
stdoutpath = sge_common.get_stdout_path("grepper")
jobname = os.path.basename(stdoutpath)
execpath = os.path.join(sge_common.get_project_root(), "grepper")
sge_common.ensure_sharc()
sge_common.validate_archivepath(basedir)
sge_common.validate_execpath(execpath)
sge_common.validate_outdir(outdir)
os.makedirs(stdoutpath)
os.makedirs(outdir)
print("Using data from {}".format(basedir))
print("Writing logs to {}".format(stdoutpath))
print("Writing results to {}".format(outdir))
sge_content = get_sge_content(jobname, stdoutpath, execpath, basedir,
outdir, greps)
print("Executing the following job:")
print("=" * 80)
print(sge_content)
print("=" * 80)
sge_common.execute_sge(sge_content)
if __name__ == "__main__":
main(sys.argv[1:])

8
sge/grepper.sge Executable file
View File

@ -0,0 +1,8 @@
#!/bin/bash
module -s load apps/python/conda 2> /dev/null
source activate mypython35
set -o nounset
"$BASEDIR/ExtensionCrawler/grepper" -a "/shared/brucker_research1/Shared/BrowserExtensions" -n $SGE_TASK_ID -N $SGE_TASK_LAST "$PATTERN" > "$BASEDIR/$SGE_TASK_ID.grep"

26
sge/grepper.sh Executable file
View File

@ -0,0 +1,26 @@
#!/usr/bin/bash
set -o nounset
PATTERN=$1
HOST=${2:-sharc.shef.ac.uk}
BASEDIR=$( cd $(dirname "$0"); cd ..; pwd -P )/
TARGETDIR='/data/$USER/grepper-'$(date +%Y%m%d-%H%M%S)
SGEFILE="$BASEDIR/sge/grepper.sge"
echo "Creating $HOST:$TARGETDIR/ExtensionCrawler ..."
ssh "$HOST" mkdir -p $TARGETDIR/ExtensionCrawler
echo "Pushing $BASEDIR to $HOST:$TARGETDIR/ExtensionCrawler ..."
rsync -zr "$BASEDIR" $HOST:"$TARGETDIR/ExtensionCrawler"
echo "Pushing $SGEFILE to $HOST:$TARGETDIR/grepper.sge ..."
rsync -zr "$SGEFILE" $HOST:"$TARGETDIR/grepper.sge"
echo "Starting job ..."
ssh "$HOST" qsub \
-v BASEDIR="$TARGETDIR",PATTERN="$PATTERN" \
-t 1-256 \
-j yes \
-o "$TARGETDIR" \
"$TARGETDIR/grepper.sge"