#!/bin/env python3 ############################################################################# # Copyright (c) 2019-2021 University of Exeter, UK # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # SPDX-License-Identifier: Apache-2.0 ############################################################################# import argparse import json import multiprocessing import logging import getopt from datetime import datetime, date import concurrent.futures # nice future but it works late than old process version !? import os import random import pexpect import shutil import time import random from multiprocessing import Process import sys sys.path.insert(0, os.path.dirname(os.path.realpath(__file__))) from template.server import settingupComputation, createMPCFile, runServer from template.computation import start_computation, update_env_flows_list from pplca.compute import get_env_flows_list_file, get_computation_id, run_computation from pplca.log_utils import log_debug, log_info, log_warning, log_error, log_exception from pplca.config import ( const_log_format, const_verbose, const_cert_dir, const_data_dir, const_mpc_program_dir, const_mpc_program, const_upload_dir, ) def copy_companies_to_test_directory(compList, envFlowList, main_dir, test_path): # ready to use companies' paths comp_Path = main_dir + "/companies" for i in compList: # First copy the folder and files of the company shutil.copytree((comp_Path + "/" + i), (test_path + "/" + i)) testComPath = str(test_path + "/" + i) generate_random_values_for_envflows(testComPath, envFlowList) log_debug( "Copied needed number(" + str(len(compList)) + ") of companies from " + comp_Path ) def give_name_to_companies(numComp): compArray = [] for i in range(numComp): compArray.append("P" + str(i + 1)) return compArray # Ideal full tree def ideal_subranges(nodes, deg): result = [] (d, r) = divmod(len(nodes), deg) cnt = 0 for i in range(deg): if i < r: result.append(nodes[cnt : cnt + d + 1]) cnt += d + 1 else: result.append(nodes[cnt : cnt + d]) cnt += d return result # Fill from right def first_right_fill_subranges(nodes, deg): result = [] cnt = 0 numNodes = len(nodes) for i in range(deg): if i < (deg - 1): result.append(nodes[cnt : cnt + 1]) cnt += 1 else: result.append(nodes[cnt:numNodes]) cnt += numNodes return result def mk_supply_chain_structure(nodes, deg, treeStructure): # head and tail head, *tail = nodes # str(head) + " " + str(tail) # I think it should be '<=' if len(tail) <= deg: return {head: tail} else: if treeStructure == 0: x = ideal_subranges(tail, deg) elif treeStructure == 1: x = first_right_fill_subranges(tail, deg) else: log_error( "There is no identified structure with this argument parameter: " + str(treeStructure) ) return { head: list( map(lambda I: mk_supply_chain_structure(I, deg, treeStructure), x) ) } def mk_config_file(rootCA, computationArray, rootdir): "Make Configuration file - is used for set up step in Scale mamba" numofComp = len(computationArray) config = {"Start": {"Certs": {"IPAdd": [], "CertName": []}}} config["Start"]["Set-up"] = "4" config["Start"]["RootCAname"] = rootCA config["Start"]["NumberofPlayers"] = str(numofComp) for item in computationArray: player = item + "_" + rootCA + ".crt" config["Start"]["Certs"]["IPAdd"].append("127.0.0.1") config["Start"]["Certs"]["CertName"].append(player) # They do not ask user anymore whether fakeOffline or fakeSacrifice # config['Start']['FakeOffline'] = fp.readline().replace("\n", "") # config['Start']['FakeSacrifice'] = fp.readline().replace("\n", "") # which secret sharing scheme (in our case it is Shamir Secret Sharing) config["Start"]["LSSS"] = "1" config["Start"]["Modulus"] = "340282366920938463463374607431768211507" if (numofComp % 2) == 0: config["Start"]["threshold"] = str((numofComp / 2) - 1) else: config["Start"]["threshold"] = str(int(numofComp / 2)) try: with open(rootdir + "/config.json", "w") as f: json.dump(config, f) log_debug( "Config file is generated for setting up computation (for Data folder)-" + rootCA + " Root company (in directory " + rootdir + ")" ) except: log_exception( "Setup Configuration file " + rootdir + "config.json is NOT created!" ) def mk_MPC_config_file(scaleVector, envFlowList, rootdir): MPC_config = {"ScaleVector": [], "NameofEnvFlows": [], "NumofEnvFlows": []} for value in scaleVector: MPC_config["ScaleVector"].append(str(value)) for flow in envFlowList: MPC_config["NameofEnvFlows"].append(flow) MPC_config["NumofEnvFlows"].append(str(len(envFlowList))) try: with open(rootdir + "/MPC.json", "w") as MPCfile: json.dump(MPC_config, MPCfile) log_debug( "Scale vector and environmental flow list used in MPC saved in directory " + rootdir + "/MPC.json" ) except: log_exception("MPC file (MPC.json) is NOT created! - " + rootdir) def generate_random_values_for_envflows(compDir, envFlowList): # Generate initial values of env flows envFlowDict = {"EnvFlows": {"NameofEnvFlow": [], "ValueofEnvFlow": []}} for i in envFlowList: envFlowDict["EnvFlows"]["NameofEnvFlow"].append(i) envFlowDict["EnvFlows"]["ValueofEnvFlow"].append(str(random.randint(5, 10))) with open(compDir + "/env-flows.json", "w") as f: json.dump(envFlowDict, f) def multi_process_computation(i, CERT, port, maindir, supplier_dir_name): comp_supp_dir = maindir + "/" + i + "/" + supplier_dir_name comp_dir = maindir + "/" + i log_debug( "Company " + i + "(" + comp_supp_dir + ") join the computation with " + CERT + " certificate." ) try: start_computation( i, supplier_dir_name, comp_supp_dir, comp_dir, CERT, str(port), False ) log_info( "Inside of multiprocessing -- cert " + CERT + " comp " + i + " and port number " + str(port) ) log_debug("And create output in " + comp_supp_dir + "/" + i + ".out") except: log_exception("Could NOT run MPC computation!!!!") def existing_scenario_computation( i, CERT, serverPort, port, maindir, supplier_dir_name ): comp_supp_dir = maindir + "/" + i + "/" + supplier_dir_name comp_dir = maindir + "/" + i log_debug( "Company " + i + "(" + comp_supp_dir + ") join the computation with " + CERT + " certificate." ) com_id = get_computation_id(serverPort, CERT) log_debug("Company " + i + " - id is " + str(com_id)) try: run_computation(i, comp_supp_dir, comp_dir, com_id, str(port), False) log_info( "Inside of multiprocessing -- cert " + CERT + " comp " + i + " and port number " + str(port) ) log_debug("And create output in " + comp_supp_dir + "/" + i + ".out") except: log_exception("Could NOT run MPC computation!!!!") def get_list_of_comp_env_flows_value(mainDir, computationArray): envFlowsList = [] for i in computationArray: compDir = mainDir + "/" + i with open(compDir + "/env-flows.json", "r") as readfile: envFlValues = json.load(readfile) envFlowsList.append(envFlValues["EnvFlows"]["ValueofEnvFlow"]) return envFlowsList def update_validaiton_file( maindir, level, computationArray, envFlowValueList, rootCA, serverPort, computationPort, ): if os.path.exists(maindir + "/" + rootCA + "/Root/MPC.json"): with open(maindir + "/" + rootCA + "/Root/MPC.json") as MPCconfigFile: MPCdata = json.load(MPCconfigFile) with open(maindir + "/" + rootCA + "/env-flows.json", "r") as rootfile: rootenvflowfile = json.load(rootfile) with open(maindir + "/computation_validation_info.json", "r") as readfile: feeds = json.load(readfile) dt_string = datetime.today().isoformat() feeds["one_level_computations"].append( { "level_info": level, "server_port": serverPort, "computation_port": computationPort, "companies": str(computationArray), "scale vector": MPCdata["ScaleVector"], "list_of_env_flows_values_of_companies": envFlowValueList, "computation_result": rootenvflowfile["EnvFlows"]["ValueofEnvFlow"], "The time of computation ": dt_string, } ) with open(maindir + "/computation_validation_info.json", "w") as outfile1: json.dump(feeds, outfile1) def run_each_compilation_parallel( computationArray, scaleVector, serverPort, rootComp, maindir, envFlowList, level ): if len(computationArray) > 2: log_info( "One level computation will happen between " + str(computationArray) + " \n And their values in scale vector are " + str(scaleVector) + " \n And server port is: " + str(serverPort) ) rootCA = rootComp rootdir = maindir + "/" + rootCA + "/Root" log_debug("RootCA directory of the computation is " + rootdir) # generate config file to give info about Setup instructions mk_config_file(rootCA, computationArray, rootdir) try: log_info( "Setup is started - " + rootdir + "." + " Please wait... Setup can take several mins (estimated time 10-15 mins). " + "It can take even longer if there are parallel operations!" ) setup_start_time = time.perf_counter() settingupComputation(rootComp, rootdir) log_info("Setup is completed, files are saved in " + rootdir + "/Data") setup_finish_time = time.perf_counter() setupTime = round(setup_finish_time - setup_start_time, 2) except: log_exception("Setup is NOT completed! - " + rootdir) # generate MPC config file for MPC configuration mk_MPC_config_file(scaleVector, envFlowList, rootdir) try: log_info( "Compilation of MPC file is started. - " + rootdir + " Please wait...It takes time to compile...(estimated time 5-10 mins)." ) mpc_start_time = time.perf_counter() createMPCFile(rootdir) log_info( "MPC file is compiled and ready to be used - " + rootdir + "/Programs/SuppChainAgg" ) mpc_finish_time = time.perf_counter() mpcTime = round(mpc_finish_time - mpc_start_time, 2) except: log_exception("MPC file is NOT compiled! - " + rootdir) with open(maindir + "/single_setup_runtime_info.json", "r") as readsetupfile: setupfeeds = json.load(readsetupfile) with open(maindir + "/single_test_scenario_runtime_info.json", "r") as readfile: feeds = json.load(readfile) dt_string = datetime.today().isoformat() tempjson = { "level_info": level, "companies": str(computationArray), "setup_time(seconds)": setupTime, "MPC_file_compilation_time(seconds)": mpcTime, "The time of computation ": dt_string, } feeds["one_level_computations"].append(tempjson) setupfeeds["one_level_computations"].append(tempjson) with open(maindir + "/single_test_scenario_runtime_info.json", "w") as outfile1: json.dump(feeds, outfile1) with open(maindir + "/single_setup_runtime_info.json", "w") as outsetupfile: json.dump(feeds, outsetupfile) def setup_and_MPC_compiling_parallel( level, subtree, rootComp, envFlowList, currentPort, maindir ): scaleVector = [] log_debug("RootComp " + rootComp + " and Subtree " + str(subtree)) # parties involved in a single computation computationArray = [] # root company is also a party involved in that computation, so we give scale vector value for it # we gave 1 as considering unit value! computationArray.append(rootComp) scaleVector.append(1) rootCompNumber = int(rootComp[1:]) level += 1 serverPort = currentPort + rootCompNumber * 100 # If it has suppliers if type(subtree) == list: with concurrent.futures.ProcessPoolExecutor() as executor: # It is dict # serverPort = currentPort rootCompPort = serverPort for supplier in subtree: # means it has sub levels if type(supplier) == dict: for i in supplier: computationArray.append(i) scaleVector.append(random.randrange(5, 10)) subtreeLen = len(supplier[i]) rootCompPort = rootCompPort + (2 + subtreeLen) log_debug( "Level " + str(level) + ", Supplier " + str(supplier) + ", Subtree length " + str(subtreeLen) + ", Root Comp Number " + str(rootCompNumber) + ", Server port: " + str(rootCompPort) ) log_debug( "The time(recursive) before executor is " + str(time.perf_counter()) ) # run the same level computations parallelly p = executor.submit( setup_and_MPC_compiling_parallel, level, supplier[i], i, envFlowList, rootCompPort, maindir, ) log_debug( "The time(recursive) after executor is " + str(time.perf_counter()) ) # means it is in the last level (it is leaf) else: computationArray.append(supplier) scaleVector.append(random.randrange(5, 10)) level -= 1 log_debug( "The time(running setup) before executor is " + str(time.perf_counter()) ) runparallel = executor.submit( run_each_compilation_parallel, computationArray, scaleVector, serverPort, rootComp, maindir, envFlowList, level, ) log_debug( "The time(running setup) after executor is " + str(time.perf_counter()) ) def run_one_level_computations_recursively( existing_status_of_test_scenario, level, subtree, rootComp, envFlowList, currentPort, maindir, ): log_debug("RootComp " + rootComp + " and Subtree " + str(subtree)) # parties involved in a single computation computationArray = [] # root company is also a party involved in that computation, so we give scale vector value for it # we gave 1 as considering unit value! computationArray.append(rootComp) rootCompNumber = int(rootComp[1:]) level += 1 serverPort = currentPort + rootCompNumber * 100 # If it has suppliers if type(subtree) == list: with concurrent.futures.ProcessPoolExecutor() as executor: # It is dict # serverPort = currentPort rootCompPort = serverPort for supplier in subtree: # means it has sub levels if type(supplier) == dict: for i in supplier: computationArray.append(i) subtreeLen = len(supplier[i]) rootCompPort = rootCompPort + (2 + subtreeLen) log_debug( "Level " + str(level) + ", Supplier " + str(supplier) + ", Subtree length " + str(subtreeLen) + ", Root Comp Number " + str(rootCompNumber) + ", Server port: " + str(rootCompPort) ) # run the same level computations parallelly p = executor.submit( run_one_level_computations_recursively, existing_status_of_test_scenario, level, supplier[i], i, envFlowList, rootCompPort, maindir, ) # means it is in the last level (it is leaf) else: computationArray.append(supplier) level -= 1 ###### There should be threading and after all threads(downstreams) finished # we can continue with other steps like if len(computationArray) > 2: log_info( "One level computation will happen between " + str(computationArray) + " \n And server port is: " + str(serverPort) ) rootCA = rootComp rootdir = maindir + "/" + rootCA + "/Root" log_debug("RootCA directory of the computation is " + rootdir) # get the env flows' value list of companies for the current computation envFlowValueList = get_list_of_comp_env_flows_value( maindir, computationArray ) try: # also give the port number for server ------- currentPort shell_cmd3 = ( "python3 " + rootdir + "/server.py -c RunningServer -p " + str(serverPort) + " > " + rootdir + "/server.log 2>&1 && echo '$!' > ser.pid" ) child3 = pexpect.spawn( "/bin/bash", ["-c", shell_cmd3], echo=False, timeout=None ) log_info("Server(" + str(serverPort) + ") is running... - " + rootdir) with open(rootdir + "/ser.pid", "w") as server: server.write("$!") time.sleep(2) except: log_exception("Server is NOT running! - " + rootdir) if existing_status_of_test_scenario == False: try: # This part should be threading too # here also we need to give port numbers for suppliers ------- currentPort # calculating running time for one level computation player_start_time = time.perf_counter() with concurrent.futures.ProcessPoolExecutor() as executor: # each player needs to give the same port number (we give the started port number) tempPort = serverPort + 1 log_info( "One level MPC Computation is started! \n Main company of the computation is " + rootCA ) for i in range(len(computationArray)): supplier = computationArray[i] # print("Supplier " + supplier) #delete supplier_dir = supplier + "_" + rootCA CERT = supplier + "_" + rootCA + ".crt" log_info( supplier + " player join the computation with " + CERT + " certificate" ) log_debug("Root Directory for this computation " + rootdir) p = executor.submit( multi_process_computation, supplier, CERT, tempPort, maindir, supplier_dir, ) log_info("MPC Computation is completed! - " + rootCA) player_finish_time = time.perf_counter() onelevelrunningTime = round( player_finish_time - player_start_time, 2 ) except: log_exception( "Multiprocessing have problems! PlayerBinary.x is NOT completed!" ) else: try: player_start_time = time.perf_counter() with concurrent.futures.ProcessPoolExecutor() as executor: # each player needs to give the same port number (we give the started port number) tempPort = serverPort + 1 log_info( "One level MPC Computation is started! \n Main company of the computation is " + rootCA ) for i in range(len(computationArray)): supplier = computationArray[i] supplier_dir = supplier + "_" + rootCA CERT = supplier + "_" + rootCA + ".crt" log_info( supplier + " player join the computation with " + CERT + " certificate" ) log_debug("Root Directory for this computation " + rootdir) p = executor.submit( existing_scenario_computation, supplier, CERT, serverPort, tempPort, maindir, supplier_dir, ) log_info("MPC Computation is completed! - " + rootCA) player_finish_time = time.perf_counter() onelevelrunningTime = round( player_finish_time - player_start_time, 2 ) except: log_exception( "Multiprocessing have problems! PlayerBinary.x is NOT completed!" ) with open( maindir + "/single_test_scenario_runtime_info.json", "r" ) as readfile: feeds = json.load(readfile) dt_string = datetime.today().isoformat() feeds["one_level_computations"].append( { "level_info": level, "companies": str(computationArray), "running_time_of_computation(seconds)": onelevelrunningTime, "The time of computation ": dt_string, } ) with open( maindir + "/single_test_scenario_runtime_info.json", "w" ) as outfile1: json.dump(feeds, outfile1) update_validaiton_file( maindir, level, computationArray, envFlowValueList, rootCA, serverPort, tempPort, ) elif len(computationArray) == 1: log_debug("It is a just supplier company, no sub-suppliers " + str(subtree)) else: log_error( "You are trying to run a scenario that we do NOT support! " + str(subtree) ) sys.exit(2) log_info("Supply Chain Computation is COMPLETED!") def argument_func(*args): main_parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, description="Server Service module.", ) main_parser.add_argument( "-ts", "--treeStructure", type=int, default="0", help="give tree structure to the system \n" + "0 - if you would like to have balanced tree \n" + "1 - if you would like to have right filled tree", ) main_conf = main_parser.parse_args(args) return main_conf if __name__ == "__main__": logger = logging.getLogger() ch = logging.StreamHandler(sys.stderr) ch.setFormatter( logging.Formatter(const_log_format("generate_and_run_test_scenario.py")) ) logger.addHandler(ch) if const_verbose(): logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) treeSt = argument_func(*sys.argv[1:]) if (treeSt.treeStructure != 1) and (treeSt.treeStructure != 0): log_error( "There is no identified structure with this argument parameter: " + str(treeSt.treeStructure) ) sys.exit(2) # starting port currentPort = 4999 # main_dir = os.path.realpath(__file__) main_dir = os.getcwd() if os.path.isfile("test_scenarios_runtime_info.json"): with open("test_scenarios_runtime_info.json", "r") as readfile: allTestScenarios = json.load(readfile) else: allTestScenarios = {"TestScenarios": []} # Environmental Flow List wanted in computations environmentalFlowList = ["cd", "sd"] # Test scenarios - how many parties involved and how many supplier parties in each layer # test_scenarios_dict={"test1":[3,2],"test2":[7,2],"test2":[5,2],"test7":[7,2],"test9":[7,6]} test_scenarios_dict = {"test_3_2_t2": [3, 2]} # Run each test scenario for i in test_scenarios_dict: # Create directory for test scenario test_sce_path = main_dir + "/" + i companies_num = test_scenarios_dict[i][0] supplier_num = test_scenarios_dict[i][1] companies = give_name_to_companies(companies_num) # if os.path.isdir(test_sce_path): # shutil.rmtree(test_sce_path) # #print("Test scenario folder - "+test_sce_path+ " is deleted") # delete # log_debug("Old test scenario folder - "+test_sce_path+ " is deleted") existing_status_of_test_scenario = False if os.path.isdir(test_sce_path): existing_status_of_test_scenario = True for i in companies: testComPath = test_sce_path + "/" + i generate_random_values_for_envflows(testComPath, environmentalFlowList) else: os.mkdir(test_sce_path) log_debug("Create directory for " + i + " test scenario") # copies companies from companies directory to test directory # we also need env flow list because we are giving this info default copy_companies_to_test_directory( companies, environmentalFlowList, main_dir, test_sce_path ) if os.path.isfile( test_sce_path + "/computations_results_of_test_scenario.json" ): with open( test_sce_path + "/computations_results_of_test_scenario.json", "r" ) as readfile: TestScenarioComputationResult = json.load(readfile) else: TestScenarioComputationResult = {"ComputationResultsofTestScenario": []} computationValidation = { "one_level_computations": [], "whole_test_scenario": {}, } with open( test_sce_path + "/computation_validation_info.json", "w" ) as com_output: json.dump(computationValidation, com_output) # testing SetupBinary Step singleSetupRuntimes = {"one_level_computations": []} with open(test_sce_path + "/single_setup_runtime_info.json", "w") as outfile: json.dump(singleSetupRuntimes, outfile) # Test scenario runtime information singleTestScenarioConfig = { "one_level_computations": [], "whole_test_scenario": {}, } with open( test_sce_path + "/single_test_scenario_runtime_info.json", "w" ) as outfile1: json.dump(singleTestScenarioConfig, outfile1) log_info( "Start to run test scenario " + i + " for " + str(companies_num) + " companies - " + str(supplier_num) + " suppliers model" ) # generate the supply chain model h = mk_supply_chain_structure(companies, supplier_num, treeSt.treeStructure) log_info( "Supply chain structure is generated for (" + str(companies_num) + "-" + str(supplier_num) + "): \n" + str(h) ) root = "" for i in h: root = i # main company log_info("Main company of the supply chain is " + root) start = time.perf_counter() level = 1 # run setup and compiling setups parallely set_and_compile = 0 if existing_status_of_test_scenario == False: set_and_compile = 1 setup_and_MPC_compiling_parallel( level, h[root], root, environmentalFlowList, currentPort, test_sce_path ) log_info("Needs to set up and compile MPC file! ") else: log_info( "The test scenario exists! No need for setup and compilation. MPC Computation is started!" ) run_one_level_computations_recursively( existing_status_of_test_scenario, level, h[root], root, environmentalFlowList, currentPort, test_sce_path, ) finish = time.perf_counter() runningTime = round(finish - start, 2) print("Finished in " + str(runningTime) + " second(s)") with open(test_sce_path + "/computation_validation_info.json", "r") as compfile: compload = json.load(compfile) with open( test_sce_path + "/single_test_scenario_runtime_info.json", "r" ) as readfile: feeds = json.load(readfile) singleTest = {} singleTest["name_of_test _scenario"] = test_sce_path singleTest["supply_chain_structure"] = h singleTest["number_of_parties_in_the_supply_chain"] = companies_num singleTest["number_of_suppliers_in_each_computation"] = supplier_num singleTest["number_of_environmental_flows"] = len(environmentalFlowList) singleTest["name_of_environmental_flows"] = environmentalFlowList singleTest["running_time_of_scenario(seconds)"] = runningTime compload["whole_test_scenario"] = singleTest feeds["whole_test_scenario"] = singleTest with open( test_sce_path + "/computation_validation_info.json", "w" ) as compoutput: json.dump(compload, compoutput) with open( test_sce_path + "/single_test_scenario_runtime_info.json", "w" ) as outfile1: json.dump(feeds, outfile1) compload["setup_and_mpc_compilation"] = set_and_compile compload["observing_running_times"] = feeds TestScenarioComputationResult["ComputationResultsofTestScenario"].append( compload ) feeds["setup_and_mpc_compilation"] = set_and_compile allTestScenarios["TestScenarios"].append(feeds) with open( test_sce_path + "/computations_results_of_test_scenario.json", "w" ) as compout: json.dump(TestScenarioComputationResult, compout) with open("test_scenarios_runtime_info.json", "w") as outfile: json.dump(allTestScenarios, outfile)