From 651506bd0c5edf197c3fa860d4852ed2c430fc54 Mon Sep 17 00:00:00 2001 From: Michael Herzberg Date: Wed, 13 Jun 2018 09:33:55 +0100 Subject: [PATCH] Sort db inserts to prevent deadlocks. --- ExtensionCrawler/dbbackend/mysql_backend.py | 57 ++++--- sge/create-db.sge | 16 +- sge/create-db.sh | 130 ++++++++------ singularity/ExtensionCrawler-dev.def | 179 ++++++++++++++++++++ 4 files changed, 300 insertions(+), 82 deletions(-) create mode 100644 singularity/ExtensionCrawler-dev.def diff --git a/ExtensionCrawler/dbbackend/mysql_backend.py b/ExtensionCrawler/dbbackend/mysql_backend.py index 674d0b2..f3e78cd 100644 --- a/ExtensionCrawler/dbbackend/mysql_backend.py +++ b/ExtensionCrawler/dbbackend/mysql_backend.py @@ -17,6 +17,7 @@ import time import datetime +from collections import OrderedDict from random import uniform import MySQLdb @@ -43,20 +44,33 @@ class MysqlBackend: def __exit__(self, *args): start = time.time() - self.retry(self._commit_cache) - self.db.commit() + self._commit_cache() log_info( "* Database batch insert finished after {}".format(datetime.timedelta(seconds=int(time.time() - start))), 2) self._close_conn() def _commit_cache(self): - for query, args in self.cache.items(): - self.cursor.executemany(query, args) + for table, arglist in self.cache.items(): + sorted_arglist = self.sort_by_primary_key(table, arglist) + args = [tuple(arg.values()) for arg in sorted_arglist] + + # Looks like this, for example: + # INSERT INTO category VALUES(extid,date,category) (%s,%s,%s) + # ON DUPLICATE KEY UPDATE extid=VALUES(extid),date=VALUES(date) + # ,category=VALUES(category) + query = "INSERT INTO {}({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}".format( + table, + ",".join(sorted_arglist[0].keys()), + ",".join(len(args[0]) * ["%s"]), + ",".join( + ["{c}=VALUES({c})".format(c=c) for c in sorted_arglist[0].keys()])) + self.retry(lambda: self.cursor.executemany(query, args)) def _create_conn(self): if self.db is None: log_info("* self.db is None, open new connection ...", 3) self.db = MySQLdb.connect(**self.dbargs) + self.db.autocommit(True) log_info("* success", 4) if self.cursor is None: log_info("* self.cursor is None, assigning new cursor ...", 3) @@ -82,7 +96,7 @@ class MysqlBackend: try: self._close_conn() except Exception as e2: - log_error("Surpressed exception: {}".format(str(e2)), 3) + log_error("Suppressed exception: {}".format(str(e2)), 3) if t + 1 == self.maxtries: log_error("MySQL connection eventually failed, closing connection!", 3) @@ -107,22 +121,25 @@ class MysqlBackend: else: return None - def insertmany(self, table, arglist): - args = [tuple(arg.values()) for arg in arglist] + def sort_by_primary_key(self, table, arglist): + self.retry(lambda: self.cursor.execute(f"SHOW KEYS FROM {table} WHERE Key_name = 'PRIMARY'")) + primary_keys = [row[4] for row in self.cursor.fetchall()] - # Looks like this, for example: - # INSERT INTO category VALUES(extid,date,category) (%s,%s,%s) - # ON DUPLICATE KEY UPDATE extid=VALUES(extid),date=VALUES(date) - # ,category=VALUES(category) - query = "INSERT INTO {}({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}".format( - table, - ",".join(arglist[0].keys()), - ",".join(len(args[0]) * ["%s"]), - ",".join( - ["{c}=VALUES({c})".format(c=c) for c in arglist[0].keys()])) - if query not in self.cache: - self.cache[query] = [] - self.cache[query] += args + sorted_arglist = sorted(arglist, key=lambda x: [x[pk] for pk in primary_keys]) + + def arglist_shuffler(x): + try: + return primary_keys.index(x) + except ValueError: + return len(primary_keys) + shuffled_arglist = [OrderedDict(sorted(arg.items(), key=lambda x: arglist_shuffler(x[0]))) for arg in sorted_arglist] + return shuffled_arglist + + + def insertmany(self, table, arglist): + if table not in self.cache: + self.cache[table] = [] + self.cache[table] += arglist def insert(self, table, **kwargs): self.insertmany(table, [kwargs]) diff --git a/sge/create-db.sge b/sge/create-db.sge index ce2759d..6a7e86e 100755 --- a/sge/create-db.sge +++ b/sge/create-db.sge @@ -1,19 +1,7 @@ #!/bin/bash set -o nounset -set -x - -SING_EXEC="singularity exec --pwd /opt/ExtensionCrawler -B $TMPDIR:/tmp $SING_IMG" +set -o errexit printenv -echo "The following parameter were passed: $*" -echo "Printing the content of $ARCHIVE to force mounting:" -ls "$ARCHIVE" -if [ -f "$BASEDIR/ids" ]; then - EXT_SELECT="-e $BASEDIR/ids" -else - EXT_SELECT= -fi - - -/usr/bin/time $SING_EXEC ./create-db -t 1 -a "$ARCHIVE" -n $SGE_TASK_ID -N $MAX_SGE_TASK_ID $EXT_SELECT $* +(set -x; /usr/bin/time singularity exec --pwd /opt/ExtensionCrawler -B $TMPDIR:/tmp create-db.img create-db -t 1 -n $SGE_TASK_ID $*) diff --git a/sge/create-db.sh b/sge/create-db.sh index 00a86b1..f3c4783 100755 --- a/sge/create-db.sh +++ b/sge/create-db.sh @@ -1,61 +1,95 @@ -#!/usr/bin/bash +#!/bin/bash set -o nounset set -o errexit +REMOTE_ARCHIVE=/shared/brucker_research1/Shared/BrowserExtensions/archive +REMOTE_TARGET_DIR_PREFIX=/data/\$USER +NUM_THREADS=48 +SGE_EXTRA_ARGS='-P rse -m a -l rmem=8G -M "msherzberg1@sheffield.ac.uk" -j yes' +PY_EXTRA_ARGS='' + +usage() { + echo "Usage:" + echo " -a (archive path, default: ${REMOTE_ARCHIVE})" + echo " -t (target directory, default: ${REMOTE_TARGET_DIR_PREFIX})" + echo " -m (degree of parallelism, default: ${NUM_THREADS})" + echo " -s \"\" (qsub arguments, default: ${SGE_EXTRA_ARGS})" + echo " -p \"\" (python script arguments, default: ${PY_EXTRA_ARGS})" +} + +while getopts ":a:t:s:p:m:" o; do + case "${o}" in + a) + REMOTE_ARCHIVE=${OPTARG} + ;; + t) + REMOTE_TARGET_DIR_PREFIX=${OPTARG} + ;; + m) + NUM_THREADS=${OPTARG} + ;; + s) + SGE_EXTRA_ARGS+=" ${OPTARG}" + ;; + p) + PY_EXTRA_ARGS+=" ${OPTARG}" + ;; + *) + usage + exit 1 + ;; + esac +done + +shift $((OPTIND-1)) + BASEDIR=$( cd $(dirname "$0"); cd ..; pwd -P ) +TEMP_FOLDER=$(mktemp -d) +TARGETDIR="${REMOTE_TARGET_DIR_PREFIX}/create-db-$(date +%Y%m%d-%H%M%S)" -NRJOBS=${NRJOBS:-256} -echo "Using $NRJOBS jobs" - -JOBRANGE=${JOBRANGE:-1-$NRJOBS} -echo "Executing jobs $JOBRANGE" - -ARCHIVE=${ARCHIVE:-$(ssh sharc.shef.ac.uk find /shared/brucker_research1/Shared/BrowserExtensions/archive/.snapshot -maxdepth 1 -name \"D*\" | sort -r | head -n1)} -echo "Using archive: $ARCHIVE" - -TARGETDIR="${TARGETDIR:-/data/\$USER}/create-db-$(date +%Y%m%d-%H%M%S)" echo "Using target dir: $TARGETDIR" - -SING_IMG_SRC="${SING_IMG_SRC:-/shared/brucker_research1/Shared/BrowserExtensions/excrawl.img}" -SING_IMG="$TARGETDIR/excrawl.img" -if ! ssh sharc.shef.ac.uk [ -f "$SING_IMG_SRC" ]; then - echo -n "$SING_IMG_SRC does not exist! Generate new image and push? (yes/abort): " - read confirm - if [ "$confirm" != yes ]; then - exit 0 - fi - echo "Creating new image ..." - (cd "$BASEDIR/singularity"; ./build.sh -f) - echo "Pushing new image ..." - scp "$BASEDIR/singularity/ExtensionCrawler.img" sharc.shef.ac.uk:"$SING_IMG_SRC" -fi -echo "Creating dirs ..." ssh sharc.shef.ac.uk mkdir -p $TARGETDIR/logs -echo "Copying $SING_IMG_SRC to $SING_IMG" -ssh sharc.shef.ac.uk cp "$SING_IMG_SRC" "$SING_IMG" - echo "Pushing sge script ..." scp "$BASEDIR/sge/create-db.sge" sharc.shef.ac.uk:"$TARGETDIR/create-db.sge" -if ! [ -z "${EXTIDLISTFILE:-}" ]; then - echo "Pushing list with extension ids ..." - scp "$EXTIDLISTFILE" sharc.shef.ac.uk:"$TARGETDIR/ids" +echo "Building image..." +if [ -f "$BASEDIR/singularity/create-db.img" ]; then + rm -f "$BASEDIR/singularity/create-db.img" +fi +sudo singularity build "$BASEDIR/singularity/create-db.img" "$BASEDIR/singularity/ExtensionCrawler-dev.def" + +echo "Pushing image..." +scp "$BASEDIR/singularity/create-db.img" sharc.shef.ac.uk:"$TARGETDIR/create-db.img" + + +echo "Gathering extension IDs..." +ssh sharc.shef.ac.uk find "${REMOTE_ARCHIVE}/data" -name "*.tar" | grep -Po "[a-p]{32}" > ${TEMP_FOLDER}/extension.ids + +NO_IDS=$(cat ${TEMP_FOLDER}/extension.ids | wc -l) + +echo "Found $NO_IDS IDs!" +if [ "$NO_IDS" = 0 ]; then + echo "Nothing to do!" + exit 0 fi -echo "Starting job ..." -ssh sharc.shef.ac.uk \ - SING_IMG=\"$SING_IMG\" \ - ARCHIVE=\"$ARCHIVE\" \ - BASEDIR=\"$TARGETDIR\" \ - MAX_SGE_TASK_ID=\"$NRJOBS\" \ - qsub \ - -V \ - -m a \ - -l rmem=8G \ - -M "msherzberg1@sheffield.ac.uk" \ - -t $JOBRANGE \ - -j yes \ - -o "$TARGETDIR/logs" \ - "$TARGETDIR/create-db.sge" \ - $* +echo "Pushing extension IDs..." +scp ${TEMP_FOLDER}/extension.ids sharc.shef.ac.uk:$TARGETDIR/ + +NO_BATCH_JOBS=$(((NO_IDS+1)/75000+1)) +JOBS_PER_BATCH=$((NO_IDS/NO_BATCH_JOBS+1)) + +for run_no in $(seq 1 $NO_BATCH_JOBS); do + FIRST_ID=$(((run_no-1) * $JOBS_PER_BATCH + 1)) + LAST_ID=$((run_no * $JOBS_PER_BATCH)) + + echo "Starting job $run_no ..." + (set -x; ssh sharc.shef.ac.uk qsub \ + -tc $((NUM_THREADS/NO_BATCH_JOBS)) \ + -t ${FIRST_ID}-${LAST_ID} \ + -wd "$TARGETDIR" \ + -o "$TARGETDIR/logs" \ + ${SGE_EXTRA_ARGS} \ + "$TARGETDIR/create-db.sge" -a "$REMOTE_ARCHIVE" -e "${TARGETDIR}/extension.ids" -N $NO_IDS ${PY_EXTRA_ARGS}) +done diff --git a/singularity/ExtensionCrawler-dev.def b/singularity/ExtensionCrawler-dev.def new file mode 100644 index 0000000..a4301e6 --- /dev/null +++ b/singularity/ExtensionCrawler-dev.def @@ -0,0 +1,179 @@ +#!/bin/sh +# Copyright 2017 The University of Sheffield, UK +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +Bootstrap: docker +From: debian + +%labels +Maintainer The LogicalHacking Team (https://logicalhacking.com) + +%setup + +%files + .. /opt/ExtensionCrawler + +%post + +################################################################### +# Add Debian unstable as a secondary (lower priority) source +# and update the data base of available packages. +cat >> /etc/apt/sources.list << EOF +deb http://ftp.us.debian.org/debian unstable main +EOF + +cat > /etc/apt/preferences << EOF +Package: * +Pin: release a=testing +Pin-Priority: 900 + +Package: * +Pin: release a=unstable +Pin-Priority: 800 +EOF + +cat > /etc/apt/apt.conf.d/01norecommend << EOF +APT::Install-Recommends "0"; +APT::Install-Suggests "0"; +EOF + +chmod go+r /etc/apt/preferences +apt-get update +################################################################### + +################################################################### +# Configure locales +apt-get install -y locales +echo "en_US.UTF-8 UTF-8" >> /etc/locale.gen +echo "en_GB.UTF-8 UTF-8" >> /etc/locale.gen +locale-gen +echo "LANG=en_US.UTF-8" > /etc/default/locale +################################################################### + +################################################################### +# Install the core dependencies (Python 3.6 or later) +# from the Debian Testing repository +apt-get install -y python3-magic python3-crypto python3-minimal python3-pip python3-setuptools python3-mysqldb python3-jsbeautifier python3-tabulate +apt-get clean +apt-get install -y git +apt-get clean +rm -rf /var/lib/apt/lists/* +################################################################### + +################################################################### +# Create /opt for local software (mainly cloned git repositories +# from logicalhacking.com +mkdir -p /opt +chmod 755 /opt +################################################################### + +################################################################### +# Add the Extension Crawler repository, for more details, visit +# https://git.logicalhacking.com/BrowserSecurity/ExtensionCrawler +cd /opt +# git clone https://git.logicalhacking.com/BrowserSecurity/ExtensionCrawler.git +# cd ExtensionCrawler +# git checkout production +# cd .. +pip3 install wheel # simhash needs wheel to build properly, still works without it though +pip3 install --system -e ExtensionCrawler +cd / +chmod -R go+u-w /opt/ExtensionCrawler +chmod -R go+u-w /usr/local/lib/ +chmod -R go+u-w /usr/local/bin/ +################################################################### + +################################################################### +# Clone cdnjs repository or crate link to external archive dir +ARCHIVE=/shared/brucker_research1/Shared/BrowserExtensions/archive +case ${SINGULARITY_IMAGE} in + *-cdnjs.img) + mkdir -p /opt/archive/filedb + cd /opt/archive/filedb + git clone https://github.com/cdnjs/cdnjs.git cdnjs-git + cd cdnjs-git + git pull + ln -s ${ARCHIVE}/conf . > /dev/null + ln -s ${ARCHIVE}/data > /dev/null + ln -s ${ARCHIVE}/log > /dev/null + ;; + *) + cd /opt/ + ln -s ${ARCHIVE} . + ;; +esac +chmod -R go+u /opt +################################################################### + +################################################################### +# Create mount/bind points for the various network drives +# on SHARC (only useful when using the Singularity image on +# the High-Performance Cluster of The University of Sheffield +mkdir /scratch +mkdir /fastdata +mkdir /data +mkdir /shared + +# Create nvidia driver directories to get rid of the singularity +# warnings on sharc +mkdir /nvbin +mkdir /nvlib +chmod go+u-w /scratch /fastdata /data /shared +################################################################### + +%environment + +export EXTENSION_ARCHIVE=/opt/archive +export PATH=/opt/ExtensionCrawler/:${PATH} + +# We install all python modules into the container, so we do not want +# to use any packages that the user might have installed in their home +# directory. +export PYTHONNOUSERSITE=1 + +%runscript +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# this text will get copied to /singularity and will run whenever the container +# is called as an executable +usage() { + cat <