memloadgen rate control

This commit is contained in:
quackerd 2022-11-11 22:11:50 +01:00
parent 075902ba1d
commit 1836bd89df
5 changed files with 430 additions and 95 deletions

View File

@ -298,8 +298,11 @@ Generator *createFacebookIA();
class memload_generator {
public:
struct memload_generator_options {
size_t chunk_size {512 * 1024 * 1024};
size_t transaction_size {4096};
size_t buffer_size {64*1024*1024};
char ia_dist[64]{"fixed"};
int verbose {0};
uint64_t bytes_per_second;
bool shared_buffer {true};
};
@ -310,6 +313,7 @@ class memload_generator {
void *from_buffer;
void *to_buffer;
std::atomic<int> *stop;
Generator * ia_gen;
int tid;

View File

@ -28,15 +28,30 @@ memload_generator::worker_thrd(void *_tinfo)
stdout, "memload_generator <thread %ld>: running...\n", tid);
}
uint64_t next_ts = topo_uptime_ns();
size_t cur_offset = 0;
uint64_t cur_ts = 0;
while(true) {
switch (tinfo->state->load()) {
case STATE_RUN:
memcpy((char *)tinfo->from_buffer, (char *)tinfo->to_buffer, tinfo->opts->chunk_size);
tinfo->num_trans.fetch_add(1);
cur_ts = topo_uptime_ns();
if (cur_ts >= next_ts) {
if (cur_offset + tinfo->opts->transaction_size > tinfo->opts->buffer_size) {
cur_offset = 0;
}
memcpy((char *)tinfo->from_buffer + cur_offset, (char *)tinfo->to_buffer, tinfo->opts->transaction_size);
tinfo->num_trans.fetch_add(1);
next_ts += tinfo->ia_gen->generate() * (double)S2NS;
cur_offset += tinfo->opts->transaction_size;
}
break;
case STATE_END:
goto end;
case STATE_RDY:
next_ts = topo_uptime_ns();
break;
case STATE_INIT:
default:
break;
@ -67,16 +82,23 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
void * local_buffer;
void * target_buffer;
if (opts.shared_buffer) {
local_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->chunk_size);
target_buffer = nms_malloc(target_domain_id, opt->chunk_size);
local_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->buffer_size);
target_buffer = nms_malloc(target_domain_id, opt->buffer_size);
}
double thread_tps = (double)opt->bytes_per_second / (double)num_cores / (double)opt->transaction_size;
int tid = 0;
while (nextcore != -1) {
auto *info = new struct thread_info;
cpuset_t cpuset;
pthread_attr_t attr;
info->ia_gen = createGenerator(opt->ia_dist);
if (info->ia_gen == nullptr) {
return;
}
info->ia_gen->set_lambda(thread_tps);
info->state = &this->state;
info->init_num = &this->init_num;
info->num_trans.store(0);
@ -85,8 +107,8 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
info->from_buffer = local_buffer;
info->to_buffer = target_buffer;
} else {
info->from_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->chunk_size);
info->to_buffer = nms_malloc(target_domain_id, opt->chunk_size);
info->from_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->buffer_size);
info->to_buffer = nms_malloc(target_domain_id, opt->buffer_size);
}
CPU_ZERO(&cpuset);

View File

@ -1,9 +1,10 @@
from http import server
from http import client, server
import subprocess as sp
import time
import os
import datetime
import sys
import json
import getopt
import libpar as par
@ -16,34 +17,30 @@ MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
#SERVER = ["skylake2.rcs.uwaterloo.ca"]
#CLIENTS = ["skylake1.rcs.uwaterloo.ca", "skylake3.rcs.uwaterloo.ca", "skylake6.rcs.uwaterloo.ca", "skylake7.rcs.uwaterloo.ca"]
SERVER_PORT_START = 8050
SERVER = ["icelake2-int.rcs.uwaterloo.ca"]
CLIENTS = ["icelake1-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca"]
SERVER_DAT = ["192.168.100.102"]
CLIENTS_DAT = ["192.168.100.101", "192.168.100.103", "192.168.100.104"]
MEMLOAD_BLKSZ = 1024*1024*512
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=8
# paths
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
class Conf:
def __init__(self, affinity, sendfile, tls, ktls, memloadgen, filepath):
def __init__(self, server, clients, server_dat, clients_dat, clients_affinity, affinity, sendfile, tls, ktls, memloadgen, filepath):
self.affinity = affinity
self.ktls = ktls
self.sendfile = sendfile
self.tls = tls
self.memloadgen = memloadgen
self.filepath = filepath
self.server = server
self.clients = clients
self.server_dat = server_dat
self.clients_dat = clients_dat
self.clients_affinity = clients_affinity
def to_string(self):
return f"affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_memloadgen.{self.memloadgen != None}_filepath.{self.filepath.replace('/','-')}"
return f"server.{self.server[0]}_affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_memloadgen.{self.memloadgen != None}_filepath.{self.filepath.replace('/','-')}"
class ArgTypes:
def __init__(self):
@ -60,15 +57,22 @@ class ArgTypes:
def get_fields(self) -> list[list[any]]:
return self.all
all_args : list[ArgTypes] = []
#SERVER_AFFINITY = ["25,27,29,31,33,35,37,39,41,43,45,47"]
#SERVER_AFFINITY = ["1,3,5,7,9,11,13,15,17,19,21,23"]
#SERVER_AFFINITY = ["1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47"]
CLIENTS_AFFINITY = ["1,3,5,7", "1,3,5,7", "1,3,5,7"]
arg_types : ArgTypes = ArgTypes()
#server
arg_types.add_arg([[["icelake2-int.rcs.uwaterloo.ca"]]])
#clients
arg_types.add_arg([[["icelake1-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]]])
#server_dat
arg_types.add_arg([[["192.168.100.102"]]])
#clients_dat
arg_types.add_arg([[["192.168.100.101", "192.168.100.104", "192.168.100.103"]]])
#clients_affinity
arg_types.add_arg([[["1,3,5,7", "1,3,5,7", "1,3,5,7"]]])
# affinity
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"]])
#arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31"]])
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"], ["49,51,53,55,57,59,61,63,65,67,69,71"]])
#arg_types.add_arg([["49,51,53,55,57,59,61,63,65,67,69,71"]])
# sendfile/tls/ktls
arg_types.add_arg([
@ -82,28 +86,105 @@ arg_types.add_arg([
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95_0", "25,27,29,31,33,35,37,39,41,43,45,47_1"]], [None]] )
#arg_types.add_arg([[None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/nvdimm/large_file_#p"]])
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
#arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/nvdimm/large_file_#p"]])
#arg_types.add_arg([["/tmpfs/large_file_#p"]])
all_args.append(arg_types)
arg_types = ArgTypes()
#server
arg_types.add_arg([[["icelake1-int.rcs.uwaterloo.ca"]]])
#clients
arg_types.add_arg([[["milan1-int.rcs.uwaterloo.ca", "icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca"]]])
#server_dat
arg_types.add_arg([[["192.168.100.101"]]])
#clients_dat
arg_types.add_arg([[["192.168.100.103", "192.168.100.102", "192.168.100.104"]]])
#clients_affinity
arg_types.add_arg([[["1,3,5,7", "1,3,5,7", "1,3,5,7"]]])
# affinity
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"]])
# sendfile/tls/ktls
arg_types.add_arg([
[False, False, False],
[True, False, False],
[False, True, False],
[False, True, True],
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95_0", "25,27,29,31,33,35,37,39,41,43,45,47_1"]], [None]])
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
all_args.append(arg_types)
arg_types = ArgTypes()
#server
arg_types.add_arg([[["milan1-int.rcs.uwaterloo.ca"]]])
#clients
arg_types.add_arg([[["icelake1-int.rcs.uwaterloo.ca", "icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca"]]])
#server_dat
arg_types.add_arg([[["192.168.100.103"]]])
#clients_dat
arg_types.add_arg([[["192.168.100.101", "192.168.100.102", "192.168.100.104"]]])
#clients_affinity
arg_types.add_arg([[["1,3,5,7", "1,3,5,7", "1,3,5,7"]]])
# affinity
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"]])
# sendfile/tls/ktls
arg_types.add_arg([
[False, False, False],
[True, False, False],
[False, True, False],
[False, True, True],
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119_0", "33,35,37,39,41,43,45,47,49,51,53,55_1"]], [None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
all_args.append(arg_types)
arg_types = ArgTypes()
#server
arg_types.add_arg([[["milan2-int.rcs.uwaterloo.ca"]]])
#clients
arg_types.add_arg([[["icelake1-int.rcs.uwaterloo.ca", "icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]]])
#server_dat
arg_types.add_arg([[["192.168.100.104"]]])
#clients_dat
arg_types.add_arg([[["192.168.100.101", "192.168.100.102", "192.168.100.103"]]])
#clients_affinity
arg_types.add_arg([[["1,3,5,7", "1,3,5,7", "1,3,5,7"]]])
# affinity
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"], ["65,67,69,71,73,75,77,79,81,83,85,87"]])
# sendfile/tls/ktls
arg_types.add_arg([
[False, False, False],
[True, False, False],
[False, True, False],
[False, True, True],
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119_0", "33,35,37,39,41,43,45,47,49,51,53,55_1"]], [None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
all_args.append(arg_types)
def parse_comma_list(input : str):
return input.split(",")
def setup_all():
setup_cmd : str = f''' sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo ./configure;
sudo make -j8 '''
def run_setup_cmd(conf : Conf, cmd : str):
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Setting up {SERVER[0]}...")
ssrv.append((SERVER[0], tc.remote_exec(SERVER, setup_cmd, blocking=False, check=False)[0]))
for s in CLIENTS:
tc.log_print(f"Setting up {s}...")
ssrv.append((s, tc.remote_exec([s], setup_cmd, blocking=False, check=False)[0]))
tc.log_print(f"Running command on {conf.server[0]}...")
ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0]))
for s in conf.clients:
tc.log_print(f"Running command on {s}...")
ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
@ -112,32 +193,75 @@ def setup_all():
else:
print(f"\n{ p[0] } succeeded\n")
def setup_all(conf : Conf):
setup_cmd : str = f'''
sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo ./configure;
sudo make -j8;
sudo rm -rf /libtopo;
sudo mkdir -p /libtopo;
sudo git clone https://git.quacker.org/d/libtopo /libtopo;
cd /libtopo;
sudo mkdir build;
cd build;
sudo cmake ../;
sudo make install;
sudo rm -rf /numam;
sudo mkdir -p /numam;
sudo chmod 777 /numam;
'''
run_setup_cmd(conf, setup_cmd)
def stop_all():
#rsync
all_clts = []
all_clts.extend(conf.clients)
all_clts.extend(conf.server)
dir = f"{os.path.dirname(__file__)}/../"
for clt in all_clts:
print("Syncing files to " + clt + "...")
rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/"
sp.check_call(rsync_cmd, shell=True)
run_setup_cmd(conf, f'''
cd /numam;
sudo rm -rf build;
sudo mkdir -p build;
cd build;
sudo cmake ../;
sudo make memloadgen;
''')
def stop_all(conf : Conf, clients_only = False):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(CLIENTS, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
tc.log_print("Stopping server...")
tc.remote_exec(SERVER, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
if not clients_only:
tc.log_print("Stopping server...")
tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir():
def prepare_logdir(conf : Conf):
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(SERVER, prep_cmd, check=False)
tc.remote_exec(conf.server, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(SERVER, prep_cmd, check=True)
tc.remote_exec(conf.server, prep_cmd, check=True)
def run_exp(conf : Conf):
stop_all()
stop_all(conf)
while True:
prepare_logdir()
prepare_logdir(conf)
ssrvs=[]
ssrv_names=[]
@ -150,13 +274,14 @@ def run_exp(conf : Conf):
mlg_cmd = "sudo "
mlg_cpu = emem.split("_")[0]
mlg_dom = emem.split("_")[1]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -i -1 -s {mlg_cpu} -d {mlg_dom}"
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)}"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(SERVER, mlg_cmd, blocking=False)[0])
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(10)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
@ -171,7 +296,7 @@ def run_exp(conf : Conf):
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(SERVER, server_cmd, blocking=False)[0]
ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
@ -183,10 +308,10 @@ def run_exp(conf : Conf):
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(CLIENTS)):
client_aff = CLIENTS_AFFINITY[i]
for i in range(len(conf.clients)):
client_aff = conf.clients_affinity[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + SERVER_DAT[0] + \
client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
@ -204,9 +329,9 @@ def run_exp(conf : Conf):
if conf.sendfile:
client_cmd += " -Z"
tc.log_print(CLIENTS[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([CLIENTS[i]], client_cmd, blocking=False)[0])
sclt_names.append(CLIENTS[i] + "@" + eaff)
tc.log_print(conf.clients[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0])
sclt_names.append(conf.clients[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
@ -227,6 +352,9 @@ def run_exp(conf : Conf):
if tc.errthr_get_failed():
break
if cur >= 90:
tc.log_print("Experiment timed out. Restarting...")
break
# while selec.poll(1):
# print(p.stdout.readline())
@ -235,26 +363,26 @@ def run_exp(conf : Conf):
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
stop_all(conf, clients_only=True)
tc.log_print("Cooling down...")
time.sleep(5)
stop_all(conf)
if success:
flush_netresult()
break
if flush_netresult(conf):
break
def flush_netresult():
def flush_netresult(conf : Conf) -> bool:
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + SERVER[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
@ -262,29 +390,46 @@ def flush_netresult():
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
tc.log_print("Processing " + str(len(logs)) + " logs ...")
for log in logs:
if os.path.isfile(log_output + "/" + log):
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
with open(log_output + "/" + log, "r") as f:
logs_bytes.append(f.read())
parser = par.iperf_json_parser(logs_bytes)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s")
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log)
try:
parser = par.iperf_json_parser(logs_bytes)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s")
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
return False
return True
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
confs : list[Conf] = []
for argtype in all_args:
args = argtype.get_fields()
for arg in args:
confs.append(Conf(*arg))
options = getopt.getopt(sys.argv[1:], 'sS')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all()
stop_all(confs[0])
return
elif opt in ('-S'):
setup_all()
setup_all(confs[0])
return
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
@ -292,19 +437,13 @@ def main():
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
args = arg_types.get_fields()
confs : list[Conf] = []
for arg in args:
confs.append(Conf(*arg))
print(f"{len(confs)} configs to run:")
for conf in confs:
print(conf.to_string())
for conf in confs:
tc.begin(conf.to_string())
for i in range(0, RUNS):
tc.begin(conf.to_string()+f"_run.{i}")
run_exp(conf)
tc.end()
stop_all()
main()

142
scripts/iperf_gentable.py Normal file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3.6
from logging import root
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import ticker
import numpy as np
import sys
import re
import os
import json
import libpar as par
import getopt
import math
import concurrent.futures as CF
def enum_files(rootdir : str, ret : list[str]):
if os.path.isfile(rootdir):
ret.append(rootdir)
return
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
enum_files(each_dir, ret)
def process_dir(rootdir: str, dat : dict[any]):
ret = []
if (("memloadgen" in rootdir) and ("sendfile" in rootdir)):
#print("Processing " + rootdir + "...")
segments = os.path.basename(rootdir).split("_")
server = segments[0]
affinity = segments[1].split(".")[1]
sendfile = segments[2].split(".")[1]
tls = segments[3].split(".")[1]
ktls = segments[4].split(".")[1]
memloadgen = segments[5].split(".")[1]
fs = segments[6].split(".")[1]
if int(affinity.split(",")[0]) <= 4:
if "2-int" in server:
affinity = "socket01"
else:
affinity = "socket00"
else:
if "2-int" in server:
affinity = "socket11"
else:
affinity = "socket10"
if "tmpfs" in fs:
fs = "tmpfs"
elif "nvdimm" in fs:
fs = "nvdimm"
elif "mnt" in fs:
fs = "ssd"
if sendfile == "True" and tls == "True" and ktls == "True":
mode = "ktls + sendfile"
elif sendfile == "True" and tls == "False" and ktls == "False":
mode = "sendfile"
elif sendfile == "False" and tls == "False" and ktls == "False":
mode = "write"
elif sendfile == "False" and tls == "True" and ktls == "False":
mode = "tls"
elif sendfile == "False" and tls == "True" and ktls == "True":
mode = "ktls"
logs = []
enum_files(rootdir, logs)
logs_bytes = []
for log in logs:
if log.endswith(".txt"):
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
print("Warning: log file empty for " + log)
try:
parser = par.iperf_json_parser(logs_bytes)
except Exception as e:
print("Warning: failed to parse " + log + ". err: " + str(e))
return
if (affinity,mode) not in dat[(memloadgen, fs)]:
dat[(memloadgen, fs)][(affinity,mode)] = []
dat[(memloadgen, fs)][(affinity,mode)].append(parser.aggregate_egress_bps)
return
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
if not os.path.isfile(each_dir):
process_dir(each_dir, dat)
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
raise Exception("Must specify -d parameter")
dat = dict()
for b in ["True", "False"]:
for c in ["nvdimm", "tmpfs", "ssd"]:
dat[(b, c)] = dict()
process_dir(datdir, dat)
for b in ["True", "False"]:
for c in ["nvdimm", "tmpfs", "ssd"]:
print("memloadgen: " + b + ",storage: " + c)
data = dat[(b, c)]
print("affinity,write,sendfile,tls,ktls,ktls + sendfile")
if data == None:
print("N/A,N/A,N/A,N/A,N/A,N/A")
for affinity in ["socket00", "socket01", "socket10", "socket11"]:
line = f"{affinity},"
for mode in ["write", "sendfile", "tls", "ktls", "ktls + sendfile"]:
if (affinity, mode) not in data:
line += "N/A,"
else:
vals = data[(affinity, mode)]
for i in range(0, len(vals)):
vals[i] = vals[i] / 1024.0 / 1024.0 / 1024.0 / 8.0
avg = np.average(vals)
stddev = np.std(vals)
line += "{:.2f} ({:.2f})".format(avg, stddev) + ","
print(line)
print("")
if __name__ == "__main__":
main()

View File

@ -1,3 +1,4 @@
#include <sys/endian.h>
#include "gen.hh"
#include <cstdlib>
#include <cstring>
@ -15,11 +16,14 @@ usage()
fprintf(stdout,
"Usage:\n"
" -v: verbose mode\n"
" -b: memory block size\n"
" -b: buffer size\n"
" -q: bytes per second\n"
" -d: destination domain index\n"
" -s: worker threads cpu list\n"
" -S: enable shared memory block\n"
" -S: enable shared buffer\n"
" -t: time to run\n"
" -T: transaction size\n"
" -i: inter arrival time distribution\n"
" -o: output file path\n");
fflush(stdout);
}
@ -31,11 +35,14 @@ int main(int argc, char * argv[])
ntr_init();
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
size_t arr_sz = 1024 * 1024;
size_t arr_sz = 32 * 1024 * 1024;
uint32_t time = -1;
uint64_t bps = 0;
uint64_t transaction_size = 4096;
cpuset_t threads;
CPU_ZERO(&threads);
CPU_SET(0, &threads);
char ia_dist[32] = "fixed";
int shared_buffer = 0;
cpuset_t domain_mask;
@ -44,7 +51,7 @@ int main(int argc, char * argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "vhb:d:s:So:t:")) != -1) {
while ((c = getopt(argc, argv, "vhb:d:s:So:T:t:q:i:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
@ -68,7 +75,16 @@ int main(int argc, char * argv[])
strncpy(output_file, optarg, 256);
break;
case 't':
time = (uint32_t)strtol(optarg, nullptr, 10);
time = strtoul(optarg, nullptr, 10);
break;
case 'T':
transaction_size = strtoul(optarg, nullptr, 10);
break;
case 'q':
bps = (uint64_t)strtoull(optarg, nullptr, 10);
break;
case 'i':
strncpy(ia_dist, optarg, sizeof(ia_dist));
break;
default:
usage();
@ -77,7 +93,16 @@ int main(int argc, char * argv[])
}
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "MLG: [size: %ld, # threads: 0x%d, domain: 0x%ld]\n", arr_sz, CPU_COUNT(&threads), CPU_FFS(&domain_mask) - 1);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configruation:\n"
" buffer size: %ld\n"
" num threads: %d\n"
" target domain: %ld\n"
" bytes per second: %lu\n"
" interarrival distribution: %s\n"
" shared buffer: %d\n"
" transaction time: %lu\n"
" runtime: %d\n\n",
arr_sz, CPU_COUNT(&threads), CPU_FFS(&domain_mask) - 1, bps, ia_dist, shared_buffer, transaction_size,time);
// init topo
if (topo_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) {
@ -93,9 +118,12 @@ int main(int argc, char * argv[])
bool success = false;
memload_generator::memload_generator_options opts;
opts.chunk_size = arr_sz;
opts.buffer_size = arr_sz;
opts.bytes_per_second = bps;
opts.shared_buffer = shared_buffer;
opts.transaction_size = transaction_size;
opts.verbose = ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT;
strncpy(opts.ia_dist, ia_dist, sizeof(opts.ia_dist));
std::ofstream ofile;
ofile.open(output_file, std::ios::out | std::ios::trunc);
@ -113,7 +141,7 @@ int main(int argc, char * argv[])
usleep(S2US);
uint64_t cur_ts = topo_uptime_ns();
uint64_t trans = mgen->get_transactions();
uint64_t bps = (uint64_t)((double)((trans - prev_trans) * arr_sz) / ((double)(cur_ts - prev_ts) / (double)S2NS));
uint64_t bps = (uint64_t)((double)((trans - prev_trans) * transaction_size) / ((double)(cur_ts - prev_ts) / (double)S2NS));
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
ofile << bps << std::endl;