diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0cdbabe --- /dev/null +++ b/.gitignore @@ -0,0 +1,271 @@ +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +################ C STUFF ########################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## + +# Prerequisites +*.d + +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + + +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +################ PYTHON STUFF ########################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +################ C++ STUFF ########################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +######################################################## +# Prerequisites +*.d + +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app \ No newline at end of file diff --git a/.pyenv b/.pyenv new file mode 100644 index 0000000..79a15ec --- /dev/null +++ b/.pyenv @@ -0,0 +1 @@ +PYTHONPATH="./scripts/libs" \ No newline at end of file diff --git a/cat/cat.cc b/cat/cat.cc index eaba959..8ec9269 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -54,6 +54,8 @@ struct options_t { struct rte_ether_addr server_mac; uint64_t cpu_mask{0x2}; // 2nd core std::vector slaves; + unsigned long rage_quit_time = (unsigned long)-1; + unsigned long last_sent_ts = 0; // states struct rte_mempool * mbuf_pool; @@ -291,6 +293,7 @@ locore_main(void * tif __rte_unused) read_tx = false; recv_resp = false; recv_stat = false; + options.last_sent_ts = get_time_us(); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch); const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, options.s_txqid, &tx_buf, 1); @@ -301,6 +304,13 @@ locore_main(void * tif __rte_unused) } } + if (!recv_stat) { + // if we haven't recevied the stats get ready to rage quit + if(get_time_us() - options.last_sent_ts > options.rage_quit_time * 1000) { + rte_exit(EXIT_FAILURE, "waiting too long for resp. I QUIT!!\n"); + } + } + if (!read_tx) { struct timespec ts; if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) { @@ -404,10 +414,12 @@ static void dump_options() " run time = %d\n" \ " warmup time = %d\n" \ " output file = %s\n" \ - " server MAC = %x:%x:%x:%x:%x:%x\n", + " rage quit time = %ld\n"\ + " host MAC = %x:%x:%x:%x:%x:%x\n", options.run_time, options.warmup_time, options.output, + options.rage_quit_time, options.server_mac.addr_bytes[0], options.server_mac.addr_bytes[1], options.server_mac.addr_bytes[2], @@ -428,7 +440,9 @@ static void usage() " -h: display the information\n" \ " -o: output filename\n" \ " -A: affinity mask\n" \ - " -i: inter-arrival time distribution\n\n"); + " -i: inter-arrival time distribution\n" \ + " -r: rage quit time (in ms)\n"); + fflush(stdout); } int main(int argc, char* argv[]) @@ -458,7 +472,7 @@ int main(int argc, char* argv[]) { int c; // parse arguments - while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:")) != -1) { + while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:r:")) != -1) { switch (c) { struct rte_ether_addr * addr; case 'v': @@ -484,12 +498,12 @@ int main(int argc, char* argv[]) break; case 'h': usage(); - rte_exit(EXIT_SUCCESS, "success\n"); + rte_exit(EXIT_SUCCESS, "\n"); case 'o': strncpy(options.output, optarg, sizeof(options.output) - 1); break; case 'A': - options.cpu_mask = atoll(optarg); + options.cpu_mask = strtoull(optarg, nullptr, 16); break; case 'i': strncpy(options.ia_gen_str, optarg, sizeof(options.ia_gen_str) - 1); @@ -501,6 +515,9 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "invalid generator string %s\n", options.ia_gen_str); } break; + case 'r': + options.rage_quit_time = atoi(optarg); + break; default: usage(); rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c); @@ -554,7 +571,11 @@ int main(int argc, char* argv[]) sleep(1); uint64_t cmask = options.cpu_mask; - const uint16_t core_id = cmask_get_next_cpu(&cmask); + const int16_t core_id = cmask_get_next_cpu(&cmask); + if (core_id == NEXT_CPU_NULL) { + rte_exit(EXIT_FAILURE, "invalid cpu mask 0x%lx\n", cmask); + } + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread on core %d\n", core_id); if (rte_eal_remote_launch(locore_main, nullptr, core_id) != 0) { rte_exit(EXIT_FAILURE, "failed to launch function on locore\n"); } diff --git a/khat/khat.cc b/khat/khat.cc index 6220638..5994aba 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -421,8 +421,9 @@ static void usage() fprintf(stdout, "Usage:\n" \ " -v(vv): verbose mode\n" \ - " -h: display the information\n" \ - " -m: cpu mask for worker threads"); + " -h: seek help\n" \ + " -A: cpu mask for worker threads\n"); + fflush(stdout); } static void dump_options() @@ -462,16 +463,16 @@ int main(int argc, char* argv[]) { int c; // parse arguments - while((c = getopt(argc, argv, "hvm:")) != -1) { + while((c = getopt(argc, argv, "hvA:")) != -1) { switch (c) { case 'v': ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); break; case 'h': usage(); - rte_exit(EXIT_SUCCESS, NULL); + rte_exit(EXIT_SUCCESS, "\n"); break; - case 'm': + case 'A': options.cpuset = strtoull(optarg, nullptr, 16); options.num_threads = cmask_get_num_cpus(options.cpuset); if (options.num_threads == 0) { diff --git a/rat/rat.cc b/rat/rat.cc index 97dcac0..f577dd2 100644 --- a/rat/rat.cc +++ b/rat/rat.cc @@ -80,83 +80,10 @@ struct options_t { static struct options_t options; -static struct thread_info * get_thread_info(int qid) -{ - return options.s_thr_info.at(qid); -} - -static uint16_t -rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx, - struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) -{ - uint64_t now = rte_rdtsc(); - struct pkt_hdr * pkt_data; - struct timespec ts; - int ret; - - for (int i = 0; i < nb_pkts; i++) { - pkt_data = check_valid_packet(pkts[i]); - - if (pkt_data == NULL) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); - continue; - } - - if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) { - struct thread_info * tinfo = get_thread_info(qidx); - uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); - if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) { - if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) { - // has hw rx timestamp - tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec; - tinfo->last_datapt->clt_sw_rx = now; - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx); - } else { - rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret); - } - } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch); - } - } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type)); - } - } - - return nb_pkts; -} - -static uint16_t -tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, - struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) -{ - uint64_t now = rte_rdtsc(); - struct pkt_hdr * pkt_data; - - for (int i = 0; i < nb_pkts; i++) { - pkt_data = check_valid_packet(pkts[i]); - - if (pkt_data == NULL) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); - continue; - } - - if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) { - struct thread_info * tinfo = get_thread_info(qidx); - uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch); - - if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) { - rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch); - } - - tinfo->last_datapt->clt_sw_tx = now; - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now); - } else { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type); - } - } - - return nb_pkts; -} +// static struct thread_info * get_thread_info(int qid) +// { +// return options.s_thr_info.at(qid); +// } static int locore_main(void * tif) @@ -400,18 +327,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) if (ret != 0) return ret; - ret = rte_eth_timesync_enable(portid); - if (ret != 0) - return ret; - /* Enable RX in promiscuous mode for the Ethernet device. */ ret = rte_eth_promiscuous_enable(portid); if (ret != 0) return ret; - rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL); - rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL); - return 0; } diff --git a/scripts/compile.sh b/scripts/compile.sh new file mode 100755 index 0000000..c134eea --- /dev/null +++ b/scripts/compile.sh @@ -0,0 +1,38 @@ +#!/bin/sh +test_dir="/numam.d" +root=".." +servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca" +rsync_flags="-vchr" +ssh_args="-o StrictHostKeyChecking=no -p77" + +user=$1 + +if [ -z $user ] +then + user=$(whoami) +fi + +echo "USER: $user" + +compile() { + # separate these functions because we might change kernel (reboot) without needing to recompile + echo "====================$1====================" + echo "Syncing directories..." + ssh $(echo $ssh_args $user@$1) "sudo mkdir -p $test_dir" + ssh $(echo $ssh_args $user@$1) "sudo chmod 777 $test_dir" + rsync $(echo $rsync_flags) -e 'ssh -p 77' $root/ $user@$1:$test_dir/ + echo "Compiling..." + ssh $(echo $ssh_args $user@$1) "mkdir -p $test_dir/build; cd $test_dir/build; cmake ../; make clean all -j8" & + wait + echo "$1 Done." + echo "" +} + +i=0 +for server in $servers +do + i=$(expr $i + 1) + compile "$server" & +done + +wait \ No newline at end of file diff --git a/scripts/histo.py b/scripts/histo.py new file mode 100644 index 0000000..b1fdd59 --- /dev/null +++ b/scripts/histo.py @@ -0,0 +1,82 @@ + +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.mlab as mlab +import numpy as np +import sys +import re +import os +import json +import getopt +import math +import concurrent.futures as CF + +num_bins = 1000 +extra_pct = [] + +def saveplot(files, data): + plt.hist(data, num_bins) + plt.xlabel("Latency") + plt.ylabel("Frequency") + plt.title(os.path.basename(files)) + f = plt.gcf() + f.set_size_inches(11.69, 8.27) + f.savefig(files + ".png", dpi=160) + plt.clf() + print("Generated - " + files + ".png") + +def output_extra_percentile(data): + a = np.array(data) + for pct in extra_pct: + p = np.percentile(a, pct) + print(str(pct) + "th: " + str(p)) + +def sanitize(data): + ret = [] + a = np.array(data) + p99 = np.percentile(a, 99) + for i in data: + if i <= p99: + ret.append(i) + return ret + + +executor = CF.ProcessPoolExecutor(max_workers=int(os.cpu_count())) + +def process_file(each_dir): + try: + print("Processing " + each_dir + " ...") + data = memparse.parse_mut_sample(each_dir)[1] + output_extra_percentile(data) + sdata = sanitize(data) + saveplot(each_dir, sdata) + except: + None + +def process_dir(rootdir): + for subdir in os.listdir(rootdir): + each_dir = os.path.join(rootdir, subdir) + if os.path.isfile(each_dir): + if each_dir.endswith("sample.txt") or each_dir.endswith(".sample"): + #executor.submit(process_file, each_dir) + process_file(each_dir) + else: + process_dir(each_dir) + +def main(): + datdir = None + options = getopt.getopt(sys.argv[1:], 'd:')[0] + + for opt, arg in options: + if opt in ('-d'): + datdir = arg + + if datdir == None: + datdir = "/home/oscar/projs/kqsched/scripts/pingpong/results.d/sample" + #raise Exception("Must specify -d parameter") + + process_dir(datdir) + executor.shutdown() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/libs/libpar.py b/scripts/libs/libpar.py new file mode 100644 index 0000000..169f5d2 --- /dev/null +++ b/scripts/libs/libpar.py @@ -0,0 +1,113 @@ +import json +import numpy as np + +class khat_parser: + class pt: + def __init__(self): + self.s_htx = 0 + self.s_hrx = 0 + self.s_stx = 0 + self.s_srx = 0 + self.c_htx = 0 + self.c_hrx = 0 + self.c_stx = 0 + self.c_srx = 0 + + def __init__(self): + self.datapt = [] + + def parse(self, output : str): + for line in output.splitlines(): + cells = line.split(',') + if len(cells) != 8: + raise Exception("Invalid line:" + line) + pt = self.pt() + pt.c_srx = int(cells[0]) + pt.c_stx = int(cells[1]) + pt.c_hrx = int(cells[2]) + pt.c_htx = int(cells[3]) + pt.s_srx = int(cells[4]) + pt.s_stx = int(cells[5]) + pt.s_hrx = int(cells[6]) + pt.s_htx = int(cells[7]) + self.datapt.append(pt) + + +class mutilate_data: + def __init__(self): + self.dat = {} + self.qps = 0 + + def to_string(self): + ret = "Throughput: " + str(self.qps) + "\n" + json.dumps(self.dat) + return ret + + @staticmethod + def parse_mut_output(output): + ret = mutilate_data() + succ_qps = False + succ_read = False + table = [None, "avg", "std", "min", "5th", "10th", "50th", "90th", "95th", "99th"] + table_legacy = [None, "avg", "std", "min", "5th", "10th", "90th", "95th", "99th"] + for line in output.splitlines(): + if line.find("Total QPS") != -1: + spl = line.split() + if len(spl) == 7: + ret.qps = float(spl[3]) + succ_qps = True + else: + break + elif line.find("read") != -1: + spl = line.split() + if len(spl) == 10: + for i in range(1, len(spl)): + ret.dat[table[i]] = float(spl[i]) + succ_read = True + elif len(spl) == 9: + for i in range(1, len(spl)): + ret.dat[table_legacy[i]] = float(spl[i]) + succ_read = True + else: + break + + if not (succ_qps and succ_read): + raise Exception("Failed to parse data") + + return ret + + @staticmethod + def parse_mut_sample(fn): + f = open(fn, "r") + qps = [] + lat = [] + lines = f.readlines() + for line in lines: + entry = line.split() + if len(entry) != 2: + raise Exception("Unrecognized line: " + line) + qps.append(float(entry[0])) + lat.append(float(entry[1])) + f.close() + return qps, lat + + + # generate mutilate output format + @staticmethod + def build_mut_output(lat_arr, qps_arr): + output = '{0: <10}'.format('#type') + '{0: >10}'.format('avg') + '{0: >10}'.format('std') + \ + '{0: >10}'.format('min') + '{0: >10}'.format('5th') + '{0: >10}'.format('10th') + \ + '{0: >10}'.format('50th') + '{0: >10}'.format('90th') + '{0: >10}'.format('95th') + '{0: >10}'.format('99th') + "\n" + + output += '{0: <10}'.format('read') + '{0: >10}'.format("{:.1f}".format(np.mean(lat_arr))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.std(lat_arr))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.min(lat_arr))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 5))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 10))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 50))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 90))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 95))) + ' ' + \ + '{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 99))) + ' ' + "\n" \ + + output += "\n" + "Total QPS = " + "{:.1f}".format(np.mean(qps_arr)) + " (0 / 0s)" + + return output \ No newline at end of file diff --git a/scripts/libs/libtc.py b/scripts/libs/libtc.py new file mode 100644 index 0000000..fa630d9 --- /dev/null +++ b/scripts/libs/libtc.py @@ -0,0 +1,175 @@ +import subprocess as sp +import time +import select +import os +import pwd +import sys +import datetime +import random +import re +from threading import Thread + +tc_logfile = None + +def log_print(info): + print(info) + if tc_logfile != None: + tc_logfile.write(info + "\n") + tc_logfile.flush() + +tc_output_dir="" +tc_cur_test = "" +tc_test_id = 0 + +def init(odir = "./results.d/"): + global tc_output_dir + tc_output_dir = odir + "_" + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + tc_output_dir = os.path.expanduser(tc_output_dir) + os.system("mkdir -p " + tc_output_dir) + global tc_logfile + tc_logfile = open(tc_output_dir + "/log.txt", "w+") + +def begin(name): + global tc_test_id + global tc_cur_test + tc_cur_test = name + tc_test_id += 1 + os.system("mkdir -p " + get_odir()) + log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " started =====") + +def end(): + global tc_cur_test + log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " completed =====") + tc_cur_test = None + +def get_odir(): + return tc_output_dir + "/" + tc_cur_test + +SCHED_QUEUE = 1 +SCHED_CPU = 2 +SCHED_BEST = 4 +SCHED_FEAT_WS = 1 +def make_sched_flag(sched, args, feat = 0, fargs = 0): + return (sched & 0xFF) | (args & 0xFF) << 8 | (feat & 0xFF) << 16 | (fargs & 0xFF) << 24 + +TUNE_RTSHARE = 2 +TUNE_TFREQ = 1 +def make_tune_flag(obj, val): + return (obj & 0xFFFF) | (val & 0xFFFF) << 16 + +def get_username(): + return pwd.getpwuid( os.getuid() )[0] + +ssh_param = "" +def set_ssh_param(para): + global ssh_param + ssh_param = para + +ssh_user = None +def set_ssh_user(user): + global ssh_user + ssh_user = user + +def remote_exec(srv, cmd, blocking=True, check=True): + sub = [] + for s in srv: + p = sp.Popen(["ssh " + ssh_param + " " + ((ssh_user + "@") if ssh_user != None else "") + s + " \"" + cmd +"\""], shell=True, stdout=sp.PIPE, stderr=sp.PIPE) + sub.append(p) + + if blocking: + for p in sub: + p.wait() + if check and p.returncode != 0: + raise Exception("Command failed " + cmd) + + return sub + + +def scan_stderr(p, exclude = None): + for err in p.stderr: + fail = True + err = err.decode() + err = err.strip() + +# print(err) + + if len(err) == 0: + continue + + if exclude != None: + for exc in exclude: + if (exc != None) and (re.match(exc, err) != None): + fail = False + break + + if fail: + log_print("Error detected: " + err) + return False + + return True + +# stderr threads +errthr_objs = [] +errthr_sigstop = False +errthr_failed = False + +def errthr_get_failed(): + return errthr_failed + +def thr_check_stderr(p : sp.Popen, exclude): +# print("thread start!") + global errthr_failed + while(not errthr_sigstop): + if not scan_stderr(p, exclude=exclude): + errthr_failed = True +# print("running!") + time.sleep(0.5 + random.uniform(-0.1, 0.1)) +# print("thread exit!") + +def errthr_start(): + global errthr_sigstop + global errthr_failed + errthr_sigstop = False + errthr_failed = False + for thr in errthr_objs: + thr.start() + +def errthr_create(cp, exclude = None): + global errthr_objs + for p in cp: + errthr_objs.append(Thread(target = thr_check_stderr, args=(p, exclude))) + +def errthr_stop(): + global errthr_objs + global errthr_sigstop + errthr_sigstop = True +# print("waiting!") + for thr in errthr_objs: + thr.join() + errthr_objs.clear() + +def parse_hostfile(fp): + ret = {} + fh = open(fp, "r") + content = fh.readlines() + fh.close() + content = [x.strip() for x in content] + for line in content: + spl = line.split(" ") + if len(spl) >= 2: + ret[spl[0]] = spl[1] + log_print("Parsed: hostname \"" + spl[0] + "\" -> \"" + spl[1] + "\"") + return ret + +def process_hostnames(names, hosts): + ret = [] + for line in names: + if line in hosts: + ret.append(hosts[line]) + else: + ret.append(line) + return ret + +def get_cpuset_core(threads): + ret = "cpuset -l 0-" + str(threads * 2 - 1) + " " + return ret \ No newline at end of file diff --git a/scripts/run.py b/scripts/run.py new file mode 100755 index 0000000..bf32915 --- /dev/null +++ b/scripts/run.py @@ -0,0 +1,228 @@ +import subprocess as sp +import time +import select +import os +import datetime +import pwd +import sys +import getopt +import numpy as np +import re + +import libpar as par +import libtc as tc + +step_inc_pct = 100 +init_step = 20000 # +start_step = 10000 +term_qps = 85000000000 + +term_pct = 1 +inc_pct = 50 +server_port = 23444 + +# paths +test_dir = "/numam.d/build" +file_dir = os.path.dirname(os.path.realpath(__file__)) +root_dir = os.path.join(file_dir,"..") +sample_filename = "sample.txt" + +affinity = [ + "0x4", # core 2 + "0x1000" # core 12 +] + +master = ["skylake3.rcs.uwaterloo.ca"] +master_mac = ["3c:15:fb:c9:f3:4b"] + +server = ["skylake2.rcs.uwaterloo.ca"] +server_mac = ["3c:15:fb:c9:f3:36"] + +clients = [] +client_mac = [] + +rage_quit = 1000 #1s +warmup = 2 +duration = 3 +cooldown = 0 +cacheline = 0 +SSH_PARAM = "-o StrictHostKeyChecking=no -p77" +SSH_USER = "oscar" + +hostfile = None +lockstat = False +client_only = False + +def stop_all(): + # stop clients + tc.log_print("Stopping clients...") + tc.remote_exec(clients, "sudo killall -9 rat", check=False) + + if not client_only: + # stop server + tc.log_print("Stopping server...") + tc.remote_exec(server, "sudo killall -9 khat", check=False) + + # stop master + tc.log_print("Stopping master...") + tc.remote_exec(master, "sudo killall -9 cat", check=False) + +def get_client_str(clt): + ret = " " + for client in clt: + ret += " -a " + client + " " + return ret + +def run_exp(sc, ld): + global stderr_failed + global stderr_scan_stop + + while True: + if client_only: + ssrv = None + else: + # start server + tc.log_print("Starting server...") + server_cmd = "sudo " + test_dir + "/khat -- -A " + sc + tc.log_print(server_cmd) + + ssrv = tc.remote_exec(server, server_cmd, blocking=False) + + # start clients + # tc.log_print("Starting clients...") + # client_cmd = tc.get_cpuset_core(client_threads) + " " + test_dir + "/pingpong/build/dismember -A" + # tc.log_print(client_cmd) + # sclt = tc.remote_exec(ssh_clients, client_cmd, blocking=False) + + time.sleep(1) + # start master + tc.log_print("Starting master...") + master_cmd = "sudo " + test_dir + "/cat -- " + \ + " -s " + server_mac[0] + \ + " -o " + test_dir + "/" + sample_filename + \ + " -t " + str(duration) + \ + " -T " + str(warmup) + \ + " -i fixed:0.01" + \ + " -r " + str(rage_quit) + \ + " -A 0x4" + + tc.log_print(master_cmd) + sp = tc.remote_exec(master, master_cmd, blocking=False) + p = sp[0] + + + # launch stderr monitoring thread + tc.errthr_create(sp, exclude=[".*EAL.*"]) + tc.errthr_create(ssrv, exclude=[".*EAL.*"]) + tc.errthr_start() + success = False + cur = 0 + while True: + # either failed or timeout + # we use failure detection to save time for long durations + if tc.errthr_get_failed() or cur >= int(warmup + duration) + 5 : + break + + if p.poll() != None: + success = True + break + + time.sleep(1) + cur = cur + 1 + + stop_all() + tc.errthr_stop() + print("Cooling down...") + time.sleep(cooldown) + + if success: + return + +def keep_results(): + scpcmd = "scp -P77 oscar@" + master[0] + ":" + test_dir + "/" + sample_filename + " " + tc.get_odir() + "/sample.txt" + tc.log_print(scpcmd) + sp.check_call(scpcmd, shell=True) + + with open(tc.get_odir() + "/sample.txt", 'r') as f: + tc.log_print("Total requests: " + str(len(f.readlines()))) + + return + +def main(): + global hostfile + global server + global master + global clients + global client_only + + tc.set_ssh_param(SSH_PARAM) + tc.set_ssh_user(SSH_USER) + + options = getopt.getopt(sys.argv[1:], 'h:sldcp')[0] + for opt, arg in options: + if opt in ('-h'): + hostfile = arg + elif opt in ('-s'): + stop_all() + return + elif opt in ('-c'): + client_only=True + + tc.init("~/results.d/numam/") + + tc.log_print("Configuration:\n" + \ + "Hostfile: " + ("None" if hostfile == None else hostfile) + "\n" \ + "Client only: " + str(client_only) + "\n") + + if hostfile != None: + hosts = tc.parse_hostfile(hostfile) + server = tc.process_hostnames(server, hosts) + clients = tc.process_hostnames(clients, hosts) + master = tc.process_hostnames(master, hosts) + + stop_all() + + for i in range(0, len(affinity)): + eaff = affinity[i] + # step_mul = 100 + # last_load = 0 + # cur_load = start_step + + tc.begin(eaff) + + tc.log_print("============ Affinity: " + str(eaff) + " Load: MAX" + " ============") + run_exp(eaff, 0) + keep_results() + stop_all() + + # while True: + # tc.log_print("============ Sched: " + str(ename) + " Flag: " + format(esched, '#04x') + " Load: " + str(cur_load) + " ============") + + # output, sout, serr = run_exp(esched, cur_load, lockstat) + + # qps = keep_results(output, sout, serr) + + # pct = int((qps - last_load) / init_step * 100) + # tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%") + + # if cur_load > term_qps: + # tc.log_print("qps more than " + str(term_qps) + "%. Done.") + # break + + # if pct <= term_pct: + # tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.") + # break + + # if pct <= inc_pct: + # step_mul += step_inc_pct + # tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%") + + # last_load = qps + # cur_load += int(init_step * step_mul / 100) + # tc.log_print("") + + tc.end() + + stop_all() + +main() \ No newline at end of file