snapshot memloadgen transaction change

This commit is contained in:
quackerd 2022-11-01 11:01:23 +01:00
parent 565dbca278
commit 68b621fd3c
19 changed files with 1125 additions and 468 deletions

View File

@ -73,7 +73,7 @@ target_compile_options(birb_posix PRIVATE ${CC_FLAGS})
target_link_libraries(birb_posix PRIVATE pthread ntr gen)
add_executable(memloadgen util/memloadgen.cc)
target_link_libraries(memloadgen PRIVATE pthread gen ntr ${TOPO_LINK_LIBRARIES})
target_link_libraries(memloadgen PRIVATE pthread gen ntr nms ${TOPO_LINK_LIBRARIES})
target_compile_options(memloadgen PRIVATE ${CC_FLAGS} ${TOPO_CFLAGS})
add_executable(nms_test tests/nms_test.c)

View File

@ -19,6 +19,7 @@
#include <utility>
#include <vector>
#include <sys/_pthreadtypes.h>
#include <sys/param.h>
#include "defs.hh"
@ -297,9 +298,7 @@ Generator *createFacebookIA();
class memload_generator {
public:
struct memload_generator_options {
constexpr static size_t ITERATION_MAX = -1;
size_t chunk_size {512 * 1024 * 1024};
size_t iteration {ITERATION_MAX};
int verbose {0};
bool shared_buffer {true};
};
@ -318,18 +317,25 @@ class memload_generator {
// stat keeping
std::atomic<uint32_t> num_trans;
uint64_t begin_ts;
uint64_t end_ts;
std::atomic<int> * state;
std::atomic<int> * init_num;
};
std::vector<struct thread_info *> thr_infos;
std::atomic<int> end;
struct memload_generator_options opts;
std::atomic<int> state;
std::atomic<int> init_num;
static constexpr int STATE_RUN = 0;
static constexpr int STATE_RDY = 1;
static constexpr int STATE_END = 2;
static constexpr int STATE_INIT = 3;
static void *worker_thrd(void *_tinfo);
struct memload_generator_options opts;
public:
memload_generator(cpuset_t * threads, cpuset_t * target_domain, struct memload_generator_options * opt, bool *success);
uint64_t get_bps();
bool check_done();
uint64_t get_transactions();
bool start();
bool stop();
~memload_generator();
};

View File

@ -1,5 +1,6 @@
#pragma once
#include <sys/endian.h>
#include <rte_byteorder.h>
#include <rte_ether.h>
#include <rte_flow.h>
@ -152,10 +153,14 @@ str_to_netspec(char *str, struct net_spec *out)
}
constexpr static uint16_t PKT_TYPE_LOAD = 0;
constexpr static uint32_t LOAD_TYPE_CPU = 0; // arg0 = cpu time in us. arg1 = unused
constexpr static uint32_t LOAD_TYPE_MEM = 1; // arg0 = which thread to access. arg1 = how many cachelines to access
constexpr static uint32_t LOAD_TYPE_MAX = LOAD_TYPE_MEM + 1;
struct pkt_payload_load {
uint32_t epoch;
uint32_t which; // which word (thread)
uint32_t load; // how many cache lines
uint32_t type; // type of load
uint32_t arg0;
uint32_t arg1;
};
constexpr static uint16_t PKT_TYPE_PROBE = 1;

View File

@ -20,17 +20,29 @@ memload_generator::worker_thrd(void *_tinfo)
long tid;
thr_self(&tid);
// wait for other threads to init
tinfo->init_num->fetch_add(1);
if (tinfo->opts->verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: running...\n", tid);
}
tinfo->begin_ts = topo_uptime_ns();
while (tinfo->num_trans.load() < tinfo->opts->iteration) {
memcpy((char *)tinfo->from_buffer, (char *)tinfo->to_buffer, tinfo->opts->chunk_size);
tinfo->end_ts = topo_uptime_ns();
tinfo->num_trans.fetch_add(1);
}
while(true) {
switch (tinfo->state->load()) {
case STATE_RUN:
memcpy((char *)tinfo->from_buffer, (char *)tinfo->to_buffer, tinfo->opts->chunk_size);
tinfo->num_trans.fetch_add(1);
break;
case STATE_END:
goto end;
case STATE_RDY:
case STATE_INIT:
default:
break;
}
}
end:
if (tinfo->opts->verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: exiting...\n", tid);
@ -41,7 +53,8 @@ memload_generator::worker_thrd(void *_tinfo)
memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domain, struct memload_generator_options * opt, bool *success)
{
*success = false;
end.store(0);
state.store(STATE_INIT);
init_num.store(0);
std::memcpy(&this->opts, opt, sizeof(memload_generator_options));
int nextcore = CPU_FFS(threads) - 1;
@ -64,10 +77,9 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
cpuset_t cpuset;
pthread_attr_t attr;
info->stop = &this->end;
info->state = &this->state;
info->init_num = &this->init_num;
info->num_trans.store(0);
info->begin_ts = 0;
info->end_ts = 0;
info->opts = &this->opts;
if (opt->shared_buffer) {
info->from_buffer = local_buffer;
@ -96,6 +108,10 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
tid++;
}
while((uint)init_num.load() != thr_infos.size());
state.store(STATE_RDY);
*success = true;
if (opts.verbose) {
fprintf(stdout,
@ -105,31 +121,40 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
}
bool
memload_generator::check_done()
memload_generator::start()
{
bool done = true;
for (auto i : thr_infos) {
done = done && (i->num_trans.load() >= this->opts.iteration);
if (this->state.load() == STATE_RDY) {
this->state.store(memload_generator::STATE_RUN);
return true;
}
return done;
return false;
}
bool
memload_generator::stop()
{
if (this->state.load() == STATE_RUN) {
this->state.store(memload_generator::STATE_RDY);
return true;
}
return false;
}
uint64_t
memload_generator::get_bps()
memload_generator::get_transactions()
{
uint64_t total_transactions = 0;
uint64_t total_time = 0;
for (auto i : thr_infos) {
total_transactions += i->num_trans.load();
total_time = (i->end_ts - i->begin_ts) + total_time;
}
return (double)(total_transactions * this->opts.chunk_size) / ((double)total_time / thr_infos.size() / S2NS);
return total_transactions;
}
memload_generator::~memload_generator()
{
this->state.store(STATE_END);
for (auto i : thr_infos) {
// XXX: free
// XXX: nms_free regions
pthread_join(i->pthr, NULL);
delete i;
}

View File

@ -79,6 +79,7 @@ struct options_t {
std::atomic<uint32_t> s_slave_recved { 0 };
std::atomic<uint32_t> s_slave_loss { 0 };
uint32_t s_state { STATE_WAIT };
bool s_hwtimestamp { true };
Generator *s_iagen { nullptr };
std::vector<struct datapt *> s_data;
@ -121,22 +122,30 @@ rx_add_timestamp(uint16_t port, uint16_t qidx __rte_unused,
->epoch);
if (options.s_last_datapt != nullptr &&
options.s_last_datapt->epoch == epoch) {
if ((ret = rte_eth_timesync_read_rx_timestamp(
port, &ts, pkts[i]->timesync & 0x3)) ==
0) {
// has hw rx timestamp
options.s_last_datapt->clt_hw_rx =
ts.tv_sec * S2NS + ts.tv_nsec;
options.s_last_datapt->clt_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p with sw: %lu hw: %lu.\n",
(void *)pkts[i], now,
options.s_last_datapt->clt_hw_rx);
if (options.s_hwtimestamp) {
if ((ret = rte_eth_timesync_read_rx_timestamp(
port, &ts, pkts[i]->timesync & 0x3)) ==
0) {
// has hw rx timestamp
options.s_last_datapt->clt_hw_rx =
ts.tv_sec * S2NS + ts.tv_nsec;
options.s_last_datapt->clt_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p with sw: %lu hw: %lu.\n",
(void *)pkts[i], now,
options.s_last_datapt->clt_hw_rx);
} else {
rte_exit(EXIT_FAILURE,
"rx_add_timestamp: packet %p not tagged - hw ts not "
"available - %d.\n",
(void *)pkts[i], ret);
}
} else {
rte_exit(EXIT_FAILURE,
"rx_add_timestamp: packet %p not tagged - hw ts not "
"available - %d.\n",
(void *)pkts[i], ret);
options.s_last_datapt->clt_sw_rx = now;
options.s_last_datapt->clt_hw_rx = 0;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p with sw: %lu hw: (disabled).\n",
(void *)pkts[i], now);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
@ -472,17 +481,22 @@ pkt_loop()
}
if (options.s_state == STATE_SENT) {
// check if hw ts is read
// check if hw tx ts is read
if (!read_tx) {
int ret;
struct timespec ts;
if ((ret = rte_eth_timesync_read_tx_timestamp(
options.portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main: read hw tx timestamp %lu.\n",
(ts.tv_nsec + ts.tv_sec * S2NS));
options.s_last_datapt->clt_hw_tx =
ts.tv_nsec + ts.tv_sec * S2NS;
if (options.s_hwtimestamp) {
if ((ret = rte_eth_timesync_read_tx_timestamp(
options.portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main: read hw tx timestamp %lu.\n",
(ts.tv_nsec + ts.tv_sec * S2NS));
options.s_last_datapt->clt_hw_tx =
ts.tv_nsec + ts.tv_sec * S2NS;
read_tx = true;
}
} else {
options.s_last_datapt->clt_hw_tx = 0;
read_tx = true;
}
}
@ -847,6 +861,12 @@ main(int argc, char *argv[])
struct mem_conf mconf;
portconf_get(options.portid, &pconf);
if (!pconf.timesync) {
options.s_hwtimestamp = false;
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"main: timesync disabled. hw timestamp unavailable.\n ");
}
dconf.mtu = MAX_STANDARD_MTU;
dconf.num_threads = 1;
dconf.portid = options.portid;

View File

@ -8,6 +8,8 @@
#include <sys/cpuset.h>
#include <sys/endian.h>
#include <sys/sched.h>
#include <sys/types.h>
#include <topo.h>
@ -23,10 +25,11 @@
#include "ntr.h"
#include "gen.hh"
//#include "gen.hh"
#include "net/netsup.hh"
#include "net/pkt.hh"
#include "nms.h"
#include "rte_byteorder.h"
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int CACHELINE_SIZE = 64;
@ -71,13 +74,7 @@ struct options_t {
false
}; // setting this to true changes mbuf size and mtu
int port_mtu { MAX_STANDARD_MTU };
int thread_cacheline_cnt = { 128 };
bool mlg_enabled { false };
uint64_t mlg_arr_sz { 0 };
cpuset_t mlg_cset = CPUSET_T_INITIALIZER(0x2);
cpuset_t mlg_dset = CPUSET_T_INITIALIZER(0x1);
int mlg_shared_buffer { 0 };
memload_generator *mlg { nullptr };
int thread_cacheline_cnt = { 1600 }; // 100MB data per thread
uint16_t portid { 0 };
// states
@ -218,6 +215,31 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
return nb_pkts;
}
static void
worker_cpu_load(unsigned long us)
{
uint64_t now = topo_uptime_ns();
while(true) {
uint64_t cur = topo_uptime_ns();
if (cur - now >= us * 1000) {
break;
}
}
}
static void
worker_memory_load(int tid, uint32_t which, uint32_t load)
{
uint32_t start_cacheline = which % (options.thread_cacheline_cnt * options.s_thr_info.size());
uint32_t thrd = start_cacheline / options.thread_cacheline_cnt;
uint32_t start = start_cacheline % options.thread_cacheline_cnt;
struct thread_info * cur = options.s_thr_info.at(tid);
struct thread_info * tgt = options.s_thr_info.at(thrd);
for (uint32_t i = 0; i < load; i++) {
*(uint32_t *)cur->load_buffer = *(uint32_t *)((char *)tgt->cache_lines + ((start + i) % options.thread_cacheline_cnt) * CACHELINE_SIZE);
}
}
static int
locore_main(void *ti)
{
@ -331,23 +353,24 @@ locore_main(void *ti)
// perform the load
auto pld = (struct pkt_payload_load *)
pkt_data->payload;
uint32_t which = rte_be_to_cpu_32(pld->which);
uint32_t load = rte_be_to_cpu_32(pld->load);
uint32_t start_cacheline = which %
(options.thread_cacheline_cnt *
options.s_thr_info.size());
uint32_t thrd = start_cacheline /
options.thread_cacheline_cnt;
uint32_t start = start_cacheline %
options.thread_cacheline_cnt;
for (uint j = 0; j < load; j++) {
*(uint32_t *)tinfo->load_buffer =
(start + j) %
options.thread_cacheline_cnt;
}
uint32_t load_type = rte_be_to_cpu_32(pld->type);
uint32_t load_arg0 = rte_be_to_cpu_32(pld->arg0);
uint32_t load_arg1 = rte_be_to_cpu_32(pld->arg1);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: LOAD @ thread %d, start %d, load %d\n",
tinfo->tid, thrd, start, load);
"locore_main <thread %d>: LOAD type %d, arg0 %d, arg1 %d\n",
tinfo->tid, load_type, load_arg0, load_arg1);
if (load_type == LOAD_TYPE_CPU) {
worker_cpu_load(load_arg0);
} else if (load_type == LOAD_TYPE_MEM) {
worker_memory_load(tinfo->tid, load_arg0, load_arg1);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: unknown LOAD type %d, ignoring...", tinfo->tid, load_type);
break;
}
// reply
pkt_hdr_to_netspec(pkt_data, &src,
@ -473,13 +496,10 @@ dump_options()
" verbosity: +%d\n"
" thread count: %d\n"
" ip: 0x%x\n"
" MLG: %s [arr_sz: %ld, thread cnt: %d, domain: %ld]\n"
" jumbo frame: %d\n"
" port id: %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.s_host_spec.ip,
options.mlg_enabled ? "on" : "off", options.mlg_arr_sz,
CPU_COUNT(&options.mlg_cset), CPU_FFS(&options.mlg_dset) - 1,
options.jumbo_frame_enabled, options.portid);
}
@ -506,7 +526,7 @@ main(int argc, char *argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:H:mb:X:x:JSp:")) != -1) {
while ((c = getopt(argc, argv, "hvA:H:Jp:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
@ -532,26 +552,10 @@ main(int argc, char *argv[])
}
has_host_spec = true;
break;
case 'm':
options.mlg_enabled = true;
break;
case 'b':
options.mlg_arr_sz = strtoull(optarg, nullptr,
10);
break;
case 'X':
cpulist_to_cpuset(optarg, &options.mlg_dset);
break;
case 'x':
cpulist_to_cpuset(optarg, &options.mlg_cset);
break;
case 'J':
options.jumbo_frame_enabled = true;
options.port_mtu = MAX_JUMBO_MTU;
break;
case 'S':
options.mlg_shared_buffer = 1;
break;
case 'p':
options.portid = atoi(optarg);
break;
@ -600,6 +604,7 @@ main(int argc, char *argv[])
if (!pconf.timesync) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"main: timesync disabled. hw timestamp unavailable.\n ");
options.s_hwtimestamp = false;
}
dconf.mtu = options.port_mtu;
dconf.num_threads = options.num_threads;
@ -675,31 +680,8 @@ main(int argc, char *argv[])
}
}
// init mlg
if (options.mlg_enabled) {
bool success = false;
memload_generator::memload_generator_options opts;
opts.chunk_size = options.mlg_arr_sz;
opts.iteration =
memload_generator::memload_generator_options::ITERATION_MAX;
opts.shared_buffer = options.mlg_shared_buffer;
opts.verbose = (ntr_get_level(NTR_DEP_USER1) -
NTR_LEVEL_WARNING) != 0;
options.mlg = new memload_generator(&options.mlg_cset,
&options.mlg_dset, &opts, &success);
if (!success) {
rte_exit(EXIT_FAILURE, "failed to init mlg\n");
}
}
while (true) {
usleep(S2US);
if (options.mlg_enabled) {
uint64_t bps = options.mlg->get_bps();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"main: MLG bps = %ld ~= %ldM\n", bps,
bps / 1024 / 1024);
}
}
// shouldn't get here
@ -714,8 +696,6 @@ main(int argc, char *argv[])
}
}
delete options.mlg;
dpdk_cleanup(&dconf);
return 0;

View File

@ -15,7 +15,7 @@ static struct port_conf port_confs[] = {
.rxoffload = RTE_ETH_RX_OFFLOAD_RSS_HASH | RTE_ETH_RX_OFFLOAD_UDP_CKSUM | RTE_ETH_RX_OFFLOAD_IPV4_CKSUM,
.txoffload = RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE | RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_IPV4_CKSUM,
.rss_hf = RTE_ETH_RSS_FRAG_IPV4 | RTE_ETH_RSS_NONFRAG_IPV4_UDP | RTE_ETH_RSS_NONFRAG_IPV4_OTHER | RTE_ETH_RSS_L2_PAYLOAD,
.timesync = true
.timesync = false
},
{
.driver_name = "net_ice",
@ -23,6 +23,13 @@ static struct port_conf port_confs[] = {
.txoffload = RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE | RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_IPV4_CKSUM,
.rss_hf = RTE_ETH_RSS_FRAG_IPV4 | RTE_ETH_RSS_NONFRAG_IPV4_UDP | RTE_ETH_RSS_NONFRAG_IPV4_OTHER | RTE_ETH_RSS_L2_PAYLOAD,
.timesync = true
},
{
.driver_name = "net_ixgbe",
.rxoffload = RTE_ETH_RX_OFFLOAD_RSS_HASH | RTE_ETH_RX_OFFLOAD_UDP_CKSUM | RTE_ETH_RX_OFFLOAD_IPV4_CKSUM,
.txoffload = RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_IPV4_CKSUM,
.rss_hf = RTE_ETH_RSS_IPV4 | RTE_ETH_RSS_NONFRAG_IPV4_UDP,
.timesync = true
}
};

View File

@ -1,5 +1,6 @@
#include <atomic>
#include <cstddef>
#include <cstdlib>
#include <list>
#include <map>
#include <mutex>
@ -63,22 +64,14 @@ struct thread_info {
std::atomic<int> lost_pkts { 0 };
Generator *ia_gen { nullptr };
Generator *load_gen { nullptr };
std::random_device which_rd;
std::mt19937 which_rng;
std::uniform_int_distribution<uint32_t> which_dice;
Generator *load_gen0 { nullptr };
Generator *load_gen1 { nullptr };
std::mutex
mtx; // this lock protects data shared between worker threads, i.e.:
std::list<struct epoch_info *> recved_epochs;
thread_info()
: which_rd()
, which_rng(which_rd())
, which_dice(std::uniform_int_distribution<uint32_t>(0, UINT32_MAX))
{
which_rng.seed(topo_uptime_ns());
}
thread_info() = default;
};
constexpr static int STATE_SYNC = 0; // waiting for SYNC
@ -86,13 +79,16 @@ constexpr static int STATE_SYNC_ACK = 1; // Waiting for sending SYNC_ACK
constexpr static int STATE_RUNNING = 2; // Running
constexpr static int STATE_FIN = 3; // FIN received
constexpr static int WORKLOAD_MAX_ARGS = 2;
struct options_t {
unsigned int run_time { 5 };
// parameters
int slave_mode { 0 };
uint32_t rage_quit_time { UINT32_MAX };
char ia_gen[256] { "fixed" };
char ld_gen[256] { "fixed:0" };
char ia_gen[256] { "fixed:0" };
char load_gen[WORKLOAD_MAX_ARGS][256] = {{"fixed:0"}, {"fixed:0"}};
uint32_t workload_type {LOAD_TYPE_CPU};
uint32_t target_qps { 0 };
uint32_t depth { 1 };
struct net_spec server_spec { };
@ -499,10 +495,9 @@ pkt_loop(struct thread_info *tinfo)
}
pld_load = (struct pkt_payload_load *)pkt_data->payload;
pld_load->load = rte_cpu_to_be_32(
tinfo->load_gen->generate());
pld_load->which = rte_cpu_to_be_32(
tinfo->which_dice(tinfo->which_rng));
pld_load->type = rte_cpu_to_be_32(options.workload_type);
pld_load->arg0 = rte_cpu_to_be_32((uint32_t)tinfo->load_gen0->generate());
pld_load->arg1 = rte_cpu_to_be_32((uint32_t)tinfo->load_gen1->generate());
unsigned int epoch = epoch_mk(tinfo->id, cur_epoch);
pld_load->epoch = rte_cpu_to_be_32(epoch);
cur_epoch++;
@ -590,7 +585,9 @@ dump_options()
" rage quit time = %ul\n"
" slave mode = %d\n"
" interarrival dist = %s\n"
" workload dist = %s\n"
" workload type = %d\n"
" workload arg0 = %s\n"
" workload arg1 = %s\n"
" qps = %d\n"
" host IP = 0x%x\n"
" depth = %u\n"
@ -600,7 +597,7 @@ dump_options()
" portid = %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING, options.run_time,
options.s_num_threads, options.rage_quit_time, options.slave_mode,
options.ia_gen, options.ld_gen, options.target_qps,
options.ia_gen, options.workload_type, options.load_gen[0], options.load_gen[1], options.target_qps,
options.s_host_spec.ip, options.depth, options.pkt_loss_delay_ms,
options.jumbo_frame_enabled, options.pkt_pad_sz, options.portid);
}
@ -617,7 +614,9 @@ usage()
" -S: slave(rat) mode\n"
" -A: affinity mask\n"
" -i: inter-arrival time distribution\n"
" -w: workload distribution\n"
" -w: workload type\n"
" -w (repeated): workload arg0 distribution\n"
" -w (repeated): workload arg1 distribution\n"
" -r: rage quit time (in ms)\n"
" -q: target QPS\n"
" -H: host net spec\n"
@ -649,6 +648,7 @@ main(int argc, char *argv[])
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
int num_of_ws = 0;
// parse arguments
while ((c = getopt(argc, argv,
"vht:s:SA:i:w:r:q:H:D:l:JP:p:")) != -1) {
@ -689,8 +689,17 @@ main(int argc, char *argv[])
sizeof(options.ia_gen) - 1);
break;
case 'w':
strncpy(options.ld_gen, optarg,
sizeof(options.ld_gen) - 1);
if (num_of_ws == 0) {
options.workload_type = strtol(optarg, NULL, 10);
if (options.workload_type >= LOAD_TYPE_MAX) {
rte_exit(EXIT_FAILURE,
"invalid workload type %s\n", optarg);
}
} else if (num_of_ws <= WORKLOAD_MAX_ARGS) {
strncpy(options.load_gen[num_of_ws - 1], optarg, 255);
}
num_of_ws++;
break;
case 'r':
options.rage_quit_time = strtol(optarg, nullptr,
@ -818,8 +827,9 @@ main(int argc, char *argv[])
unsigned int lcore_id = cpuset_idx - 1;
tinfo = new thread_info;
tinfo->ia_gen = createGenerator(options.ia_gen);
tinfo->load_gen = createGenerator(options.ld_gen);
if (tinfo->ia_gen == nullptr || tinfo->load_gen == nullptr) {
tinfo->load_gen0 = createGenerator(options.load_gen[0]);
tinfo->load_gen1 = createGenerator(options.load_gen[1]);
if (tinfo->ia_gen == nullptr || tinfo->load_gen0 == nullptr || tinfo->load_gen1 == nullptr) {
rte_exit(EXIT_FAILURE,
"invalid ia_gen or ld_gen string\n");
}
@ -886,7 +896,8 @@ main(int argc, char *argv[])
qps, total_recv, total_loss);
for (auto each : options.s_thr_info) {
delete each->load_gen;
delete each->load_gen0;
delete each->load_gen1;
delete each->ia_gen;
delete each;
}

310
scripts/iperf.py Normal file
View File

@ -0,0 +1,310 @@
from http import server
import subprocess as sp
import time
import os
import datetime
import sys
import getopt
import libpar as par
import libtc as tc
import libmechspec as mechspec
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
MLG_PATH = "/numam/build/bin/memloadgen"
EXE_PATH = BIN_PATH + "/src/iperf3"
SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
#SERVER = ["skylake2.rcs.uwaterloo.ca"]
#CLIENTS = ["skylake1.rcs.uwaterloo.ca", "skylake3.rcs.uwaterloo.ca", "skylake6.rcs.uwaterloo.ca", "skylake7.rcs.uwaterloo.ca"]
SERVER_PORT_START = 8050
SERVER = ["icelake2-int.rcs.uwaterloo.ca"]
CLIENTS = ["icelake1-int.rcs.uwaterloo.ca", "milan1-int.rcs.uwaterloo.ca", "milan2-int.rcs.uwaterloo.ca"]
SERVER_DAT = ["192.168.100.102"]
CLIENTS_DAT = ["192.168.100.101", "192.168.100.103", "192.168.100.104"]
MEMLOAD_BLKSZ = 1024*1024*512
# paths
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
class Conf:
def __init__(self, affinity, sendfile, tls, ktls, memloadgen, filepath):
self.affinity = affinity
self.ktls = ktls
self.sendfile = sendfile
self.tls = tls
self.memloadgen = memloadgen
self.filepath = filepath
def to_string(self):
return f"affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_memloadgen.{self.memloadgen != None}_filepath.{self.filepath.replace('/','-')}"
class ArgTypes:
def __init__(self):
self.all = [[]]
def add_arg(self, arg : list[list[any]]):
new_all = []
for val in arg:
for exst in self.all:
tmp = exst.copy()
tmp.extend(val)
new_all.append(tmp)
self.all = new_all
def get_fields(self) -> list[list[any]]:
return self.all
#SERVER_AFFINITY = ["25,27,29,31,33,35,37,39,41,43,45,47"]
#SERVER_AFFINITY = ["1,3,5,7,9,11,13,15,17,19,21,23"]
#SERVER_AFFINITY = ["1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47"]
CLIENTS_AFFINITY = ["1,3,5,7", "1,3,5,7", "1,3,5,7"]
arg_types : ArgTypes = ArgTypes()
# affinity
arg_types.add_arg([["1,3,5,7,9,11,13,15,17,19,21,23"]])
#arg_types.add_arg([["49,51,53,55,57,59,61,63,65,67,69,71"]])
# sendfile/tls/ktls
arg_types.add_arg([
[False, False, False],
[True, False, False],
[False, True, False],
[False, True, True],
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95_0", "25,27,29,31,33,35,37,39,41,43,45,47_1"]], [None]] )
#arg_types.add_arg([[None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/nvdimm/large_file_#p"]])
#arg_types.add_arg([["/tmpfs/large_file_#p"]])
def parse_comma_list(input : str):
return input.split(",")
def setup_all():
setup_cmd : str = f''' sudo pkg install -y openssl-devel vim curl wget gmake cmake openssl-devel llvm gcc;
sudo pkg remove -y iperf iperf3;
sudo rm -rf { BIN_PATH };
sudo mkdir -p { BIN_PATH };
sudo git clone https://git.quacker.org/d/iperf3-tls { BIN_PATH };
cd { BIN_PATH };
sudo ./configure;
sudo make -j8 '''
ssrv : list[tuple[str, sp.Popen]] = []
tc.log_print(f"Setting up {SERVER[0]}...")
ssrv.append((SERVER[0], tc.remote_exec(SERVER, setup_cmd, blocking=False, check=False)[0]))
for s in CLIENTS:
tc.log_print(f"Setting up {s}...")
ssrv.append((s, tc.remote_exec([s], setup_cmd, blocking=False, check=False)[0]))
for p in ssrv:
_ , stderr = p[1].communicate()
if p[1].returncode != 0:
print(f"\n{ p[0] } failed. stderr:\n{stderr.decode()}\n")
else:
print(f"\n{ p[0] } succeeded\n")
def stop_all():
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(CLIENTS, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
tc.log_print("Stopping server...")
tc.remote_exec(SERVER, "sudo killall -9 iperf3; sudo killall -9 memloadgen", check=False)
def prepare_logdir():
tc.log_print("Preparing server log directory...")
prep_cmd = "sudo rm -rf " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(SERVER, prep_cmd, check=False)
time.sleep(0.1)
prep_cmd = "sudo mkdir -p " + LOG_FILEPATH
tc.log_print(prep_cmd)
tc.remote_exec(SERVER, prep_cmd, check=True)
def run_exp(conf : Conf):
stop_all()
while True:
prepare_logdir()
ssrvs=[]
ssrv_names=[]
cur_srv_proc = 0
smlg = []
smlg_names = []
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split("_")[0]
mlg_dom = emem.split("_")[1]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -i -1 -s {mlg_cpu} -d {mlg_dom}"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(SERVER, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
" -F " + conf.filepath.replace("#p", str(cur_srv_proc)) + \
" -A " + eaff + \
" -J --logfile " + LOG_FILEPATH + "/" + eaff + ".txt"
if conf.tls:
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(SERVER, server_cmd, blocking=False)[0]
ssrvs.append(ssrv)
ssrv_names.append("Server " + str(cur_srv_proc))
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
time.sleep(5)
# start clients
tc.log_print("Starting clients...")
sclts = []
sclt_names = []
clt_number = 0
for i in range(len(CLIENTS)):
client_aff = CLIENTS_AFFINITY[i]
for eaff in parse_comma_list(client_aff):
client_cmd = f"sudo {EXE_PATH} -c " + SERVER_DAT[0] + \
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 30" + \
" -P 4" + \
" -R" + \
" -N" + \
" -4" + \
" -O 10" + \
" -J --logfile /dev/null"
if conf.tls:
client_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
client_cmd += " --enable-ssl-ktls"
if conf.sendfile:
client_cmd += " -Z"
tc.log_print(CLIENTS[i] + ":\n" + client_cmd)
sclts.append(tc.remote_exec([CLIENTS[i]], client_cmd, blocking=False)[0])
sclt_names.append(CLIENTS[i] + "@" + eaff)
clt_number = clt_number + 1
time.sleep(0.1)
# launch stderr monitoring thread
exclude = ["Pseudo-terminal"]
tc.errthr_create(sclts, sclt_names, exclude)
tc.errthr_create(ssrvs, ssrv_names, exclude)
if (conf.memloadgen != None):
tc.errthr_create(smlg, smlg_names, exclude)
tc.errthr_start()
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
success = False
while not success:
success = False
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed():
break
# while selec.poll(1):
# print(p.stdout.readline())
success = True
for p in sclts:
if p.poll() == None:
success = False
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
tc.log_print("Cooling down...")
time.sleep(5)
if success:
flush_netresult()
break
def flush_netresult():
tc.log_print("Keeping results...")
# copy log directory back to machine
log_output = tc.get_odir()
os.makedirs(log_output, exist_ok=True)
scp_cmd = "scp -P77 -r " + tc.get_ssh_user() + "@" + SERVER[0] + ":" + LOG_FILEPATH + " " + log_output + "/"
tc.log_print(scp_cmd)
sp.check_call(scp_cmd, shell=True)
# parse results
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
tc.log_print("Processing " + str(len(logs)) + " logs ...")
for log in logs:
if os.path.isfile(log_output + "/" + log):
with open(log_output + "/" + log, "r") as f:
logs_bytes.append(f.read())
parser = par.iperf_json_parser(logs_bytes)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s")
def main():
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
options = getopt.getopt(sys.argv[1:], 'sS')[0]
for opt, arg in options:
if opt in ('-s'):
stop_all()
return
elif opt in ('-S'):
setup_all()
return
tc.init("~/results.d/iperf3/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
args = arg_types.get_fields()
confs : list[Conf] = []
for arg in args:
confs.append(Conf(*arg))
print(f"{len(confs)} configs to run:")
for conf in confs:
print(conf.to_string())
for conf in confs:
tc.begin(conf.to_string())
run_exp(conf)
tc.end()
stop_all()
main()

View File

@ -0,0 +1,25 @@
class NetSpec:
def __init__(self, fqdn, ip, mac) -> None:
self.mac = mac
self.ip = ip
self.fqdn = fqdn
self.netspec = ip + "@" + mac
class LabNetSpecs:
def __init__(self) -> None:
self.SKYLAKE1_10G = NetSpec(fqdn = "skylake1.rcs.uwaterloo.ca",ip = "192.168.123.11", mac = "")
self.SKYLAKE2_10G = NetSpec(fqdn = "skylake2.rcs.uwaterloo.ca",ip = "192.168.123.12", mac = "3c:15:fb:c9:f3:36")
self.SKYLAKE3_10G = NetSpec(fqdn = "skylake3.rcs.uwaterloo.ca",ip = "192.168.123.13", mac = "3c:15:fb:c9:f3:4b")
self.SKYLAKE4_10G = NetSpec(fqdn = "skylake4.rcs.uwaterloo.ca",ip = "192.168.123.14", mac = "")
self.SKYLAKE5_10G = NetSpec(fqdn = "skylake5.rcs.uwaterloo.ca",ip = "192.168.123.15", mac = "3c:15:fb:c9:f3:28")
self.SKYLAKE6_10G = NetSpec(fqdn = "skylake6.rcs.uwaterloo.ca",ip = "192.168.123.16", mac = "3c:15:fb:62:9b:2f")
self.SKYLAKE7_10G = NetSpec(fqdn = "skylake7.rcs.uwaterloo.ca",ip = "192.168.123.17", mac = "3c:15:fb:c9:f3:44")
self.SKYLAKE8_10G = NetSpec(fqdn = "skylake8.rcs.uwaterloo.ca",ip = "192.168.123.18", mac = "3c:15:fb:62:9c:be")
self.MILAN1_100G = NetSpec(fqdn = "milan1-int.rcs.uwaterloo.ca",ip = "192.168.123.19", mac = "")
self.MILAN1_10G = NetSpec(fqdn = "milan1-int.rcs.uwaterloo.ca",ip = "192.168.123.19", mac = "a0:42:3f:4d:cb:bc")
self.ICELAKE2_100G = NetSpec(fqdn = "icelake2-int.rcs.uwaterloo.ca",ip = "192.168.123.20", mac = "")
self.ICELAKE2_10G = NetSpec(fqdn = "icelake2-int.rcs.uwaterloo.ca",ip = "192.168.123.20", mac = "")
LAB = LabNetSpecs()

View File

@ -1,6 +1,42 @@
import json
import numpy as np
class iperf_json_parser:
def __init__(self, inputs):
self.aggregate_egress_bps = 0
self.jsonobjs = []
for input in inputs:
jsobj = json.loads(input)
self.jsonobjs.append(jsobj)
each_bps = jsobj['end']['sum_sent']['bits_per_second']
self.aggregate_egress_bps += each_bps
class pmc_parser:
def __init__(self, input):
self.raw = input
lines = input.split('\n')
if len(lines) < 2:
raise Exception("Invalid pmc file format")
spec = lines[0].strip()
if (spec[0] != '#'):
raise Exception("Invalid pmc file spec line: \"" + lines[0] + "\"")
spec = spec.split(' ')
self.cores = len(spec) - 1
elements = spec[1].split('/')
if (len(elements) != 3):
raise Exception("Invalid pmc file spec line: \"" + lines[0] + "\"")
self.counter = elements[2].strip()
last_line = lines[-1]
elements = last_line.split(' ')
total = 0
for e in elements:
if (len(e) > 0):
total += int(e)
self.count = total
class khat_parser:
class pt:
def __init__(self):

View File

@ -66,11 +66,19 @@ def set_ssh_param(para):
global ssh_param
ssh_param = para
def get_ssh_param():
global ssh_param
return ssh_param
ssh_user = None
def set_ssh_user(user):
global ssh_user
ssh_user = user
def get_ssh_user():
global ssh_user
return ssh_user
def remote_exec(srv, cmd, blocking=True, check=True):
sub = []
for s in srv:

269
scripts/netexp.py Normal file
View File

@ -0,0 +1,269 @@
import time
import subprocess as sp
import libpar as par
import libtc as tc
import libmechspec as mechspec
class NetExpResult:
def __init__(self):
self.parser = None
self.pmc_parser = None
self.sample = None
class NetExpConf:
def __init__(self):
self.root_dir = ""
self.enable_client_only = False
self.enable_memgen = False
self.memgen_affinity = ""
self.memgen_iteration = -1
self.memgen_size = 512 * 1024 * 1024
self.memgen_tgtdom = 1
self.srv_affinity = ""
self.srv_mechspec = None
self.srv_port = 0
self.clt_qps = 0
self.clt_mechspecs = []
self.clt_affinity = "1"
self.clt_wrkld = 0
self.clt_wrkarg0 = "fixed:0"
self.clt_wrkarg1 = "fixed:0"
self.clt_pkt_loss_lat = 1000
self.clt_rage_quit_lat = 1000
self.clt_port = 0
self.clt_pkt_pad = 0
self.clt_pkt_depth = 1
self.clt_ia = "exponential"
self.mst_mechspec = None
self.mst_affinity = "2"
self.mst_qps = 100
self.mst_port = 0
self.mst_pkt_loss_lat = 1000
self.mst_pkt_loss_max = 1000
self.mst_duration = 10
self.mst_warmup = 5
self.mst_ia = "exponential"
self.enable_pmc = False
self.pmc_counters = []
self.pmc_mode = 0 # 0 = sampling
self.pmc_sampling_rate = 8192
self.pmc_counting_interval = 0.1
def __build_fqdn_arr(self, ns):
ret = []
for n in ns:
ret.append(n.fqdn)
return ret
def get_pmc_str(self):
ret = ""
for counter in self.pmc_counters:
ret = ret + counter + ","
return ret[:-1]
def calc_client_qps(self):
return 0 if self.clt_qps == 0 else (int)((self.clt_qps - self.mst_qps) / len(self.clt_mechspecs))
def finalize_mechspecs(self):
self.clt_fqdns = self.__build_fqdn_arr(self.clt_mechspecs)
self.srv_fqdns = self.__build_fqdn_arr([self.srv_mechspec])
self.mst_fqdns = self.__build_fqdn_arr([self.mst_mechspec])
__SAMPLE_FN = "sample.txt.tmp"
__PMC_FN = "pmc.txt.tmp"
def __keep_result(conf : NetExpConf):
result = NetExpResult()
target_scp_fn = tc.get_odir() + "/" + __SAMPLE_FN
scpcmd = "scp -P77 " + tc.get_ssh_user() + "@" + conf.mst_mechspec.fqdn + ":" + conf.root_dir + "/" + __SAMPLE_FN + " " + target_scp_fn
tc.log_print(scpcmd)
sp.check_call(scpcmd, shell=True)
result.parser = par.khat_parser()
with open(target_scp_fn, "r") as f:
result.sample = f.read()
result.parser.parse(result.sample)
rmcmd = "rm " + target_scp_fn
tc.log_print(rmcmd)
sp.check_call(rmcmd, shell=True)
if conf.enable_pmc:
target_pmc_fn = tc.get_odir() + "/" + __PMC_FN
pmcscpcmd = "scp -P77 " + tc.get_ssh_user() + "@" + conf.srv_mechspec.fqdn + ":" + conf.root_dir + "/" + __PMC_FN + " " + target_pmc_fn
tc.log_print(pmcscpcmd)
sp.check_call(pmcscpcmd, shell=True)
if conf.pmc_mode == 0:
pmcproccmd = "sudo pmcstat -R " + conf.root_dir + "/" + __PMC_FN + " -m " + conf.root_dir + "/" + __PMC_FN + ".proc"
tc.log_print(pmcproccmd)
tc.remote_exec(conf.srv_fqdns, pmcproccmd)
pmcscpcmd = "scp -P77 " + tc.get_ssh_user() + "@" + conf.srv_mechspec.fqdn + ":" + conf.root_dir + "/" + __PMC_FN + ".proc" + " " + target_pmc_fn + ".proc"
tc.log_print(pmcscpcmd)
sp.check_call(pmcscpcmd, shell=True)
if conf.pmc_mode != 0:
with open(target_pmc_fn, "r") as f:
result.pmc_parser = par.pmc_parser(f.read())
else:
with open(target_pmc_fn, "rb") as f:
with open(target_pmc_fn + ".proc", "r") as g:
result.pmc_parser = [f.read(), g.read()]
rmcmd = "rm " + target_pmc_fn + ".proc"
tc.log_print(rmcmd)
sp.check_call(rmcmd, shell=True)
rmcmd = "rm " + target_pmc_fn
tc.log_print(rmcmd)
sp.check_call(rmcmd, shell=True)
return result
def stop_all(conf : NetExpConf):
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(conf.clt_fqdns, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat; sudo killall -9 memloadgen", check=False)
# stop master
tc.log_print("Stopping master...")
tc.remote_exec(conf.mst_fqdns, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat; sudo killall -9 memloadgen", check=False)
if not conf.enable_client_only:
# stop server
tc.log_print("Stopping server...")
tc.remote_exec(conf.srv_fqdns, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat; sudo killall -9 memloadgen", check=False)
if conf.enable_pmc:
tc.log_print("Stopping server PMC...")
tc.remote_exec(conf.srv_fqdns, "sudo killall -9 pmcstat", check=False)
def run(conf : NetExpConf):
stop_all(conf)
while True:
server_cmd = "sudo "
if conf.enable_pmc:
if conf.pmc_mode != 0:
pmc_cmd = "sudo pmcstat -C -w " + str(conf.pmc_counting_interval) + " -s " + conf.get_pmc_str() + " -o " + conf.root_dir + "/" + __PMC_FN
else:
pmc_cmd = "sudo pmcstat -n " + str(conf.pmc_sampling_rate) + " -S " + conf.get_pmc_str() + " -O " + conf.root_dir + "/" + __PMC_FN
tc.log_print("Starting server PMC...")
tc.log_print(pmc_cmd)
spmc = tc.remote_exec(conf.srv_fqdns, pmc_cmd, blocking=False)
server_cmd += conf.root_dir + "/khat --log-level lib.eal:err -- -A " + conf.srv_affinity + \
" -H " + conf.srv_mechspec.netspec + " -p " + str(conf.srv_port)
if int(conf.clt_pkt_pad) > 1518:
server_cmd += " -J "
if conf.enable_client_only:
ssrv = None
tc.log_print(server_cmd)
else:
# start server
tc.log_print("Starting server...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(conf.srv_fqdns, server_cmd, blocking=False)
if conf.enable_memgen:
memgen_cmd = "sudo " + conf.root_dir + "/memloadgen -b " + str(conf.memgen_size) + " -s " + conf.memgen_affinity + \
" -i " + str(conf.memgen_iteration) + " -d " + str(conf.memgen_tgtdom)
tc.log_print("Starting memloadgen...")
tc.log_print(memgen_cmd)
smem = tc.remote_exec(conf.srv_fqdns, memgen_cmd, blocking=False)
# start clients
tc.log_print("Starting clients...")
sclt = []
sclt_name = []
for i in range(len(conf.clt_fqdns)):
client_cmd = "sudo " + conf.root_dir + "/rat --log-level lib.eal:err -- -S -A " + conf.clt_affinity + \
" -i " + conf.clt_ia + \
" -q " + str(conf.calc_client_qps()) + \
" -H " + conf.clt_mechspecs[i].netspec + \
" -s " + conf.srv_mechspec.netspec + \
" -r " + str(conf.clt_rage_quit_lat) + \
" -l " + str(conf.clt_pkt_loss_lat) + \
" -w " + str(conf.clt_wrkld) + \
" -w " + str(conf.clt_wrkarg0) + \
" -w " + str(conf.clt_wrkarg1) + \
" -P " + str(conf.clt_pkt_pad) + \
" -D " + str(conf.clt_pkt_depth) + \
" -p " + str(conf.clt_port)
if int(conf.clt_pkt_pad) > 1518:
client_cmd += " -J "
tc.log_print(client_cmd)
sclt.append(tc.remote_exec([conf.clt_fqdns[i]], client_cmd, blocking=False)[0])
sclt_name.append(conf.clt_fqdns[i])
time.sleep(5)
# start master
tc.remote_exec
tc.log_print("Starting master...")
master_cmd = "sudo " + conf.root_dir + "/cat --log-level lib.eal:err -- " + \
" -s " + conf.srv_mechspec.netspec + \
" -o " + conf.root_dir + "/" + __SAMPLE_FN + \
" -t " + str(conf.mst_duration) + \
" -T " + str(conf.mst_warmup) + \
" -i " + conf.mst_ia + \
" -q " + str(conf.mst_qps) + \
" -l " + str(conf.mst_pkt_loss_lat) + \
" -L " + str(conf.mst_pkt_loss_max) + \
" -A " + conf.mst_affinity + \
" -H " + conf.mst_mechspec.netspec + \
" -p " + str(conf.mst_port)
for clt in conf.clt_mechspecs:
master_cmd += " -S " + clt.netspec
tc.log_print(master_cmd)
sp = tc.remote_exec(conf.mst_fqdns, master_cmd, blocking=False)
p = sp[0]
# launch stderr monitoring thread
exclude = ["Pseudo-terminal", "ice_", "i40e_"]
tc.errthr_create([p], conf.mst_fqdns, exclude)
if not conf.enable_client_only:
tc.errthr_create(ssrv, conf.srv_fqdns, exclude)
tc.errthr_create(sclt, sclt_name, exclude)
if conf.enable_memgen:
tc.errthr_create(smem, ["memloadgen"], exclude)
if conf.enable_pmc:
tc.errthr_create(spmc, ["pmcstat"], exclude)
tc.errthr_start()
success = False
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
while True:
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed() or cur >= (conf.mst_warmup + conf.mst_duration) * 3:
break
# while selec.poll(1):
# print(p.stdout.readline())
if p.poll() != None:
success = True
break
time.sleep(1)
cur = cur + 1
stop_all(conf)
tc.errthr_stop()
tc.log_print("Cooling down...")
time.sleep(5)
if success:
return __keep_result(conf)

View File

@ -1,3 +1,5 @@
from cgi import test
from site import abs_paths
import subprocess as sp
import time
import select
@ -11,329 +13,213 @@ import re
import libpar as par
import libtc as tc
import libmechspec as mechspec
import netexp
# load_gen
loadgen_load = "fixed:0"
only_max_qps = True
# [[counter names], counting mode (0 = sampling, 1 = counting)]
pmc_counters = [
#"",
# [["mem_load_l3_miss_retired.local_dram"], 1],
# [["mem_load_l3_miss_retired.remote_dram"], 1],
# [["mem_load_l3_miss_retired.remote_hitm"], 1],
# [["mem_load_l3_miss_retired.remote_fwd"], 1]
# [["mem_trans_retired.load_latency_gt_8"], 0],
# [["mem_trans_retired.load_latency_gt_16"], 0],
# [["mem_trans_retired.load_latency_gt_32"], 0],
# [["mem_trans_retired.load_latency_gt_64"], 0],
# [["mem_trans_retired.load_latency_gt_128"], 0],
# [["mem_trans_retired.load_latency_gt_256"], 0],
# [["mem_trans_retired.load_latency_gt_512"], 0],
[["mem_trans_retired.load_latency_gt_8", ""], 0],
]
# pkt_pad
jumbo_frame_threshold = 1518
pkt_pads = [
#"9018",
#1518",
"0",
"256",
"512",
"1024"
#"2048",
#"4096",
#"8192"
clt_pkt_pads = [
0,
# 256,
# 512,
# 1024,
# 2048,
# 4096,
# 8192
]
pkt_pads_depth = {}
pkt_pads_depth["0"] = "32"
pkt_pads_depth["256"] = "16"
pkt_pads_depth["512"] = "8"
pkt_pads_depth["1024"] = "4"
pkt_pads_depth["1518"] = "4"
pkt_pads_depth["2048"] = "2"
pkt_pads_depth["4096"] = "1"
pkt_pads_depth["8192"] = "1"
pkt_pads_depth["9018"] = "1"
clt_pkt_pads_depth = {}
clt_pkt_pads_depth[0] = 8
clt_pkt_pads_depth[256] = 6
clt_pkt_pads_depth[512] = 6
clt_pkt_pads_depth[1024] = 4
clt_pkt_pads_depth[1518] = 4
clt_pkt_pads_depth[2048] = 2
clt_pkt_pads_depth[4096] = 2
clt_pkt_pads_depth[8192] = 1
clt_pkt_pads_depth[9018] = 1
# memgen
enable_memgen = False
memgen_mask = [
"0xFFFFFF000000",
"0x000000FFFFFF",
"0xFFFFFF000000",
"0xFFFFFF000000",
"0xFFFFFF000000",
"0xFFFFFF000000",
"0xFFFFFF000000",
"0x000000FFFFFF",
"0x000000FFFFFF",
"0x000000FFFFFF",
"0x000000FFFFFF",
"0x000000FFFFFF",
# clt_load
clt_wrkld = [
[0, "fixed:0", "fixed:0"],
# [0, "uniform:1000", "fixed:0"],
# [0, "uniform:100", "fixed:0"],
# [0, "uniform:10", "fixed:0"],
# [1, "uniform:480", "uniform:1024"],
# [1, "uniform:480", "uniform:256"],
# [1, "uniform:480", "uniform:64"]
]
memgen_target = [
"0xA",
"0xA000000",
"0xA",
"0xA",
"0xA",
"0xA",
"0xA",
"0xA000000",
"0xA000000",
"0xA000000",
"0xA000000",
"0xA000000",
]
# paths
test_dir = "/numam.d/build/bin"
file_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.join(file_dir,"..")
sample_filename = "sample.txt"
affinity = [
"1,3,5,7,9,11,13,15,17,19,21,23",
"65,67,69,71,73,75,77,79,81,83,85,87,89,91,93,95,97,99,101,103,105,107,109,111,113,115,117,119,121,123,125,127",
"1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59,61,63",
"1,3,5,7,9,11,13,15",
"17,19,21,23,25,27,29,31",
"33,35,37,39,41,43,45,47",
"49,51,53,55,57,59,61,63"
# [srv_affinity, OPTIONAL( memgen_affinity, iteration, buffer_size, target_dom )]
server_affinity = [
["1,3,5,7,9,11,13,15,17,19,21,23"],
["25,27,29,31,33,35,37,39,41,43,45,47"],
#["1,3,5,7,9,11,13,15,17,19,21,23", "26,28,30,32,34,36,38,40,42,44,46", -1, 512*1024*1024, 0],
#["25,27,29,31,33,35,37,39,41,43,45,47", "2,4,6,8,10,12,14,16,18,20,22", -1, 512*1024*1024, 1],
# "65,67,69,71,73,75,77,79,81,83,85,87,89,91,93,95,97,99,101,103,105,107,109,111,113,115,117,119,121,123,125,127",
# "1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59,61,63",
# "1,3,5,7,9,11,13,15",
# "17,19,21,23,25,27,29,31",
# "33,35,37,39,41,43,45,47",
# "49,51,53,55,57,59,61,63"
]
server = ["skylake5.rcs.uwaterloo.ca"]
server_spec = ["192.168.123.13@3c:15:fb:c9:f3:28"]
master_cpumask = "2" # 1 thread
def flush_netresult(conf : netexp.NetExpConf, result : netexp.NetExpResult):
sample_out = tc.get_odir() + "/" + str(result.parser.qps) + ".txt"
master = ["icelake2-int.rcs.uwaterloo.ca"]
master_spec = ["192.168.123.9@40:a6:b7:7c:86:10"]
#server = ["milan1-int.rcs.uwaterloo.ca"]
#server_spec = ["192.168.123.9@00:07:43:54:37:08"]
with open(sample_out, "w") as f:
f.write(result.sample)
clients = ["skylake2.rcs.uwaterloo.ca"]#, "skylake5.rcs.uwaterloo.ca", "skylake6.rcs.uwaterloo.ca"]# "skylake7.rcs.uwaterloo.ca", "skylake8.rcs.uwaterloo.ca"]
client_spec = ["192.168.123.12@3c:15:fb:c9:f3:36"]#, "192.168.123.13@3c:15:fb:c9:f3:28", "192.168.123.14@3c:15:fb:62:9b:2f"] #, "192.168.123.13@3c:15:fb:62:9c:be"]
client_cpumask = "1,3,5,7,9,11,13,15,17,19,21,23"
client_rage_quit = 1000 #1s
warmup = 5
duration = 20
cooldown = 5
cacheline = 0
SSH_PARAM = "-o StrictHostKeyChecking=no -p77"
SSH_USER = "oscar"
master_qps = 100
master_pkt_loss = 100
master_pkt_loss_failure = -1
hostfile = None
lockstat = False
client_only = False
def stop_all():
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(clients, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat", check=False)
if not client_only:
# stop server
tc.log_print("Stopping server...")
tc.remote_exec(server, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat", check=False)
# stop master
tc.log_print("Stopping master...")
tc.remote_exec(master, "sudo killall -9 rat; sudo killall -9 cat; sudo killall -9 khat", check=False)
def get_client_str():
ret = ""
for client in client_spec:
ret += " -S " + client + " "
return ret
def calc_client_ld(ld : int):
return 0 if ld == 0 else (int)((ld - master_qps) / len(clients))
def run_exp(affinity : str, ld : int, pkt_pad : str, aff_idx : int):
while True:
server_cmd = "sudo " + test_dir + "/khat --log-level lib.eal:err -- -A " + affinity + \
" -H " + server_spec[0]
if int(pkt_pad) > jumbo_frame_threshold:
server_cmd += " -J "
if enable_memgen:
server_cmd += " -m -b 0 -X " + memgen_target[aff_idx] + " -x " + memgen_mask[aff_idx]
if client_only:
ssrv = None
tc.log_print(server_cmd)
if conf.enable_pmc:
pmc_out = tc.get_odir() + "/" + str(result.parser.qps) + ".pmc"
if conf.pmc_mode != 0:
with open(pmc_out, "w") as f:
f.write(result.pmc_parser.raw)
else:
# start server
tc.log_print("Starting server...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(server, server_cmd, blocking=False)
with open(pmc_out, "wb") as f:
f.write(result.pmc_parser[0])
with open(pmc_out + "_parsed", "w") as g:
g.write(result.pmc_parser[1])
# start clients
tc.log_print("Starting clients...")
sclt = []
sclt_name = []
for i in range(len(clients)):
client_cmd = "sudo " + test_dir + "/rat --log-level lib.eal:err -- -S -A " + client_cpumask + \
" -i exponential " + \
" -q " + str(calc_client_ld(ld)) + \
" -H " + client_spec[i] + \
" -s " + server_spec[0] + \
" -r " + str(client_rage_quit) + \
" -l 100 " + \
" -w " + loadgen_load + \
" -P " + pkt_pad + \
" -D " + pkt_pads_depth[pkt_pad]
if int(pkt_pad) > jumbo_frame_threshold:
client_cmd += " -J "
tc.log_print(client_cmd)
sclt.append(tc.remote_exec([clients[i]], client_cmd, blocking=False)[0])
sclt_name.append(clients[i])
time.sleep(5)
# start master
tc.log_print("Starting master...")
master_cmd = "sudo " + test_dir + "/cat --log-level lib.eal:err -- " + \
" -s " + server_spec[0] + \
" -o " + test_dir + "/" + sample_filename + \
" -t " + str(duration) + \
" -T " + str(warmup) + \
" -i exponential" + \
" -q " + str(master_qps) + \
" -l " + str(master_pkt_loss) + \
" -L " + str(master_pkt_loss_failure) + \
" -A " + master_cpumask + \
" -H " + master_spec[0] + \
get_client_str()
tc.log_print(master_cmd)
sp = tc.remote_exec(master, master_cmd, blocking=False)
p = sp[0]
# launch stderr monitoring thread
exclude = ["Pseudo-terminal", "ice_", "i40e_"]
tc.errthr_create([p], master, exclude)
if not client_only:
tc.errthr_create(ssrv, server, exclude)
tc.errthr_create(sclt, sclt_name, exclude)
tc.errthr_start()
success = False
cur = 0
# selec = select.poll()
# selec.register(p.stdout, select.POLLIN)
while True:
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed() or cur >= (warmup + duration) * 3:
break
# while selec.poll(1):
# print(p.stdout.readline())
if p.poll() != None:
success = True
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
print("Cooling down...")
time.sleep(cooldown)
if success:
return
def keep_results(qps_only : bool = False):
success = True
target_scp_fn = tc.get_odir() + "/sample.txt"
scpcmd = "scp -P77 oscar@" + master[0] + ":" + test_dir + "/" + sample_filename + " " + target_scp_fn
tc.log_print(scpcmd)
sp.check_call(scpcmd, shell=True)
parser = par.khat_parser()
with open(target_scp_fn, "r") as f:
parser.parse(f.read())
target_fn = tc.get_odir() + "/" + str(parser.qps) + ".txt"
mvcmd = "mv " + target_scp_fn + " " + target_fn
tc.log_print(mvcmd)
sp.check_call(mvcmd, shell=True)
tc.log_print("=== Summary - qps: " + str(parser.qps) + " master loss: " + str(float(parser.master_loss) / float(parser.master_recv + parser.master_loss) * 100.00) + " slave loss: " + str(float(parser.slave_loss) / float(parser.slave_recv + parser.slave_loss) * 100.0) + "%" )
if not qps_only:
try:
tc.log_print("=== Server HW:")
tc.log_print(par.mutilate_data.build_mut_output(parser.srv_hwlat, [parser.qps]) + "\n")
tc.log_print("=== Server SW:")
tc.log_print(par.mutilate_data.build_mut_output(parser.srv_swlat, [parser.qps]) + "\n")
tc.log_print("=== Client HW:")
tc.log_print(par.mutilate_data.build_mut_output(parser.clt_hwlat, [parser.qps]) + "\n")
tc.log_print("=== Client SW:")
tc.log_print(par.mutilate_data.build_mut_output(parser.clt_swlat, [parser.qps]) + "\n")
except:
success = False
if not success:
rmcmd = "rm " + target_fn
tc.log_print(rmcmd)
sp.check_call(rmcmd, shell=True)
return (parser.qps if success else -1)
tc.log_print("=== Summary - qps: " + str(result.parser.qps) + " master loss: " + str(float(result.parser.master_loss) / float(result.parser.master_recv + result.parser.master_loss) * 100.00) + "% slave loss: " + str(float(result.parser.slave_loss) / float(result.parser.slave_recv + result.parser.slave_loss) * 100.0) + "%" )
tc.log_print("=== Server HW:")
tc.log_print(par.mutilate_data.build_mut_output(result.parser.srv_hwlat, [result.parser.qps]) + "\n")
tc.log_print("=== Server SW:")
tc.log_print(par.mutilate_data.build_mut_output(result.parser.srv_swlat, [result.parser.qps]) + "\n")
tc.log_print("=== Client HW:")
tc.log_print(par.mutilate_data.build_mut_output(result.parser.clt_hwlat, [result.parser.qps]) + "\n")
tc.log_print("=== Client SW:")
tc.log_print(par.mutilate_data.build_mut_output(result.parser.clt_swlat, [result.parser.qps]) + "\n")
if conf.enable_pmc:
if conf.pmc_mode != 0:
tc.log_print("=== PMC:")
tc.log_print("counter: " + result.pmc_parser.counter + " count: " + str(result.pmc_parser.count) + " cores: " + str(result.pmc_parser.cores))
def main():
global hostfile
global server
global master
global clients
global client_only
tc.set_ssh_param(SSH_PARAM)
tc.set_ssh_user(SSH_USER)
tc.set_ssh_param("-o StrictHostKeyChecking=no -p77")
tc.set_ssh_user("oscar")
output_dirname = "run"
options = getopt.getopt(sys.argv[1:], 'h:so:c')[0]
conf = netexp.NetExpConf()
conf.srv_mechspec = mechspec.LAB.SKYLAKE3_10G
conf.clt_mechspecs = [mechspec.LAB.SKYLAKE6_10G, mechspec.LAB.SKYLAKE5_10G, mechspec.LAB.SKYLAKE7_10G, mechspec.LAB.SKYLAKE8_10G]
conf.mst_mechspec = mechspec.LAB.SKYLAKE2_10G
conf.finalize_mechspecs()
conf.root_dir = "/numam.d/build/bin"
# server fixed configs
conf.srv_port = 0
# client fixed configs
conf.clt_ia = "exponential"
conf.clt_affinity = "1,3,5,7,9,11,13,15,17,19,21,23"
conf.clt_port = 0
conf.clt_pkt_loss_lat = 5000
conf.clt_rage_quit_lat = 5000
# master fixed configs
conf.mst_port = 0
conf.mst_warmup = 5
conf.mst_duration = 20
conf.mst_qps = 100
conf.mst_ia = "exponential"
conf.mst_pkt_loss_lat = 5000
conf.mst_pkt_loss_max = 100
conf.mst_affinity = "2"
# pmc stuff
conf.pmc_sampling_rate = 4096
conf.pmc_counting_interval = 0.1
options = getopt.getopt(sys.argv[1:], 'sc')[0]
for opt, arg in options:
if opt in ('-h'):
hostfile = arg
elif opt in ('-s'):
stop_all()
if opt in ('-s'):
netexp.stop_all(conf)
return
elif opt in ('-o'):
output_dirname = arg
elif opt in ('-c'):
client_only=True
conf.enable_client_only=True
tc.init("~/results.d/numam_neo/" + output_dirname + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
cpcmd = "cp " + __file__ + " " + tc.get_odir() + "/"
tc.log_print(cpcmd)
sp.check_call(cpcmd, shell=True)
tc.log_print("Configuration:\n" + \
"hostfile: " + ("None" if hostfile == None else hostfile) + "\n" \
"client only: " + str(client_only) + "\n" + \
"output: " + output_dirname)
for eaff in server_affinity:
conf.srv_affinity = eaff[0]
conf.enable_memgen = False
if len(eaff) > 1:
conf.enable_memgen = True
conf.memgen_affinity = eaff[1]
conf.memgen_iteration = eaff[2]
conf.memgen_size = eaff[3]
conf.memgen_tgtdom = eaff[4]
for epad in clt_pkt_pads:
conf.clt_pkt_pad = 0
conf.clt_pkt_depth = clt_pkt_pads_depth[conf.clt_pkt_pad]
for eload in clt_wrkld:
conf.clt_wrkld = eload[0]
conf.clt_wrkarg0 = eload[1]
conf.clt_wrkarg1 = eload[2]
for epmc in pmc_counters:
conf.enable_pmc = False
if len(epmc) > 0:
conf.enable_pmc = True
conf.pmc_counters = epmc[0]
conf.pmc_mode = epmc[1]
if hostfile != None:
hosts = tc.parse_hostfile(hostfile)
server = tc.process_hostnames(server, hosts)
clients = tc.process_hostnames(clients, hosts)
master = tc.process_hostnames(master, hosts)
test_name = "affinity" + eaff[0] + "_pad" + str(epad) + "_load" + str(eload[0]) + "," + str(eload[1]) + "," + str(eload[2])
if (conf.enable_memgen):
test_name += "_memload" + str(eaff[1]) + "," + str(eaff[2]) + "," + str(eaff[3]) + "," + str(eaff[4])
if (conf.enable_pmc):
test_name += "_pmc" + str(epmc[1]) + "_" + conf.get_pmc_str()
tc.begin(test_name)
conf.clt_qps = 0
tc.log_print("============ " + test_name + " QPS: MAX ============")
result : netexp.NetExpResult = netexp.run(conf)
flush_netresult(conf, result)
max_qps = result.parser.qps
stop_all()
if conf.enable_client_only:
return
for i in range(0, len(affinity)):
for epad in pkt_pads:
eaff = affinity[i]
tc.begin(eaff + "_" + epad)
tc.log_print("============ Affinity: " + str(eaff) + " PktPad: " + epad + " Load: MAX" + " ============")
run_exp(eaff, 0, epad, i)
max_qps = keep_results(qps_only=True)
stop_all()
if client_only:
break
finish = (int)(max_qps - max(master_qps, 0.01 * max_qps))
step = (int)(finish / 10)
cur_load = step
while cur_load <= finish:
tc.log_print("============ Affinity: " + str(eaff) + " PktPad: " + epad + " Load: " + str(cur_load) + " ============")
run_exp(eaff, cur_load, epad, i)
if keep_results() != -1:
# increment only on success
cur_load += step
tc.log_print("")
if only_max_qps:
continue
finish = (int)(max_qps - max(conf.mst_qps, 0.01 * max_qps))
step = (int)(finish / 10)
cur_qps = step
while cur_qps <= finish:
tc.log_print("============ " + test_name + " QPS: " + str(cur_qps) + " ============")
conf.clt_qps = cur_qps
result : netexp.NetExpResult = netexp.run(conf)
flush_netresult(result)
cur_qps += step
tc.log_print("")
tc.end()
stop_all()
netexp.stop_all(conf)
main()

View File

@ -2,7 +2,7 @@
dpdk_dir="/dpdk"
libtopo_dir="/libtopo"
root="$(dirname "$0")/.."
servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca skylake5.rcs.uwaterloo.ca skylake6.rcs.uwaterloo.ca icelake2-int.rcs.uwaterloo.ca milan1-int.rcs.uwaterloo.ca"
servers="skylake8.rcs.uwaterloo.ca"
rsync_flags="-az"
ssh_args="-o StrictHostKeyChecking=no -p77"
@ -20,7 +20,7 @@ compile() {
# separate these functions because we might change kernel (reboot) without needing to recompile
echo "====================$1===================="
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"sudo rm -rf $libtopo_dir; sudo rm -rf /usr/local/include/libtopo; sudo rm -rf /usr/local/lib/libtopo;sudo mkdir -p $libtopo_dir; sudo chmod 777 $libtopo_dir; cd $libtopo_dir; git clone https://git.quacker.org/d/libtopo; cd libtopo; mkdir build; cd build; cmake ../; sudo make install\""
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"sudo pkg install -y meson pkgconf py38-pyelftools; sudo rm -rf $dpdk_dir; sudo mkdir -p $dpdk_dir; sudo chmod 777 $dpdk_dir; cd $dpdk_dir; git clone https://git.quacker.org/d/numam-dpdk; cd numam-dpdk; git checkout migration; CC=gcc CXX=g++ meson -Denable_kmods=true build; cd build; ninja install\""
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"sudo pkg install -y meson pkgconf py39-pyelftools; sudo rm -rf $dpdk_dir; sudo mkdir -p $dpdk_dir; sudo chmod 777 $dpdk_dir; cd $dpdk_dir; git clone https://git.quacker.org/d/numam-dpdk; cd numam-dpdk; git checkout migration; CC=gcc CXX=g++ meson -Denable_kmods=true build; cd build; ninja install\""
wait
echo "$1 Done."
echo ""

View File

@ -1,7 +1,9 @@
#!/bin/sh
dpdk_dir="/numam.d"
root="$(dirname "$0")/.."
servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca skylake5.rcs.uwaterloo.ca skylake6.rcs.uwaterloo.ca icelake2-int.rcs.uwaterloo.ca milan1-int.rcs.uwaterloo.ca"
servers="icelake1-int.rcs.uwaterloo.ca"
#icelake2-int.rcs.uwaterloo.ca"
#servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca skylake5.rcs.uwaterloo.ca skylake6.rcs.uwaterloo.ca skylake7.rcs.uwaterloo.ca skylake8.rcs.uwaterloo.ca icelake2-int.rcs.uwaterloo.ca milan1-int.rcs.uwaterloo.ca"
rsync_flags="-rv -e \"ssh -p77\""
ssh_args="-o StrictHostKeyChecking=no -p77"
@ -20,7 +22,7 @@ compile() {
echo "====================$1===================="
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"rm -rf $dpdk_dir; mkdir $dpdk_dir; chmod 777 $dpdk_dir\""
rsync -rv -e "ssh -p77" $root/ $user@$1:$dpdk_dir/
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"cd $dpdk_dir; rm -rf build; mkdir build; cd build; cmake ../; make -j8 khat cat rat\""
ssh $(echo $ssh_args $user@$1) "sudo sh -c \"cd $dpdk_dir; rm -rf build; mkdir build; cd build; cmake ../; make -j8 khat cat rat memloadgen\""
wait
echo "$1 Done."
echo ""

10
scripts/test.py Normal file
View File

@ -0,0 +1,10 @@
import libpar as par
path = "/home/quackerd/results.d/numam_neo/run_20220807093909/affinity1,3,5,7,9,11,13,15,17,19,21,23_pad0_load0,fixed:0,fixed:0_pmc.mem_load_l3_miss_retired.local_dram/pmc.txt.tmp"
with open(path, "r") as f:
p = par.pmc_parser(f.read())
print(str(p.cores))
print(str(p.count))
print(str(p.counter))

21
scripts/tmp.py Normal file
View File

@ -0,0 +1,21 @@
import time
import subprocess as sp
import libpar as par
import libtc as tc
import libmechspec as mechspec
import numpy as np
FILE : str = "/home/quackerd/1028361.txt"
parser = par.khat_parser()
with open(FILE, "r") as f:
parser.parse(f.read())
re = []
for i in range(0, len(parser.clt_swlat)):
ertt = parser.clt_swlat[i] - parser.srv_swlat[i]
re.append(ertt)
print("Median: " + str(np.percentile(re, 50)))

View File

@ -1,6 +1,10 @@
#include "gen.hh"
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <fstream>
#include "ntr.h"
#include "nms.h"
#include <getopt.h>
#include <unistd.h>
#include <topo.h>
@ -11,32 +15,39 @@ usage()
fprintf(stdout,
"Usage:\n"
" -v: verbose mode\n"
" -b: MLG bytes per second\n"
" -x: MLG thread affinity mask\n"
" -X: MLG target domain affinity mask\n"
" -S: shared buffer\n"
" -i: iterations\n");
" -b: memory block size\n"
" -d: destination domain index\n"
" -s: worker threads cpu list\n"
" -S: enable shared memory block\n"
" -t: time to run\n"
" -o: output file path\n");
fflush(stdout);
}
static char output_file[256] = "memloadgen_samples.txt";
int main(int argc, char * argv[])
{
ntr_init();
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
size_t arr_sz = 0;
uint32_t iter = -1;
size_t arr_sz = 1024 * 1024;
uint32_t time = -1;
cpuset_t threads;
CPU_ZERO(&threads);
CPU_SET(0, &threads);
int shared_buffer = 0;
cpuset_t domain_mask;
CPU_ZERO(&domain_mask);
CPU_SET(0, &domain_mask);
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hb:X:x:vi:S")) != -1) {
while ((c = getopt(argc, argv, "vhb:d:s:So:t:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
ntr_get_level(NTR_DEP_USER1) + 1);
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
@ -44,18 +55,20 @@ int main(int argc, char * argv[])
case 'b':
arr_sz = strtoull(optarg, nullptr, 10);
break;
case 'i':
iter = strtoul(optarg, nullptr, 10);
break;
case 'X':
case 'd':
cpulist_to_cpuset(optarg, &domain_mask);
break;
case 'x':
case 's':
cpulist_to_cpuset(optarg, &threads);
break;
case 'S':
shared_buffer = 1;
break;
case 'o':
strncpy(output_file, optarg, 256);
break;
case 't':
time = (uint32_t)strtol(optarg, nullptr, 10);
default:
usage();
exit(0);
@ -63,31 +76,54 @@ int main(int argc, char * argv[])
}
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "MLG: [size: %ld iter: %u, # threads: 0x%d, domain: 0x%ld]\n", arr_sz, iter, CPU_COUNT(&threads), CPU_FFS(&domain_mask) - 1);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "MLG: [size: %ld, # threads: 0x%d, domain: 0x%ld]\n", arr_sz, CPU_COUNT(&threads), CPU_FFS(&domain_mask) - 1);
// init topo
if (topo_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
if (topo_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) {
fprintf(stderr, "libtopo init failed!\n");
exit(1);
}
// init
if (nms_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) {
fprintf(stderr, "libnms init failed!\n");
exit(1);
}
bool success = false;
memload_generator::memload_generator_options opts;
opts.chunk_size = arr_sz;
opts.iteration = iter;
opts.shared_buffer = shared_buffer;
opts.verbose = 1;
opts.verbose = ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT;
std::ofstream ofile;
ofile.open(output_file, std::ios::out | std::ios::trunc);
auto mgen = new memload_generator(&threads, &domain_mask, &opts, &success);
while(!mgen->check_done()) {
usleep(10000);
if (!mgen->start()) {
fprintf(stderr, "failed to start memloadgen!\n");
exit(1);
}
unsigned long bps = mgen->get_bps();
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
uint64_t prev_ts = topo_uptime_ns();
uint64_t prev_trans = mgen->get_transactions();
uint32_t cur_time = 0;
while(cur_time < time) {
usleep(S2US);
uint64_t cur_ts = topo_uptime_ns();
uint64_t trans = mgen->get_transactions();
uint64_t bps = (uint64_t)((double)((trans - prev_trans) * arr_sz) / ((double)(cur_ts - prev_ts) / (double)S2NS));
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
ofile << bps << std::endl;
prev_ts = cur_ts;
prev_trans = trans;
cur_time++;
}
mgen->stop();
delete mgen;
ofile.close();
return 0;
}