numam/libnm/loadgen.cc

195 lines
4.8 KiB
C++

#include "nmp.hh"
#include <sys/cpuset.h>
#include <sys/domainset.h>
#include <sys/thr.h>
#include <pthread.h>
#include <pthread_np.h>
void *
memload_generator::worker_thrd(void *_tinfo)
{
auto *tinfo = (struct thread_info *)_tinfo;
long tid;
thr_self(&tid);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator <thread %ld>: from domain %d to %d\n",
tid, tinfo->from_domainid, tinfo->to_domainid);
}
tinfo->from_region = nm_malloc(tinfo->from_domainid, REGION_SZ * FROM_REGION_CNT);
tinfo->to_region = nm_malloc(tinfo->to_domainid, REGION_SZ * FROM_REGION_CNT);
if (tinfo->from_region == nullptr || tinfo->to_region == nullptr) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (nm_get_verbose() > 0) {
fprintf(stderr,
"memload_generator <thread %ld>: failed to allocate memory\n", tid);
}
return nullptr;
}
// populate the region with 1/2/3s
for(uint i = 0; i < FROM_REGION_CNT; i++) {
memset((char*)tinfo->from_region + i * REGION_SZ, i + 1, REGION_SZ);
}
tinfo->init_status.store(thread_info::INIT_SUCCESS);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator <thread %ld>: init finished, waiting for start...\n",
tid);
}
while (tinfo->state->load() == STATE_READY) {
};
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator <thread %ld>: running...\n", tid);
}
uint64_t offset = 0;
uint64_t next_ts = nm_get_uptime_ns();
while (tinfo->state->load() == STATE_START) {
// generate traffic
uint64_t now = nm_get_uptime_ns();
if (now >= next_ts) {
next_ts = next_ts + tinfo->ia_gen->generate() * S2NS;
uint64_t to_offset = offset % REGION_SZ;
uint64_t from_offset = offset % (REGION_SZ * FROM_REGION_CNT);
memcpy((char *)tinfo->to_region + to_offset, (char *)tinfo->from_region + from_offset, TRANSACTION_SZ);
offset += TRANSACTION_SZ;
tinfo->num_trans.fetch_add(1);
}
}
nm_free(tinfo->from_domainid, tinfo->from_region);
nm_free(tinfo->to_domainid, tinfo->to_region);
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator <thread %ld>: exiting...\n", tid);
}
return nullptr;
}
memload_generator::memload_generator(
uint64_t from_cmask, uint64_t to_cmask, uint64_t bps, bool *success)
{
*success = false;
state.store(STATE_READY);
int nextcore;
int to_coreid = cmask_get_next_cpu(&to_cmask);
int num_cores = cmask_get_num_cpus(from_cmask);
cpuset_t cpuset;
if (to_coreid == NEXT_CPU_NULL || num_cores == 0) {
return;
}
while ((nextcore = cmask_get_next_cpu(&from_cmask)) != NEXT_CPU_NULL) {
auto *info = new struct thread_info;
pthread_attr_t attr;
info->ia_gen = createGenerator("exponential");
info->ia_gen->set_lambda(((double)(bps) / (double)num_cores) /
(double)(REGION_SZ));
info->num_trans.store(0);
info->to_domainid = nm_obj_get_id(nm_obj_find_parent(nm_obj_from_id(NM_LEVEL_CORE, to_coreid), NM_LEVEL_NUMA));
info->from_domainid = nm_obj_get_id(nm_obj_find_parent(nm_obj_from_id(NM_LEVEL_CORE, nextcore), NM_LEVEL_NUMA));
info->init_status.store(thread_info::INIT_START);
info->state = &state;
CPU_ZERO(&cpuset);
CPU_SET(nextcore, &cpuset);
pthread_attr_init(&attr);
pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &cpuset);
pthread_create(&info->pthr, &attr, worker_thrd, info);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator: created thread on core %d\n",
nextcore);
}
thr_infos.push_back(info);
}
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator: waiting for thread init...\n");
}
bool failed = false;
uint num_success = 0;
while (num_success < thr_infos.size() && !failed) {
num_success = 0;
for (auto i : thr_infos) {
if (i->init_status.load() ==
thread_info::INIT_SUCCESS) {
num_success++;
}
if (i->init_status.load() == thread_info::INIT_FAILED) {
failed = true;
}
}
}
*success = num_success == thr_infos.size();
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator: exiting constructor. Success: %d...\n",
success ? 1 : 0);
}
}
void
memload_generator::start()
{
if (this->state.load() == STATE_READY) {
state.store(STATE_START);
begin_ts = nm_get_uptime_ns();
}
}
void
memload_generator::stop()
{
if (this->state.load() != STATE_STOP) {
stop_ts = nm_get_uptime_ns();
state.store(STATE_STOP);
}
}
uint64_t
memload_generator::get_bps()
{
uint64_t now = state.load() == STATE_STOP ? stop_ts :
nm_get_uptime_ns();
uint64_t total_transactions = 0;
for (auto i : thr_infos) {
total_transactions += i->num_trans.load();
}
return (double)(TRANSACTION_SZ * total_transactions) /
(double)((now - begin_ts) / (S2NS));
}
memload_generator::~memload_generator()
{
if (this->state.load() != STATE_STOP) {
stop();
}
for (auto i : thr_infos) {
pthread_join(i->pthr, nullptr);
delete i->ia_gen;
delete i;
}
}