From 05965dbb94fe4af569402ae953bf491278ece26f Mon Sep 17 00:00:00 2001 From: oscar Date: Wed, 15 Mar 2023 18:43:37 -0400 Subject: [PATCH] memloadgen allocate memory in thread --- CMakeLists.txt | 4 +- inc/gen.hh | 9 +- inc/nms.h | 3 + libgen/loadgen.cc | 204 ++++++++++++++++++++++++++++++---------------- libnms/alloc.c | 7 ++ net/cat.cc | 2 +- 6 files changed, 152 insertions(+), 77 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 46cbf43..69a1889 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,8 +32,8 @@ set(C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c17 include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories() -set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) -set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) +set(LIBNTR_C_FLAGS -O3 -g -Wall -Wextra -Werror -std=c2x) +set(LIBGEN_CC_FLAGS -O3 -g -Wall -Wextra -Werror -std=c++17) add_library(ntr SHARED libntr/ntr.c) target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS}) diff --git a/inc/gen.hh b/inc/gen.hh index e5f5b6a..0ea7c32 100644 --- a/inc/gen.hh +++ b/inc/gen.hh @@ -313,21 +313,20 @@ class memload_generator { void *from_buffer; void *to_buffer; std::atomic reset_ts; - Generator * ia_gen; - int tid; - + int coreid; + int target_dom; struct memload_generator_options * opts; + Generator * ia_gen; // stat keeping std::atomic num_trans; std::atomic * state; - std::atomic * init_num; + std::atomic init_status; }; std::vector thr_infos; std::atomic state; - std::atomic init_num; static constexpr int STATE_RUN = 0; static constexpr int STATE_RDY = 1; static constexpr int STATE_END = 2; diff --git a/inc/nms.h b/inc/nms.h index f49313b..306d6b4 100644 --- a/inc/nms.h +++ b/inc/nms.h @@ -15,6 +15,9 @@ nms_malloc(int nodeid, size_t sz); void * nms_alloc_static(int nodeid, size_t sz); +void +nms_free_static(void * buf, size_t sz); + void nms_free(int nodeid, void * addr); diff --git a/libgen/loadgen.cc b/libgen/loadgen.cc index 37c58d9..715f1b7 100644 --- a/libgen/loadgen.cc +++ b/libgen/loadgen.cc @@ -1,80 +1,122 @@ -#include -#include -#include -#include - +#include #include #include #include #include + +#include +#include #include +#include - -#include "gen.hh" #include "nms.h" +#include "gen.hh" + +#include void * memload_generator::worker_thrd(void *_tinfo) { auto *tinfo = (struct thread_info *)_tinfo; + void *from_buffer, *to_buffer; long tid; thr_self(&tid); - // wait for other threads to init - tinfo->init_num->fetch_add(1); - - if (tinfo->opts->verbose) { - fprintf( - stdout, "memload_generator : running...\n", tid); + if (tinfo->opts->shared_buffer) { + from_buffer = tinfo->from_buffer; + to_buffer = tinfo->to_buffer; + } else { + if (tinfo->opts->verbose) { + fprintf(stdout, + "memload_generator : allocating %lu bytes on domain %d...\n", + tid, tinfo->opts->buffer_size, + topo_core_to_numa(tinfo->coreid)); + } + from_buffer = nms_alloc_static(topo_core_to_numa( + tinfo->coreid), + tinfo->opts->buffer_size); + if (tinfo->opts->verbose) { + fprintf(stdout, + "memload_generator : allocating %lu bytes on domain %d...\n", + tid, tinfo->opts->buffer_size, tinfo->target_dom); + } + to_buffer = nms_alloc_static(tinfo->target_dom, + tinfo->opts->buffer_size); } + if (from_buffer == nullptr || to_buffer == nullptr) { + if (tinfo->opts->verbose) { + fprintf(stderr, + "memload_generator : failed to allocate memory\n", + tid); + } + tinfo->init_status.store(-1); + return nullptr; + } + + // wait for other threads to init + tinfo->init_status.store(1); + if (tinfo->opts->verbose) { + fprintf(stdout, "memload_generator : running...\n", tid); + } uint64_t next_ts = topo_uptime_ns(); size_t cur_offset = 0; uint64_t cur_ts = 0; - while(true) { + while (true) { switch (tinfo->state->load()) { - case STATE_RUN: - cur_ts = topo_uptime_ns(); - if (cur_ts >= next_ts) { - if (cur_offset + tinfo->opts->transaction_size > tinfo->opts->buffer_size) { - cur_offset = 0; - } - - memcpy((char *)tinfo->from_buffer + cur_offset, (char *)tinfo->to_buffer + cur_offset, tinfo->opts->transaction_size); - tinfo->num_trans.fetch_add(1); - - if (tinfo->reset_ts.load(std::memory_order_relaxed)) { - tinfo->reset_ts.store(false, std::memory_order_relaxed); - next_ts = cur_ts; - } - next_ts += tinfo->ia_gen->generate() * (double)S2NS; - cur_offset += tinfo->opts->transaction_size; + case STATE_RUN: + cur_ts = topo_uptime_ns(); + if (cur_ts >= next_ts) { + if (cur_offset + tinfo->opts->transaction_size > + tinfo->opts->buffer_size) { + cur_offset = 0; } - break; - case STATE_END: - goto end; - case STATE_RDY: - next_ts = topo_uptime_ns(); - break; - case STATE_INIT: - default: - break; + + memcpy((char *)tinfo->from_buffer + cur_offset, + (char *)tinfo->to_buffer + cur_offset, + tinfo->opts->transaction_size); + tinfo->num_trans.fetch_add(1); + + if (tinfo->reset_ts.load( + std::memory_order_relaxed)) { + tinfo->reset_ts.store(false, + std::memory_order_relaxed); + next_ts = cur_ts; + } + next_ts += tinfo->ia_gen->generate() * + (double)S2NS; + cur_offset += tinfo->opts->transaction_size; + } + break; + case STATE_END: + goto end; + case STATE_RDY: + next_ts = topo_uptime_ns(); + break; + case STATE_INIT: + default: + break; } } end: if (tinfo->opts->verbose) { - fprintf( - stdout, "memload_generator : exiting...\n", tid); + fprintf(stdout, "memload_generator : exiting...\n", + tid); + } + + if (!tinfo->opts->shared_buffer) { + nms_free_static(from_buffer, tinfo->opts->buffer_size); + nms_free_static(to_buffer, tinfo->opts->buffer_size); } return nullptr; } -memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domain, struct memload_generator_options * opt, bool *success) +memload_generator::memload_generator(cpuset_t *threads, cpuset_t *target_domain, + struct memload_generator_options *opt, bool *success) { *success = false; state.store(STATE_INIT); - init_num.store(0); std::memcpy(&this->opts, opt, sizeof(memload_generator_options)); int nextcore = CPU_FFS(threads) - 1; @@ -84,38 +126,41 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai return; } - void * local_buffer; - void * target_buffer; + double thread_tps = (double)opt->trans_per_second / (double)num_cores; + void *local_buffer = nullptr; + void *target_buffer = nullptr; + int tid = 0; + if (opts.shared_buffer) { - local_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size); - target_buffer = nms_alloc_static(target_domain_id, opt->buffer_size); + local_buffer = nms_alloc_static(topo_core_to_numa(nextcore), + opt->buffer_size); + target_buffer = nms_alloc_static(target_domain_id, + opt->buffer_size); + } + + if (local_buffer == nullptr || target_buffer == nullptr) { + goto end; } - double thread_tps = (double)opt->trans_per_second / (double)num_cores; - int tid = 0; while (nextcore != -1) { - auto *info = new struct thread_info; + auto info = new struct thread_info; cpuset_t cpuset; pthread_attr_t attr; - info->ia_gen = createGenerator(opt->ia_dist); + info->ia_gen = createGenerator(opts.ia_dist); if (info->ia_gen == nullptr) { - return; + goto end; } info->ia_gen->set_lambda(thread_tps); - info->reset_ts.store(false, std::memory_order_relaxed); - + info->init_status.store(0); info->state = &this->state; - info->init_num = &this->init_num; + info->reset_ts.store(false, std::memory_order_relaxed); info->num_trans.store(0); info->opts = &this->opts; - if (opt->shared_buffer) { - info->from_buffer = local_buffer; - info->to_buffer = target_buffer; - } else { - info->from_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size); - info->to_buffer = nms_alloc_static(target_domain_id, opt->buffer_size); - } + info->coreid = nextcore; + info->target_dom = target_domain_id; + info->from_buffer = local_buffer; + info->to_buffer = target_buffer; CPU_ZERO(&cpuset); CPU_SET(nextcore, &cpuset); @@ -125,8 +170,8 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai if (opts.verbose) { fprintf(stdout, - "memload_generator: created thread %d on core %d target domain %d\n", tid, - nextcore, target_domain_id); + "memload_generator: created thread %d on core %d target domain %d\n", + tid, nextcore, target_domain_id); } thr_infos.push_back(info); @@ -136,11 +181,21 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai tid++; } - while((uint)init_num.load() != thr_infos.size()); + for (auto tinfo : thr_infos) { + int status; + while ((status = tinfo->init_status.load()) != 1) { + if (tinfo->init_status.load() == -1) { + state.store(STATE_END); + *success = false; + goto end; + } + } + } state.store(STATE_RDY); *success = true; +end: if (opts.verbose) { fprintf(stdout, "memload_generator: exiting constructor. Success: %d...\n", @@ -171,10 +226,13 @@ memload_generator::stop() bool memload_generator::set_transactions(uint64_t tps) { - if (this->state.load() != STATE_END && this->state.load() != STATE_INIT) { - for(unsigned int i = 0; i < thr_infos.size(); i++) { - thr_infos.at(i)->ia_gen->set_lambda((double)tps/ (double)thr_infos.size()); - thr_infos.at(i)->reset_ts.store(true, std::memory_order_relaxed); + if (this->state.load() != STATE_END && + this->state.load() != STATE_INIT) { + for (unsigned int i = 0; i < thr_infos.size(); i++) { + thr_infos.at(i)->ia_gen->set_lambda( + (double)tps / (double)thr_infos.size()); + thr_infos.at(i)->reset_ts.store(true, + std::memory_order_relaxed); } return true; } @@ -193,10 +251,18 @@ memload_generator::get_transactions() memload_generator::~memload_generator() { + void *buf1, *buf2; this->state.store(STATE_END); for (auto i : thr_infos) { // XXX: nms_free regions pthread_join(i->pthr, NULL); + buf1 = i->from_buffer; + buf2 = i->to_buffer; delete i; } + + if (opts.shared_buffer) { + nms_free_static(buf1, opts.buffer_size); + nms_free_static(buf2, opts.buffer_size); + } } diff --git a/libnms/alloc.c b/libnms/alloc.c index 942d3b3..e5db9a9 100644 --- a/libnms/alloc.c +++ b/libnms/alloc.c @@ -35,6 +35,13 @@ struct nms_desc { static _Atomic(int) initialized = 0; static struct nms_desc g_desc; +void +nms_free_static(void * buf, size_t sz) +{ + munmap(buf, sz); + return; +} + void * nms_alloc_static(int node_id, size_t sz) { diff --git a/net/cat.cc b/net/cat.cc index ac1b80a..677c844 100644 --- a/net/cat.cc +++ b/net/cat.cc @@ -92,7 +92,7 @@ static struct options_t options; static uint16_t rx_add_timestamp(uint16_t port, uint16_t qidx __rte_unused, - struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, +struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { uint64_t now = topo_uptime_ns();