add signal control

This commit is contained in:
quackerd 2023-03-05 15:58:06 +01:00
parent 521a49d945
commit 6cd0e7d12f

View File

@ -1,5 +1,8 @@
#include <sys/endian.h> #include <sys/endian.h>
#include <sys/signal.h>
#include "gen.hh" #include "gen.hh"
#include <array>
#include <atomic>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <list> #include <list>
@ -8,6 +11,7 @@
#include "ntr.h" #include "ntr.h"
#include "nms.h" #include "nms.h"
#include <getopt.h> #include <getopt.h>
#include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <topo.h> #include <topo.h>
@ -19,7 +23,7 @@ usage()
" -v: verbose mode\n" " -v: verbose mode\n"
" -b: buffer size\n" " -b: buffer size\n"
" -q: bytes per second\n" " -q: bytes per second\n"
" -p: target throughput percentage\n" " -p: target throughput percentage (triggered by SIGUSR1)\n"
" -d: destination domain index\n" " -d: destination domain index\n"
" -s: worker threads cpu list\n" " -s: worker threads cpu list\n"
" -S: enable shared buffer\n" " -S: enable shared buffer\n"
@ -28,13 +32,20 @@ usage()
" -i: inter arrival time distribution\n" " -i: inter arrival time distribution\n"
" -o: output file path\n" " -o: output file path\n"
" -H: history size for pct adjustment\n" " -H: history size for pct adjustment\n"
" -w: warmup time before pct adjustment\n"
" -M: print this string when threads are ready to run\n"); " -M: print this string when threads are ready to run\n");
fflush(stdout); fflush(stdout);
} }
static std::atomic<int> rate_control {0};
static char output_file[256] = "memloadgen_samples.txt"; static char output_file[256] = "memloadgen_samples.txt";
void sig_handler(int sig)
{
if (sig == SIGUSR1 && rate_control.load(std::memory_order_relaxed) == 0) {
rate_control.store(1, std::memory_order_relaxed);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "enabling rate control....\n");
}
}
int main(int argc, char * argv[]) int main(int argc, char * argv[])
{ {
ntr_init(); ntr_init();
@ -50,8 +61,7 @@ int main(int argc, char * argv[])
CPU_ZERO(&threads); CPU_ZERO(&threads);
CPU_SET(0, &threads); CPU_SET(0, &threads);
char ia_dist[32] = "fixed"; char ia_dist[32] = "fixed";
int history_sz = 0; int history_sz = 5;
int warmup = 0;
std::list<uint64_t> history; std::list<uint64_t> history;
int shared_buffer = 0; int shared_buffer = 0;
@ -61,7 +71,7 @@ int main(int argc, char * argv[])
{ {
int c; int c;
// parse arguments // parse arguments
while ((c = getopt(argc, argv, "vhb:d:s:So:T:t:q:i:p:H:w:M:")) != -1) { while ((c = getopt(argc, argv, "vhb:d:s:So:T:t:q:i:p:H:M:")) != -1) {
switch (c) { switch (c) {
case 'v': case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
@ -102,9 +112,6 @@ int main(int argc, char * argv[])
case 'H': case 'H':
history_sz = strtol(optarg, nullptr, 10); history_sz = strtol(optarg, nullptr, 10);
break; break;
case 'w':
warmup = strtol(optarg, nullptr, 10);
break;
case 'M': case 'M':
strncpy(magic, optarg, sizeof(magic)); strncpy(magic, optarg, sizeof(magic));
break; break;
@ -126,11 +133,11 @@ int main(int argc, char * argv[])
" transaction time: %lu\n" " transaction time: %lu\n"
" runtime: %d\n" " runtime: %d\n"
" history: %d\n" " history: %d\n"
" warmup: %d\n", " magic: %s\n",
arr_sz, CPU_COUNT(&threads), arr_sz, CPU_COUNT(&threads),
CPU_FFS(&domain_mask) - 1, bps, CPU_FFS(&domain_mask) - 1, bps,
pct, ia_dist, shared_buffer, pct, ia_dist, shared_buffer,
transaction_size,time, history_sz, warmup); transaction_size,time, history_sz, magic);
// init topo // init topo
if (topo_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) { if (topo_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) {
@ -143,7 +150,10 @@ int main(int argc, char * argv[])
fprintf(stderr, "libnms init failed!\n"); fprintf(stderr, "libnms init failed!\n");
exit(1); exit(1);
} }
if (signal(SIGUSR1, sig_handler) == SIG_ERR) {
fprintf(stderr, "signal() failed %d!\n", errno);
exit(1);
}
bool success = false; bool success = false;
memload_generator::memload_generator_options opts; memload_generator::memload_generator_options opts;
opts.buffer_size = arr_sz; opts.buffer_size = arr_sz;
@ -181,15 +191,18 @@ int main(int argc, char * argv[])
prev_trans = trans; prev_trans = trans;
cur_time++; cur_time++;
if((int)cur_time <= warmup) { int rc = rate_control.load(std::memory_order_relaxed);
switch(rc) {
case 0:
case 1:
// keep history // keep history
history.emplace_back(bps); history.emplace_back(bps);
if ((int)history.size() > history_sz) { if ((int)history.size() > history_sz) {
history.pop_front(); history.pop_front();
} }
// adjust time if (rc == 1) {
if ((int)cur_time == warmup) { rate_control.store(2, std::memory_order_relaxed);
uint64_t sum = 0; uint64_t sum = 0;
size_t sz = history.size(); size_t sz = history.size();
while (history.size() > 0) { while (history.size() > 0) {
@ -197,10 +210,13 @@ int main(int argc, char * argv[])
history.pop_front(); history.pop_front();
} }
uint64_t newbps = (uint64_t)((sum / sz) * (double)pct / 100.0); uint64_t newbps = ((sum / sz) * (double)pct / 100.0);
mgen->set_transactions(newbps / transaction_size); mgen->set_transactions(newbps / transaction_size);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "adjusted target bps = %ld ~= %ldM\n", newbps, newbps / 1024 / 1024); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "adjusted target bps = %ld ~= %ldM\n", newbps, newbps / 1024 / 1024);
} }
break;
default:
break;
} }
} }
mgen->stop(); mgen->stop();