ConfidentialLCA/template/pplca/compute.py

337 lines
11 KiB
Python

#!/usr/bin/env python3
#############################################################################
# Copyright (c) 2019-2021 University of Exeter, UK
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#############################################################################
"""Basic computation module."""
from io import BytesIO
import shutil
import logging
import zipfile
import os
import json
import base64
import pexpect
import requests
from OpenSSL import crypto
from pplca.config import (
const_default_host,
const_specified_host,
const_cert_dir,
const_data_dir,
const_mpc_program_dir,
const_envflows_dir,
const_mpc_program,
)
from pplca.log_utils import log_debug, log_info
def get_certificate_list(portNumber):
"""Get the list of signed certificates"""
# url = const_default_host() + "certificatesdb/certlist/"
url = const_specified_host(portNumber) + "certificatesdb/certlist/"
return requests.get(url).content
def get_computation_id(portNumber, certificate):
"""Get computation ID."""
# url = const_default_host() + "computationId/"
url = const_specified_host(portNumber) + "computationId/"
return int(requests.get(url + str(certificate)).content)
def get_all_certificates(portNumber, cwd, certlist):
"""Obtain list of all certificates."""
# url = const_default_host() + "cert/"
url = const_specified_host(portNumber) + "cert/"
crt = const_cert_dir(cwd)
jsonlist = json.loads(certlist.decode("utf-8"))
log_debug("List of Certificates ")
for j in jsonlist["certs"]["CertNames"]:
response = requests.get(url + j)
try:
signed_client_certificate = crypto.load_certificate(
crypto.FILETYPE_PEM, response.content
)
except:
log_debug(
"The signed certificate is not loaded!!!" + "Response: " + str(response)
)
certpath = crt + j
try:
with open(certpath, "wb") as cert_store:
cert_store.write(
crypto.dump_certificate(
crypto.FILETYPE_PEM, signed_client_certificate
)
)
log_debug("CRT Stored Here :" + certpath)
except IOError:
log_debug("Certificate " + j + " is not saved in " + crt)
log_info("Get the signed certificates used in computation " "and save in " + crt)
def get_computation_file(portNumber, cwd):
"""Get the compiled MPC files."""
# url = const_default_host() + "computationfile"
url = const_specified_host(portNumber) + "computationfile"
response = requests.get(url)
comdir = const_mpc_program_dir(cwd) + "/" + const_mpc_program()
try:
with zipfile.ZipFile(
BytesIO(base64.b64decode(response.content)), "r"
) as zip_ref:
zip_ref.extractall(comdir)
log_debug("Get the MPC computation folder" + comdir)
except:
log_debug("MPC folder is not a zip file " + str(zip_ref))
def get_env_flows_list_file(portNumber, compdir):
"""Get content of Env Flows File."""
url = const_specified_host(portNumber) + "envflowslistfile"
response = requests.get(url)
envflowdir = const_envflows_dir(compdir)
try:
with zipfile.ZipFile(
BytesIO(base64.b64decode(response.content)), "r"
) as zip_ref:
zip_ref.extractall(envflowdir)
log_debug("Get environmental flows list file to" + envflowdir)
except:
log_debug("Environmental flows list file is not a zip file " + str(zip_ref))
def get_data_files(portNumber, cwd, com_id):
"""Get content of Data folder."""
# url = const_default_host() + "datafolder/"
url = const_specified_host(portNumber) + "datafolder/"
response = requests.get(url + str(com_id))
try:
with zipfile.ZipFile(
BytesIO(base64.b64decode(response.content)), "r"
) as zip_ref:
zip_ref.extractall(cwd + "/Data")
log_debug("Get files of Data folder and save in " + cwd + "/Data")
except:
log_debug("Files of Data folder are not a zip file " + str(zip_ref))
def mk_env_flows_and_values_file(compDir):
# Generate initial values of env flows
envFlowDict = {"EnvFlows": {"NameofEnvFlow": [], "ValueofEnvFlow": []}}
with open(compDir + "/envflowslist.json") as envflo:
envFlowsList = json.load(envflo)
for i in envFlowsList["NameofEnvFlow"]:
envFlowDict["EnvFlows"]["NameofEnvFlow"].append(i)
flowValue = input(
"How many/much {} is emitted/produced for one unit product ? : ".format(
i
)
)
envFlowDict["EnvFlows"]["ValueofEnvFlow"].append(str(flowValue))
with open(compDir + "/env-flows.json", "w") as f:
json.dump(envFlowDict, f)
def run_computation(comName, cwd, compdir, com_id, portNumber, envFlowValue):
"""Run the actual computation."""
env_flow_value_list = []
data = {}
if envFlowValue:
mk_env_flows_and_values_file(compdir)
try:
with open(compdir + "/env-flows.json") as json_file:
data = json.load(json_file)
except:
log_debug("Cannot find env-flow.json in " + cwd)
if os.path.exists(compdir + "/Root/MPC.json"):
with open(compdir + "/Root/MPC.json") as MPCconfigFile:
MPCdata = json.load(MPCconfigFile)
for j in range(len(data["EnvFlows"]["ValueofEnvFlow"])):
env_flow_value_list.append(data["EnvFlows"]["ValueofEnvFlow"][j])
print(data["EnvFlows"]["ValueofEnvFlow"][j])
try:
log_debug("Start to Podman Container for Player " + str(comName))
shell_cmd = (
"podman run --cidfile "
+ cwd
+ "/playercontainerId --pod mypod -it --volume "
+ cwd
+ "/Cert-Store:/opt/src/SCALE-MAMBA/Cert-Store --volume "
+ cwd
+ "/Data:/opt/src/SCALE-MAMBA/Data --volume "
+ cwd
+ "/Programs:/opt/src/SCALE-MAMBA/Programs -w /opt/src/SCALE-MAMBA/ localhost/scale-mamba-latest PlayerBinary.x -pnb "
+ str(portNumber)
+ " "
+ str(com_id)
+ " Programs/"
+ const_mpc_program()
)
child = pexpect.spawn("/bin/bash", ["-c", shell_cmd], echo=False, timeout=None)
if com_id == 0:
for j, _ in enumerate(MPCdata["ScaleVector"]):
child.expect("Input channel")
child.sendline(MPCdata["ScaleVector"][j])
for i, _ in enumerate(env_flow_value_list):
child.expect("Input channel")
child.sendline(str(env_flow_value_list[i]))
# log_debug("!!! Waiting for input - ", str(env_flow_value_list[i]))
with open(cwd + "/output.out", "w") as output_file:
output_file.write(child.read().decode("UTF-8"))
print(child.read().decode("UTF-8"))
except:
print("MPC computation is NOT completed - compute.py - " + cwd)
try:
con_rm_cmd = (
"podman rm --cidfile "
+ cwd
+ "/playercontainerId && rm "
+ cwd
+ "/playercontainerId"
)
os.system(con_rm_cmd)
log_debug("Player's container is removed! - " + cwd)
except:
log_debug("Player's container could NOT be removed! - " + cwd)
update_env_flows_list(compdir, cwd)
log_info(
"The result (updated environmental flows) is saved in "
+ compdir
+ "/env-flows.json file."
)
def get_list_of_env_flow(cwd):
word_list = []
with open(
const_mpc_program_dir(cwd) + "/" + const_mpc_program() + ".mpc"
) as mpcfile:
line = mpcfile.readline()
while line:
if "enFlName" in line:
words = line.rsplit("'")
for i in range(len(words) - 1):
if i % 2 == 0:
print(words[i + 1])
word_list.append(words[i + 1])
break
line = mpcfile.readline()
return word_list
##############
def update_env_flows_list(compdir, compSuppDir):
with open(compdir + "/env-flows.json") as envflo:
envFlowsList = json.load(envflo)
count = 0
file_path = compSuppDir + "/output.out"
log_debug("Checking for: " + file_path)
if os.path.exists(file_path):
with open(file_path) as computation_result:
line = computation_result.readline()
log_debug("Found and looking for the values " + "of Environmental Flows")
while line:
if "Output channel 0" in line:
array = line.split(" ")
arrlen = len(array)
value = array[arrlen - 1].replace("\n", "")
envFlowsList["EnvFlows"]["ValueofEnvFlow"][count] = str(value)
count = count + 1
line = computation_result.readline()
else:
log_debug("The file " + file_path + " is not found!")
if count != 0:
with open(compdir + "/env-flows.json", "w") as json_file:
json.dump(envFlowsList, json_file)
log_info(
"The result of computation (new value list of environmental flows) : "
+ str(envFlowsList["EnvFlows"]["ValueofEnvFlow"])
)
log_info(
"The value of Environmental Flows are updated and saved in "
+ compdir
+ "/env-flows.json"
)
log_debug(
"The result of computation (new value list of environmental flows) : "
+ str(envFlowsList["EnvFlows"]["ValueofEnvFlow"])
)
def generate_needed_folders_and_files(playername, cwd, compdir):
os.makedirs(cwd + "/Data")
os.makedirs(cwd + "/Programs/SuppChainAgg")
os.makedirs(cwd + "/Cert-Store")
original = compdir + "/Root/Cert-Store/" + playername + ".key"
target = cwd + "/Cert-Store/" + playername + ".key"
shutil.copy(original, target, follow_symlinks=False)
##############
def start_computation(
comName, suppdirname, cwd, compdir, certificate, portNum, envFlowValue
):
generate_needed_folders_and_files(suppdirname, cwd, compdir)
portNumber = str(int(portNum) - 1)
log_info("Get all necessary files from server...")
certlist = get_certificate_list(portNumber)
get_all_certificates(portNumber, cwd, certlist)
get_computation_file(portNumber, cwd)
if envFlowValue:
get_env_flows_list_file(portNumber, compdir)
com_id = get_computation_id(portNumber, certificate)
log_debug("Computation id of the company is " + str(com_id))
get_data_files(portNumber, cwd, com_id)
log_info("Ready for computation...")
run_computation(comName, cwd, compdir, com_id, portNum, envFlowValue)
log_info("Computation is completed.")