cleanup and stuff

This commit is contained in:
quackerd 2023-01-04 17:25:32 +01:00
parent f20ae16e31
commit a9cac61069
17 changed files with 75 additions and 2545 deletions

View File

@ -10,7 +10,7 @@ struct device_conf {
int portid;
uint16_t tx_ring_sz;
uint16_t rx_ring_sz;
int num_threads;
cpuset_t core_affinity;
int mtu;
uint64_t rx_offloads;
uint64_t tx_offloads;

View File

@ -46,7 +46,7 @@ const static struct rte_ether_addr POU_MAC {
0x01, 0x00, 0x5e, 0x00, 0x01, 0x81
};
const static uint32_t POU_IP = RTE_IPV4(224, 0, 1, 129);
const static uint16_t POU_PORT = 319;
const static uint16_t POU_PORT = 320;
/* Khat Protocol:
* khat only processes two kinds of packets - LOAD and PROBE
* rat:

View File

@ -1,4 +1,3 @@
#include <sys/_timespec.h>
#include <atomic>
#include <cstdlib>
#include <ctime>
@ -608,6 +607,8 @@ pkt_loop()
(void *)tx_buf, epoch);
}
rte_pktmbuf_free(tx_buf);
read_tx = false;
recv_resp = false;
recv_stat = false;
@ -867,8 +868,15 @@ main(int argc, char *argv[])
"main: timesync disabled. hw timestamp unavailable.\n ");
}
if (CPU_COUNT(&options.cpu_set) > 1) {
int ffs = CPU_FFS(&options.cpu_set);
CPU_ZERO(&options.cpu_set);
CPU_SET(ffs - 1, &options.cpu_set);
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "cat only supports one thread, using only core %d.\n", ffs - 1);
}
dconf.mtu = MAX_STANDARD_MTU;
dconf.num_threads = 1;
CPU_COPY(&options.cpu_set, &dconf.core_affinity);
dconf.portid = options.portid;
dconf.rss_hf = pconf.rss_hf;
dconf.rx_offloads = pconf.rxoffload;

View File

@ -415,56 +415,55 @@ locore_main(void *ti)
assert(options.is_probing.load());
struct timespec ts { };
struct pkt_payload_stat *stat;
int status = 0;
if (options.s_hwtimestamp) {
if (rte_eth_timesync_read_tx_timestamp(
options.portid, &ts) == 0) {
if ((status = rte_eth_timesync_read_tx_timestamp(
options.portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: obtained hw tx timestamp %lu.\n",
tinfo->tid,
(ts.tv_sec * S2NS + ts.tv_nsec));
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: failed to obtain hw tx timestamp.\n",
tinfo->tid);
pending_probe = false;
goto end_stat;
"locore_main <thread %d>: failed to obtain hw tx timestamp: %d.\n",
tinfo->tid, status);
}
}
// now we have everything we need
if (status == 0) {
// now we have everything we need
if (alloc_pkt_hdr(mempool_get(tinfo->node_id),
PKT_TYPE_STAT, &options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
if (alloc_pkt_hdr(mempool_get(tinfo->node_id),
PKT_TYPE_STAT, &options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
}
// populate stats
stat = (struct pkt_payload_stat *)tx_data->payload;
stat->epoch = rte_cpu_to_be_32(
options.s_probe_info.epoch);
if (options.s_hwtimestamp) {
stat->hw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(
ts.tv_nsec + ts.tv_sec * S2NS);
} else {
stat->hw_rx = 0;
stat->hw_tx = 0;
}
stat->sw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_tx);
// send the packet
tx_burst_all(options.portid, tinfo->txqid, &pkt_buf, 1);
// release flux
pending_probe = false;
options.is_probing.store(false);
}
// populate stats
stat = (struct pkt_payload_stat *)tx_data->payload;
stat->epoch = rte_cpu_to_be_32(
options.s_probe_info.epoch);
if (options.s_hwtimestamp) {
stat->hw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(
ts.tv_nsec + ts.tv_sec * S2NS);
} else {
stat->hw_rx = 0;
stat->hw_tx = 0;
}
stat->sw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_tx);
// send the packet
tx_burst_all(options.portid, tinfo->txqid, &pkt_buf, 1);
end_stat:
// release flux
pending_probe = false;
options.is_probing.store(false);
}
}
}
@ -607,7 +606,7 @@ main(int argc, char *argv[])
options.s_hwtimestamp = false;
}
dconf.mtu = options.port_mtu;
dconf.num_threads = options.num_threads;
CPU_COPY(&options.cpu_set, &dconf.core_affinity);
dconf.portid = options.portid;
dconf.rss_hf = pconf.rss_hf;
dconf.rx_offloads = pconf.rxoffload;

View File

@ -66,6 +66,7 @@ port_init(struct device_conf *dconf)
};
int ret;
int num_threads = CPU_COUNT(&dconf->core_affinity);
if (rte_eth_dev_count_avail() == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
@ -93,8 +94,7 @@ port_init(struct device_conf *dconf)
port_conf.txmode.offloads = dconf->tx_offloads;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
dconf->portid, dconf->num_threads, dconf->num_threads, &port_conf);
ret = rte_eth_dev_configure(dconf->portid, num_threads, num_threads, &port_conf);
if (ret != 0)
rte_exit(EXIT_FAILURE, "failed to configure port: %d\n", ret);
@ -105,24 +105,26 @@ port_init(struct device_conf *dconf)
/* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads;
for (int i = 0; i < dconf->num_threads; i++) {
ret = rte_eth_rx_queue_setup(dconf->portid, i, dconf->rx_ring_sz,
rte_eth_dev_socket_id(dconf->portid), &rxconf,
mempool_get(rte_eth_dev_socket_id(dconf->portid)));
if (ret < 0)
rte_exit(EXIT_FAILURE, "failed to setup rx queue %d: %d\n", i, ret);
}
/* Allocate and set up 1 TX queue per thread per Ethernet port. */
rxconf.rx_nseg = 0;
rxconf.rx_seg = nullptr;
rxconf.rx_nmempool = 0;
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
for (int i = 0; i < dconf->num_threads; i++) {
ret = rte_eth_tx_queue_setup(
dconf->portid, i, dconf->tx_ring_sz, rte_eth_dev_socket_id(dconf->portid),
&txconf);
int core;
int qid = 0;
CPU_FOREACH_ISSET(core, &dconf->core_affinity) {
int socket = rte_lcore_to_socket_id(core);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "port_init: setting up rx & tx queue for core %d (socket %d)...\n", core, socket);
ret = rte_eth_rx_queue_setup(dconf->portid, qid, dconf->rx_ring_sz, socket, &rxconf, mempool_get(socket));
if (ret < 0)
rte_exit(EXIT_FAILURE, "failed to setup tx queue %d: %d", i, ret);
rte_exit(EXIT_FAILURE, "failed to setup rx queue for core %d: %d\n", core, ret);
ret = rte_eth_tx_queue_setup(dconf->portid, qid, dconf->tx_ring_sz, socket, &txconf);
if (ret < 0)
rte_exit(EXIT_FAILURE, "failed to setup tx queue for core %d: %d", core, ret);
qid++;
}
// set mtu
@ -145,19 +147,15 @@ port_init(struct device_conf *dconf)
if (ret != 0)
rte_exit(EXIT_FAILURE, "failed to enable promiscuous mode: %d\n", ret);
for (int i = 0; i < dconf->num_threads; i++) {
for (int i = 0; i < num_threads; i++) {
if (dconf->tx_fn != nullptr) {
if (rte_eth_add_tx_callback(dconf->portid,
i, dconf->tx_fn,
dconf->tx_user) == nullptr) {
if (rte_eth_add_tx_callback(dconf->portid, i, dconf->tx_fn, dconf->tx_user) == nullptr) {
rte_exit(EXIT_FAILURE, "failed to attach callback to tx queue %d\n", i);
}
}
if (dconf->rx_fn != nullptr) {
if (rte_eth_add_rx_callback(dconf->portid,
i, dconf->rx_fn,
dconf->rx_user) == nullptr) {
if (rte_eth_add_rx_callback(dconf->portid, i, dconf->rx_fn, dconf->rx_user) == nullptr) {
rte_exit(EXIT_FAILURE, "failed to attach callback to rx queue %d\n", i);
}
}

View File

@ -22,7 +22,7 @@ static struct port_conf port_confs[] = {
.rxoffload = RTE_ETH_RX_OFFLOAD_RSS_HASH | RTE_ETH_RX_OFFLOAD_UDP_CKSUM | RTE_ETH_RX_OFFLOAD_IPV4_CKSUM | RTE_ETH_RX_OFFLOAD_TIMESTAMP,
.txoffload = RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE | RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_IPV4_CKSUM,
.rss_hf = RTE_ETH_RSS_FRAG_IPV4 | RTE_ETH_RSS_NONFRAG_IPV4_UDP | RTE_ETH_RSS_NONFRAG_IPV4_OTHER | RTE_ETH_RSS_L2_PAYLOAD,
.timesync = true
.timesync = false
},
{
.driver_name = "net_ixgbe",

View File

@ -782,7 +782,7 @@ main(int argc, char *argv[])
"main: timesync disabled. hw timestamp unavailable.\n ");
}
dconf.mtu = options.port_mtu;
dconf.num_threads = options.s_num_threads;
CPU_COPY(&options.cpu_set, &dconf.core_affinity);
dconf.portid = options.portid;
dconf.rss_hf = pconf.rss_hf;
dconf.rx_offloads = pconf.rxoffload;

View File

@ -1,609 +0,0 @@
import subprocess as sp
import time
import os
import datetime
import sys
import getopt
import numpy as np
import libpar as par
import libtc as tc
import iperfconf as ic
# definitions
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
SERVER_PORT_START = 8050
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=3
SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16])
SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24])
MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64])
ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48])
# Shared Arguments
NUM_CORES = 12
NUM_CLIENTS = 3
# KTLS
KTLS_ARGS = [(False, False, False),
(True, False, False),
(False, True, False),
(False, True, True),
(True, True, True)]
# MEMLOADGEN
MEMLOADGEN_ARGS_ICELAKE = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_ICELAKE.append([f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}",
f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"])
MEMLOADGEN_ARGS_ICELAKE.append(None)
MEMLOADGEN_ARGS_MILAN = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_MILAN.append([f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}",
f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"])
MEMLOADGEN_ARGS_MILAN.append(None)
MEMLOADGEN_ARGS_SKYLAKE = []
for i in range(0, 100, 25):
MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}",
f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"])
MEMLOADGEN_ARGS_SKYLAKE.append(None)
MEMLOADGEN_ARGS_SANDYBRIDGE = []
for i in range(0, 100, 25):
MEMLOADGEN_ARGS_SANDYBRIDGE.append([f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 6, stride = 2)}.1.{i}",
f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 6, stride = 2)}.0.{i}"])
MEMLOADGEN_ARGS_SANDYBRIDGE.append(None)
# client thread affinity
CLIENT_ASSIGNMENTS = ic.distribute_threads(NUM_CORES, 1, NUM_CLIENTS, 4)
CLIENT_AFFINITY = []
for i in range(NUM_CLIENTS):
CLIENT_AFFINITY.append(ic.list_to_comma_delimited(CLIENT_ASSIGNMENTS[i]))
# server thread affinity
ICELAKE_SERVER_AFFINITY_SOCKET0 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4)
ICELAKE_SERVER_AFFINITY_SOCKET1 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4)
MILAN_SERVER_AFFINITY_SOCKET0 = MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4)
MILAN_SERVER_AFFINITY_SOCKET1= MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4)
SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2)
SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 2)
SANDYBRIDGE_SERVER_AFFINITY_SOCKET0= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2)
SANDYBRIDGE_SERVER_AFFINITY_SOCKET1= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 4)
# file system
#FILE_PATHS = ["/tank/large_file_#p"]
FILE_PATHS = ["/tmpfs1/small_file_1_#p"]
# mmap
USE_MMAP = [False, True]
all_args : list[ic.ArgTypes] = []
# args start
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.100.101"])
#clients_dat
arg_types.add_arg(["192.168.100.102", "192.168.100.103"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(ICELAKE_SERVER_AFFINITY_SOCKET1, ICELAKE_SERVER_AFFINITY_SOCKET0)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_ICELAKE)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
# args start
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.101.101"])
#clients_dat
arg_types.add_arg(["192.168.101.102", "192.168.101.103"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(ICELAKE_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_ICELAKE)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["milan1-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.100.103"])
#clients_dat
arg_types.add_arg(["192.168.100.102", "192.168.100.104", "192.168.100.101"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET0, MILAN_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["milan1-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.101.103"])
#clients_dat
arg_types.add_arg(["192.168.101.102", "192.168.101.104", "192.168.101.101"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.100.104"])
#clients_dat
arg_types.add_arg(["192.168.100.102", "192.168.100.103", "192.168.100.101"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET0, MILAN_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.101.104"])
#clients_dat
arg_types.add_arg(["192.168.101.102", "192.168.101.103", "192.168.101.101"])
#clients_affinity
arg_types.add_arg(CLIENT_AFFINITY)
# affinity
arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
#arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN_MLX)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.101.92"])
#clients_dat
arg_types.add_arg(["192.168.101.102", "192.168.101.103", "192.168.101.101"])
#clients_affinity
arg_types.add_arg(["1,3", "1,3", "1,3"])
# affinitybb
arg_types.add_arg("25,27,29,31,33,35", "1,3,5,7,9,11")
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE)
# filepath
arg_types.add_arg("/tmpfs/small_file_#p")
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.100.92"])
#clients_dat
arg_types.add_arg(["192.168.100.102", "192.168.100.103", "192.168.100.101"])
#clients_affinity
arg_types.add_arg(["1,3", "1,3", "1,3"])
# affinitybb
arg_types.add_arg("1,3,5,7,9,11")
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE)
# filepath
arg_types.add_arg("/tmpfs/small_file_#p")
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
def parse_comma_list(input : str):
return input.split(",")
def run_setup_cmd(conf : ic.Conf, cmd : str):
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Running command on {conf.server[0]}...")
ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0]))
for s in conf.clients:
tc.log_print(f"Running command on {s}...")
ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
if p[1].returncode != 0:
print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n")
else:
print(f"\n{ p[0] } succeeded\n")
def setup_all(conf : ic.Conf):
setup_cmd : str = f'''
sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo git checkout dev;
sudo ./configure;
sudo make -j8;
sudo rm -rf /libtopo;
sudo mkdir -p /libtopo;
sudo git clone https://git.quacker.org/d/libtopo /libtopo;
cd /libtopo;
sudo mkdir build;
cd build;
sudo cmake ../;
sudo make install;
sudo rm -rf /numam;
sudo mkdir -p /numam;
sudo chmod 777 /numam;
'''
run_setup_cmd(conf, setup_cmd)
#rsync
all_clts = []
all_clts.extend(conf.clients)
all_clts.extend(conf.server)
dir = f"{os.path.dirname(__file__)}/../"
for clt in all_clts:
print("Syncing files to " + clt + "...")
rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/"
sp.check_call(rsync_cmd, shell=True)
run_setup_cmd(conf, f'''
cd /numam;
sudo rm -rf build;
sudo mkdir -p build;
cd build;
sudo cmake ../;
sudo make memloadgen;
''')
def stop_all(conf : ic.Conf, clients_only = False):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
if not clients_only:
tc.log_print("Stopping server...")
tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir(conf : ic.Conf):
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=True)
def run_exp(conf : ic.Conf):
stop_all(conf)
while True:
prepare_logdir(conf)
ssrvs=[]
ssrv_names=[]
cur_srv_proc = 0
smlg = []
smlg_names = []
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split(".")[0]
mlg_dom = emem.split(".")[1]
mlg_pct = emem.split(".")[2]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(5)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
" -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \
" -A " + eaff
if conf.tls:
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
if conf.odirect:
server_cmd += " --use-odirect"
if conf.mmap:
server_cmd += " --use-mmap"
server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
# start clients
tc.log_print("Starting clients...")
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(conf.clients)):
client_aff = conf.clients_affinity[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 25" + \
" -P 8" + \
" -R" + \
" -N" + \
" -4" + \
" -O 10"
if conf.tls:
client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
client_cmd += " -J --logfile /dev/null"
tc.log_print(conf.clients[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0])
sclt_names.append(conf.clients[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
exclude = ["Pseudo-terminal"]
tc.errthr_create(sclts, sclt_names, exclude)
tc.errthr_create(ssrvs, ssrv_names, exclude)
if (conf.memloadgen != None):
tc.errthr_create(smlg, smlg_names, exclude)
tc.errthr_start()
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
success = False
while not success:
success = False
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed():
break
if cur >= 90:
tc.log_print("Experiment timed out. Restarting...")
break
# while selec.poll(1):
# print(p.stdout.readline())
success = True
for p in sclts:
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
tc.errthr_stop()
stop_all(conf, clients_only=True)
tc.log_print("Cooling down...")
time.sleep(3)
stop_all(conf)
if success:
if flush_netresult(conf):
break
def flush_netresult(conf : ic.Conf) -> bool:
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
# parse results
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
memload_bytes = []
for log in logs:
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
with open(log_output + "/" + log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log):
with open(log_output + "/" + log, "r") as f:
memloadbuf = f.read()
if len(memloadbuf) == 0:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
else:
memload_bytes.append(memloadbuf)
try:
parser = par.iperf_json_parser(logs_bytes)
bps = []
for ml in memload_bytes:
memparser = par.memloadgen_parser(ml, 22, 32)
bps.append(memparser.bps)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \
" | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" )
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
return False
return True
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
confs : list[ic.Conf] = []
for argtype in all_args:
args = argtype.get_fields()
for arg in args:
confs.append(ic.Conf(*arg))
name = None
options = getopt.getopt(sys.argv[1:], 'sSn:')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all(confs[0])
return
elif opt in ('-S'):
setup_all(confs[0])
return
elif opt in ('-n'):
name = arg
if name != None:
output_dirname = name
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
server_set = []
for conf in confs:
tc.log_print(conf.to_string(verbose=True))
if conf.server[0] not in server_set:
server_set.append(conf.server[0])
tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:")
for server in server_set:
print(f"Gathering information on {server} (sysctl)...")
p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (ifconfig)...")
p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (mount)...")
p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.mount", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (dmesg)...")
p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f:
f.write(p.communicate()[0].decode())
for conf in confs:
for i in range(0, RUNS):
tc.begin(conf.to_string()+f"_run.{i}")
run_exp(conf)
tc.end()
main()

View File

@ -1,163 +0,0 @@
#!/usr/bin/env python3.6
from logging import root
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import ticker
import numpy as np
import sys
import re
import os
import json
import libpar as par
import getopt
import math
import concurrent.futures as CF
def enum_files(rootdir : str, ret : list[str]):
if os.path.isfile(rootdir):
ret.append(rootdir)
return
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
enum_files(each_dir, ret)
def process_dir(rootdir: str, dat : dict[any]):
ret = []
if (("memloadgen" in rootdir) and ("sendfile" in rootdir)):
#print("Processing " + rootdir + "...")
segments = os.path.basename(rootdir).split("_")
server = segments[0]
affinity = segments[1].split(".")[1]
sendfile = segments[2].split(".")[1]
tls = segments[3].split(".")[1]
ktls = segments[4].split(".")[1]
memloadgen = segments[5].split(".")[1]
fs = segments[6].split(".")[1]
if int(affinity.split(",")[0]) <= 4:
if "2-int" in server:
affinity = "socket01"
else:
affinity = "socket00"
else:
if "2-int" in server:
affinity = "socket11"
else:
affinity = "socket10"
if "tmpfs" in fs:
fs = "tmpfs"
elif "nvdimm" in fs:
fs = "nvdimm"
elif "mnt" in fs:
fs = "ssd"
if sendfile == "True" and tls == "True" and ktls == "True":
mode = "ktls + sendfile"
elif sendfile == "True" and tls == "False" and ktls == "False":
mode = "sendfile"
elif sendfile == "False" and tls == "False" and ktls == "False":
mode = "write"
elif sendfile == "False" and tls == "True" and ktls == "False":
mode = "tls"
elif sendfile == "False" and tls == "True" and ktls == "True":
mode = "ktls"
if "icelake" in server:
server = "icelake"
elif "milan" in server:
server = "milan"
memloadgen_bps = []
logs = []
enum_files(rootdir, logs)
logs_bytes = []
for log in logs:
if log.endswith(".txt"):
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
print("Warning: log file empty for " + log)
elif "memloadgen" in log:
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
parser = par.memloadgen_parser(buf, 20, 30)
memloadgen_bps.append(parser.bps)
else:
print("Warning: memloadgen file empty for " + log)
try:
parser = par.iperf_json_parser(logs_bytes)
except Exception as e:
print("Warning: failed to parse " + log + ". err: " + str(e))
return
if (affinity,mode) not in dat[server][(memloadgen, fs)]:
dat[server][(memloadgen, fs)][(affinity,mode)] = []
dat[server][(memloadgen, fs)][(affinity,mode)].append((parser.aggregate_egress_bps, np.sum(memloadgen_bps)))
return
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
if not os.path.isfile(each_dir):
process_dir(each_dir, dat)
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
raise Exception("Must specify -d parameter")
dat = dict()
for a in ["icelake", "milan"]:
dat[a] = dict()
for c in ["nvdimm", "tmpfs", "ssd"]:
for b in ["0", "50", "False"]:
dat[a][(b, c)] = dict()
process_dir(datdir, dat)
for a in ["icelake", "milan"]:
data2 = dat[a]
print(a)
for c in ["tmpfs", "ssd"]:
for b in ["0", "50", "False"]:
print("memloadgen: " + b + ",storage: " + c)
data = data2[(b, c)]
print("affinity,write,sendfile,tls,ktls,ktls + sendfile")
if data == None:
print("N/A,N/A,N/A,N/A,N/A,N/A")
for affinity in ["socket00", "socket01", "socket10", "socket11"]:
line = f"{affinity},"
for mode in ["write", "sendfile", "tls", "ktls", "ktls + sendfile"]:
if (affinity, mode) not in data:
line += "N/A,"
else:
vals = data[(affinity, mode)]
real_vals = []
real_mlg = []
for i in range(0, len(vals)):
real_vals.append(vals[i][0] / 1024.0 / 1024.0 / 1024.0 / 8.0)
real_mlg.append(vals[i][1] / 1024.0 / 1024.0 / 1024.0)
line += "{:.2f} ({:.2f}) [{:.2f} ({:.2f})]".format(np.average(real_vals), np.std(real_vals),
np.average(real_mlg), np.std(real_mlg)) + ","
print(line)
print("")
print("")
if __name__ == "__main__":
main()

View File

@ -1,117 +0,0 @@
#!/usr/bin/env python3.6
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import ticker
import numpy as np
import os
import sys
import libpar as par
import getopt
import iperfconf as ic
#
# memloadgen vs max throughput
#
marker_map = ["o", "P", "s", "v", "*", "+", "^", "1", "2", "d", "X", "o", "P", "s", "v", "*", "+", "^", "1", "2", "d", "X"]
color_map = ["xkcd:black", "xkcd:red", "xkcd:blue", "xkcd:green", "xkcd:cyan", "xkcd:purple", "xkcd:orange", "xkcd:salmon", "xkcd:lightgreen", "xkcd:indigo", "xkcd:brown", "xkcd:bubblegum", "xkcd:lavender", "xkcd:maroon", "xkcd:fern", "xkcd:sky", "xkcd:orchid", "xkcd:sienna"]
parser_idx_labels = ["srv_hw", "srv_sw", "clt_hw", "clt_sw"]
def generate_graph(server_name : str, affinity : str, fn : str, dat : dict[ic.Conf, ic.ConfData]):
marker_idx = 0
color_idx = 0
ax = plt.gca()
ax.set_yscale("linear")
ax.set_title(server_name)
ax.set_xlabel("Memload Percent (%)")
ax.set_ylabel("Throughput (GB/s)")
ax.set_ylim(0, 13)
ax.xaxis.get_major_formatter().set_scientific(False)
ax.yaxis.set_minor_formatter(ticker.ScalarFormatter())
graph_data : dict(ic.Conf, ic.ConfData) = dict()
print(f"Generating graph => {server_name}_{affinity} ...")
for conf in dat:
if (conf.server == server_name) and (conf.affinity == affinity):
if conf not in graph_data:
graph_data[conf] = dat[conf]
else:
raise Exception("duplicate conf found!")
labels = []
xs = []
ys = []
errs = []
skip_list = []
# one curve per conf
each_conf : ic.Conf
for each_conf in graph_data:
if each_conf in skip_list:
continue
label = each_conf.to_string_graph()
bps = []
pct = []
err = []
other_conf : ic.Conf
for other_conf in graph_data:
if other_conf.equal_except_memload(each_conf):
bps.append(np.average(graph_data[other_conf].get_bps()) / 8 / 1024 / 1024 / 1024)
err.append(np.std(graph_data[other_conf].get_bps()) / 8 / 1024 / 1024 / 1024)
pct.append(int(other_conf.memloadgen))
skip_list.append(other_conf)
arg = np.argsort(pct)
labels.append(label)
xs.append(np.array(pct)[arg])
ys.append(np.array(bps)[arg])
errs.append(np.array(err)[arg])
arg = np.argsort(labels)
lsorted = np.array(labels)[arg]
xsorted = np.array(xs)[arg]
ysorted = np.array(ys)[arg]
esorted = np.array(errs)[arg]
for i in range(len(lsorted)):
marker_type = marker_map[marker_idx]
color_type = color_map[color_idx]
marker_idx += 1
color_idx += 1
ax.errorbar(x = xsorted[i], y = ysorted[i], yerr = esorted[i], xerr = None, label=lsorted[i], marker=marker_type, color=color_type, markersize=8)
plt.gcf().set_size_inches(23.4, 16.5)
ax.legend()
plt.savefig(fn, dpi=150)
plt.close()
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
raise Exception("Must specify -d parameter")
dat = dict()
ic.process_directory(datdir, dat)
tuples = set()
conf : ic.Conf
for conf in dat:
tuples.add((conf.server, conf.affinity))
for tup in tuples:
generate_graph(tup[0], tup[1], f"{datdir}/{tup[0]}_{tup[1]}.png", dat)
if __name__ == "__main__":
main()

View File

@ -1,397 +0,0 @@
import subprocess as sp
import time
import os
import datetime
import sys
import getopt
import numpy as np
import libpar as par
import libtc as tc
import iperfconf as ic
# definitions
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
SERVER_PORT_START = 8050
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=3
POWER9_CPULIST : ic.CPUList = ic.CPUList([88, 88])
# Shared Arguments
NUM_CORES = 12
NUM_CLIENTS = 3
# KTLS
KTLS_ARGS = [(False, False, False),
(True, False, False),
(False, True, False),
(False, True, True),
(True, True, True)]
MEMLOADGEN_ARGS_POWER9 = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_POWER9.append([f"{POWER9_CPULIST.get_cpulist_by_offset(domain = 0, offset = 48, num = 10, stride = 4)}.1.{i}",
f"{POWER9_CPULIST.get_cpulist_by_offset(domain = 1, offset = 48, num = 10, stride = 4)}.0.{i}"])
MEMLOADGEN_ARGS_POWER9.append(None)
MEMLOADGEN_ARGS_POWER9 = [None]
POWER9_SERVER_AFFINITY_SOCKET0 = POWER9_CPULIST.get_cpulist_by_offset(domain = 0, offset = 2, num = 12, stride = 4)
POWER9_SERVER_AFFINITY_SOCKET1= POWER9_CPULIST.get_cpulist_by_offset(domain = 1, offset = 2, num = 12, stride = 4)
# file system
#FILE_PATHS = ["/tank/large_file_#p"]
#"/tmpfs1/small_files_#p",
FILE_PATHS = ["/tmpfs0/small_files_#p", "/tmpfs1/small_files_#p"]
# mmap
USE_MMAP = [False, True]
all_args : list[ic.ArgTypes] = []
# args start
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["power9-int.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.80.80"])
#clients_dat
arg_types.add_arg(["192.168.80.101", "192.168.80.103"])
#clients_affinity
arg_types.add_arg(["49,53,57,61,65,69", "65,69,73,77,81,85"])
# affinity
arg_types.add_arg(POWER9_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_POWER9)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
all_args.append(arg_types)
def parse_comma_list(input : str):
return input.split(",")
def run_setup_cmd(conf : ic.Conf, cmd : str):
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Running command on {conf.server[0]}...")
ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0]))
for s in conf.clients:
tc.log_print(f"Running command on {s}...")
ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
if p[1].returncode != 0:
print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n")
else:
print(f"\n{ p[0] } succeeded\n")
def setup_all(conf : ic.Conf):
setup_cmd : str = f'''
sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo git checkout dev;
sudo ./configure;
sudo make -j8;
sudo rm -rf /libtopo;
sudo mkdir -p /libtopo;
sudo git clone https://git.quacker.org/d/libtopo /libtopo;
cd /libtopo;
sudo mkdir build;
cd build;
sudo cmake ../;
sudo make install;
sudo rm -rf /numam;
sudo mkdir -p /numam;
sudo chmod 777 /numam;
'''
run_setup_cmd(conf, setup_cmd)
#rsync
all_clts = []
all_clts.extend(conf.clients)
all_clts.extend(conf.server)
dir = f"{os.path.dirname(__file__)}/../"
for clt in all_clts:
print("Syncing files to " + clt + "...")
rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/"
sp.check_call(rsync_cmd, shell=True)
run_setup_cmd(conf, f'''
cd /numam;
sudo rm -rf build;
sudo mkdir -p build;
cd build;
sudo cmake ../;
sudo make memloadgen;
''')
def stop_all(conf : ic.Conf, clients_only = False):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
if not clients_only:
tc.log_print("Stopping server...")
tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir(conf : ic.Conf):
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=True)
def run_exp(conf : ic.Conf):
stop_all(conf)
while True:
prepare_logdir(conf)
ssrvs=[]
ssrv_names=[]
cur_srv_proc = 0
smlg = []
smlg_names = []
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split(".")[0]
mlg_dom = emem.split(".")[1]
mlg_pct = emem.split(".")[2]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(5)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
" -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \
" -A " + eaff
if conf.tls:
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
if conf.odirect:
server_cmd += " --use-odirect"
if conf.mmap:
server_cmd += " --use-mmap"
server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
# start clients
tc.log_print("Starting clients...")
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(conf.clients)):
client_aff = conf.clients_affinity[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 25" + \
" -P 16" + \
" -R" + \
" -N" + \
" -4" + \
" -O 10"
if conf.tls:
client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
client_cmd += " -J --logfile /dev/null"
tc.log_print(conf.clients[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0])
sclt_names.append(conf.clients[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
exclude = ["Pseudo-terminal"]
tc.errthr_create(sclts, sclt_names, exclude)
tc.errthr_create(ssrvs, ssrv_names, exclude)
if (conf.memloadgen != None):
tc.errthr_create(smlg, smlg_names, exclude)
tc.errthr_start()
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
success = False
while not success:
success = False
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed():
break
if cur >= 90:
tc.log_print("Experiment timed out. Restarting...")
break
# while selec.poll(1):
# print(p.stdout.readline())
success = True
for p in sclts:
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
tc.errthr_stop()
stop_all(conf, clients_only=True)
tc.log_print("Cooling down...")
time.sleep(3)
stop_all(conf)
if success:
if flush_netresult(conf):
break
def flush_netresult(conf : ic.Conf) -> bool:
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
# parse results
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
memload_bytes = []
for log in logs:
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
with open(log_output + "/" + log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log):
with open(log_output + "/" + log, "r") as f:
memloadbuf = f.read()
if len(memloadbuf) == 0:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
else:
memload_bytes.append(memloadbuf)
try:
parser = par.iperf_json_parser(logs_bytes)
bps = []
for ml in memload_bytes:
memparser = par.memloadgen_parser(ml, 22, 32)
bps.append(memparser.bps)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \
" | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" )
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
return False
return True
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
confs : list[ic.Conf] = []
for argtype in all_args:
args = argtype.get_fields()
for arg in args:
confs.append(ic.Conf(*arg))
name = None
options = getopt.getopt(sys.argv[1:], 'sSn:')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all(confs[0])
return
elif opt in ('-S'):
setup_all(confs[0])
return
elif opt in ('-n'):
name = arg
if name != None:
output_dirname = name
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
server_set = []
for conf in confs:
tc.log_print(conf.to_string(verbose=True))
if conf.server[0] not in server_set:
server_set.append(conf.server[0])
tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:")
for server in server_set:
print(f"Gathering information on {server} (sysctl)...")
p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (ifconfig)...")
p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (mount)...")
p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.mount", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (dmesg)...")
p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f:
f.write(p.communicate()[0].decode())
for conf in confs:
for i in range(0, RUNS):
tc.begin(conf.to_string()+f"_run.{i}")
run_exp(conf)
tc.end()
main()

View File

@ -1,454 +0,0 @@
import subprocess as sp
import time
import os
import datetime
import sys
import getopt
import numpy as np
import libpar as par
import libtc as tc
import iperfconf as ic
# definitions
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
SERVER_PORT_START = 8050
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=3
SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16])
SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24])
MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64])
ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48])
# Shared Arguments
NUM_CORES = 12
NUM_CLIENTS = 3
# KTLS
KTLS_ARGS = [#(False, False, False),
#(True, False, False),
#(False, True, False),
(False, True, True),
(True, True, True)]
# MEMLOADGEN
MEMLOADGEN_ARGS_ICELAKE = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_ICELAKE.append([f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}",
f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"])
MEMLOADGEN_ARGS_ICELAKE.append(None)
MEMLOADGEN_ARGS_MILAN = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_MILAN.append([f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}",
f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"])
MEMLOADGEN_ARGS_MILAN.append(None)
MEMLOADGEN_ARGS_SKYLAKE = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}",
f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"])
MEMLOADGEN_ARGS_SKYLAKE.append(None)
MEMLOADGEN_ARGS_SANDYBRIDGE = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_SANDYBRIDGE.append([f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 4, stride = 1)}.1.{i}",
f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 4, stride = 1)}.0.{i}"])
MEMLOADGEN_ARGS_SANDYBRIDGE.append(None)
# client thread affinity
CLIENT_ASSIGNMENTS = ic.distribute_threads(NUM_CORES, 1, NUM_CLIENTS, 4)
CLIENT_AFFINITY = []
for i in range(NUM_CLIENTS):
CLIENT_AFFINITY.append(ic.list_to_comma_delimited(CLIENT_ASSIGNMENTS[i]))
# server thread affinity
ICELAKE_SERVER_AFFINITY_SOCKET0 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4)
ICELAKE_SERVER_AFFINITY_SOCKET1 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4)
MILAN_SERVER_AFFINITY_SOCKET0 = MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4)
MILAN_SERVER_AFFINITY_SOCKET1= MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4)
SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2)
SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 2)
SANDYBRIDGE_SERVER_AFFINITY_SOCKET0= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 0, num = 8, stride = 2)
SANDYBRIDGE_SERVER_AFFINITY_SOCKET1= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 0, num = 8, stride = 2)
# file system
#FILE_PATHS = ["/tank/large_file_#p"]
#"/tmpfs0/small_files_#p",
FILE_PATHS = ["/tmpfs0/small_files_#p", "/tmpfs1/small_files_#p"]
# mmap
USE_MMAP = [False, True]
all_args : list[ic.ArgTypes] = []
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["sandybridge4.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.70.74"])
#clients_dat
arg_types.add_arg(["192.168.70.103", "192.168.70.104"])
#clients_affinity
arg_types.add_arg(["1,3,5", "1,3,5"])
# affinity
#arg_types.add_arg("17,19,21,23,25,27")
arg_types.add_arg("1,3,5,7,9,11")
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_SANDYBRIDGE)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
all_args.append(arg_types)
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["sandybridge4.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.70.74"])
#clients_dat
arg_types.add_arg(["192.168.70.103", "192.168.70.104"])
#clients_affinity
arg_types.add_arg(["1,3,5", "1,3,5"])
# affinity
arg_types.add_arg("17,19,21,23,25,27")
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_SANDYBRIDGE)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
#all_args.append(arg_types)
def parse_comma_list(input : str):
return input.split(",")
def run_setup_cmd(conf : ic.Conf, cmd : str):
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Running command on {conf.server[0]}...")
ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0]))
for s in conf.clients:
tc.log_print(f"Running command on {s}...")
ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
if p[1].returncode != 0:
print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n")
else:
print(f"\n{ p[0] } succeeded\n")
def setup_all(conf : ic.Conf):
setup_cmd : str = f'''
sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo git checkout dev;
sudo ./configure;
sudo make -j8;
sudo rm -rf /libtopo;
sudo mkdir -p /libtopo;
sudo git clone https://git.quacker.org/d/libtopo /libtopo;
cd /libtopo;
sudo mkdir build;
cd build;
sudo cmake ../;
sudo make install;
sudo rm -rf /numam;
sudo mkdir -p /numam;
sudo chmod 777 /numam;
'''
run_setup_cmd(conf, setup_cmd)
#rsync
all_clts = []
all_clts.extend(conf.clients)
all_clts.extend(conf.server)
dir = f"{os.path.dirname(__file__)}/../"
for clt in all_clts:
print("Syncing files to " + clt + "...")
rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/"
sp.check_call(rsync_cmd, shell=True)
run_setup_cmd(conf, f'''
cd /numam;
sudo rm -rf build;
sudo mkdir -p build;
cd build;
sudo cmake ../;
sudo make memloadgen;
''')
def stop_all(conf : ic.Conf, clients_only = False):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
if not clients_only:
tc.log_print("Stopping server...")
tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir(conf : ic.Conf):
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=True)
def run_exp(conf : ic.Conf):
stop_all(conf)
while True:
prepare_logdir(conf)
ssrvs=[]
ssrv_names=[]
cur_srv_proc = 0
smlg = []
smlg_names = []
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split(".")[0]
mlg_dom = emem.split(".")[1]
mlg_pct = emem.split(".")[2]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(5)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
" -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \
" -A " + eaff
if conf.tls:
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
if conf.odirect:
server_cmd += " --use-odirect"
if conf.mmap:
server_cmd += " --use-mmap"
server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
# start clients
tc.log_print("Starting clients...")
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(conf.clients)):
client_aff = conf.clients_affinity[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 25" + \
" -P 16" + \
" -R" + \
" -N" + \
" -4" + \
" -O 10"
if conf.tls:
client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
client_cmd += " -J --logfile /dev/null"
tc.log_print(conf.clients[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0])
sclt_names.append(conf.clients[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
exclude = ["Pseudo-terminal"]
tc.errthr_create(sclts, sclt_names, exclude)
tc.errthr_create(ssrvs, ssrv_names, exclude)
if (conf.memloadgen != None):
tc.errthr_create(smlg, smlg_names, exclude)
tc.errthr_start()
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
success = False
while not success:
success = False
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed():
break
if cur >= 90:
tc.log_print("Experiment timed out. Restarting...")
break
# while selec.poll(1):
# print(p.stdout.readline())
success = True
for p in sclts:
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
tc.errthr_stop()
stop_all(conf, clients_only=True)
tc.log_print("Cooling down...")
time.sleep(3)
stop_all(conf)
if success:
if flush_netresult(conf):
break
def flush_netresult(conf : ic.Conf) -> bool:
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
# parse results
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
memload_bytes = []
for log in logs:
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
with open(log_output + "/" + log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log):
with open(log_output + "/" + log, "r") as f:
memloadbuf = f.read()
if len(memloadbuf) == 0:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
else:
memload_bytes.append(memloadbuf)
try:
parser = par.iperf_json_parser(logs_bytes)
bps = []
for ml in memload_bytes:
memparser = par.memloadgen_parser(ml, 22, 32)
bps.append(memparser.bps)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \
" | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" )
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
return False
return True
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
confs : list[ic.Conf] = []
for argtype in all_args:
args = argtype.get_fields()
for arg in args:
confs.append(ic.Conf(*arg))
name = None
options = getopt.getopt(sys.argv[1:], 'sSn:')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all(confs[0])
return
elif opt in ('-S'):
setup_all(confs[0])
return
elif opt in ('-n'):
name = arg
if name != None:
output_dirname = name
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
server_set = []
for conf in confs:
tc.log_print(conf.to_string(verbose=True))
if conf.server[0] not in server_set:
server_set.append(conf.server[0])
tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:")
for server in server_set:
print(f"Gathering information on {server} (sysctl)...")
p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (ifconfig)...")
p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (mount)...")
p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.mount", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (dmesg)...")
p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f:
f.write(p.communicate()[0].decode())
for conf in confs:
for i in range(0, RUNS):
tc.begin(conf.to_string()+f"_run.{i}")
run_exp(conf)
tc.end()
main()

View File

@ -1,399 +0,0 @@
import subprocess as sp
import time
import os
import datetime
import sys
import getopt
import numpy as np
import libpar as par
import libtc as tc
import iperfconf as ic
# definitions
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
SERVER_PORT_START = 8050
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=3
SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16])
SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24])
MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64])
ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48])
# Shared Arguments
NUM_CORES = 12
NUM_CLIENTS = 3
# KTLS
KTLS_ARGS = [#(False, False, False),
#(True, False, False),
#(False, True, False),
(False, True, True),
(True, True, True)]
MEMLOADGEN_ARGS_SKYLAKE = []
for i in range(0, 100, 10):
MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}",
f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"])
MEMLOADGEN_ARGS_SKYLAKE.append(None)
SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 6, stride = 2)
SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 6, stride = 2)
# file system
#FILE_PATHS = ["/tank/large_file_#p"]
#"/tmpfs1/small_files_#p",
FILE_PATHS = ["/tmpfs1/small_files_#p", "/tmpfs0/small_files_#p"]
# mmap
USE_MMAP = [False, True]
all_args : list[ic.ArgTypes] = []
# args start
arg_types : ic.ArgTypes = ic.ArgTypes()
#server
arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"])
#clients
arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"])
#server_dat
arg_types.add_arg(["192.168.90.93"])
#clients_dat
arg_types.add_arg(["192.168.90.102", "192.168.90.101"])
#clients_affinity
arg_types.add_arg(["1,3,5", "1,3,5"])
# affinity
arg_types.add_arg(SKYLAKE_SERVER_AFFINITY_SOCKET1)
# sendfile/tls/ktls
arg_types.add_args(*KTLS_ARGS)
# memloadgen
arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE)
# filepath
arg_types.add_arg(*FILE_PATHS)
# ODIRECT
arg_types.add_arg(False)
# mmap
arg_types.add_arg(*USE_MMAP)
all_args.append(arg_types)
def parse_comma_list(input : str):
return input.split(",")
def run_setup_cmd(conf : ic.Conf, cmd : str):
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Running command on {conf.server[0]}...")
ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0]))
for s in conf.clients:
tc.log_print(f"Running command on {s}...")
ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
if p[1].returncode != 0:
print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n")
else:
print(f"\n{ p[0] } succeeded\n")
def setup_all(conf : ic.Conf):
setup_cmd : str = f'''
sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo git checkout dev;
sudo ./configure;
sudo make -j8;
sudo rm -rf /libtopo;
sudo mkdir -p /libtopo;
sudo git clone https://git.quacker.org/d/libtopo /libtopo;
cd /libtopo;
sudo mkdir build;
cd build;
sudo cmake ../;
sudo make install;
sudo rm -rf /numam;
sudo mkdir -p /numam;
sudo chmod 777 /numam;
'''
run_setup_cmd(conf, setup_cmd)
#rsync
all_clts = []
all_clts.extend(conf.clients)
all_clts.extend(conf.server)
dir = f"{os.path.dirname(__file__)}/../"
for clt in all_clts:
print("Syncing files to " + clt + "...")
rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/"
sp.check_call(rsync_cmd, shell=True)
run_setup_cmd(conf, f'''
cd /numam;
sudo rm -rf build;
sudo mkdir -p build;
cd build;
sudo cmake ../;
sudo make memloadgen;
''')
def stop_all(conf : ic.Conf, clients_only = False):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
if not clients_only:
tc.log_print("Stopping server...")
tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir(conf : ic.Conf):
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(conf.server, prep_cmd, check=True)
def run_exp(conf : ic.Conf):
stop_all(conf)
while True:
prepare_logdir(conf)
ssrvs=[]
ssrv_names=[]
cur_srv_proc = 0
smlg = []
smlg_names = []
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split(".")[0]
mlg_dom = emem.split(".")[1]
mlg_pct = emem.split(".")[2]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(5)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
" -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \
" -A " + eaff
if conf.tls:
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
if conf.odirect:
server_cmd += " --use-odirect"
if conf.mmap:
server_cmd += " --use-mmap"
server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
# start clients
tc.log_print("Starting clients...")
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(conf.clients)):
client_aff = conf.clients_affinity[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 25" + \
" -P 16" + \
" -R" + \
" -N" + \
" -4" + \
" -O 10"
if conf.tls:
client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
client_cmd += " -J --logfile /dev/null"
tc.log_print(conf.clients[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0])
sclt_names.append(conf.clients[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
exclude = ["Pseudo-terminal"]
tc.errthr_create(sclts, sclt_names, exclude)
tc.errthr_create(ssrvs, ssrv_names, exclude)
if (conf.memloadgen != None):
tc.errthr_create(smlg, smlg_names, exclude)
tc.errthr_start()
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
success = False
while not success:
success = False
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed():
break
if cur >= 90:
tc.log_print("Experiment timed out. Restarting...")
break
# while selec.poll(1):
# print(p.stdout.readline())
success = True
for p in sclts:
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
tc.errthr_stop()
stop_all(conf, clients_only=True)
tc.log_print("Cooling down...")
time.sleep(3)
stop_all(conf)
if success:
if flush_netresult(conf):
break
def flush_netresult(conf : ic.Conf) -> bool:
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
# parse results
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
memload_bytes = []
for log in logs:
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
with open(log_output + "/" + log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log):
with open(log_output + "/" + log, "r") as f:
memloadbuf = f.read()
if len(memloadbuf) == 0:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
else:
memload_bytes.append(memloadbuf)
try:
parser = par.iperf_json_parser(logs_bytes)
bps = []
for ml in memload_bytes:
memparser = par.memloadgen_parser(ml, 22, 32)
bps.append(memparser.bps)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \
" | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" )
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
return False
return True
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
confs : list[ic.Conf] = []
for argtype in all_args:
args = argtype.get_fields()
for arg in args:
confs.append(ic.Conf(*arg))
name = None
options = getopt.getopt(sys.argv[1:], 'sSn:')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all(confs[0])
return
elif opt in ('-S'):
setup_all(confs[0])
return
elif opt in ('-n'):
name = arg
if name != None:
output_dirname = name
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
server_set = []
for conf in confs:
tc.log_print(conf.to_string(verbose=True))
if conf.server[0] not in server_set:
server_set.append(conf.server[0])
tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:")
for server in server_set:
print(f"Gathering information on {server} (sysctl)...")
p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (ifconfig)...")
p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (mount)...")
p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.mount", "w") as f:
f.write(p.communicate()[0].decode())
print(f"Gathering information on {server} (dmesg)...")
p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0]
with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f:
f.write(p.communicate()[0].decode())
for conf in confs:
for i in range(0, RUNS):
tc.begin(conf.to_string()+f"_run.{i}")
run_exp(conf)
tc.end()
main()

View File

@ -1,309 +0,0 @@
import numpy as np
import os
import libpar as par
class IperfConfObj:
KTLS_MODE_SW = "sw"
KTLS_MODE_IFNET = "IFNET"
KTLS_MODE_NONE = "none"
KTLS_MODE_TOE = "toe"
STORAGE_NVDIMM = "nvdimm"
STORAGE_TMPFS = "tmpfs"
STORAGE_NVME = "nvme"
STORAGE_RAID_0 = "0"
STORAGE_RAID_NONE = "none"
SERVER_ICELAKE = "icelake"
SERVER_MILAN = "milan"
SERVER_SKYLAKE = "skylake"
NIC_INTEL_100 = "ice100"
NIC_CHELSIO_100 = "cc100"
NIC_MELLANOX_100 = "mlx100"
def __init__(self):
self.conf_odirect = False
self.conf_memloadgen_pct = 0
self.conf_memloadgen_num = 1
self.conf_memloadgen_threads = []
self.conf_ktls = False
self.conf_tls = False
self.conf_sendfile = False
self.conf_storage = self.STORAGE_TMPFS
self.conf_storage_domain = 0
self.conf_nic_domain = 0
self.conf_nic_name = self.NIC_INTEL_100
self.conf_ktls_mode = self.KTLS_MODE_NONE
self.conf_storage_raid = self.STORAGE_RAID_NONE
self.conf_storage_num = 1
self.conf_mmap = False
self.conf_server = self.SERVER_ICELAKE
self.num_clients = 1
self.server_threads = 1
self.server_threads_base = 0
self.server_threads_stride = 1
self.stat_memloadgen = 0
self.stat_throughput = 0
self.log_driver = ""
self.log_memloadgen = {}
self.log_iperf = {}
class Conf:
def __init__(self, server, clients, server_dat, clients_dat, clients_affinity, affinity, sendfile, tls, ktls, memloadgen, filepath, odirect, mmap):
self.affinity = affinity
self.ktls = ktls
self.sendfile = sendfile
self.tls = tls
self.odirect = odirect
self.memloadgen = memloadgen
self.filepath = filepath
self.server = server
self.clients = clients
self.server_dat = server_dat
self.clients_dat = clients_dat
self.clients_affinity = clients_affinity
self.mmap = mmap
def to_string(self, verbose = False):
ret = f"server.{self.server[0]}@{self.server_dat[0]}_affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_"
if verbose:
ret += f"memloadgen.{False if self.memloadgen == None else (self.memloadgen[0] + '+' + self.memloadgen[1])}"
else:
ret += f"memloadgen.{False if self.memloadgen == None else (self.memloadgen[0].split('.')[2])}"
ret += f"_filepath.{self.filepath.replace('/','-').replace('_','-')}_odirect.{self.odirect}_mmap.{self.mmap}"
return ret
def to_string_graph(self):
ret = f"affinity.{self.affinity}_mode.{self.tls}_memloadgen.{self.memloadgen}_fs.{self.filepath}"
return ret
def __hash__(self):
return hash((self.server, self.filepath, self.odirect, self.memloadgen, self.tls, self.sendfile, self.ktls, self.affinity))
def __eq__(self, other):
return (self.server, self.filepath, self.odirect, self.memloadgen, self.tls, self.sendfile, self.ktls, self.affinity) == \
(other.server, other.filepath, other.odirect, other.memloadgen, other.tls, other.sendfile, other.ktls, other.affinity)
def equal_except_memload(self, other):
return (self.server, self.filepath, self.odirect, self.tls, self.sendfile, self.ktls, self.affinity) == \
(other.server, other.filepath, other.odirect, other.tls, other.sendfile, other.ktls, other.affinity)
def __ne__(self, other):
# Not strictly necessary, but to avoid having both x==y and x!=y
# True at the same time
return not (self == other)
class ConfData:
def __init__(self, conf : Conf):
self.conf = conf
self.bps = []
self.memload = []
def add_run(self, qps : float, memload : int):
self.bps.append(qps)
self.memload.append(memload)
def get_bps(self) -> list[float]:
return self.bps
def get_memload(self) -> list[int]:
return self.memload
class ArgTypes:
def __init__(self):
self.all = [[]]
def add_arg(self, *arg : any):
new_all = []
for val in arg:
for exst in self.all:
tmp = exst.copy()
tmp.append(val)
new_all.append(tmp)
self.all = new_all
def add_args(self, *args : tuple[any]):
new_all = []
for arg in args:
for exst in self.all:
tmp = exst.copy()
for val in arg:
tmp.append(val)
new_all.append(tmp)
self.all = new_all
def get_fields(self) -> list[list[any]]:
return self.all
class CPUList:
def __init__(self, cores : list[int]):
self._cores = cores
self._total_cores = np.sum(cores)
self._offset_lookup : list[int] = []
for i in range(len(cores)):
self._offset_lookup.append(0)
if (i == 0):
self._offset_lookup[i] = 0
else:
self._offset_lookup[i] = self._offset_lookup[i-1] + cores[i-1]
def get_all_cores(self) -> int:
return self._total_cores
def get_cores_in_domain(self, domain : int) -> int:
return self._cores[domain]
def cores_to_cpulist(self, cores : list[int]) -> str:
ret = ""
for i in range(len(cores)):
if (i > 0):
ret += ","
ret += str(cores[i])
return ret
def get_cpulist_by_offset(self, domain : int, offset : int, num : int, stride : int) -> str:
ret = self.get_cores_by_offset(domain, offset, num, stride)
return self.cores_to_cpulist(ret)
def get_cores_by_offset(self, domain : int, offset : int, num : int, stride : int) -> list[int]:
ret = []
offset = offset + self._offset_lookup[domain]
for _ in range(num):
ret.append(offset)
offset = offset + stride
return ret
def distribute_threads(num_threads : int, offset : int, num_clients : int, stride : int):
client_strs : list[list[int]] = []
for i in range(0, num_clients):
client_strs.append([])
for i in range(0, num_threads):
cur_client = i % num_clients
lst = client_strs[cur_client]
if (len(lst) == 0):
lst.append(offset)
else:
lst.append(lst[-1] + stride)
return client_strs
def list_to_comma_delimited(list : list[any]):
ret = ""
for i in range(0, len(list)):
if (i > 0):
ret += ","
ret += str(list[i])
return ret
def _enum_files(rootdir : str, ret : list[str]):
if os.path.isfile(rootdir):
ret.append(rootdir)
return
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
_enum_files(each_dir, ret)
def process_directory(dir: str, dat : dict[Conf, ConfData]):
if (("memloadgen" in dir) and ("sendfile" in dir)):
segments = os.path.basename(dir).split("_")
server = segments[0]
affinity = segments[1].split(".")[1]
sendfile = segments[2].split(".")[1]
tls = segments[3].split(".")[1]
ktls = segments[4].split(".")[1]
memloadgen = segments[5].split(".")[1]
fs = segments[6].split(".")[1]
# cleanup data
if int(affinity.split(",")[0]) <= 8:
if "2-int" in server:
affinity = "socket01"
else:
affinity = "socket00"
else:
if "2-int" in server:
affinity = "socket11"
else:
affinity = "socket10"
if "tmpfs" in fs:
fs = "tmpfs"
elif "nvdimm" in fs:
fs = "nvdimm"
elif "mnt" in fs:
fs = "ssd"
if memloadgen == "False":
memloadgen = "0"
elif memloadgen == "0":
memloadgen = "100"
if sendfile == "True" and tls == "True" and ktls == "True":
mode = "ktls+sendfile"
elif sendfile == "True" and tls == "False" and ktls == "False":
mode = "sendfile"
elif sendfile == "False" and tls == "False" and ktls == "False":
mode = "write"
elif sendfile == "False" and tls == "True" and ktls == "False":
mode = "tls"
elif sendfile == "False" and tls == "True" and ktls == "True":
mode = "ktls"
if "icelake" in server:
server = "icelake"
elif "milan" in server:
server = "milan"
conf : Conf = Conf(server, None, None, None, None, affinity, mode, mode, mode, memloadgen, fs, None, None)
# conf.server = server
# conf.affinity = affinity
# conf.sendfile = mode
# conf.tls = mode
# conf.ktls = mode
# conf.memloadgen = memloadgen
# conf.filepath = fs
# conf.odirect = ""
print("Processing category " + conf.to_string_graph() + " ... ")
memloadgen_bps = []
logs = []
_enum_files(dir, logs)
logs_bytes = []
for log in logs:
if log.endswith(".txt"):
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
logs_bytes.append(buf)
else:
print("Warning: log file empty for " + log)
elif "memloadgen" in log:
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
parser = par.memloadgen_parser(buf, 20, 30)
memloadgen_bps.append(parser.bps)
else:
print("Warning: memloadgen file empty for " + log)
try:
parser = par.iperf_json_parser(logs_bytes)
except Exception as e:
print("Warning: failed to parse " + log + ". err: " + str(e))
return
if not (conf in dat):
dat[conf] = ConfData(conf)
dat[conf].add_run(parser.aggregate_egress_bps, memloadgen_bps)
return
for subdir in os.listdir(dir):
each_dir = os.path.join(dir, subdir)
if not os.path.isfile(each_dir):
process_directory(each_dir, dat)

View File

@ -1,4 +1,3 @@
from lib2to3.refactor import get_fixers_from_package
import subprocess as sp
import time
import select

View File

@ -1,6 +0,0 @@
sudo mkdir -p /mnt/zroot
sudo fsck /dev/$1
mount -t ufs /dev/$1 /mnt/zroot
sudo mkdir -p /tmpfs
mount -t tmpfs tmpfs /tmpfs
cp /mnt/zroot/large* /tmpfs/

View File

@ -1,20 +0,0 @@
sudo mkdir -p /mnt/zroot
sudo fsck /dev/$1
mount -t ufs /dev/$1 /mnt/zroot
sudo mkdir -p /tmpfs0
sudo mkdir -p /tmpfs1
sysctl vfs.tmpfs.domain_target=0
sysctl vfs.tmpfs.domain_policy=3
mount -t tmpfs tmpfs /tmpfs0
cp /mnt/zroot/small_file_0_* /tmpfs0/
sysctl vfs.tmpfs.domain_target=1
sysctl vfs.tmpfs.domain_policy=3
mount -t tmpfs tmpfs /tmpfs1
cp /mnt/zroot/small_file_1_* /tmpfs1/
sysctl vfs.tmpfs.domain_target=0
sysctl vfs.tmpfs.domain_policy=1