memloadgen allocate memory in thread

This commit is contained in:
oscar 2023-03-15 18:43:37 -04:00
parent 25c18b4fc5
commit 05965dbb94
6 changed files with 152 additions and 77 deletions

View File

@ -32,8 +32,8 @@ set(C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c17
include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories() include_directories()
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) set(LIBNTR_C_FLAGS -O3 -g -Wall -Wextra -Werror -std=c2x)
set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) set(LIBGEN_CC_FLAGS -O3 -g -Wall -Wextra -Werror -std=c++17)
add_library(ntr SHARED libntr/ntr.c) add_library(ntr SHARED libntr/ntr.c)
target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS}) target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS})

View File

@ -313,21 +313,20 @@ class memload_generator {
void *from_buffer; void *from_buffer;
void *to_buffer; void *to_buffer;
std::atomic<bool> reset_ts; std::atomic<bool> reset_ts;
Generator * ia_gen;
int tid; int tid;
int coreid;
int target_dom;
struct memload_generator_options * opts; struct memload_generator_options * opts;
Generator * ia_gen;
// stat keeping // stat keeping
std::atomic<uint32_t> num_trans; std::atomic<uint32_t> num_trans;
std::atomic<int> * state; std::atomic<int> * state;
std::atomic<int> * init_num; std::atomic<int> init_status;
}; };
std::vector<struct thread_info *> thr_infos; std::vector<struct thread_info *> thr_infos;
std::atomic<int> state; std::atomic<int> state;
std::atomic<int> init_num;
static constexpr int STATE_RUN = 0; static constexpr int STATE_RUN = 0;
static constexpr int STATE_RDY = 1; static constexpr int STATE_RDY = 1;
static constexpr int STATE_END = 2; static constexpr int STATE_END = 2;

View File

@ -15,6 +15,9 @@ nms_malloc(int nodeid, size_t sz);
void * void *
nms_alloc_static(int nodeid, size_t sz); nms_alloc_static(int nodeid, size_t sz);
void
nms_free_static(void * buf, size_t sz);
void void
nms_free(int nodeid, void * addr); nms_free(int nodeid, void * addr);

View File

@ -1,80 +1,122 @@
#include <atomic> #include <sys/types.h>
#include <pthread.h>
#include <pthread_np.h>
#include <unistd.h>
#include <sys/cpuset.h> #include <sys/cpuset.h>
#include <sys/domainset.h> #include <sys/domainset.h>
#include <sys/endian.h> #include <sys/endian.h>
#include <sys/thr.h> #include <sys/thr.h>
#include <pthread.h>
#include <pthread_np.h>
#include <topo.h> #include <topo.h>
#include <unistd.h>
#include "gen.hh"
#include "nms.h" #include "nms.h"
#include "gen.hh"
#include <atomic>
void * void *
memload_generator::worker_thrd(void *_tinfo) memload_generator::worker_thrd(void *_tinfo)
{ {
auto *tinfo = (struct thread_info *)_tinfo; auto *tinfo = (struct thread_info *)_tinfo;
void *from_buffer, *to_buffer;
long tid; long tid;
thr_self(&tid); thr_self(&tid);
// wait for other threads to init if (tinfo->opts->shared_buffer) {
tinfo->init_num->fetch_add(1); from_buffer = tinfo->from_buffer;
to_buffer = tinfo->to_buffer;
if (tinfo->opts->verbose) { } else {
fprintf( if (tinfo->opts->verbose) {
stdout, "memload_generator <thread %ld>: running...\n", tid); fprintf(stdout,
"memload_generator <thread %ld>: allocating %lu bytes on domain %d...\n",
tid, tinfo->opts->buffer_size,
topo_core_to_numa(tinfo->coreid));
}
from_buffer = nms_alloc_static(topo_core_to_numa(
tinfo->coreid),
tinfo->opts->buffer_size);
if (tinfo->opts->verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: allocating %lu bytes on domain %d...\n",
tid, tinfo->opts->buffer_size, tinfo->target_dom);
}
to_buffer = nms_alloc_static(tinfo->target_dom,
tinfo->opts->buffer_size);
} }
if (from_buffer == nullptr || to_buffer == nullptr) {
if (tinfo->opts->verbose) {
fprintf(stderr,
"memload_generator <thread %ld>: failed to allocate memory\n",
tid);
}
tinfo->init_status.store(-1);
return nullptr;
}
// wait for other threads to init
tinfo->init_status.store(1);
if (tinfo->opts->verbose) {
fprintf(stdout, "memload_generator <thread %ld>: running...\n", tid);
}
uint64_t next_ts = topo_uptime_ns(); uint64_t next_ts = topo_uptime_ns();
size_t cur_offset = 0; size_t cur_offset = 0;
uint64_t cur_ts = 0; uint64_t cur_ts = 0;
while(true) { while (true) {
switch (tinfo->state->load()) { switch (tinfo->state->load()) {
case STATE_RUN: case STATE_RUN:
cur_ts = topo_uptime_ns(); cur_ts = topo_uptime_ns();
if (cur_ts >= next_ts) { if (cur_ts >= next_ts) {
if (cur_offset + tinfo->opts->transaction_size > tinfo->opts->buffer_size) { if (cur_offset + tinfo->opts->transaction_size >
cur_offset = 0; tinfo->opts->buffer_size) {
} cur_offset = 0;
memcpy((char *)tinfo->from_buffer + cur_offset, (char *)tinfo->to_buffer + cur_offset, tinfo->opts->transaction_size);
tinfo->num_trans.fetch_add(1);
if (tinfo->reset_ts.load(std::memory_order_relaxed)) {
tinfo->reset_ts.store(false, std::memory_order_relaxed);
next_ts = cur_ts;
}
next_ts += tinfo->ia_gen->generate() * (double)S2NS;
cur_offset += tinfo->opts->transaction_size;
} }
break;
case STATE_END: memcpy((char *)tinfo->from_buffer + cur_offset,
goto end; (char *)tinfo->to_buffer + cur_offset,
case STATE_RDY: tinfo->opts->transaction_size);
next_ts = topo_uptime_ns(); tinfo->num_trans.fetch_add(1);
break;
case STATE_INIT: if (tinfo->reset_ts.load(
default: std::memory_order_relaxed)) {
break; tinfo->reset_ts.store(false,
std::memory_order_relaxed);
next_ts = cur_ts;
}
next_ts += tinfo->ia_gen->generate() *
(double)S2NS;
cur_offset += tinfo->opts->transaction_size;
}
break;
case STATE_END:
goto end;
case STATE_RDY:
next_ts = topo_uptime_ns();
break;
case STATE_INIT:
default:
break;
} }
} }
end: end:
if (tinfo->opts->verbose) { if (tinfo->opts->verbose) {
fprintf( fprintf(stdout, "memload_generator <thread %ld>: exiting...\n",
stdout, "memload_generator <thread %ld>: exiting...\n", tid); tid);
}
if (!tinfo->opts->shared_buffer) {
nms_free_static(from_buffer, tinfo->opts->buffer_size);
nms_free_static(to_buffer, tinfo->opts->buffer_size);
} }
return nullptr; return nullptr;
} }
memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domain, struct memload_generator_options * opt, bool *success) memload_generator::memload_generator(cpuset_t *threads, cpuset_t *target_domain,
struct memload_generator_options *opt, bool *success)
{ {
*success = false; *success = false;
state.store(STATE_INIT); state.store(STATE_INIT);
init_num.store(0);
std::memcpy(&this->opts, opt, sizeof(memload_generator_options)); std::memcpy(&this->opts, opt, sizeof(memload_generator_options));
int nextcore = CPU_FFS(threads) - 1; int nextcore = CPU_FFS(threads) - 1;
@ -84,38 +126,41 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
return; return;
} }
void * local_buffer; double thread_tps = (double)opt->trans_per_second / (double)num_cores;
void * target_buffer; void *local_buffer = nullptr;
void *target_buffer = nullptr;
int tid = 0;
if (opts.shared_buffer) { if (opts.shared_buffer) {
local_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size); local_buffer = nms_alloc_static(topo_core_to_numa(nextcore),
target_buffer = nms_alloc_static(target_domain_id, opt->buffer_size); opt->buffer_size);
target_buffer = nms_alloc_static(target_domain_id,
opt->buffer_size);
}
if (local_buffer == nullptr || target_buffer == nullptr) {
goto end;
} }
double thread_tps = (double)opt->trans_per_second / (double)num_cores;
int tid = 0;
while (nextcore != -1) { while (nextcore != -1) {
auto *info = new struct thread_info; auto info = new struct thread_info;
cpuset_t cpuset; cpuset_t cpuset;
pthread_attr_t attr; pthread_attr_t attr;
info->ia_gen = createGenerator(opt->ia_dist); info->ia_gen = createGenerator(opts.ia_dist);
if (info->ia_gen == nullptr) { if (info->ia_gen == nullptr) {
return; goto end;
} }
info->ia_gen->set_lambda(thread_tps); info->ia_gen->set_lambda(thread_tps);
info->reset_ts.store(false, std::memory_order_relaxed); info->init_status.store(0);
info->state = &this->state; info->state = &this->state;
info->init_num = &this->init_num; info->reset_ts.store(false, std::memory_order_relaxed);
info->num_trans.store(0); info->num_trans.store(0);
info->opts = &this->opts; info->opts = &this->opts;
if (opt->shared_buffer) { info->coreid = nextcore;
info->from_buffer = local_buffer; info->target_dom = target_domain_id;
info->to_buffer = target_buffer; info->from_buffer = local_buffer;
} else { info->to_buffer = target_buffer;
info->from_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size);
info->to_buffer = nms_alloc_static(target_domain_id, opt->buffer_size);
}
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(nextcore, &cpuset); CPU_SET(nextcore, &cpuset);
@ -125,8 +170,8 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
if (opts.verbose) { if (opts.verbose) {
fprintf(stdout, fprintf(stdout,
"memload_generator: created thread %d on core %d target domain %d\n", tid, "memload_generator: created thread %d on core %d target domain %d\n",
nextcore, target_domain_id); tid, nextcore, target_domain_id);
} }
thr_infos.push_back(info); thr_infos.push_back(info);
@ -136,11 +181,21 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
tid++; tid++;
} }
while((uint)init_num.load() != thr_infos.size()); for (auto tinfo : thr_infos) {
int status;
while ((status = tinfo->init_status.load()) != 1) {
if (tinfo->init_status.load() == -1) {
state.store(STATE_END);
*success = false;
goto end;
}
}
}
state.store(STATE_RDY); state.store(STATE_RDY);
*success = true; *success = true;
end:
if (opts.verbose) { if (opts.verbose) {
fprintf(stdout, fprintf(stdout,
"memload_generator: exiting constructor. Success: %d...\n", "memload_generator: exiting constructor. Success: %d...\n",
@ -171,10 +226,13 @@ memload_generator::stop()
bool bool
memload_generator::set_transactions(uint64_t tps) memload_generator::set_transactions(uint64_t tps)
{ {
if (this->state.load() != STATE_END && this->state.load() != STATE_INIT) { if (this->state.load() != STATE_END &&
for(unsigned int i = 0; i < thr_infos.size(); i++) { this->state.load() != STATE_INIT) {
thr_infos.at(i)->ia_gen->set_lambda((double)tps/ (double)thr_infos.size()); for (unsigned int i = 0; i < thr_infos.size(); i++) {
thr_infos.at(i)->reset_ts.store(true, std::memory_order_relaxed); thr_infos.at(i)->ia_gen->set_lambda(
(double)tps / (double)thr_infos.size());
thr_infos.at(i)->reset_ts.store(true,
std::memory_order_relaxed);
} }
return true; return true;
} }
@ -193,10 +251,18 @@ memload_generator::get_transactions()
memload_generator::~memload_generator() memload_generator::~memload_generator()
{ {
void *buf1, *buf2;
this->state.store(STATE_END); this->state.store(STATE_END);
for (auto i : thr_infos) { for (auto i : thr_infos) {
// XXX: nms_free regions // XXX: nms_free regions
pthread_join(i->pthr, NULL); pthread_join(i->pthr, NULL);
buf1 = i->from_buffer;
buf2 = i->to_buffer;
delete i; delete i;
} }
if (opts.shared_buffer) {
nms_free_static(buf1, opts.buffer_size);
nms_free_static(buf2, opts.buffer_size);
}
} }

View File

@ -35,6 +35,13 @@ struct nms_desc {
static _Atomic(int) initialized = 0; static _Atomic(int) initialized = 0;
static struct nms_desc g_desc; static struct nms_desc g_desc;
void
nms_free_static(void * buf, size_t sz)
{
munmap(buf, sz);
return;
}
void * void *
nms_alloc_static(int node_id, size_t sz) nms_alloc_static(int node_id, size_t sz)
{ {

View File

@ -92,7 +92,7 @@ static struct options_t options;
static uint16_t static uint16_t
rx_add_timestamp(uint16_t port, uint16_t qidx __rte_unused, rx_add_timestamp(uint16_t port, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused,
void *_ __rte_unused) void *_ __rte_unused)
{ {
uint64_t now = topo_uptime_ns(); uint64_t now = topo_uptime_ns();