memload gen

Summary: Add memload generator

Test Plan: by hand

Reviewers: ali

Differential Revision: https://review.rcs.uwaterloo.ca/D415
This commit is contained in:
quackerd 2021-02-16 05:14:43 -05:00
parent f655e5f5cb
commit 06b93ddf1c
7 changed files with 369 additions and 36 deletions

View File

@ -25,7 +25,7 @@ include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS})
include_directories(${Hwloc_INCLUDE_DIRS})
set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -mavx -msse4)
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11)
set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
@ -34,7 +34,7 @@ set(CAT_LINKLIBS pthread nm ntr gen)
set(RAT_LINKLIBS pthread nm ntr gen)
add_library(nm libnm/nm.cc)
target_link_libraries(nm ${Hwloc_LIBRARIES})
target_link_libraries(nm ${Hwloc_LIBRARIES} gen)
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
add_library(ntr libntr/ntr.c)

View File

@ -821,13 +821,13 @@ main(int argc, char *argv[])
rte_exit(EXIT_FAILURE, "must specify host IP\n");
}
dump_options();
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
dump_options();
// create default generator
options.s_iagen = createGenerator(options.ia_gen_str);
if (options.s_iagen == nullptr) {

View File

@ -1,5 +1,9 @@
#pragma once
#include "gen.h"
#include "util.h"
#include <atomic>
#include <cstdint>
#include <vector>
@ -7,6 +11,13 @@ constexpr static int NM_LEVEL_NUMA = 0;
constexpr static int NM_LEVEL_CPU = 1;
constexpr static int NM_LEVEL_CORE = 2;
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
std::vector<struct nm_obj *> *nm_get_nodes();
std::vector<struct nm_obj *> *nm_get_cpus();
std::vector<struct nm_obj *> *nm_get_cores();
@ -18,3 +29,44 @@ int nm_init(int verbosity);
uint64_t nm_tsc2ns(uint64_t tsc);
uint64_t nm_get_uptime_ns();
class memload_generator {
private:
DISALLOW_EVIL_CONSTRUCTORS(memload_generator);
struct thread_info {
pthread_t pthr;
std::atomic<uint64_t> num_trans;
Generator *ia_gen;
std::atomic<int> init_status;
std::atomic<int> *state;
constexpr static int INIT_START = 0;
constexpr static int INIT_SUCCESS = 1;
constexpr static int INIT_FAILED = 2;
void *region;
uint32_t to_domainid;
};
constexpr static uint32_t REGION_ZS = 0x1000;
constexpr static uint32_t TRANSACTION_CNT =
0x8; // how many transactions per cycle
constexpr static uint32_t TRANSACTION_SZ =
0x8; // how large each transaction is
std::vector<struct thread_info *> thr_infos;
std::atomic<int> state;
constexpr static uint32_t STATE_READY = 0;
constexpr static uint32_t STATE_START = 1;
constexpr static uint32_t STATE_STOP = 2;
uint64_t begin_ts;
uint64_t stop_ts;
static void *worker_thrd(void *_tinfo);
public:
memload_generator(uint64_t from_cmask, uint64_t to_cmask, uint64_t bps,
bool *success);
void start();
void stop();
uint64_t get_bps();
~memload_generator();
};

View File

@ -11,14 +11,15 @@ constexpr static unsigned long S2US = 1000000UL;
constexpr static unsigned long MS2NS = 1000000UL;
constexpr static uint16_t MIN_RANDOM_PORT = 1000;
constexpr static uint16_t DEFAULT_RAT_PORT = 1234;
constexpr static unsigned int INIT_DELAY = 2;
constexpr static unsigned int INIT_DELAY = 1;
constexpr static unsigned int MAX_NODES = 64;
constexpr static int NEXT_CPU_NULL = -1;
static inline int
cmask_get_next_cpu(uint64_t *mask)
{
int ffs = ffsll(*mask);
*mask &= ~(1 << (ffs - 1));
*mask &= ~(1ul << (ffs - 1));
return ffs - 1;
}

View File

@ -7,7 +7,6 @@
#include <rte_launch.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <stdnoreturn.h>
#include <unistd.h>
#include "nm.h"
@ -23,7 +22,8 @@ constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 8;
constexpr static unsigned int BURST_SIZE = 32;
constexpr static size_t MEMPOOL_NAME_BUF_LEN = 64;
static const struct rte_mbuf_dynfield rte_mbuf_dynfield_probe_flag = {
.name = "rte_mbuf_dynfield_probe_flag",
@ -51,6 +51,7 @@ struct thread_info {
int rxqid;
int txqid;
int lcore_id;
int node_id;
};
// state machine:
@ -73,15 +74,21 @@ struct options_t {
int num_threads { 1 };
uint64_t cpuset { 0x4 }; // 2nd core
uint64_t memmask { 0x0 }; // same socket as the NIC
char mempool_name_buf[MEMPOOL_NAME_BUF_LEN];
bool mlg_enabled { false };
uint64_t mlg_bps { 0 };
uint64_t mlg_cmask { 0 };
uint64_t mlg_dmask { 0 };
memload_generator *mlg { nullptr };
// states
uint16_t s_portid { 0 };
struct net_spec s_host_spec {
};
struct rte_mempool *s_pkt_mempool { nullptr };
std::atomic<int> s_state { SERVER_STATE_WAIT };
struct probe_state_t s_probe_info;
std::vector<struct thread_info *> s_thr_info;
struct rte_mempool *s_mempools[MAX_NODES];
};
struct options_t options;
@ -194,7 +201,7 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
return nb_pkts;
}
noreturn static int
static int
locore_main(void *ti)
{
auto tinfo = (struct thread_info *)ti;
@ -270,7 +277,9 @@ locore_main(void *ti)
options.s_probe_info.cspec.src =
&options.s_host_spec;
if (alloc_pkt_hdr(options.s_pkt_mempool,
if (alloc_pkt_hdr(
options
.s_mempools[tinfo->node_id],
PKT_TYPE_PROBE_RESP,
&options.s_probe_info.cspec,
&pkt_buf, &tx_data) != 0) {
@ -302,7 +311,8 @@ locore_main(void *ti)
// we reply to load packet regardless of the
// server state
if (alloc_pkt_hdr(options.s_pkt_mempool,
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_LOAD_RESP, &cspec, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
@ -349,7 +359,8 @@ locore_main(void *ti)
(ts.tv_sec * S2NS + ts.tv_nsec));
// now we have everything we need
if (alloc_pkt_hdr(options.s_pkt_mempool,
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_STAT,
&options.s_probe_info.cspec, &pkt_buf,
&tx_data) != 0) {
@ -503,6 +514,10 @@ usage()
" -h: seek help\n"
" -A: cpu mask for worker threads\n"
" -M: mempool socket affinity mask\n"
" -m: enable memory load generator(MLG)\n"
" -b: MLG bytes per second\n"
" -x: MLG thread affinity mask\n"
" -X: MLG target domain affinity mask\n"
" -H: host spec\n");
fflush(stdout);
}
@ -516,10 +531,12 @@ dump_options()
" thread count: %d\n"
" cpu mask: 0x%lx\n"
" mempool mask: 0x%lx\n"
" ip: 0x%x\n",
" ip: 0x%x\n"
" MLG: %s [bps: %ld, threads: 0x%lx, domain: 0x%lx]\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.cpuset, options.memmask,
options.s_host_spec.ip);
options.s_host_spec.ip, options.mlg_enabled ? "on" : "off",
options.mlg_bps, options.mlg_cmask, options.mlg_dmask);
}
int
@ -545,7 +562,7 @@ main(int argc, char *argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:M:H:")) != -1) {
while ((c = getopt(argc, argv, "hvA:M:H:mb:X:x:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
@ -574,6 +591,20 @@ main(int argc, char *argv[])
}
has_host_spec = true;
break;
case 'm':
options.mlg_enabled = true;
break;
case 'b':
options.mlg_bps = strtoull(optarg, nullptr, 10);
break;
case 'X':
options.mlg_dmask = strtoull(
optarg, nullptr, 16);
break;
case 'x':
options.mlg_cmask = strtoull(
optarg, nullptr, 16);
break;
default:
usage();
rte_exit(
@ -586,13 +617,23 @@ main(int argc, char *argv[])
rte_exit(EXIT_FAILURE, "Must specify host spec\n");
}
dump_options();
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
dump_options();
// init mlg
if (options.mlg_enabled) {
bool success = false;
options.mlg = new memload_generator(options.mlg_cmask,
options.mlg_dmask, options.mlg_bps, &success);
if (!success) {
rte_exit(EXIT_FAILURE, "failed to init mlg\n");
}
}
// register dynamic field
PROBE_FLAG_OFFSET = rte_mbuf_dynfield_register(
&rte_mbuf_dynfield_probe_flag);
@ -616,23 +657,48 @@ main(int argc, char *argv[])
portid);
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
auto nodes = nm_get_nodes();
if (nodes->size() > MAX_NODES) {
rte_exit(EXIT_FAILURE, "Too many numa nodes\n");
}
options.s_pkt_mempool = mbuf_pool;
for (auto each : *nodes) {
uint32_t nodeid = each->id;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: creating mempool for node %d\n", nodeid);
// create one mbuf pool per socket
snprintf(options.mempool_name_buf, MEMPOOL_NAME_BUF_LEN,
"khat_mempool_%d", nodeid);
mbuf_pool = rte_pktmbuf_pool_create(options.mempool_name_buf,
MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, nodeid);
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %d\n",
rte_errno);
}
options.s_mempools[nodeid] = mbuf_pool;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: created mempool for node %d\n", nodeid);
break; // XXX: hack
}
// init threads
// auto cores = nm_get_cores();
uint64_t cpuset = options.cpuset;
for (int i = 0; i < options.num_threads; i++) {
auto *tinfo = new struct thread_info;
tinfo->tid = i;
tinfo->lcore_id = cmask_get_next_cpu(&cpuset);
// tinfo->node_id =
// cores->at(tinfo->lcore_id)->parent->parent->id;
tinfo->node_id = 0; // XXX: hack
options.s_thr_info.push_back(tinfo);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: thread %d assigned to cpu %d, node %d", tinfo->tid,
tinfo->lcore_id, tinfo->node_id);
}
if (port_init(portid, mbuf_pool) != 0) {
@ -665,6 +731,15 @@ main(int argc, char *argv[])
}
}
options.mlg->start();
while (true) {
usleep(S2US);
uint64_t bps = options.mlg->get_bps();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
}
options.mlg->stop();
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
@ -678,6 +753,7 @@ main(int argc, char *argv[])
// shouldn't get here
// clean up
rte_eth_dev_stop(portid);
delete options.mlg;
return 0;
}

View File

@ -1,12 +1,18 @@
#include <sys/types.h>
#include <sys/cpuset.h>
#include <sys/domainset.h>
#include <sys/mman.h>
#include <sys/sysctl.h>
#include <sys/thr.h>
#include <hwloc.h>
#include <util.h>
#include <x86intrin.h>
#include "nm.h"
#include <algorithm>
#include <atomic>
#include <vector>
static const char *SYSCTL_TSC = "machdep.tsc_freq";
@ -14,13 +20,6 @@ static const char *SYSCTL_TSC = "machdep.tsc_freq";
static int verbose = 0;
static uint64_t sysctl_tsc_freq = 0;
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
static bool
nm_obj_comparator(struct nm_obj *a, struct nm_obj *b)
{
@ -184,4 +183,209 @@ nm_init(int verbosity)
std::sort(cores.begin(), cores.end(), nm_obj_comparator);
return ret;
}
}
void *
memload_generator::worker_thrd(void *_tinfo)
{
auto *tinfo = (struct thread_info *)_tinfo;
long id;
domainset_t set;
DOMAINSET_ZERO(&set);
DOMAINSET_SET(tinfo->to_domainid, &set);
// get the id
thr_self(&id);
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: setting domain to %d\n",
id, tinfo->to_domainid);
}
// set allocation strategy
if (cpuset_setdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, id,
sizeof(domainset_t), &set, DOMAINSET_POLICY_PREFER) != 0) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (verbose) {
printf(
"memload_generator <thread %ld>: cpuset_setdomain failed with %d\n",
id, errno);
}
return nullptr;
}
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: mmap() memory...\n", id);
}
// mmap anonymous page
if ((tinfo->region = mmap(nullptr, REGION_ZS, PROT_READ | PROT_WRITE,
MAP_ANON | MAP_ALIGNED_SUPER | MAP_NOCORE | MAP_PRIVATE |
MAP_PREFAULT_READ,
-1, 0)) == MAP_FAILED) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (verbose) {
printf(
"memload_generator <thread %ld>: mmap failed with %d\n",
id, errno);
}
return nullptr;
}
tinfo->init_status.store(thread_info::INIT_SUCCESS);
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: init finished, waiting for start...\n",
id);
}
while (tinfo->state->load() == STATE_READY) {
};
if (verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: running...\n", id);
}
uint64_t next_ts = nm_get_uptime_ns();
while (tinfo->state->load() == STATE_START) {
// generate traffic
uint64_t now = nm_get_uptime_ns();
if (now >= next_ts) {
next_ts = next_ts + tinfo->ia_gen->generate() * S2NS;
for (uint i = 0; i < TRANSACTION_CNT; i++) {
_mm_stream_si64(
(long long *)((char *)tinfo->region +
i * TRANSACTION_SZ),
(long long)now);
}
tinfo->num_trans.fetch_add(1);
}
}
munmap(tinfo->region, REGION_ZS);
if (verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: exiting...\n", id);
}
return nullptr;
}
memload_generator::memload_generator(
uint64_t from_cmask, uint64_t to_cmask, uint64_t bps, bool *success)
{
*success = false;
state.store(STATE_READY);
int nextcore;
int to_coreid = cmask_get_next_cpu(&to_cmask);
int num_cores = cmask_get_num_cpus(from_cmask);
cpuset_t cpuset;
if (to_coreid == NEXT_CPU_NULL || num_cores == 0) {
return;
}
while ((nextcore = cmask_get_next_cpu(&from_cmask)) != NEXT_CPU_NULL) {
auto *info = new struct thread_info;
pthread_attr_t attr;
info->ia_gen = createGenerator("exponential");
info->ia_gen->set_lambda(((double)(bps) / (double)num_cores) /
(double)(TRANSACTION_CNT * TRANSACTION_SZ));
info->num_trans.store(0);
info->to_domainid = cores.at(to_coreid)->parent->parent->id;
info->init_status.store(thread_info::INIT_START);
info->state = &state;
CPU_ZERO(&cpuset);
CPU_SET(nextcore, &cpuset);
pthread_attr_init(&attr);
pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &cpuset);
pthread_create(&info->pthr, &attr, worker_thrd, info);
if (verbose) {
fprintf(stdout,
"memload_generator: created thread on core %d\n",
nextcore);
}
thr_infos.push_back(info);
}
if (verbose) {
fprintf(
stdout, "memload_generator: waiting for thread init...\n");
}
bool failed = false;
uint num_success = 0;
while (num_success < thr_infos.size() && !failed) {
num_success = 0;
for (auto i : thr_infos) {
if (i->init_status.load() ==
thread_info::INIT_SUCCESS) {
num_success++;
}
if (i->init_status.load() == thread_info::INIT_FAILED) {
failed = true;
}
}
}
*success = num_success == thr_infos.size();
if (verbose) {
fprintf(stdout,
"memload_generator: exiting constructor. Success: %d...\n",
success ? 1 : 0);
}
}
void
memload_generator::start()
{
if (this->state.load() == STATE_READY) {
state.store(STATE_START);
begin_ts = nm_get_uptime_ns();
}
}
void
memload_generator::stop()
{
if (this->state.load() != STATE_STOP) {
stop_ts = nm_get_uptime_ns();
state.store(STATE_STOP);
}
}
uint64_t
memload_generator::get_bps()
{
uint64_t now = state.load() == STATE_STOP ? stop_ts :
nm_get_uptime_ns();
uint64_t total_transactions = 0;
for (auto i : thr_infos) {
total_transactions += i->num_trans.load();
}
return (double)(TRANSACTION_CNT * TRANSACTION_SZ * total_transactions) /
(double)((now - begin_ts) / (S2NS));
}
memload_generator::~memload_generator()
{
if (this->state.load() != STATE_STOP) {
stop();
}
for (auto i : thr_infos) {
pthread_join(i->pthr, nullptr);
delete i->ia_gen;
delete i;
}
}

View File

@ -688,13 +688,13 @@ main(int argc, char *argv[])
rte_exit(EXIT_FAILURE, "Must specify host IP.\n");
}
dump_options();
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
dump_options();
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");