diff --git a/inc/net/netsup.hh b/inc/net/netsup.hh index f80de9a..fb803e4 100644 --- a/inc/net/netsup.hh +++ b/inc/net/netsup.hh @@ -10,7 +10,7 @@ struct device_conf { int portid; uint16_t tx_ring_sz; uint16_t rx_ring_sz; - int num_threads; + cpuset_t core_affinity; int mtu; uint64_t rx_offloads; uint64_t tx_offloads; diff --git a/inc/net/pkt.hh b/inc/net/pkt.hh index 16baf35..f56cce2 100644 --- a/inc/net/pkt.hh +++ b/inc/net/pkt.hh @@ -46,7 +46,7 @@ const static struct rte_ether_addr POU_MAC { 0x01, 0x00, 0x5e, 0x00, 0x01, 0x81 }; const static uint32_t POU_IP = RTE_IPV4(224, 0, 1, 129); -const static uint16_t POU_PORT = 319; +const static uint16_t POU_PORT = 320; /* Khat Protocol: * khat only processes two kinds of packets - LOAD and PROBE * rat: diff --git a/net/cat.cc b/net/cat.cc index 4e10804..ac1b80a 100644 --- a/net/cat.cc +++ b/net/cat.cc @@ -1,4 +1,3 @@ -#include #include #include #include @@ -608,6 +607,8 @@ pkt_loop() (void *)tx_buf, epoch); } + rte_pktmbuf_free(tx_buf); + read_tx = false; recv_resp = false; recv_stat = false; @@ -867,8 +868,15 @@ main(int argc, char *argv[]) "main: timesync disabled. hw timestamp unavailable.\n "); } + if (CPU_COUNT(&options.cpu_set) > 1) { + int ffs = CPU_FFS(&options.cpu_set); + CPU_ZERO(&options.cpu_set); + CPU_SET(ffs - 1, &options.cpu_set); + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "cat only supports one thread, using only core %d.\n", ffs - 1); + } + dconf.mtu = MAX_STANDARD_MTU; - dconf.num_threads = 1; + CPU_COPY(&options.cpu_set, &dconf.core_affinity); dconf.portid = options.portid; dconf.rss_hf = pconf.rss_hf; dconf.rx_offloads = pconf.rxoffload; diff --git a/net/khat.cc b/net/khat.cc index 10602b0..bf65965 100644 --- a/net/khat.cc +++ b/net/khat.cc @@ -415,56 +415,55 @@ locore_main(void *ti) assert(options.is_probing.load()); struct timespec ts { }; struct pkt_payload_stat *stat; - + int status = 0; if (options.s_hwtimestamp) { - if (rte_eth_timesync_read_tx_timestamp( - options.portid, &ts) == 0) { + if ((status = rte_eth_timesync_read_tx_timestamp( + options.portid, &ts)) == 0) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main : obtained hw tx timestamp %lu.\n", tinfo->tid, (ts.tv_sec * S2NS + ts.tv_nsec)); } else { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, - "locore_main : failed to obtain hw tx timestamp.\n", - tinfo->tid); - pending_probe = false; - goto end_stat; + "locore_main : failed to obtain hw tx timestamp: %d.\n", + tinfo->tid, status); } } - // now we have everything we need + if (status == 0) { + // now we have everything we need - if (alloc_pkt_hdr(mempool_get(tinfo->node_id), - PKT_TYPE_STAT, &options.s_probe_info.cspec, 0, - &pkt_buf, &tx_data) != 0) { - rte_exit(EXIT_FAILURE, - "failed to alloc pkt_buf\n"); + if (alloc_pkt_hdr(mempool_get(tinfo->node_id), + PKT_TYPE_STAT, &options.s_probe_info.cspec, 0, + &pkt_buf, &tx_data) != 0) { + rte_exit(EXIT_FAILURE, + "failed to alloc pkt_buf\n"); + } + + // populate stats + stat = (struct pkt_payload_stat *)tx_data->payload; + stat->epoch = rte_cpu_to_be_32( + options.s_probe_info.epoch); + if (options.s_hwtimestamp) { + stat->hw_rx = rte_cpu_to_be_64( + options.s_probe_info.last_hw_rx); + stat->hw_tx = rte_cpu_to_be_64( + ts.tv_nsec + ts.tv_sec * S2NS); + } else { + stat->hw_rx = 0; + stat->hw_tx = 0; + } + stat->sw_rx = rte_cpu_to_be_64( + options.s_probe_info.last_sw_rx); + stat->sw_tx = rte_cpu_to_be_64( + options.s_probe_info.last_sw_tx); + + // send the packet + tx_burst_all(options.portid, tinfo->txqid, &pkt_buf, 1); + + // release flux + pending_probe = false; + options.is_probing.store(false); } - - // populate stats - stat = (struct pkt_payload_stat *)tx_data->payload; - stat->epoch = rte_cpu_to_be_32( - options.s_probe_info.epoch); - if (options.s_hwtimestamp) { - stat->hw_rx = rte_cpu_to_be_64( - options.s_probe_info.last_hw_rx); - stat->hw_tx = rte_cpu_to_be_64( - ts.tv_nsec + ts.tv_sec * S2NS); - } else { - stat->hw_rx = 0; - stat->hw_tx = 0; - } - stat->sw_rx = rte_cpu_to_be_64( - options.s_probe_info.last_sw_rx); - stat->sw_tx = rte_cpu_to_be_64( - options.s_probe_info.last_sw_tx); - - // send the packet - tx_burst_all(options.portid, tinfo->txqid, &pkt_buf, 1); - - end_stat: - // release flux - pending_probe = false; - options.is_probing.store(false); } } } @@ -607,7 +606,7 @@ main(int argc, char *argv[]) options.s_hwtimestamp = false; } dconf.mtu = options.port_mtu; - dconf.num_threads = options.num_threads; + CPU_COPY(&options.cpu_set, &dconf.core_affinity); dconf.portid = options.portid; dconf.rss_hf = pconf.rss_hf; dconf.rx_offloads = pconf.rxoffload; diff --git a/net/libnetsup/dpdk.cc b/net/libnetsup/dpdk.cc index f2a5e92..312d0c3 100644 --- a/net/libnetsup/dpdk.cc +++ b/net/libnetsup/dpdk.cc @@ -66,6 +66,7 @@ port_init(struct device_conf *dconf) }; int ret; + int num_threads = CPU_COUNT(&dconf->core_affinity); if (rte_eth_dev_count_avail() == 0) { rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); } @@ -93,8 +94,7 @@ port_init(struct device_conf *dconf) port_conf.txmode.offloads = dconf->tx_offloads; /* Configure the Ethernet device. */ - ret = rte_eth_dev_configure( - dconf->portid, dconf->num_threads, dconf->num_threads, &port_conf); + ret = rte_eth_dev_configure(dconf->portid, num_threads, num_threads, &port_conf); if (ret != 0) rte_exit(EXIT_FAILURE, "failed to configure port: %d\n", ret); @@ -105,24 +105,26 @@ port_init(struct device_conf *dconf) /* Allocate and set up 1 RX queue per thread per Ethernet port. */ rxconf = dev_info.default_rxconf; rxconf.offloads = port_conf.rxmode.offloads; - for (int i = 0; i < dconf->num_threads; i++) { - ret = rte_eth_rx_queue_setup(dconf->portid, i, dconf->rx_ring_sz, - rte_eth_dev_socket_id(dconf->portid), &rxconf, - mempool_get(rte_eth_dev_socket_id(dconf->portid))); - - if (ret < 0) - rte_exit(EXIT_FAILURE, "failed to setup rx queue %d: %d\n", i, ret); - } - - /* Allocate and set up 1 TX queue per thread per Ethernet port. */ + rxconf.rx_nseg = 0; + rxconf.rx_seg = nullptr; + rxconf.rx_nmempool = 0; txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; - for (int i = 0; i < dconf->num_threads; i++) { - ret = rte_eth_tx_queue_setup( - dconf->portid, i, dconf->tx_ring_sz, rte_eth_dev_socket_id(dconf->portid), - &txconf); + + int core; + int qid = 0; + CPU_FOREACH_ISSET(core, &dconf->core_affinity) { + int socket = rte_lcore_to_socket_id(core); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "port_init: setting up rx & tx queue for core %d (socket %d)...\n", core, socket); + ret = rte_eth_rx_queue_setup(dconf->portid, qid, dconf->rx_ring_sz, socket, &rxconf, mempool_get(socket)); if (ret < 0) - rte_exit(EXIT_FAILURE, "failed to setup tx queue %d: %d", i, ret); + rte_exit(EXIT_FAILURE, "failed to setup rx queue for core %d: %d\n", core, ret); + + ret = rte_eth_tx_queue_setup(dconf->portid, qid, dconf->tx_ring_sz, socket, &txconf); + if (ret < 0) + rte_exit(EXIT_FAILURE, "failed to setup tx queue for core %d: %d", core, ret); + + qid++; } // set mtu @@ -145,19 +147,15 @@ port_init(struct device_conf *dconf) if (ret != 0) rte_exit(EXIT_FAILURE, "failed to enable promiscuous mode: %d\n", ret); - for (int i = 0; i < dconf->num_threads; i++) { + for (int i = 0; i < num_threads; i++) { if (dconf->tx_fn != nullptr) { - if (rte_eth_add_tx_callback(dconf->portid, - i, dconf->tx_fn, - dconf->tx_user) == nullptr) { + if (rte_eth_add_tx_callback(dconf->portid, i, dconf->tx_fn, dconf->tx_user) == nullptr) { rte_exit(EXIT_FAILURE, "failed to attach callback to tx queue %d\n", i); } } if (dconf->rx_fn != nullptr) { - if (rte_eth_add_rx_callback(dconf->portid, - i, dconf->rx_fn, - dconf->rx_user) == nullptr) { + if (rte_eth_add_rx_callback(dconf->portid, i, dconf->rx_fn, dconf->rx_user) == nullptr) { rte_exit(EXIT_FAILURE, "failed to attach callback to rx queue %d\n", i); } } diff --git a/net/libnetsup/portconf.cc b/net/libnetsup/portconf.cc index 92031bd..0cc9026 100644 --- a/net/libnetsup/portconf.cc +++ b/net/libnetsup/portconf.cc @@ -22,7 +22,7 @@ static struct port_conf port_confs[] = { .rxoffload = RTE_ETH_RX_OFFLOAD_RSS_HASH | RTE_ETH_RX_OFFLOAD_UDP_CKSUM | RTE_ETH_RX_OFFLOAD_IPV4_CKSUM | RTE_ETH_RX_OFFLOAD_TIMESTAMP, .txoffload = RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE | RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_IPV4_CKSUM, .rss_hf = RTE_ETH_RSS_FRAG_IPV4 | RTE_ETH_RSS_NONFRAG_IPV4_UDP | RTE_ETH_RSS_NONFRAG_IPV4_OTHER | RTE_ETH_RSS_L2_PAYLOAD, - .timesync = true + .timesync = false }, { .driver_name = "net_ixgbe", diff --git a/net/rat.cc b/net/rat.cc index 188de05..d82e5e7 100644 --- a/net/rat.cc +++ b/net/rat.cc @@ -782,7 +782,7 @@ main(int argc, char *argv[]) "main: timesync disabled. hw timestamp unavailable.\n "); } dconf.mtu = options.port_mtu; - dconf.num_threads = options.s_num_threads; + CPU_COPY(&options.cpu_set, &dconf.core_affinity); dconf.portid = options.portid; dconf.rss_hf = pconf.rss_hf; dconf.rx_offloads = pconf.rxoffload; diff --git a/scripts/iperf.py b/scripts/iperf.py deleted file mode 100644 index 9072982..0000000 --- a/scripts/iperf.py +++ /dev/null @@ -1,609 +0,0 @@ -import subprocess as sp -import time -import os -import datetime -import sys -import getopt -import numpy as np - -import libpar as par -import libtc as tc -import iperfconf as ic - -# definitions -file_dir : str = os.path.dirname(os.path.realpath(__file__)) -root_dir : str = os.path.join(file_dir,"..") -LOG_FILEPATH = "/iperflogs" -BIN_PATH = "/iperftls" -MLG_PATH = "/numam/build/bin/memloadgen" -EXE_PATH = BIN_PATH + "/src/iperf3" -SSL_CERT = "/certs/server.crt" -SSL_PKEY = "/certs/server.key" -SERVER_PORT_START = 8050 -MEMLOAD_BLKSZ = 1024*1024*64 -RUNS=3 -SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16]) -SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24]) -MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64]) -ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48]) - -# Shared Arguments -NUM_CORES = 12 -NUM_CLIENTS = 3 -# KTLS -KTLS_ARGS = [(False, False, False), - (True, False, False), - (False, True, False), - (False, True, True), - (True, True, True)] - -# MEMLOADGEN -MEMLOADGEN_ARGS_ICELAKE = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_ICELAKE.append([f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}", - f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"]) -MEMLOADGEN_ARGS_ICELAKE.append(None) - -MEMLOADGEN_ARGS_MILAN = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_MILAN.append([f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}", - f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"]) -MEMLOADGEN_ARGS_MILAN.append(None) - -MEMLOADGEN_ARGS_SKYLAKE = [] -for i in range(0, 100, 25): - MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}", - f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"]) -MEMLOADGEN_ARGS_SKYLAKE.append(None) - -MEMLOADGEN_ARGS_SANDYBRIDGE = [] -for i in range(0, 100, 25): - MEMLOADGEN_ARGS_SANDYBRIDGE.append([f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 6, stride = 2)}.1.{i}", - f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 6, stride = 2)}.0.{i}"]) -MEMLOADGEN_ARGS_SANDYBRIDGE.append(None) - -# client thread affinity -CLIENT_ASSIGNMENTS = ic.distribute_threads(NUM_CORES, 1, NUM_CLIENTS, 4) -CLIENT_AFFINITY = [] -for i in range(NUM_CLIENTS): - CLIENT_AFFINITY.append(ic.list_to_comma_delimited(CLIENT_ASSIGNMENTS[i])) -# server thread affinity -ICELAKE_SERVER_AFFINITY_SOCKET0 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4) -ICELAKE_SERVER_AFFINITY_SOCKET1 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4) -MILAN_SERVER_AFFINITY_SOCKET0 = MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4) -MILAN_SERVER_AFFINITY_SOCKET1= MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4) -SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2) -SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 2) -SANDYBRIDGE_SERVER_AFFINITY_SOCKET0= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2) -SANDYBRIDGE_SERVER_AFFINITY_SOCKET1= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 4) - -# file system -#FILE_PATHS = ["/tank/large_file_#p"] -FILE_PATHS = ["/tmpfs1/small_file_1_#p"] -# mmap -USE_MMAP = [False, True] -all_args : list[ic.ArgTypes] = [] - -# args start -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.100.101"]) -#clients_dat -arg_types.add_arg(["192.168.100.102", "192.168.100.103"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(ICELAKE_SERVER_AFFINITY_SOCKET1, ICELAKE_SERVER_AFFINITY_SOCKET0) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_ICELAKE) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - - -# args start -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.101.101"]) -#clients_dat -arg_types.add_arg(["192.168.101.102", "192.168.101.103"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(ICELAKE_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_ICELAKE) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["milan1-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.100.103"]) -#clients_dat -arg_types.add_arg(["192.168.100.102", "192.168.100.104", "192.168.100.101"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET0, MILAN_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -all_args.append(arg_types) - - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["milan1-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.101.103"]) -#clients_dat -arg_types.add_arg(["192.168.101.102", "192.168.101.104", "192.168.101.101"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -all_args.append(arg_types) - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.100.104"]) -#clients_dat -arg_types.add_arg(["192.168.100.102", "192.168.100.103", "192.168.100.101"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET0, MILAN_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.101.104"]) -#clients_dat -arg_types.add_arg(["192.168.101.102", "192.168.101.103", "192.168.101.101"]) -#clients_affinity -arg_types.add_arg(CLIENT_AFFINITY) -# affinity -arg_types.add_arg(MILAN_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -#arg_types.add_arg(*MEMLOADGEN_ARGS_MILAN_MLX) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.101.92"]) -#clients_dat -arg_types.add_arg(["192.168.101.102", "192.168.101.103", "192.168.101.101"]) -#clients_affinity -arg_types.add_arg(["1,3", "1,3", "1,3"]) -# affinitybb -arg_types.add_arg("25,27,29,31,33,35", "1,3,5,7,9,11") -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE) -# filepath -arg_types.add_arg("/tmpfs/small_file_#p") -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.100.92"]) -#clients_dat -arg_types.add_arg(["192.168.100.102", "192.168.100.103", "192.168.100.101"]) -#clients_affinity -arg_types.add_arg(["1,3", "1,3", "1,3"]) -# affinitybb -arg_types.add_arg("1,3,5,7,9,11") -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE) -# filepath -arg_types.add_arg("/tmpfs/small_file_#p") -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - - -def parse_comma_list(input : str): - return input.split(",") - -def run_setup_cmd(conf : ic.Conf, cmd : str): - ssrv : list[tuple[str, sp.Popen]] = [] - tc.log_print(f"Running command on {conf.server[0]}...") - ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0])) - - for s in conf.clients: - tc.log_print(f"Running command on {s}...") - ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0])) - - for p in ssrv: - _ , stderr = p[1].communicate() - if p[1].returncode != 0: - print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n") - else: - print(f"\n{ p[0] } succeeded\n") - -def setup_all(conf : ic.Conf): - setup_cmd : str = f''' - sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod; - sudo pkg remove -y iperf iperf3; - sudo rm -rf { BIN_PATH }; - sudo mkdir -p { BIN_PATH }; - sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH }; - cd { BIN_PATH }; - sudo git checkout dev; - sudo ./configure; - sudo make -j8; - sudo rm -rf /libtopo; - sudo mkdir -p /libtopo; - sudo git clone https://git.quacker.org/d/libtopo /libtopo; - cd /libtopo; - sudo mkdir build; - cd build; - sudo cmake ../; - sudo make install; - sudo rm -rf /numam; - sudo mkdir -p /numam; - sudo chmod 777 /numam; - ''' - run_setup_cmd(conf, setup_cmd) - - #rsync - all_clts = [] - all_clts.extend(conf.clients) - all_clts.extend(conf.server) - dir = f"{os.path.dirname(__file__)}/../" - for clt in all_clts: - print("Syncing files to " + clt + "...") - rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/" - sp.check_call(rsync_cmd, shell=True) - - run_setup_cmd(conf, f''' - cd /numam; - sudo rm -rf build; - sudo mkdir -p build; - cd build; - sudo cmake ../; - sudo make memloadgen; - ''') - - -def stop_all(conf : ic.Conf, clients_only = False): - # stop clients - tc.log_print("Stopping clients...") - tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - - if not clients_only: - tc.log_print("Stopping server...") - tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - -def prepare_logdir(conf : ic.Conf): - tc.log_print("Preparing server log directory...") - prep_cmd = "sudo rm -rf " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=False) - - time.sleep(0.1) - - prep_cmd = "sudo mkdir -p " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=True) - -def run_exp(conf : ic.Conf): - stop_all(conf) - while True: - prepare_logdir(conf) - - ssrvs=[] - ssrv_names=[] - cur_srv_proc = 0 - - smlg = [] - smlg_names = [] - if conf.memloadgen != None: - for emem in conf.memloadgen: - mlg_cmd = "sudo " - mlg_cpu = emem.split(".")[0] - mlg_dom = emem.split(".")[1] - mlg_pct = emem.split(".")[2] - mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17" - - tc.log_print("Starting memloadgen...") - tc.log_print(mlg_cmd) - smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0]) - smlg_names.append("memloadgen") - time.sleep(0.1) - time.sleep(5) - for eaff in parse_comma_list(conf.affinity): - server_cmd = "sudo " - server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \ - " -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \ - " -A " + eaff - if conf.tls: - server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - if conf.ktls: - server_cmd += " --enable-ssl-ktls" - if conf.odirect: - server_cmd += " --use-odirect" - if conf.mmap: - server_cmd += " --use-mmap" - server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt" - # start server - tc.log_print("Starting server proc " + str(cur_srv_proc) + "...") - tc.log_print(server_cmd) - ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0] - ssrvs.append(ssrv) - ssrv_names.append("Server " + str(cur_srv_proc)) - cur_srv_proc = cur_srv_proc + 1 - time.sleep(0.1) - - # start clients - tc.log_print("Starting clients...") - sclts = [] - sclt_names = [] - clt_number = 0 - for i in range(len(conf.clients)): - client_aff = conf.clients_affinity[i] - for eaff in parse_comma_list(client_aff): - client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \ - " -p " + str(SERVER_PORT_START + clt_number) + \ - " --connect-timeout 1000" + \ - " -A " + eaff + \ - " -t 25" + \ - " -P 8" + \ - " -R" + \ - " -N" + \ - " -4" + \ - " -O 10" - if conf.tls: - client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - client_cmd += " -J --logfile /dev/null" - - tc.log_print(conf.clients[i] + ":\n" + client_cmd) - sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0]) - sclt_names.append(conf.clients[i] + "@" + eaff) - clt_number = clt_number + 1 - time.sleep(0.1) - # launch stderr monitoring thread - exclude = ["Pseudo-terminal"] - tc.errthr_create(sclts, sclt_names, exclude) - tc.errthr_create(ssrvs, ssrv_names, exclude) - if (conf.memloadgen != None): - tc.errthr_create(smlg, smlg_names, exclude) - tc.errthr_start() - cur = 0 - # selec = select.poll() - # selec.register(p.stdout, select.POLLIN) - success = False - while not success: - success = False - # either failed or timeout - # we use failure detection to save time for long durations - if tc.errthr_get_failed(): - break - - if cur >= 90: - tc.log_print("Experiment timed out. Restarting...") - break - # while selec.poll(1): - # print(p.stdout.readline()) - - success = True - for p in sclts: - if p.poll() == None: - success = False - break - - time.sleep(1) - cur = cur + 1 - - tc.errthr_stop() - stop_all(conf, clients_only=True) - tc.log_print("Cooling down...") - time.sleep(3) - stop_all(conf) - - if success: - if flush_netresult(conf): - break - -def flush_netresult(conf : ic.Conf) -> bool: - tc.log_print("Keeping results...") - # copy log directory back to machine - log_output = tc.get_odir() - os.makedirs(log_output, exist_ok=True) - scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - - # parse results - log_output = log_output + "/" + os.path.basename(LOG_FILEPATH) - logs = os.listdir(log_output) - logs_bytes = [] - memload_bytes = [] - for log in logs: - tc.log_print("Processing " + log + "...") - if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"): - with open(log_output + "/" + log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log): - with open(log_output + "/" + log, "r") as f: - memloadbuf = f.read() - if len(memloadbuf) == 0: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - else: - memload_bytes.append(memloadbuf) - - try: - parser = par.iperf_json_parser(logs_bytes) - bps = [] - for ml in memload_bytes: - memparser = par.memloadgen_parser(ml, 22, 32) - bps.append(memparser.bps) - - tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \ - " | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" ) - except Exception as e: - tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...") - scp_cmd = "sudo rm -rf " + log_output + "/*" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - return False - return True - -def main(): - tc.set_ssh_param("-o StrictHostKeyChecking=no -p77") - tc.set_ssh_user("oscar") - output_dirname = "run" - - confs : list[ic.Conf] = [] - for argtype in all_args: - args = argtype.get_fields() - for arg in args: - confs.append(ic.Conf(*arg)) - - name = None - options = getopt.getopt(sys.argv[1:], 'sSn:')[0] - for opt, arg in options: - if opt in ('-s'): - stop_all(confs[0]) - return - elif opt in ('-S'): - setup_all(confs[0]) - return - elif opt in ('-n'): - name = arg - - if name != None: - output_dirname = name - tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/" - tc.log_print(cpcmd) - sp.check_call(cpcmd, shell=True) - - server_set = [] - for conf in confs: - tc.log_print(conf.to_string(verbose=True)) - if conf.server[0] not in server_set: - server_set.append(conf.server[0]) - tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:") - - for server in server_set: - print(f"Gathering information on {server} (sysctl)...") - p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (ifconfig)...") - p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (mount)...") - p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.mount", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (dmesg)...") - p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f: - f.write(p.communicate()[0].decode()) - - for conf in confs: - for i in range(0, RUNS): - tc.begin(conf.to_string()+f"_run.{i}") - run_exp(conf) - tc.end() -main() diff --git a/scripts/iperf_gentable.py b/scripts/iperf_gentable.py deleted file mode 100644 index ec240d1..0000000 --- a/scripts/iperf_gentable.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python3.6 - -from logging import root -import pandas as pd -import matplotlib.pyplot as plt -from matplotlib import ticker -import numpy as np -import sys -import re -import os -import json -import libpar as par -import getopt -import math -import concurrent.futures as CF - -def enum_files(rootdir : str, ret : list[str]): - if os.path.isfile(rootdir): - ret.append(rootdir) - return - - for subdir in os.listdir(rootdir): - each_dir = os.path.join(rootdir, subdir) - enum_files(each_dir, ret) - -def process_dir(rootdir: str, dat : dict[any]): - ret = [] - - if (("memloadgen" in rootdir) and ("sendfile" in rootdir)): - #print("Processing " + rootdir + "...") - segments = os.path.basename(rootdir).split("_") - server = segments[0] - affinity = segments[1].split(".")[1] - sendfile = segments[2].split(".")[1] - tls = segments[3].split(".")[1] - ktls = segments[4].split(".")[1] - memloadgen = segments[5].split(".")[1] - fs = segments[6].split(".")[1] - if int(affinity.split(",")[0]) <= 4: - if "2-int" in server: - affinity = "socket01" - else: - affinity = "socket00" - else: - if "2-int" in server: - affinity = "socket11" - else: - affinity = "socket10" - if "tmpfs" in fs: - fs = "tmpfs" - elif "nvdimm" in fs: - fs = "nvdimm" - elif "mnt" in fs: - fs = "ssd" - - if sendfile == "True" and tls == "True" and ktls == "True": - mode = "ktls + sendfile" - elif sendfile == "True" and tls == "False" and ktls == "False": - mode = "sendfile" - elif sendfile == "False" and tls == "False" and ktls == "False": - mode = "write" - elif sendfile == "False" and tls == "True" and ktls == "False": - mode = "tls" - elif sendfile == "False" and tls == "True" and ktls == "True": - mode = "ktls" - - if "icelake" in server: - server = "icelake" - elif "milan" in server: - server = "milan" - - memloadgen_bps = [] - logs = [] - enum_files(rootdir, logs) - logs_bytes = [] - for log in logs: - if log.endswith(".txt"): - with open(log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - print("Warning: log file empty for " + log) - elif "memloadgen" in log: - with open(log, "r") as f: - buf = f.read() - if len(buf) > 0: - parser = par.memloadgen_parser(buf, 20, 30) - memloadgen_bps.append(parser.bps) - else: - print("Warning: memloadgen file empty for " + log) - - try: - parser = par.iperf_json_parser(logs_bytes) - - except Exception as e: - print("Warning: failed to parse " + log + ". err: " + str(e)) - return - - if (affinity,mode) not in dat[server][(memloadgen, fs)]: - dat[server][(memloadgen, fs)][(affinity,mode)] = [] - - dat[server][(memloadgen, fs)][(affinity,mode)].append((parser.aggregate_egress_bps, np.sum(memloadgen_bps))) - return - - for subdir in os.listdir(rootdir): - each_dir = os.path.join(rootdir, subdir) - if not os.path.isfile(each_dir): - process_dir(each_dir, dat) - - -def main(): - datdir = None - options = getopt.getopt(sys.argv[1:], 'd:')[0] - - for opt, arg in options: - if opt in ('-d'): - datdir = arg - - if datdir == None: - raise Exception("Must specify -d parameter") - - dat = dict() - - for a in ["icelake", "milan"]: - dat[a] = dict() - for c in ["nvdimm", "tmpfs", "ssd"]: - for b in ["0", "50", "False"]: - dat[a][(b, c)] = dict() - - process_dir(datdir, dat) - - for a in ["icelake", "milan"]: - data2 = dat[a] - print(a) - for c in ["tmpfs", "ssd"]: - for b in ["0", "50", "False"]: - print("memloadgen: " + b + ",storage: " + c) - data = data2[(b, c)] - print("affinity,write,sendfile,tls,ktls,ktls + sendfile") - if data == None: - print("N/A,N/A,N/A,N/A,N/A,N/A") - for affinity in ["socket00", "socket01", "socket10", "socket11"]: - line = f"{affinity}," - for mode in ["write", "sendfile", "tls", "ktls", "ktls + sendfile"]: - if (affinity, mode) not in data: - line += "N/A," - else: - vals = data[(affinity, mode)] - real_vals = [] - real_mlg = [] - for i in range(0, len(vals)): - real_vals.append(vals[i][0] / 1024.0 / 1024.0 / 1024.0 / 8.0) - real_mlg.append(vals[i][1] / 1024.0 / 1024.0 / 1024.0) - line += "{:.2f} ({:.2f}) [{:.2f} ({:.2f})]".format(np.average(real_vals), np.std(real_vals), - np.average(real_mlg), np.std(real_mlg)) + "," - print(line) - print("") - print("") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/scripts/iperf_graph.py b/scripts/iperf_graph.py deleted file mode 100644 index ade67e9..0000000 --- a/scripts/iperf_graph.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python3.6 -import pandas as pd -import matplotlib.pyplot as plt -from matplotlib import ticker -import numpy as np -import os -import sys -import libpar as par -import getopt -import iperfconf as ic - -# -# memloadgen vs max throughput -# -marker_map = ["o", "P", "s", "v", "*", "+", "^", "1", "2", "d", "X", "o", "P", "s", "v", "*", "+", "^", "1", "2", "d", "X"] -color_map = ["xkcd:black", "xkcd:red", "xkcd:blue", "xkcd:green", "xkcd:cyan", "xkcd:purple", "xkcd:orange", "xkcd:salmon", "xkcd:lightgreen", "xkcd:indigo", "xkcd:brown", "xkcd:bubblegum", "xkcd:lavender", "xkcd:maroon", "xkcd:fern", "xkcd:sky", "xkcd:orchid", "xkcd:sienna"] -parser_idx_labels = ["srv_hw", "srv_sw", "clt_hw", "clt_sw"] - - -def generate_graph(server_name : str, affinity : str, fn : str, dat : dict[ic.Conf, ic.ConfData]): - marker_idx = 0 - color_idx = 0 - - ax = plt.gca() - ax.set_yscale("linear") - ax.set_title(server_name) - ax.set_xlabel("Memload Percent (%)") - ax.set_ylabel("Throughput (GB/s)") - ax.set_ylim(0, 13) - ax.xaxis.get_major_formatter().set_scientific(False) - ax.yaxis.set_minor_formatter(ticker.ScalarFormatter()) - - graph_data : dict(ic.Conf, ic.ConfData) = dict() - - print(f"Generating graph => {server_name}_{affinity} ...") - - for conf in dat: - if (conf.server == server_name) and (conf.affinity == affinity): - if conf not in graph_data: - graph_data[conf] = dat[conf] - else: - raise Exception("duplicate conf found!") - - labels = [] - xs = [] - ys = [] - errs = [] - - skip_list = [] - # one curve per conf - each_conf : ic.Conf - for each_conf in graph_data: - if each_conf in skip_list: - continue - - label = each_conf.to_string_graph() - bps = [] - pct = [] - err = [] - other_conf : ic.Conf - for other_conf in graph_data: - if other_conf.equal_except_memload(each_conf): - bps.append(np.average(graph_data[other_conf].get_bps()) / 8 / 1024 / 1024 / 1024) - err.append(np.std(graph_data[other_conf].get_bps()) / 8 / 1024 / 1024 / 1024) - pct.append(int(other_conf.memloadgen)) - skip_list.append(other_conf) - - arg = np.argsort(pct) - - labels.append(label) - xs.append(np.array(pct)[arg]) - ys.append(np.array(bps)[arg]) - errs.append(np.array(err)[arg]) - - arg = np.argsort(labels) - lsorted = np.array(labels)[arg] - xsorted = np.array(xs)[arg] - ysorted = np.array(ys)[arg] - esorted = np.array(errs)[arg] - - for i in range(len(lsorted)): - marker_type = marker_map[marker_idx] - color_type = color_map[color_idx] - marker_idx += 1 - color_idx += 1 - ax.errorbar(x = xsorted[i], y = ysorted[i], yerr = esorted[i], xerr = None, label=lsorted[i], marker=marker_type, color=color_type, markersize=8) - - plt.gcf().set_size_inches(23.4, 16.5) - ax.legend() - plt.savefig(fn, dpi=150) - plt.close() - -def main(): - datdir = None - options = getopt.getopt(sys.argv[1:], 'd:')[0] - - for opt, arg in options: - if opt in ('-d'): - datdir = arg - - if datdir == None: - raise Exception("Must specify -d parameter") - - dat = dict() - ic.process_directory(datdir, dat) - - tuples = set() - conf : ic.Conf - for conf in dat: - tuples.add((conf.server, conf.affinity)) - - for tup in tuples: - generate_graph(tup[0], tup[1], f"{datdir}/{tup[0]}_{tup[1]}.png", dat) - - -if __name__ == "__main__": - main() diff --git a/scripts/iperf_p9.py b/scripts/iperf_p9.py deleted file mode 100644 index b6a7655..0000000 --- a/scripts/iperf_p9.py +++ /dev/null @@ -1,397 +0,0 @@ -import subprocess as sp -import time -import os -import datetime -import sys -import getopt -import numpy as np - -import libpar as par -import libtc as tc -import iperfconf as ic - -# definitions -file_dir : str = os.path.dirname(os.path.realpath(__file__)) -root_dir : str = os.path.join(file_dir,"..") -LOG_FILEPATH = "/iperflogs" -BIN_PATH = "/iperftls" -MLG_PATH = "/numam/build/bin/memloadgen" -EXE_PATH = BIN_PATH + "/src/iperf3" -SSL_CERT = "/certs/server.crt" -SSL_PKEY = "/certs/server.key" -SERVER_PORT_START = 8050 -MEMLOAD_BLKSZ = 1024*1024*64 -RUNS=3 -POWER9_CPULIST : ic.CPUList = ic.CPUList([88, 88]) - -# Shared Arguments -NUM_CORES = 12 -NUM_CLIENTS = 3 -# KTLS -KTLS_ARGS = [(False, False, False), - (True, False, False), - (False, True, False), - (False, True, True), - (True, True, True)] - -MEMLOADGEN_ARGS_POWER9 = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_POWER9.append([f"{POWER9_CPULIST.get_cpulist_by_offset(domain = 0, offset = 48, num = 10, stride = 4)}.1.{i}", - f"{POWER9_CPULIST.get_cpulist_by_offset(domain = 1, offset = 48, num = 10, stride = 4)}.0.{i}"]) -MEMLOADGEN_ARGS_POWER9.append(None) -MEMLOADGEN_ARGS_POWER9 = [None] - -POWER9_SERVER_AFFINITY_SOCKET0 = POWER9_CPULIST.get_cpulist_by_offset(domain = 0, offset = 2, num = 12, stride = 4) -POWER9_SERVER_AFFINITY_SOCKET1= POWER9_CPULIST.get_cpulist_by_offset(domain = 1, offset = 2, num = 12, stride = 4) - -# file system -#FILE_PATHS = ["/tank/large_file_#p"] -#"/tmpfs1/small_files_#p", -FILE_PATHS = ["/tmpfs0/small_files_#p", "/tmpfs1/small_files_#p"] -# mmap -USE_MMAP = [False, True] -all_args : list[ic.ArgTypes] = [] - -# args start -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["power9-int.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake1-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.80.80"]) -#clients_dat -arg_types.add_arg(["192.168.80.101", "192.168.80.103"]) -#clients_affinity -arg_types.add_arg(["49,53,57,61,65,69", "65,69,73,77,81,85"]) -# affinity -arg_types.add_arg(POWER9_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_POWER9) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -all_args.append(arg_types) - - -def parse_comma_list(input : str): - return input.split(",") - -def run_setup_cmd(conf : ic.Conf, cmd : str): - ssrv : list[tuple[str, sp.Popen]] = [] - tc.log_print(f"Running command on {conf.server[0]}...") - ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0])) - - for s in conf.clients: - tc.log_print(f"Running command on {s}...") - ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0])) - - for p in ssrv: - _ , stderr = p[1].communicate() - if p[1].returncode != 0: - print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n") - else: - print(f"\n{ p[0] } succeeded\n") - -def setup_all(conf : ic.Conf): - setup_cmd : str = f''' - sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod; - sudo pkg remove -y iperf iperf3; - sudo rm -rf { BIN_PATH }; - sudo mkdir -p { BIN_PATH }; - sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH }; - cd { BIN_PATH }; - sudo git checkout dev; - sudo ./configure; - sudo make -j8; - sudo rm -rf /libtopo; - sudo mkdir -p /libtopo; - sudo git clone https://git.quacker.org/d/libtopo /libtopo; - cd /libtopo; - sudo mkdir build; - cd build; - sudo cmake ../; - sudo make install; - sudo rm -rf /numam; - sudo mkdir -p /numam; - sudo chmod 777 /numam; - ''' - run_setup_cmd(conf, setup_cmd) - - #rsync - all_clts = [] - all_clts.extend(conf.clients) - all_clts.extend(conf.server) - dir = f"{os.path.dirname(__file__)}/../" - for clt in all_clts: - print("Syncing files to " + clt + "...") - rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/" - sp.check_call(rsync_cmd, shell=True) - - run_setup_cmd(conf, f''' - cd /numam; - sudo rm -rf build; - sudo mkdir -p build; - cd build; - sudo cmake ../; - sudo make memloadgen; - ''') - - -def stop_all(conf : ic.Conf, clients_only = False): - # stop clients - tc.log_print("Stopping clients...") - tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - - if not clients_only: - tc.log_print("Stopping server...") - tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - -def prepare_logdir(conf : ic.Conf): - tc.log_print("Preparing server log directory...") - prep_cmd = "sudo rm -rf " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=False) - - time.sleep(0.1) - - prep_cmd = "sudo mkdir -p " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=True) - -def run_exp(conf : ic.Conf): - stop_all(conf) - while True: - prepare_logdir(conf) - - ssrvs=[] - ssrv_names=[] - cur_srv_proc = 0 - - smlg = [] - smlg_names = [] - if conf.memloadgen != None: - for emem in conf.memloadgen: - mlg_cmd = "sudo " - mlg_cpu = emem.split(".")[0] - mlg_dom = emem.split(".")[1] - mlg_pct = emem.split(".")[2] - mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17" - - tc.log_print("Starting memloadgen...") - tc.log_print(mlg_cmd) - smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0]) - smlg_names.append("memloadgen") - time.sleep(0.1) - time.sleep(5) - for eaff in parse_comma_list(conf.affinity): - server_cmd = "sudo " - server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \ - " -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \ - " -A " + eaff - if conf.tls: - server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - if conf.ktls: - server_cmd += " --enable-ssl-ktls" - if conf.odirect: - server_cmd += " --use-odirect" - if conf.mmap: - server_cmd += " --use-mmap" - server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt" - # start server - tc.log_print("Starting server proc " + str(cur_srv_proc) + "...") - tc.log_print(server_cmd) - ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0] - ssrvs.append(ssrv) - ssrv_names.append("Server " + str(cur_srv_proc)) - cur_srv_proc = cur_srv_proc + 1 - time.sleep(0.1) - - # start clients - tc.log_print("Starting clients...") - sclts = [] - sclt_names = [] - clt_number = 0 - for i in range(len(conf.clients)): - client_aff = conf.clients_affinity[i] - for eaff in parse_comma_list(client_aff): - client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \ - " -p " + str(SERVER_PORT_START + clt_number) + \ - " --connect-timeout 1000" + \ - " -A " + eaff + \ - " -t 25" + \ - " -P 16" + \ - " -R" + \ - " -N" + \ - " -4" + \ - " -O 10" - if conf.tls: - client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - client_cmd += " -J --logfile /dev/null" - - tc.log_print(conf.clients[i] + ":\n" + client_cmd) - sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0]) - sclt_names.append(conf.clients[i] + "@" + eaff) - clt_number = clt_number + 1 - time.sleep(0.1) - # launch stderr monitoring thread - exclude = ["Pseudo-terminal"] - tc.errthr_create(sclts, sclt_names, exclude) - tc.errthr_create(ssrvs, ssrv_names, exclude) - if (conf.memloadgen != None): - tc.errthr_create(smlg, smlg_names, exclude) - tc.errthr_start() - cur = 0 - # selec = select.poll() - # selec.register(p.stdout, select.POLLIN) - success = False - while not success: - success = False - # either failed or timeout - # we use failure detection to save time for long durations - if tc.errthr_get_failed(): - break - - if cur >= 90: - tc.log_print("Experiment timed out. Restarting...") - break - # while selec.poll(1): - # print(p.stdout.readline()) - - success = True - for p in sclts: - if p.poll() == None: - success = False - break - - time.sleep(1) - cur = cur + 1 - - tc.errthr_stop() - stop_all(conf, clients_only=True) - tc.log_print("Cooling down...") - time.sleep(3) - stop_all(conf) - - if success: - if flush_netresult(conf): - break - -def flush_netresult(conf : ic.Conf) -> bool: - tc.log_print("Keeping results...") - # copy log directory back to machine - log_output = tc.get_odir() - os.makedirs(log_output, exist_ok=True) - scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - - # parse results - log_output = log_output + "/" + os.path.basename(LOG_FILEPATH) - logs = os.listdir(log_output) - logs_bytes = [] - memload_bytes = [] - for log in logs: - tc.log_print("Processing " + log + "...") - if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"): - with open(log_output + "/" + log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log): - with open(log_output + "/" + log, "r") as f: - memloadbuf = f.read() - if len(memloadbuf) == 0: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - else: - memload_bytes.append(memloadbuf) - - try: - parser = par.iperf_json_parser(logs_bytes) - bps = [] - for ml in memload_bytes: - memparser = par.memloadgen_parser(ml, 22, 32) - bps.append(memparser.bps) - - tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \ - " | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" ) - except Exception as e: - tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...") - scp_cmd = "sudo rm -rf " + log_output + "/*" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - return False - return True - -def main(): - tc.set_ssh_param("-o StrictHostKeyChecking=no -p77") - tc.set_ssh_user("oscar") - output_dirname = "run" - - confs : list[ic.Conf] = [] - for argtype in all_args: - args = argtype.get_fields() - for arg in args: - confs.append(ic.Conf(*arg)) - - name = None - options = getopt.getopt(sys.argv[1:], 'sSn:')[0] - for opt, arg in options: - if opt in ('-s'): - stop_all(confs[0]) - return - elif opt in ('-S'): - setup_all(confs[0]) - return - elif opt in ('-n'): - name = arg - - if name != None: - output_dirname = name - tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/" - tc.log_print(cpcmd) - sp.check_call(cpcmd, shell=True) - - server_set = [] - for conf in confs: - tc.log_print(conf.to_string(verbose=True)) - if conf.server[0] not in server_set: - server_set.append(conf.server[0]) - tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:") - - for server in server_set: - print(f"Gathering information on {server} (sysctl)...") - p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (ifconfig)...") - p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (mount)...") - p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.mount", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (dmesg)...") - p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f: - f.write(p.communicate()[0].decode()) - - for conf in confs: - for i in range(0, RUNS): - tc.begin(conf.to_string()+f"_run.{i}") - run_exp(conf) - tc.end() -main() diff --git a/scripts/iperf_sand.py b/scripts/iperf_sand.py deleted file mode 100644 index df1d789..0000000 --- a/scripts/iperf_sand.py +++ /dev/null @@ -1,454 +0,0 @@ -import subprocess as sp -import time -import os -import datetime -import sys -import getopt -import numpy as np - -import libpar as par -import libtc as tc -import iperfconf as ic - -# definitions -file_dir : str = os.path.dirname(os.path.realpath(__file__)) -root_dir : str = os.path.join(file_dir,"..") -LOG_FILEPATH = "/iperflogs" -BIN_PATH = "/iperftls" -MLG_PATH = "/numam/build/bin/memloadgen" -EXE_PATH = BIN_PATH + "/src/iperf3" -SSL_CERT = "/certs/server.crt" -SSL_PKEY = "/certs/server.key" -SERVER_PORT_START = 8050 -MEMLOAD_BLKSZ = 1024*1024*64 -RUNS=3 -SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16]) -SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24]) -MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64]) -ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48]) - -# Shared Arguments -NUM_CORES = 12 -NUM_CLIENTS = 3 -# KTLS -KTLS_ARGS = [#(False, False, False), - #(True, False, False), - #(False, True, False), - (False, True, True), - (True, True, True)] - -# MEMLOADGEN -MEMLOADGEN_ARGS_ICELAKE = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_ICELAKE.append([f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}", - f"{ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"]) -MEMLOADGEN_ARGS_ICELAKE.append(None) - -MEMLOADGEN_ARGS_MILAN = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_MILAN.append([f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 3, num = NUM_CORES, stride = 4)}.0.{i}", - f"{MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 3, num = NUM_CORES, stride = 4)}.1.{i}"]) -MEMLOADGEN_ARGS_MILAN.append(None) - -MEMLOADGEN_ARGS_SKYLAKE = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}", - f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"]) -MEMLOADGEN_ARGS_SKYLAKE.append(None) - -MEMLOADGEN_ARGS_SANDYBRIDGE = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_SANDYBRIDGE.append([f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 4, stride = 1)}.1.{i}", - f"{SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 4, stride = 1)}.0.{i}"]) -MEMLOADGEN_ARGS_SANDYBRIDGE.append(None) - -# client thread affinity -CLIENT_ASSIGNMENTS = ic.distribute_threads(NUM_CORES, 1, NUM_CLIENTS, 4) -CLIENT_AFFINITY = [] -for i in range(NUM_CLIENTS): - CLIENT_AFFINITY.append(ic.list_to_comma_delimited(CLIENT_ASSIGNMENTS[i])) -# server thread affinity -ICELAKE_SERVER_AFFINITY_SOCKET0 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4) -ICELAKE_SERVER_AFFINITY_SOCKET1 = ICELAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4) -MILAN_SERVER_AFFINITY_SOCKET0 = MILAN_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = NUM_CORES, stride = 4) -MILAN_SERVER_AFFINITY_SOCKET1= MILAN_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = NUM_CORES, stride = 4) -SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 8, stride = 2) -SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 8, stride = 2) -SANDYBRIDGE_SERVER_AFFINITY_SOCKET0= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 0, num = 8, stride = 2) -SANDYBRIDGE_SERVER_AFFINITY_SOCKET1= SANDYBRIDGE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 0, num = 8, stride = 2) - -# file system -#FILE_PATHS = ["/tank/large_file_#p"] -#"/tmpfs0/small_files_#p", -FILE_PATHS = ["/tmpfs0/small_files_#p", "/tmpfs1/small_files_#p"] -# mmap -USE_MMAP = [False, True] -all_args : list[ic.ArgTypes] = [] - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["sandybridge4.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.70.74"]) -#clients_dat -arg_types.add_arg(["192.168.70.103", "192.168.70.104"]) -#clients_affinity -arg_types.add_arg(["1,3,5", "1,3,5"]) -# affinity -#arg_types.add_arg("17,19,21,23,25,27") -arg_types.add_arg("1,3,5,7,9,11") -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_SANDYBRIDGE) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -all_args.append(arg_types) - -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["sandybridge4.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["milan2-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.70.74"]) -#clients_dat -arg_types.add_arg(["192.168.70.103", "192.168.70.104"]) -#clients_affinity -arg_types.add_arg(["1,3,5", "1,3,5"]) -# affinity -arg_types.add_arg("17,19,21,23,25,27") -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_SANDYBRIDGE) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -#all_args.append(arg_types) - - -def parse_comma_list(input : str): - return input.split(",") - -def run_setup_cmd(conf : ic.Conf, cmd : str): - ssrv : list[tuple[str, sp.Popen]] = [] - tc.log_print(f"Running command on {conf.server[0]}...") - ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0])) - - for s in conf.clients: - tc.log_print(f"Running command on {s}...") - ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0])) - - for p in ssrv: - _ , stderr = p[1].communicate() - if p[1].returncode != 0: - print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n") - else: - print(f"\n{ p[0] } succeeded\n") - -def setup_all(conf : ic.Conf): - setup_cmd : str = f''' - sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod; - sudo pkg remove -y iperf iperf3; - sudo rm -rf { BIN_PATH }; - sudo mkdir -p { BIN_PATH }; - sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH }; - cd { BIN_PATH }; - sudo git checkout dev; - sudo ./configure; - sudo make -j8; - sudo rm -rf /libtopo; - sudo mkdir -p /libtopo; - sudo git clone https://git.quacker.org/d/libtopo /libtopo; - cd /libtopo; - sudo mkdir build; - cd build; - sudo cmake ../; - sudo make install; - sudo rm -rf /numam; - sudo mkdir -p /numam; - sudo chmod 777 /numam; - ''' - run_setup_cmd(conf, setup_cmd) - - #rsync - all_clts = [] - all_clts.extend(conf.clients) - all_clts.extend(conf.server) - dir = f"{os.path.dirname(__file__)}/../" - for clt in all_clts: - print("Syncing files to " + clt + "...") - rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/" - sp.check_call(rsync_cmd, shell=True) - - run_setup_cmd(conf, f''' - cd /numam; - sudo rm -rf build; - sudo mkdir -p build; - cd build; - sudo cmake ../; - sudo make memloadgen; - ''') - - -def stop_all(conf : ic.Conf, clients_only = False): - # stop clients - tc.log_print("Stopping clients...") - tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - - if not clients_only: - tc.log_print("Stopping server...") - tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - -def prepare_logdir(conf : ic.Conf): - tc.log_print("Preparing server log directory...") - prep_cmd = "sudo rm -rf " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=False) - - time.sleep(0.1) - - prep_cmd = "sudo mkdir -p " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=True) - -def run_exp(conf : ic.Conf): - stop_all(conf) - while True: - prepare_logdir(conf) - - ssrvs=[] - ssrv_names=[] - cur_srv_proc = 0 - - smlg = [] - smlg_names = [] - if conf.memloadgen != None: - for emem in conf.memloadgen: - mlg_cmd = "sudo " - mlg_cpu = emem.split(".")[0] - mlg_dom = emem.split(".")[1] - mlg_pct = emem.split(".")[2] - mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17" - - tc.log_print("Starting memloadgen...") - tc.log_print(mlg_cmd) - smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0]) - smlg_names.append("memloadgen") - time.sleep(0.1) - time.sleep(5) - for eaff in parse_comma_list(conf.affinity): - server_cmd = "sudo " - server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \ - " -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \ - " -A " + eaff - if conf.tls: - server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - if conf.ktls: - server_cmd += " --enable-ssl-ktls" - if conf.odirect: - server_cmd += " --use-odirect" - if conf.mmap: - server_cmd += " --use-mmap" - server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt" - # start server - tc.log_print("Starting server proc " + str(cur_srv_proc) + "...") - tc.log_print(server_cmd) - ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0] - ssrvs.append(ssrv) - ssrv_names.append("Server " + str(cur_srv_proc)) - cur_srv_proc = cur_srv_proc + 1 - time.sleep(0.1) - - # start clients - tc.log_print("Starting clients...") - sclts = [] - sclt_names = [] - clt_number = 0 - for i in range(len(conf.clients)): - client_aff = conf.clients_affinity[i] - for eaff in parse_comma_list(client_aff): - client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \ - " -p " + str(SERVER_PORT_START + clt_number) + \ - " --connect-timeout 1000" + \ - " -A " + eaff + \ - " -t 25" + \ - " -P 16" + \ - " -R" + \ - " -N" + \ - " -4" + \ - " -O 10" - if conf.tls: - client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - client_cmd += " -J --logfile /dev/null" - - tc.log_print(conf.clients[i] + ":\n" + client_cmd) - sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0]) - sclt_names.append(conf.clients[i] + "@" + eaff) - clt_number = clt_number + 1 - time.sleep(0.1) - # launch stderr monitoring thread - exclude = ["Pseudo-terminal"] - tc.errthr_create(sclts, sclt_names, exclude) - tc.errthr_create(ssrvs, ssrv_names, exclude) - if (conf.memloadgen != None): - tc.errthr_create(smlg, smlg_names, exclude) - tc.errthr_start() - cur = 0 - # selec = select.poll() - # selec.register(p.stdout, select.POLLIN) - success = False - while not success: - success = False - # either failed or timeout - # we use failure detection to save time for long durations - if tc.errthr_get_failed(): - break - - if cur >= 90: - tc.log_print("Experiment timed out. Restarting...") - break - # while selec.poll(1): - # print(p.stdout.readline()) - - success = True - for p in sclts: - if p.poll() == None: - success = False - break - - time.sleep(1) - cur = cur + 1 - - tc.errthr_stop() - stop_all(conf, clients_only=True) - tc.log_print("Cooling down...") - time.sleep(3) - stop_all(conf) - - if success: - if flush_netresult(conf): - break - -def flush_netresult(conf : ic.Conf) -> bool: - tc.log_print("Keeping results...") - # copy log directory back to machine - log_output = tc.get_odir() - os.makedirs(log_output, exist_ok=True) - scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - - # parse results - log_output = log_output + "/" + os.path.basename(LOG_FILEPATH) - logs = os.listdir(log_output) - logs_bytes = [] - memload_bytes = [] - for log in logs: - tc.log_print("Processing " + log + "...") - if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"): - with open(log_output + "/" + log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log): - with open(log_output + "/" + log, "r") as f: - memloadbuf = f.read() - if len(memloadbuf) == 0: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - else: - memload_bytes.append(memloadbuf) - - try: - parser = par.iperf_json_parser(logs_bytes) - bps = [] - for ml in memload_bytes: - memparser = par.memloadgen_parser(ml, 22, 32) - bps.append(memparser.bps) - - tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \ - " | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" ) - except Exception as e: - tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...") - scp_cmd = "sudo rm -rf " + log_output + "/*" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - return False - return True - -def main(): - tc.set_ssh_param("-o StrictHostKeyChecking=no -p77") - tc.set_ssh_user("oscar") - output_dirname = "run" - - confs : list[ic.Conf] = [] - for argtype in all_args: - args = argtype.get_fields() - for arg in args: - confs.append(ic.Conf(*arg)) - - name = None - options = getopt.getopt(sys.argv[1:], 'sSn:')[0] - for opt, arg in options: - if opt in ('-s'): - stop_all(confs[0]) - return - elif opt in ('-S'): - setup_all(confs[0]) - return - elif opt in ('-n'): - name = arg - - if name != None: - output_dirname = name - tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/" - tc.log_print(cpcmd) - sp.check_call(cpcmd, shell=True) - - server_set = [] - for conf in confs: - tc.log_print(conf.to_string(verbose=True)) - if conf.server[0] not in server_set: - server_set.append(conf.server[0]) - tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:") - - for server in server_set: - print(f"Gathering information on {server} (sysctl)...") - p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (ifconfig)...") - p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (mount)...") - p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.mount", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (dmesg)...") - p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f: - f.write(p.communicate()[0].decode()) - for conf in confs: - for i in range(0, RUNS): - tc.begin(conf.to_string()+f"_run.{i}") - run_exp(conf) - tc.end() -main() diff --git a/scripts/iperf_sky.py b/scripts/iperf_sky.py deleted file mode 100644 index f303944..0000000 --- a/scripts/iperf_sky.py +++ /dev/null @@ -1,399 +0,0 @@ -import subprocess as sp -import time -import os -import datetime -import sys -import getopt -import numpy as np - -import libpar as par -import libtc as tc -import iperfconf as ic - -# definitions -file_dir : str = os.path.dirname(os.path.realpath(__file__)) -root_dir : str = os.path.join(file_dir,"..") -LOG_FILEPATH = "/iperflogs" -BIN_PATH = "/iperftls" -MLG_PATH = "/numam/build/bin/memloadgen" -EXE_PATH = BIN_PATH + "/src/iperf3" -SSL_CERT = "/certs/server.crt" -SSL_PKEY = "/certs/server.key" -SERVER_PORT_START = 8050 -MEMLOAD_BLKSZ = 1024*1024*64 -RUNS=3 -SANDYBRIDGE_CPULIST : ic.CPUList = ic.CPUList([16, 16]) -SKYLAKE_CPULIST : ic.CPUList = ic.CPUList([24, 24]) -MILAN_CPULIST : ic.CPUList = ic.CPUList([64, 64]) -ICELAKE_CPULIST : ic.CPUList = ic.CPUList([48, 48]) - -# Shared Arguments -NUM_CORES = 12 -NUM_CLIENTS = 3 -# KTLS -KTLS_ARGS = [#(False, False, False), - #(True, False, False), - #(False, True, False), - (False, True, True), - (True, True, True)] - -MEMLOADGEN_ARGS_SKYLAKE = [] -for i in range(0, 100, 10): - MEMLOADGEN_ARGS_SKYLAKE.append([f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 12, num = 6, stride = 2)}.1.{i}", - f"{SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 12, num = 6, stride = 2)}.0.{i}"]) -MEMLOADGEN_ARGS_SKYLAKE.append(None) - -SKYLAKE_SERVER_AFFINITY_SOCKET0 = SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 0, offset = 1, num = 6, stride = 2) -SKYLAKE_SERVER_AFFINITY_SOCKET1= SKYLAKE_CPULIST.get_cpulist_by_offset(domain = 1, offset = 1, num = 6, stride = 2) - -# file system -#FILE_PATHS = ["/tank/large_file_#p"] -#"/tmpfs1/small_files_#p", -FILE_PATHS = ["/tmpfs1/small_files_#p", "/tmpfs0/small_files_#p"] -# mmap -USE_MMAP = [False, True] -all_args : list[ic.ArgTypes] = [] - -# args start -arg_types : ic.ArgTypes = ic.ArgTypes() -#server -arg_types.add_arg(["skylake3.rcs.uwaterloo.ca"]) -#clients -arg_types.add_arg(["icelake2-int.rcs.uwaterloo.ca", "icelake1-int.rcs.uwaterloo.ca"]) -#server_dat -arg_types.add_arg(["192.168.90.93"]) -#clients_dat -arg_types.add_arg(["192.168.90.102", "192.168.90.101"]) -#clients_affinity -arg_types.add_arg(["1,3,5", "1,3,5"]) -# affinity -arg_types.add_arg(SKYLAKE_SERVER_AFFINITY_SOCKET1) -# sendfile/tls/ktls -arg_types.add_args(*KTLS_ARGS) -# memloadgen -arg_types.add_arg(*MEMLOADGEN_ARGS_SKYLAKE) -# filepath -arg_types.add_arg(*FILE_PATHS) -# ODIRECT -arg_types.add_arg(False) -# mmap -arg_types.add_arg(*USE_MMAP) -all_args.append(arg_types) - - -def parse_comma_list(input : str): - return input.split(",") - -def run_setup_cmd(conf : ic.Conf, cmd : str): - ssrv : list[tuple[str, sp.Popen]] = [] - tc.log_print(f"Running command on {conf.server[0]}...") - ssrv.append((conf.server[0], tc.remote_exec(conf.server, cmd, blocking=False, check=False)[0])) - - for s in conf.clients: - tc.log_print(f"Running command on {s}...") - ssrv.append((s, tc.remote_exec([s], cmd, blocking=False, check=False)[0])) - - for p in ssrv: - _ , stderr = p[1].communicate() - if p[1].returncode != 0: - print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n") - else: - print(f"\n{ p[0] } succeeded\n") - -def setup_all(conf : ic.Conf): - setup_cmd : str = f''' - sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc rsync pkgconf isal-kmod ktls_isa-l_crypto-kmod; - sudo pkg remove -y iperf iperf3; - sudo rm -rf { BIN_PATH }; - sudo mkdir -p { BIN_PATH }; - sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH }; - cd { BIN_PATH }; - sudo git checkout dev; - sudo ./configure; - sudo make -j8; - sudo rm -rf /libtopo; - sudo mkdir -p /libtopo; - sudo git clone https://git.quacker.org/d/libtopo /libtopo; - cd /libtopo; - sudo mkdir build; - cd build; - sudo cmake ../; - sudo make install; - sudo rm -rf /numam; - sudo mkdir -p /numam; - sudo chmod 777 /numam; - ''' - run_setup_cmd(conf, setup_cmd) - - #rsync - all_clts = [] - all_clts.extend(conf.clients) - all_clts.extend(conf.server) - dir = f"{os.path.dirname(__file__)}/../" - for clt in all_clts: - print("Syncing files to " + clt + "...") - rsync_cmd = f"rsync -az --no-perms --rsync-path=\"sudo rsync\" --omit-dir-times -e \"ssh -p77\" {dir} {tc.get_ssh_user()}@{clt}:/numam/" - sp.check_call(rsync_cmd, shell=True) - - run_setup_cmd(conf, f''' - cd /numam; - sudo rm -rf build; - sudo mkdir -p build; - cd build; - sudo cmake ../; - sudo make memloadgen; - ''') - - -def stop_all(conf : ic.Conf, clients_only = False): - # stop clients - tc.log_print("Stopping clients...") - tc.remote_exec(conf.clients, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - - if not clients_only: - tc.log_print("Stopping server...") - tc.remote_exec(conf.server, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False) - -def prepare_logdir(conf : ic.Conf): - tc.log_print("Preparing server log directory...") - prep_cmd = "sudo rm -rf " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=False) - - time.sleep(0.1) - - prep_cmd = "sudo mkdir -p " + LOG_FILEPATH - tc.log_print(prep_cmd) - tc.remote_exec(conf.server, prep_cmd, check=True) - -def run_exp(conf : ic.Conf): - stop_all(conf) - while True: - prepare_logdir(conf) - - ssrvs=[] - ssrv_names=[] - cur_srv_proc = 0 - - smlg = [] - smlg_names = [] - if conf.memloadgen != None: - for emem in conf.memloadgen: - mlg_cmd = "sudo " - mlg_cpu = emem.split(".")[0] - mlg_dom = emem.split(".")[1] - mlg_pct = emem.split(".")[2] - mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17" - - tc.log_print("Starting memloadgen...") - tc.log_print(mlg_cmd) - smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0]) - smlg_names.append("memloadgen") - time.sleep(0.1) - time.sleep(5) - for eaff in parse_comma_list(conf.affinity): - server_cmd = "sudo " - server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \ - " -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \ - " -A " + eaff - if conf.tls: - server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - if conf.ktls: - server_cmd += " --enable-ssl-ktls" - if conf.odirect: - server_cmd += " --use-odirect" - if conf.mmap: - server_cmd += " --use-mmap" - server_cmd += " -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt" - # start server - tc.log_print("Starting server proc " + str(cur_srv_proc) + "...") - tc.log_print(server_cmd) - ssrv = tc.remote_exec(conf.server, server_cmd, blocking=False)[0] - ssrvs.append(ssrv) - ssrv_names.append("Server " + str(cur_srv_proc)) - cur_srv_proc = cur_srv_proc + 1 - time.sleep(0.1) - - # start clients - tc.log_print("Starting clients...") - sclts = [] - sclt_names = [] - clt_number = 0 - for i in range(len(conf.clients)): - client_aff = conf.clients_affinity[i] - for eaff in parse_comma_list(client_aff): - client_cmd = f"sudo {EXE_PATH} -c " + conf.server_dat[0] + \ - " -p " + str(SERVER_PORT_START + clt_number) + \ - " --connect-timeout 1000" + \ - " -A " + eaff + \ - " -t 25" + \ - " -P 16" + \ - " -R" + \ - " -N" + \ - " -4" + \ - " -O 10" - if conf.tls: - client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}" - client_cmd += " -J --logfile /dev/null" - - tc.log_print(conf.clients[i] + ":\n" + client_cmd) - sclts.append(tc.remote_exec([conf.clients[i]], client_cmd, blocking=False)[0]) - sclt_names.append(conf.clients[i] + "@" + eaff) - clt_number = clt_number + 1 - time.sleep(0.1) - # launch stderr monitoring thread - exclude = ["Pseudo-terminal"] - tc.errthr_create(sclts, sclt_names, exclude) - tc.errthr_create(ssrvs, ssrv_names, exclude) - if (conf.memloadgen != None): - tc.errthr_create(smlg, smlg_names, exclude) - tc.errthr_start() - cur = 0 - # selec = select.poll() - # selec.register(p.stdout, select.POLLIN) - success = False - while not success: - success = False - # either failed or timeout - # we use failure detection to save time for long durations - if tc.errthr_get_failed(): - break - - if cur >= 90: - tc.log_print("Experiment timed out. Restarting...") - break - # while selec.poll(1): - # print(p.stdout.readline()) - - success = True - for p in sclts: - if p.poll() == None: - success = False - break - - time.sleep(1) - cur = cur + 1 - - tc.errthr_stop() - stop_all(conf, clients_only=True) - tc.log_print("Cooling down...") - time.sleep(3) - stop_all(conf) - - if success: - if flush_netresult(conf): - break - -def flush_netresult(conf : ic.Conf) -> bool: - tc.log_print("Keeping results...") - # copy log directory back to machine - log_output = tc.get_odir() - os.makedirs(log_output, exist_ok=True) - scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + conf.server[0] + ":" + LOG_FILEPATH + " " + log_output + "/" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - - # parse results - log_output = log_output + "/" + os.path.basename(LOG_FILEPATH) - logs = os.listdir(log_output) - logs_bytes = [] - memload_bytes = [] - for log in logs: - tc.log_print("Processing " + log + "...") - if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"): - with open(log_output + "/" + log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log): - with open(log_output + "/" + log, "r") as f: - memloadbuf = f.read() - if len(memloadbuf) == 0: - tc.log_print("Warning: log file empty for " + log + ". Retrying...") - return False - else: - memload_bytes.append(memloadbuf) - - try: - parser = par.iperf_json_parser(logs_bytes) - bps = [] - for ml in memload_bytes: - memparser = par.memloadgen_parser(ml, 22, 32) - bps.append(memparser.bps) - - tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \ - "{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \ - " | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" ) - except Exception as e: - tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...") - scp_cmd = "sudo rm -rf " + log_output + "/*" - tc.log_print(scp_cmd) - sp.check_call(scp_cmd, shell=True) - return False - return True - -def main(): - tc.set_ssh_param("-o StrictHostKeyChecking=no -p77") - tc.set_ssh_user("oscar") - output_dirname = "run" - - confs : list[ic.Conf] = [] - for argtype in all_args: - args = argtype.get_fields() - for arg in args: - confs.append(ic.Conf(*arg)) - - name = None - options = getopt.getopt(sys.argv[1:], 'sSn:')[0] - for opt, arg in options: - if opt in ('-s'): - stop_all(confs[0]) - return - elif opt in ('-S'): - setup_all(confs[0]) - return - elif opt in ('-n'): - name = arg - - if name != None: - output_dirname = name - tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/" - tc.log_print(cpcmd) - sp.check_call(cpcmd, shell=True) - - server_set = [] - for conf in confs: - tc.log_print(conf.to_string(verbose=True)) - if conf.server[0] not in server_set: - server_set.append(conf.server[0]) - tc.log_print(f"{len(confs)} configs ({RUNS} * {len(confs)} = {RUNS * len(confs)} runs) scheduled:") - - for server in server_set: - print(f"Gathering information on {server} (sysctl)...") - p : sp.Popen = tc.remote_exec([server], "sysctl -a", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.sysctl", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (ifconfig)...") - p : sp.Popen = tc.remote_exec([server], "ifconfig", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.ifconfig", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (mount)...") - p : sp.Popen = tc.remote_exec([server], "mount", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.mount", "w") as f: - f.write(p.communicate()[0].decode()) - print(f"Gathering information on {server} (dmesg)...") - p : sp.Popen = tc.remote_exec([server], "sudo dmesg", blocking=False, check=True)[0] - with open(f"{tc.get_odir()}/{server}.dmesg", "w") as f: - f.write(p.communicate()[0].decode()) - - for conf in confs: - for i in range(0, RUNS): - tc.begin(conf.to_string()+f"_run.{i}") - run_exp(conf) - tc.end() -main() diff --git a/scripts/libs/iperfconf.py b/scripts/libs/iperfconf.py deleted file mode 100644 index 87bee5a..0000000 --- a/scripts/libs/iperfconf.py +++ /dev/null @@ -1,309 +0,0 @@ -import numpy as np -import os -import libpar as par - -class IperfConfObj: - KTLS_MODE_SW = "sw" - KTLS_MODE_IFNET = "IFNET" - KTLS_MODE_NONE = "none" - KTLS_MODE_TOE = "toe" - STORAGE_NVDIMM = "nvdimm" - STORAGE_TMPFS = "tmpfs" - STORAGE_NVME = "nvme" - STORAGE_RAID_0 = "0" - STORAGE_RAID_NONE = "none" - SERVER_ICELAKE = "icelake" - SERVER_MILAN = "milan" - SERVER_SKYLAKE = "skylake" - NIC_INTEL_100 = "ice100" - NIC_CHELSIO_100 = "cc100" - NIC_MELLANOX_100 = "mlx100" - - def __init__(self): - self.conf_odirect = False - self.conf_memloadgen_pct = 0 - self.conf_memloadgen_num = 1 - self.conf_memloadgen_threads = [] - self.conf_ktls = False - self.conf_tls = False - self.conf_sendfile = False - self.conf_storage = self.STORAGE_TMPFS - self.conf_storage_domain = 0 - self.conf_nic_domain = 0 - self.conf_nic_name = self.NIC_INTEL_100 - self.conf_ktls_mode = self.KTLS_MODE_NONE - self.conf_storage_raid = self.STORAGE_RAID_NONE - self.conf_storage_num = 1 - self.conf_mmap = False - self.conf_server = self.SERVER_ICELAKE - self.num_clients = 1 - self.server_threads = 1 - self.server_threads_base = 0 - self.server_threads_stride = 1 - self.stat_memloadgen = 0 - self.stat_throughput = 0 - self.log_driver = "" - self.log_memloadgen = {} - self.log_iperf = {} - - -class Conf: - def __init__(self, server, clients, server_dat, clients_dat, clients_affinity, affinity, sendfile, tls, ktls, memloadgen, filepath, odirect, mmap): - self.affinity = affinity - self.ktls = ktls - self.sendfile = sendfile - self.tls = tls - self.odirect = odirect - self.memloadgen = memloadgen - self.filepath = filepath - self.server = server - self.clients = clients - self.server_dat = server_dat - self.clients_dat = clients_dat - self.clients_affinity = clients_affinity - self.mmap = mmap - - def to_string(self, verbose = False): - ret = f"server.{self.server[0]}@{self.server_dat[0]}_affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_" - if verbose: - ret += f"memloadgen.{False if self.memloadgen == None else (self.memloadgen[0] + '+' + self.memloadgen[1])}" - else: - ret += f"memloadgen.{False if self.memloadgen == None else (self.memloadgen[0].split('.')[2])}" - ret += f"_filepath.{self.filepath.replace('/','-').replace('_','-')}_odirect.{self.odirect}_mmap.{self.mmap}" - return ret - - def to_string_graph(self): - ret = f"affinity.{self.affinity}_mode.{self.tls}_memloadgen.{self.memloadgen}_fs.{self.filepath}" - return ret - - def __hash__(self): - return hash((self.server, self.filepath, self.odirect, self.memloadgen, self.tls, self.sendfile, self.ktls, self.affinity)) - - def __eq__(self, other): - return (self.server, self.filepath, self.odirect, self.memloadgen, self.tls, self.sendfile, self.ktls, self.affinity) == \ - (other.server, other.filepath, other.odirect, other.memloadgen, other.tls, other.sendfile, other.ktls, other.affinity) - - def equal_except_memload(self, other): - return (self.server, self.filepath, self.odirect, self.tls, self.sendfile, self.ktls, self.affinity) == \ - (other.server, other.filepath, other.odirect, other.tls, other.sendfile, other.ktls, other.affinity) - - def __ne__(self, other): - # Not strictly necessary, but to avoid having both x==y and x!=y - # True at the same time - return not (self == other) - -class ConfData: - def __init__(self, conf : Conf): - self.conf = conf - self.bps = [] - self.memload = [] - - def add_run(self, qps : float, memload : int): - self.bps.append(qps) - self.memload.append(memload) - - def get_bps(self) -> list[float]: - return self.bps - - def get_memload(self) -> list[int]: - return self.memload - -class ArgTypes: - def __init__(self): - self.all = [[]] - - def add_arg(self, *arg : any): - new_all = [] - for val in arg: - for exst in self.all: - tmp = exst.copy() - tmp.append(val) - new_all.append(tmp) - self.all = new_all - - def add_args(self, *args : tuple[any]): - new_all = [] - for arg in args: - for exst in self.all: - tmp = exst.copy() - for val in arg: - tmp.append(val) - new_all.append(tmp) - self.all = new_all - - def get_fields(self) -> list[list[any]]: - return self.all - - -class CPUList: - def __init__(self, cores : list[int]): - self._cores = cores - self._total_cores = np.sum(cores) - self._offset_lookup : list[int] = [] - for i in range(len(cores)): - self._offset_lookup.append(0) - if (i == 0): - self._offset_lookup[i] = 0 - else: - self._offset_lookup[i] = self._offset_lookup[i-1] + cores[i-1] - - def get_all_cores(self) -> int: - return self._total_cores - - def get_cores_in_domain(self, domain : int) -> int: - return self._cores[domain] - - def cores_to_cpulist(self, cores : list[int]) -> str: - ret = "" - for i in range(len(cores)): - if (i > 0): - ret += "," - ret += str(cores[i]) - return ret - - def get_cpulist_by_offset(self, domain : int, offset : int, num : int, stride : int) -> str: - ret = self.get_cores_by_offset(domain, offset, num, stride) - return self.cores_to_cpulist(ret) - - def get_cores_by_offset(self, domain : int, offset : int, num : int, stride : int) -> list[int]: - ret = [] - offset = offset + self._offset_lookup[domain] - for _ in range(num): - ret.append(offset) - offset = offset + stride - return ret - - -def distribute_threads(num_threads : int, offset : int, num_clients : int, stride : int): - client_strs : list[list[int]] = [] - for i in range(0, num_clients): - client_strs.append([]) - - for i in range(0, num_threads): - cur_client = i % num_clients - lst = client_strs[cur_client] - if (len(lst) == 0): - lst.append(offset) - else: - lst.append(lst[-1] + stride) - return client_strs - -def list_to_comma_delimited(list : list[any]): - ret = "" - for i in range(0, len(list)): - if (i > 0): - ret += "," - ret += str(list[i]) - return ret - - -def _enum_files(rootdir : str, ret : list[str]): - if os.path.isfile(rootdir): - ret.append(rootdir) - return - - for subdir in os.listdir(rootdir): - each_dir = os.path.join(rootdir, subdir) - _enum_files(each_dir, ret) - -def process_directory(dir: str, dat : dict[Conf, ConfData]): - if (("memloadgen" in dir) and ("sendfile" in dir)): - segments = os.path.basename(dir).split("_") - server = segments[0] - affinity = segments[1].split(".")[1] - sendfile = segments[2].split(".")[1] - tls = segments[3].split(".")[1] - ktls = segments[4].split(".")[1] - memloadgen = segments[5].split(".")[1] - fs = segments[6].split(".")[1] - - # cleanup data - if int(affinity.split(",")[0]) <= 8: - if "2-int" in server: - affinity = "socket01" - else: - affinity = "socket00" - else: - if "2-int" in server: - affinity = "socket11" - else: - affinity = "socket10" - - if "tmpfs" in fs: - fs = "tmpfs" - elif "nvdimm" in fs: - fs = "nvdimm" - elif "mnt" in fs: - fs = "ssd" - - if memloadgen == "False": - memloadgen = "0" - elif memloadgen == "0": - memloadgen = "100" - - if sendfile == "True" and tls == "True" and ktls == "True": - mode = "ktls+sendfile" - elif sendfile == "True" and tls == "False" and ktls == "False": - mode = "sendfile" - elif sendfile == "False" and tls == "False" and ktls == "False": - mode = "write" - elif sendfile == "False" and tls == "True" and ktls == "False": - mode = "tls" - elif sendfile == "False" and tls == "True" and ktls == "True": - mode = "ktls" - - if "icelake" in server: - server = "icelake" - elif "milan" in server: - server = "milan" - - conf : Conf = Conf(server, None, None, None, None, affinity, mode, mode, mode, memloadgen, fs, None, None) - # conf.server = server - # conf.affinity = affinity - # conf.sendfile = mode - # conf.tls = mode - # conf.ktls = mode - # conf.memloadgen = memloadgen - # conf.filepath = fs - # conf.odirect = "" - - print("Processing category " + conf.to_string_graph() + " ... ") - - memloadgen_bps = [] - logs = [] - _enum_files(dir, logs) - logs_bytes = [] - for log in logs: - if log.endswith(".txt"): - with open(log, "r") as f: - buf = f.read() - if len(buf) > 0: - logs_bytes.append(buf) - else: - print("Warning: log file empty for " + log) - elif "memloadgen" in log: - with open(log, "r") as f: - buf = f.read() - if len(buf) > 0: - parser = par.memloadgen_parser(buf, 20, 30) - memloadgen_bps.append(parser.bps) - else: - print("Warning: memloadgen file empty for " + log) - - try: - parser = par.iperf_json_parser(logs_bytes) - - except Exception as e: - print("Warning: failed to parse " + log + ". err: " + str(e)) - return - - if not (conf in dat): - dat[conf] = ConfData(conf) - - dat[conf].add_run(parser.aggregate_egress_bps, memloadgen_bps) - return - - for subdir in os.listdir(dir): - each_dir = os.path.join(dir, subdir) - if not os.path.isfile(each_dir): - process_directory(each_dir, dat) \ No newline at end of file diff --git a/scripts/libs/libtc.py b/scripts/libs/libtc.py index accdd30..2f422c3 100644 --- a/scripts/libs/libtc.py +++ b/scripts/libs/libtc.py @@ -1,4 +1,3 @@ -from lib2to3.refactor import get_fixers_from_package import subprocess as sp import time import select diff --git a/scripts/mount.sh b/scripts/mount.sh deleted file mode 100644 index 85f048e..0000000 --- a/scripts/mount.sh +++ /dev/null @@ -1,6 +0,0 @@ -sudo mkdir -p /mnt/zroot -sudo fsck /dev/$1 -mount -t ufs /dev/$1 /mnt/zroot -sudo mkdir -p /tmpfs -mount -t tmpfs tmpfs /tmpfs -cp /mnt/zroot/large* /tmpfs/ diff --git a/scripts/mount_small.sh b/scripts/mount_small.sh deleted file mode 100644 index 4d46aa4..0000000 --- a/scripts/mount_small.sh +++ /dev/null @@ -1,20 +0,0 @@ -sudo mkdir -p /mnt/zroot -sudo fsck /dev/$1 -mount -t ufs /dev/$1 /mnt/zroot -sudo mkdir -p /tmpfs0 -sudo mkdir -p /tmpfs1 - -sysctl vfs.tmpfs.domain_target=0 -sysctl vfs.tmpfs.domain_policy=3 - -mount -t tmpfs tmpfs /tmpfs0 -cp /mnt/zroot/small_file_0_* /tmpfs0/ - -sysctl vfs.tmpfs.domain_target=1 -sysctl vfs.tmpfs.domain_policy=3 - -mount -t tmpfs tmpfs /tmpfs1 -cp /mnt/zroot/small_file_1_* /tmpfs1/ - -sysctl vfs.tmpfs.domain_target=0 -sysctl vfs.tmpfs.domain_policy=1