run script and robust stderr detection

This commit is contained in:
quackerd 2021-02-01 23:48:14 -05:00
parent f2be62a9be
commit ff4946a699
Signed by: d
GPG Key ID: F73412644EDE357A
10 changed files with 945 additions and 95 deletions

271
.gitignore vendored Normal file
View File

@ -0,0 +1,271 @@
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ PYTHON STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C++ STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app

1
.pyenv Normal file
View File

@ -0,0 +1 @@
PYTHONPATH="./scripts/libs"

View File

@ -54,6 +54,8 @@ struct options_t {
struct rte_ether_addr server_mac;
uint64_t cpu_mask{0x2}; // 2nd core
std::vector<struct rte_ether_addr *> slaves;
unsigned long rage_quit_time = (unsigned long)-1;
unsigned long last_sent_ts = 0;
// states
struct rte_mempool * mbuf_pool;
@ -291,6 +293,7 @@ locore_main(void * tif __rte_unused)
read_tx = false;
recv_resp = false;
recv_stat = false;
options.last_sent_ts = get_time_us();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, options.s_txqid, &tx_buf, 1);
@ -301,6 +304,13 @@ locore_main(void * tif __rte_unused)
}
}
if (!recv_stat) {
// if we haven't recevied the stats get ready to rage quit
if(get_time_us() - options.last_sent_ts > options.rage_quit_time * 1000) {
rte_exit(EXIT_FAILURE, "waiting too long for resp. I QUIT!!\n");
}
}
if (!read_tx) {
struct timespec ts;
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
@ -404,10 +414,12 @@ static void dump_options()
" run time = %d\n" \
" warmup time = %d\n" \
" output file = %s\n" \
" server MAC = %x:%x:%x:%x:%x:%x\n",
" rage quit time = %ld\n"\
" host MAC = %x:%x:%x:%x:%x:%x\n",
options.run_time,
options.warmup_time,
options.output,
options.rage_quit_time,
options.server_mac.addr_bytes[0],
options.server_mac.addr_bytes[1],
options.server_mac.addr_bytes[2],
@ -428,7 +440,9 @@ static void usage()
" -h: display the information\n" \
" -o: output filename\n" \
" -A: affinity mask\n" \
" -i: inter-arrival time distribution\n\n");
" -i: inter-arrival time distribution\n" \
" -r: rage quit time (in ms)\n");
fflush(stdout);
}
int main(int argc, char* argv[])
@ -458,7 +472,7 @@ int main(int argc, char* argv[])
{
int c;
// parse arguments
while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:")) != -1) {
while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:r:")) != -1) {
switch (c) {
struct rte_ether_addr * addr;
case 'v':
@ -484,12 +498,12 @@ int main(int argc, char* argv[])
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "success\n");
rte_exit(EXIT_SUCCESS, "\n");
case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1);
break;
case 'A':
options.cpu_mask = atoll(optarg);
options.cpu_mask = strtoull(optarg, nullptr, 16);
break;
case 'i':
strncpy(options.ia_gen_str, optarg, sizeof(options.ia_gen_str) - 1);
@ -501,6 +515,9 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "invalid generator string %s\n", options.ia_gen_str);
}
break;
case 'r':
options.rage_quit_time = atoi(optarg);
break;
default:
usage();
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
@ -554,7 +571,11 @@ int main(int argc, char* argv[])
sleep(1);
uint64_t cmask = options.cpu_mask;
const uint16_t core_id = cmask_get_next_cpu(&cmask);
const int16_t core_id = cmask_get_next_cpu(&cmask);
if (core_id == NEXT_CPU_NULL) {
rte_exit(EXIT_FAILURE, "invalid cpu mask 0x%lx\n", cmask);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread on core %d\n", core_id);
if (rte_eal_remote_launch(locore_main, nullptr, core_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
}

View File

@ -421,8 +421,9 @@ static void usage()
fprintf(stdout,
"Usage:\n" \
" -v(vv): verbose mode\n" \
" -h: display the information\n" \
" -m: cpu mask for worker threads");
" -h: seek help\n" \
" -A: cpu mask for worker threads\n");
fflush(stdout);
}
static void dump_options()
@ -462,16 +463,16 @@ int main(int argc, char* argv[])
{
int c;
// parse arguments
while((c = getopt(argc, argv, "hvm:")) != -1) {
while((c = getopt(argc, argv, "hvA:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, NULL);
rte_exit(EXIT_SUCCESS, "\n");
break;
case 'm':
case 'A':
options.cpuset = strtoull(optarg, nullptr, 16);
options.num_threads = cmask_get_num_cpus(options.cpuset);
if (options.num_threads == 0) {

View File

@ -80,83 +80,10 @@ struct options_t {
static struct options_t options;
static struct thread_info * get_thread_info(int qid)
{
return options.s_thr_info.at(qid);
}
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct pkt_hdr * pkt_data;
struct timespec ts;
int ret;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
struct thread_info * tinfo = get_thread_info(qidx);
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (tinfo->last_datapt != nullptr && tinfo->last_datapt->epoch == epoch) {
if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) {
// has hw rx timestamp
tinfo->last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
tinfo->last_datapt->clt_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, tinfo->last_datapt->clt_hw_rx);
} else {
rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, tinfo->last_datapt->epoch);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
}
return nb_pkts;
}
static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct pkt_hdr * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
continue;
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
struct thread_info * tinfo = get_thread_info(qidx);
uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, tinfo->last_datapt->epoch);
}
tinfo->last_datapt->clt_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
}
}
return nb_pkts;
}
// static struct thread_info * get_thread_info(int qid)
// {
// return options.s_thr_info.at(qid);
// }
static int
locore_main(void * tif)
@ -400,18 +327,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0)
return ret;
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL);
return 0;
}

38
scripts/compile.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/sh
test_dir="/numam.d"
root=".."
servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca"
rsync_flags="-vchr"
ssh_args="-o StrictHostKeyChecking=no -p77"
user=$1
if [ -z $user ]
then
user=$(whoami)
fi
echo "USER: $user"
compile() {
# separate these functions because we might change kernel (reboot) without needing to recompile
echo "====================$1===================="
echo "Syncing directories..."
ssh $(echo $ssh_args $user@$1) "sudo mkdir -p $test_dir"
ssh $(echo $ssh_args $user@$1) "sudo chmod 777 $test_dir"
rsync $(echo $rsync_flags) -e 'ssh -p 77' $root/ $user@$1:$test_dir/
echo "Compiling..."
ssh $(echo $ssh_args $user@$1) "mkdir -p $test_dir/build; cd $test_dir/build; cmake ../; make clean all -j8" &
wait
echo "$1 Done."
echo ""
}
i=0
for server in $servers
do
i=$(expr $i + 1)
compile "$server" &
done
wait

82
scripts/histo.py Normal file
View File

@ -0,0 +1,82 @@
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import numpy as np
import sys
import re
import os
import json
import getopt
import math
import concurrent.futures as CF
num_bins = 1000
extra_pct = []
def saveplot(files, data):
plt.hist(data, num_bins)
plt.xlabel("Latency")
plt.ylabel("Frequency")
plt.title(os.path.basename(files))
f = plt.gcf()
f.set_size_inches(11.69, 8.27)
f.savefig(files + ".png", dpi=160)
plt.clf()
print("Generated - " + files + ".png")
def output_extra_percentile(data):
a = np.array(data)
for pct in extra_pct:
p = np.percentile(a, pct)
print(str(pct) + "th: " + str(p))
def sanitize(data):
ret = []
a = np.array(data)
p99 = np.percentile(a, 99)
for i in data:
if i <= p99:
ret.append(i)
return ret
executor = CF.ProcessPoolExecutor(max_workers=int(os.cpu_count()))
def process_file(each_dir):
try:
print("Processing " + each_dir + " ...")
data = memparse.parse_mut_sample(each_dir)[1]
output_extra_percentile(data)
sdata = sanitize(data)
saveplot(each_dir, sdata)
except:
None
def process_dir(rootdir):
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
if os.path.isfile(each_dir):
if each_dir.endswith("sample.txt") or each_dir.endswith(".sample"):
#executor.submit(process_file, each_dir)
process_file(each_dir)
else:
process_dir(each_dir)
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
datdir = "/home/oscar/projs/kqsched/scripts/pingpong/results.d/sample"
#raise Exception("Must specify -d parameter")
process_dir(datdir)
executor.shutdown()
if __name__ == "__main__":
main()

113
scripts/libs/libpar.py Normal file
View File

@ -0,0 +1,113 @@
import json
import numpy as np
class khat_parser:
class pt:
def __init__(self):
self.s_htx = 0
self.s_hrx = 0
self.s_stx = 0
self.s_srx = 0
self.c_htx = 0
self.c_hrx = 0
self.c_stx = 0
self.c_srx = 0
def __init__(self):
self.datapt = []
def parse(self, output : str):
for line in output.splitlines():
cells = line.split(',')
if len(cells) != 8:
raise Exception("Invalid line:" + line)
pt = self.pt()
pt.c_srx = int(cells[0])
pt.c_stx = int(cells[1])
pt.c_hrx = int(cells[2])
pt.c_htx = int(cells[3])
pt.s_srx = int(cells[4])
pt.s_stx = int(cells[5])
pt.s_hrx = int(cells[6])
pt.s_htx = int(cells[7])
self.datapt.append(pt)
class mutilate_data:
def __init__(self):
self.dat = {}
self.qps = 0
def to_string(self):
ret = "Throughput: " + str(self.qps) + "\n" + json.dumps(self.dat)
return ret
@staticmethod
def parse_mut_output(output):
ret = mutilate_data()
succ_qps = False
succ_read = False
table = [None, "avg", "std", "min", "5th", "10th", "50th", "90th", "95th", "99th"]
table_legacy = [None, "avg", "std", "min", "5th", "10th", "90th", "95th", "99th"]
for line in output.splitlines():
if line.find("Total QPS") != -1:
spl = line.split()
if len(spl) == 7:
ret.qps = float(spl[3])
succ_qps = True
else:
break
elif line.find("read") != -1:
spl = line.split()
if len(spl) == 10:
for i in range(1, len(spl)):
ret.dat[table[i]] = float(spl[i])
succ_read = True
elif len(spl) == 9:
for i in range(1, len(spl)):
ret.dat[table_legacy[i]] = float(spl[i])
succ_read = True
else:
break
if not (succ_qps and succ_read):
raise Exception("Failed to parse data")
return ret
@staticmethod
def parse_mut_sample(fn):
f = open(fn, "r")
qps = []
lat = []
lines = f.readlines()
for line in lines:
entry = line.split()
if len(entry) != 2:
raise Exception("Unrecognized line: " + line)
qps.append(float(entry[0]))
lat.append(float(entry[1]))
f.close()
return qps, lat
# generate mutilate output format
@staticmethod
def build_mut_output(lat_arr, qps_arr):
output = '{0: <10}'.format('#type') + '{0: >10}'.format('avg') + '{0: >10}'.format('std') + \
'{0: >10}'.format('min') + '{0: >10}'.format('5th') + '{0: >10}'.format('10th') + \
'{0: >10}'.format('50th') + '{0: >10}'.format('90th') + '{0: >10}'.format('95th') + '{0: >10}'.format('99th') + "\n"
output += '{0: <10}'.format('read') + '{0: >10}'.format("{:.1f}".format(np.mean(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.std(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.min(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 5))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 10))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 50))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 90))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 95))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 99))) + ' ' + "\n" \
output += "\n" + "Total QPS = " + "{:.1f}".format(np.mean(qps_arr)) + " (0 / 0s)"
return output

175
scripts/libs/libtc.py Normal file
View File

@ -0,0 +1,175 @@
import subprocess as sp
import time
import select
import os
import pwd
import sys
import datetime
import random
import re
from threading import Thread
tc_logfile = None
def log_print(info):
print(info)
if tc_logfile != None:
tc_logfile.write(info + "\n")
tc_logfile.flush()
tc_output_dir=""
tc_cur_test = ""
tc_test_id = 0
def init(odir = "./results.d/"):
global tc_output_dir
tc_output_dir = odir + "_" + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
tc_output_dir = os.path.expanduser(tc_output_dir)
os.system("mkdir -p " + tc_output_dir)
global tc_logfile
tc_logfile = open(tc_output_dir + "/log.txt", "w+")
def begin(name):
global tc_test_id
global tc_cur_test
tc_cur_test = name
tc_test_id += 1
os.system("mkdir -p " + get_odir())
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " started =====")
def end():
global tc_cur_test
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " completed =====")
tc_cur_test = None
def get_odir():
return tc_output_dir + "/" + tc_cur_test
SCHED_QUEUE = 1
SCHED_CPU = 2
SCHED_BEST = 4
SCHED_FEAT_WS = 1
def make_sched_flag(sched, args, feat = 0, fargs = 0):
return (sched & 0xFF) | (args & 0xFF) << 8 | (feat & 0xFF) << 16 | (fargs & 0xFF) << 24
TUNE_RTSHARE = 2
TUNE_TFREQ = 1
def make_tune_flag(obj, val):
return (obj & 0xFFFF) | (val & 0xFFFF) << 16
def get_username():
return pwd.getpwuid( os.getuid() )[0]
ssh_param = ""
def set_ssh_param(para):
global ssh_param
ssh_param = para
ssh_user = None
def set_ssh_user(user):
global ssh_user
ssh_user = user
def remote_exec(srv, cmd, blocking=True, check=True):
sub = []
for s in srv:
p = sp.Popen(["ssh " + ssh_param + " " + ((ssh_user + "@") if ssh_user != None else "") + s + " \"" + cmd +"\""], shell=True, stdout=sp.PIPE, stderr=sp.PIPE)
sub.append(p)
if blocking:
for p in sub:
p.wait()
if check and p.returncode != 0:
raise Exception("Command failed " + cmd)
return sub
def scan_stderr(p, exclude = None):
for err in p.stderr:
fail = True
err = err.decode()
err = err.strip()
# print(err)
if len(err) == 0:
continue
if exclude != None:
for exc in exclude:
if (exc != None) and (re.match(exc, err) != None):
fail = False
break
if fail:
log_print("Error detected: " + err)
return False
return True
# stderr threads
errthr_objs = []
errthr_sigstop = False
errthr_failed = False
def errthr_get_failed():
return errthr_failed
def thr_check_stderr(p : sp.Popen, exclude):
# print("thread start!")
global errthr_failed
while(not errthr_sigstop):
if not scan_stderr(p, exclude=exclude):
errthr_failed = True
# print("running!")
time.sleep(0.5 + random.uniform(-0.1, 0.1))
# print("thread exit!")
def errthr_start():
global errthr_sigstop
global errthr_failed
errthr_sigstop = False
errthr_failed = False
for thr in errthr_objs:
thr.start()
def errthr_create(cp, exclude = None):
global errthr_objs
for p in cp:
errthr_objs.append(Thread(target = thr_check_stderr, args=(p, exclude)))
def errthr_stop():
global errthr_objs
global errthr_sigstop
errthr_sigstop = True
# print("waiting!")
for thr in errthr_objs:
thr.join()
errthr_objs.clear()
def parse_hostfile(fp):
ret = {}
fh = open(fp, "r")
content = fh.readlines()
fh.close()
content = [x.strip() for x in content]
for line in content:
spl = line.split(" ")
if len(spl) >= 2:
ret[spl[0]] = spl[1]
log_print("Parsed: hostname \"" + spl[0] + "\" -> \"" + spl[1] + "\"")
return ret
def process_hostnames(names, hosts):
ret = []
for line in names:
if line in hosts:
ret.append(hosts[line])
else:
ret.append(line)
return ret
def get_cpuset_core(threads):
ret = "cpuset -l 0-" + str(threads * 2 - 1) + " "
return ret

228
scripts/run.py Executable file
View File

@ -0,0 +1,228 @@
import subprocess as sp
import time
import select
import os
import datetime
import pwd
import sys
import getopt
import numpy as np
import re
import libpar as par
import libtc as tc
step_inc_pct = 100
init_step = 20000 #
start_step = 10000
term_qps = 85000000000
term_pct = 1
inc_pct = 50
server_port = 23444
# paths
test_dir = "/numam.d/build"
file_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.join(file_dir,"..")
sample_filename = "sample.txt"
affinity = [
"0x4", # core 2
"0x1000" # core 12
]
master = ["skylake3.rcs.uwaterloo.ca"]
master_mac = ["3c:15:fb:c9:f3:4b"]
server = ["skylake2.rcs.uwaterloo.ca"]
server_mac = ["3c:15:fb:c9:f3:36"]
clients = []
client_mac = []
rage_quit = 1000 #1s
warmup = 2
duration = 3
cooldown = 0
cacheline = 0
SSH_PARAM = "-o StrictHostKeyChecking=no -p77"
SSH_USER = "oscar"
hostfile = None
lockstat = False
client_only = False
def stop_all():
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(clients, "sudo killall -9 rat", check=False)
if not client_only:
# stop server
tc.log_print("Stopping server...")
tc.remote_exec(server, "sudo killall -9 khat", check=False)
# stop master
tc.log_print("Stopping master...")
tc.remote_exec(master, "sudo killall -9 cat", check=False)
def get_client_str(clt):
ret = " "
for client in clt:
ret += " -a " + client + " "
return ret
def run_exp(sc, ld):
global stderr_failed
global stderr_scan_stop
while True:
if client_only:
ssrv = None
else:
# start server
tc.log_print("Starting server...")
server_cmd = "sudo " + test_dir + "/khat -- -A " + sc
tc.log_print(server_cmd)
ssrv = tc.remote_exec(server, server_cmd, blocking=False)
# start clients
# tc.log_print("Starting clients...")
# client_cmd = tc.get_cpuset_core(client_threads) + " " + test_dir + "/pingpong/build/dismember -A"
# tc.log_print(client_cmd)
# sclt = tc.remote_exec(ssh_clients, client_cmd, blocking=False)
time.sleep(1)
# start master
tc.log_print("Starting master...")
master_cmd = "sudo " + test_dir + "/cat -- " + \
" -s " + server_mac[0] + \
" -o " + test_dir + "/" + sample_filename + \
" -t " + str(duration) + \
" -T " + str(warmup) + \
" -i fixed:0.01" + \
" -r " + str(rage_quit) + \
" -A 0x4"
tc.log_print(master_cmd)
sp = tc.remote_exec(master, master_cmd, blocking=False)
p = sp[0]
# launch stderr monitoring thread
tc.errthr_create(sp, exclude=[".*EAL.*"])
tc.errthr_create(ssrv, exclude=[".*EAL.*"])
tc.errthr_start()
success = False
cur = 0
while True:
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed() or cur >= int(warmup + duration) + 5 :
break
if p.poll() != None:
success = True
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
print("Cooling down...")
time.sleep(cooldown)
if success:
return
def keep_results():
scpcmd = "scp -P77 oscar@" + master[0] + ":" + test_dir + "/" + sample_filename + " " + tc.get_odir() + "/sample.txt"
tc.log_print(scpcmd)
sp.check_call(scpcmd, shell=True)
with open(tc.get_odir() + "/sample.txt", 'r') as f:
tc.log_print("Total requests: " + str(len(f.readlines())))
return
def main():
global hostfile
global server
global master
global clients
global client_only
tc.set_ssh_param(SSH_PARAM)
tc.set_ssh_user(SSH_USER)
options = getopt.getopt(sys.argv[1:], 'h:sldcp')[0]
for opt, arg in options:
if opt in ('-h'):
hostfile = arg
elif opt in ('-s'):
stop_all()
return
elif opt in ('-c'):
client_only=True
tc.init("~/results.d/numam/")
tc.log_print("Configuration:\n" + \
"Hostfile: " + ("None" if hostfile == None else hostfile) + "\n" \
"Client only: " + str(client_only) + "\n")
if hostfile != None:
hosts = tc.parse_hostfile(hostfile)
server = tc.process_hostnames(server, hosts)
clients = tc.process_hostnames(clients, hosts)
master = tc.process_hostnames(master, hosts)
stop_all()
for i in range(0, len(affinity)):
eaff = affinity[i]
# step_mul = 100
# last_load = 0
# cur_load = start_step
tc.begin(eaff)
tc.log_print("============ Affinity: " + str(eaff) + " Load: MAX" + " ============")
run_exp(eaff, 0)
keep_results()
stop_all()
# while True:
# tc.log_print("============ Sched: " + str(ename) + " Flag: " + format(esched, '#04x') + " Load: " + str(cur_load) + " ============")
# output, sout, serr = run_exp(esched, cur_load, lockstat)
# qps = keep_results(output, sout, serr)
# pct = int((qps - last_load) / init_step * 100)
# tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%")
# if cur_load > term_qps:
# tc.log_print("qps more than " + str(term_qps) + "%. Done.")
# break
# if pct <= term_pct:
# tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.")
# break
# if pct <= inc_pct:
# step_mul += step_inc_pct
# tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%")
# last_load = qps
# cur_load += int(init_step * step_mul / 100)
# tc.log_print("")
tc.end()
stop_all()
main()