memloadgen pct support

This commit is contained in:
quackerd 2022-11-16 08:44:43 +01:00
parent 1836bd89df
commit 18339fb109
8 changed files with 184 additions and 61 deletions

View File

@ -302,7 +302,7 @@ class memload_generator {
size_t buffer_size {64*1024*1024};
char ia_dist[64]{"fixed"};
int verbose {0};
uint64_t bytes_per_second;
uint64_t trans_per_second;
bool shared_buffer {true};
};
@ -312,7 +312,7 @@ class memload_generator {
pthread_t pthr;
void *from_buffer;
void *to_buffer;
std::atomic<int> *stop;
std::atomic<bool> reset_ts;
Generator * ia_gen;
int tid;
@ -341,5 +341,6 @@ class memload_generator {
uint64_t get_transactions();
bool start();
bool stop();
bool set_transactions(uint64_t tps);
~memload_generator();
};

View File

@ -12,6 +12,9 @@ nms_init(int verbose);
void *
nms_malloc(int nodeid, size_t sz);
void *
nms_alloc_static(int nodeid, size_t sz);
void
nms_free(int nodeid, void * addr);

View File

@ -43,7 +43,11 @@ memload_generator::worker_thrd(void *_tinfo)
memcpy((char *)tinfo->from_buffer + cur_offset, (char *)tinfo->to_buffer, tinfo->opts->transaction_size);
tinfo->num_trans.fetch_add(1);
next_ts += tinfo->ia_gen->generate() * (double)S2NS;
if (tinfo->reset_ts.load(std::memory_order_relaxed)) {
tinfo->reset_ts.store(false, std::memory_order_relaxed);
next_ts = cur_ts;
}
next_ts += tinfo->ia_gen->generate() * (double)S2NS;
cur_offset += tinfo->opts->transaction_size;
}
break;
@ -82,11 +86,11 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
void * local_buffer;
void * target_buffer;
if (opts.shared_buffer) {
local_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->buffer_size);
target_buffer = nms_malloc(target_domain_id, opt->buffer_size);
local_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size);
target_buffer = nms_alloc_static(target_domain_id, opt->buffer_size);
}
double thread_tps = (double)opt->bytes_per_second / (double)num_cores / (double)opt->transaction_size;
double thread_tps = (double)opt->trans_per_second / (double)num_cores;
int tid = 0;
while (nextcore != -1) {
auto *info = new struct thread_info;
@ -98,6 +102,7 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
return;
}
info->ia_gen->set_lambda(thread_tps);
info->reset_ts.store(false, std::memory_order_relaxed);
info->state = &this->state;
info->init_num = &this->init_num;
@ -107,8 +112,8 @@ memload_generator::memload_generator(cpuset_t * threads, cpuset_t * target_domai
info->from_buffer = local_buffer;
info->to_buffer = target_buffer;
} else {
info->from_buffer = nms_malloc(topo_core_to_numa(nextcore), opt->buffer_size);
info->to_buffer = nms_malloc(target_domain_id, opt->buffer_size);
info->from_buffer = nms_alloc_static(topo_core_to_numa(nextcore), opt->buffer_size);
info->to_buffer = nms_alloc_static(target_domain_id, opt->buffer_size);
}
CPU_ZERO(&cpuset);
@ -162,6 +167,19 @@ memload_generator::stop()
return false;
}
bool
memload_generator::set_transactions(uint64_t tps)
{
if (this->state.load() != STATE_END && this->state.load() != STATE_INIT) {
for(unsigned int i = 0; i < thr_infos.size(); i++) {
thr_infos.at(i)->ia_gen->set_lambda((double)tps/ (double)thr_infos.size());
thr_infos.at(i)->reset_ts.store(true, std::memory_order_relaxed);
}
return true;
}
return false;
}
uint64_t
memload_generator::get_transactions()
{

View File

@ -16,7 +16,7 @@
#define MAX_NUMA_DOMAINS (64)
#define MAX_REGIONS (64)
#define REGION_SIZE (1024 * 1024 * 1024)
#define MALLOC_UNIT (4096)
#define PAGE_SIZE (4096)
struct nms_region {
uintptr_t start_addr;
@ -35,8 +35,8 @@ struct nms_desc {
static _Atomic(int) initialized = 0;
static struct nms_desc g_desc;
static void *
nms_alloc_region(int nodeid, size_t sz)
void *
nms_alloc_static(int node_id, size_t sz)
{
long tid;
domainset_t orig_dom;
@ -55,7 +55,7 @@ nms_alloc_region(int nodeid, size_t sz)
domainset_t tmp_domain;
DOMAINSET_ZERO(&tmp_domain);
DOMAINSET_SET(nodeid, &tmp_domain);
DOMAINSET_SET(node_id, &tmp_domain);
ret = cpuset_setdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, tid, sizeof(tmp_domain), &tmp_domain, DOMAINSET_POLICY_ROUNDROBIN);
if (ret != 0) {
@ -63,14 +63,16 @@ nms_alloc_region(int nodeid, size_t sz)
return NULL;
}
if ((region = mmap(NULL, REGION_SIZE, PROT_READ | PROT_WRITE, MAP_ANON | MAP_ALIGNED_SUPER | MAP_NOCORE | MAP_PRIVATE | MAP_NOSYNC, -1, 0)) == MAP_FAILED) {
if ((region = mmap(NULL, sz, PROT_READ | PROT_WRITE, MAP_ANON | MAP_ALIGNED_SUPER | MAP_NOCORE | MAP_PRIVATE | MAP_NOSYNC | MAP_PREFAULT_READ, -1, 0)) == MAP_FAILED) {
fprintf(stderr, "libnms: mmap failed with %d\n", errno);
return NULL;
}
// touch the pages to prefault the pages
for (size_t i = 0; i < REGION_SIZE; i += MALLOC_UNIT) {
*(uint8_t*)((uintptr_t)region + i) = 0;
int sum;
for (size_t i = 0; i < sz; i++) {
sum += *(uint8_t *)((char *)region + i);
*(uint8_t *)((char *)region + i) = 0;
}
// restore existing thread's allocation strategy
@ -99,7 +101,7 @@ nms_region_malloc(struct nms_region * region, size_t size)
if (region->size >= region->occupied + size) {
ret = (void *)(region->start_addr + region->occupied);
region->occupied += size;
region->occupied = (region->occupied + MALLOC_UNIT - 1) & ~(MALLOC_UNIT - 1);
region->occupied = (region->occupied + PAGE_SIZE - 1) & ~(PAGE_SIZE - 1);
}
return ret;
}
@ -110,7 +112,7 @@ nms_desc_add_region(struct nms_desc * desc, int nodeid, size_t size)
void * ret;
int idx;
ret = nms_alloc_region(nodeid, REGION_SIZE);
ret = nms_alloc_static(nodeid, REGION_SIZE);
if (ret == NULL) {
fprintf(stderr, "libnms: failed to allocate region on node %d\n", nodeid);
return ENOMEM;
@ -150,7 +152,7 @@ retry:
pthread_mutex_unlock(&desc->alloc_lock);
return NULL;
}
fprintf(stdout, "libnms: request of size %zu -> allocated new region on node %d\n", size, nodeid);
fprintf(stdout, "libnms: malloc request of size %zu -> allocated new region on node %d\n", size, nodeid);
goto retry;
}

View File

@ -1,15 +1,13 @@
from http import client, server
import subprocess as sp
import time
import os
import datetime
import sys
import json
import getopt
import numpy as np
import libpar as par
import libtc as tc
import libmechspec as mechspec
LOG_FILEPATH = "/iperflogs"
BIN_PATH = "/iperftls"
@ -19,18 +17,19 @@ SSL_CERT = "/certs/server.crt"
SSL_PKEY = "/certs/server.key"
SERVER_PORT_START = 8050
MEMLOAD_BLKSZ = 1024*1024*64
RUNS=8
RUNS=3
# paths
file_dir : str = os.path.dirname(os.path.realpath(__file__))
root_dir : str = os.path.join(file_dir,"..")
class Conf:
def __init__(self, server, clients, server_dat, clients_dat, clients_affinity, affinity, sendfile, tls, ktls, memloadgen, filepath):
def __init__(self, server, clients, server_dat, clients_dat, clients_affinity, affinity, sendfile, tls, ktls, memloadgen, filepath, odirect):
self.affinity = affinity
self.ktls = ktls
self.sendfile = sendfile
self.tls = tls
self.odirect = odirect
self.memloadgen = memloadgen
self.filepath = filepath
self.server = server
@ -40,7 +39,7 @@ class Conf:
self.clients_affinity = clients_affinity
def to_string(self):
return f"server.{self.server[0]}_affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_memloadgen.{self.memloadgen != None}_filepath.{self.filepath.replace('/','-')}"
return f"server.{self.server[0]}_affinity.{self.affinity}_sendfile.{self.sendfile}_tls.{self.tls}_ktls.{self.ktls}_memloadgen.{False if self.memloadgen == None else self.memloadgen[0].split('.')[2]}_filepath.{self.filepath.replace('/','-')}_odirect.{self.odirect}"
class ArgTypes:
def __init__(self):
@ -83,11 +82,15 @@ arg_types.add_arg([
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95_0", "25,27,29,31,33,35,37,39,41,43,45,47_1"]], [None]] )
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95.0.0", "25,27,29,31,33,35,37,39,41,43,45,47.1.0"]],
[["73,75,77,79,81,83,85,87,89,91,93,95.0.50", "25,27,29,31,33,35,37,39,41,43,45,47.1.50"]],
[None]] )
#arg_types.add_arg([[None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
#arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/nvdimm/large_file_#p"]])
#arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/tmpfs/large_file_#p"]])
# ODIRECT
arg_types.add_arg([[False]])
#arg_types.add_arg([["/tmpfs/large_file_#p"]])
all_args.append(arg_types)
@ -113,9 +116,13 @@ arg_types.add_arg([
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95_0", "25,27,29,31,33,35,37,39,41,43,45,47_1"]], [None]])
arg_types.add_arg([[["73,75,77,79,81,83,85,87,89,91,93,95.0.0", "25,27,29,31,33,35,37,39,41,43,45,47.1.0"]],
[["73,75,77,79,81,83,85,87,89,91,93,95.0.50", "25,27,29,31,33,35,37,39,41,43,45,47.1.50"]],
[None]])
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/tmpfs/large_file_#p"]])
# odirect
arg_types.add_arg([[False]])
all_args.append(arg_types)
@ -141,9 +148,12 @@ arg_types.add_arg([
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119_0", "33,35,37,39,41,43,45,47,49,51,53,55_1"]], [None]] )
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119.0.0", "33,35,37,39,41,43,45,47,49,51,53,55.1.0"]],
[["97,99,101,103,105,107,109,111,113,115,117,119.0.50", "33,35,37,39,41,43,45,47,49,51,53,55.1.50"]], [None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/tmpfs/large_file_#p"]])
# odirect
arg_types.add_arg([[True]])
all_args.append(arg_types)
@ -169,9 +179,12 @@ arg_types.add_arg([
[True, True, True],
])
# memloadgen
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119_0", "33,35,37,39,41,43,45,47,49,51,53,55_1"]], [None]] )
arg_types.add_arg([[["97,99,101,103,105,107,109,111,113,115,117,119.0.0", "33,35,37,39,41,43,45,47,49,51,53,55.1.0"]],
[["97,99,101,103,105,107,109,111,113,115,117,119.0.50", "33,35,37,39,41,43,45,47,49,51,53,55.1.50"]], [None]] )
# filepath
arg_types.add_arg([["/mnt/zroot/large_file_#p"]])
arg_types.add_arg([["/mnt/zroot/large_file_#p"], ["/tmpfs/large_file_#p"]])
# odirect
arg_types.add_arg([[True]])
all_args.append(arg_types)
def parse_comma_list(input : str):
@ -272,17 +285,17 @@ def run_exp(conf : Conf):
if conf.memloadgen != None:
for emem in conf.memloadgen:
mlg_cmd = "sudo "
mlg_cpu = emem.split("_")[0]
mlg_dom = emem.split("_")[1]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)}"
mlg_cpu = emem.split(".")[0]
mlg_dom = emem.split(".")[1]
mlg_pct = emem.split(".")[2]
mlg_cmd += f"{MLG_PATH} -b {MEMLOAD_BLKSZ} -T {MEMLOAD_BLKSZ} -s {mlg_cpu} -d {mlg_dom} -o {LOG_FILEPATH}/memloadgen_{len(smlg)} -p {mlg_pct} -H 4 -w 17"
tc.log_print("Starting memloadgen...")
tc.log_print(mlg_cmd)
smlg.append(tc.remote_exec(conf.server, mlg_cmd, blocking=False)[0])
smlg_names.append("memloadgen")
time.sleep(0.1)
time.sleep(10)
time.sleep(5)
for eaff in parse_comma_list(conf.affinity):
server_cmd = "sudo "
server_cmd += f"{EXE_PATH} -s -p " + str(SERVER_PORT_START + cur_srv_proc) + \
@ -293,6 +306,8 @@ def run_exp(conf : Conf):
server_cmd += f" --enable-ssl-over-tcp --ssl-certificate {SSL_CERT} --ssl-private-key {SSL_PKEY}"
if conf.ktls:
server_cmd += " --enable-ssl-ktls"
if conf.odirect:
server_cmd += " --use-odirect"
# start server
tc.log_print("Starting server proc " + str(cur_srv_proc) + "...")
tc.log_print(server_cmd)
@ -302,7 +317,6 @@ def run_exp(conf : Conf):
cur_srv_proc = cur_srv_proc + 1
time.sleep(0.1)
time.sleep(5)
# start clients
tc.log_print("Starting clients...")
sclts = []
@ -315,7 +329,7 @@ def run_exp(conf : Conf):
" -p " + str(SERVER_PORT_START + clt_number) + \
" --connect-timeout 1000" + \
" -A " + eaff + \
" -t 30" + \
" -t 25" + \
" -P 4" + \
" -R" + \
" -N" + \
@ -390,6 +404,7 @@ def flush_netresult(conf : Conf) -> bool:
log_output = log_output + "/" + os.path.basename(LOG_FILEPATH)
logs = os.listdir(log_output)
logs_bytes = []
memload_bytes = []
for log in logs:
tc.log_print("Processing " + log + "...")
if os.path.isfile(log_output + "/" + log) and log.endswith(".txt"):
@ -398,12 +413,28 @@ def flush_netresult(conf : Conf) -> bool:
if len(buf) > 0:
logs_bytes.append(buf)
else:
tc.log_print("Warning: log file empty for " + log)
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
if os.path.isfile(log_output + "/" + log) and ("memloadgen" in log):
with open(log_output + "/" + log, "r") as f:
memloadbuf = f.read()
if len(memloadbuf) == 0:
tc.log_print("Warning: log file empty for " + log + ". Retrying...")
return False
else:
memload_bytes.append(memloadbuf)
try:
parser = par.iperf_json_parser(logs_bytes)
bps = []
for ml in memload_bytes:
memparser = par.memloadgen_parser(ml, 22, 32)
bps.append(memparser.bps)
tc.log_print("Aggregated throughput: " + "{:.2f}".format(parser.aggregate_egress_bps / 8.0) + " B/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0) + " MB/s " + \
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s")
"{:.2f}".format(parser.aggregate_egress_bps / 8.0 / 1024.0 / 1024.0 / 1024.0) + " GB/s" + \
" | Memload generator: " + "{:.2f}".format(np.sum(bps) / 1024.0 / 1024.0 / 1024.0) + " GB/s" )
except Exception as e:
tc.log_print("Warning: exception for parsing logs: " + str(e) + " restarting...")
scp_cmd = "sudo rm -rf " + log_output + "/*"

View File

@ -64,6 +64,7 @@ def process_dir(rootdir: str, dat : dict[any]):
elif sendfile == "False" and tls == "True" and ktls == "True":
mode = "ktls"
memloadgen_bps = []
logs = []
enum_files(rootdir, logs)
logs_bytes = []
@ -75,6 +76,15 @@ def process_dir(rootdir: str, dat : dict[any]):
logs_bytes.append(buf)
else:
print("Warning: log file empty for " + log)
elif "memloadgen" in log:
with open(log, "r") as f:
buf = f.read()
if len(buf) > 0:
parser = par.memloadgen_parser(buf, 20, 30)
memloadgen_bps.append(parser.bps)
else:
print("Warning: memloadgen file empty for " + log)
try:
parser = par.iperf_json_parser(logs_bytes)
@ -85,7 +95,7 @@ def process_dir(rootdir: str, dat : dict[any]):
if (affinity,mode) not in dat[(memloadgen, fs)]:
dat[(memloadgen, fs)][(affinity,mode)] = []
dat[(memloadgen, fs)][(affinity,mode)].append(parser.aggregate_egress_bps)
dat[(memloadgen, fs)][(affinity,mode)].append((parser.aggregate_egress_bps, np.sum(memloadgen_bps)))
return
for subdir in os.listdir(rootdir):
@ -126,16 +136,15 @@ def main():
line += "N/A,"
else:
vals = data[(affinity, mode)]
real_vals = []
real_mlg = []
for i in range(0, len(vals)):
vals[i] = vals[i] / 1024.0 / 1024.0 / 1024.0 / 8.0
avg = np.average(vals)
stddev = np.std(vals)
line += "{:.2f} ({:.2f})".format(avg, stddev) + ","
real_vals.append(vals[i][0] / 1024.0 / 1024.0 / 1024.0 / 8.0)
real_mlg.append(vals[i][1] / 1024.0 / 1024.0 / 1024.0)
line += "{:.2f} ({:.2f}) [{:.2f} ({:.2f})]".format(np.average(real_vals), np.std(real_vals),
np.average(real_mlg), np.std(real_mlg)) + ","
print(line)
print("")
if __name__ == "__main__":

View File

@ -10,7 +10,21 @@ class iperf_json_parser:
self.jsonobjs.append(jsobj)
each_bps = jsobj['end']['sum_sent']['bits_per_second']
self.aggregate_egress_bps += each_bps
class memloadgen_parser:
def __init__(self, input, min, max):
lines = input.split('\n')
if max > len(lines):
max = len(lines)
if len(lines) <= min:
raise Exception("Not enough lines!")
if min > max:
min = max
arr = []
for i in range(min, max):
arr.append(int(lines[i]))
self.bps = np.mean(arr)
class pmc_parser:
def __init__(self, input):

View File

@ -2,6 +2,7 @@
#include "gen.hh"
#include <cstdlib>
#include <cstring>
#include <list>
#include <iostream>
#include <fstream>
#include "ntr.h"
@ -18,13 +19,16 @@ usage()
" -v: verbose mode\n"
" -b: buffer size\n"
" -q: bytes per second\n"
" -p: target throughput percentage\n"
" -d: destination domain index\n"
" -s: worker threads cpu list\n"
" -S: enable shared buffer\n"
" -t: time to run\n"
" -T: transaction size\n"
" -i: inter arrival time distribution\n"
" -o: output file path\n");
" -o: output file path\n"
" -H: history size for pct adjustment\n"
" -w: warmup time before pct adjustment\n");
fflush(stdout);
}
@ -35,14 +39,18 @@ int main(int argc, char * argv[])
ntr_init();
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
size_t arr_sz = 32 * 1024 * 1024;
size_t arr_sz = 64 * 1024 * 1024;
uint32_t time = -1;
uint64_t bps = 0;
uint64_t transaction_size = 4096;
uint64_t transaction_size = arr_sz;
cpuset_t threads;
uint32_t pct = 0;
CPU_ZERO(&threads);
CPU_SET(0, &threads);
char ia_dist[32] = "fixed";
int history_sz = 0;
int warmup = 0;
std::list<uint64_t> history;
int shared_buffer = 0;
cpuset_t domain_mask;
@ -51,7 +59,7 @@ int main(int argc, char * argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "vhb:d:s:So:T:t:q:i:")) != -1) {
while ((c = getopt(argc, argv, "vhb:d:s:So:T:t:q:i:p:H:w:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
@ -86,6 +94,15 @@ int main(int argc, char * argv[])
case 'i':
strncpy(ia_dist, optarg, sizeof(ia_dist));
break;
case 'p':
pct = strtoul(optarg, nullptr, 10);
break;
case 'H':
history_sz = strtol(optarg, nullptr, 10);
break;
case 'w':
warmup = strtol(optarg, nullptr, 10);
break;
default:
usage();
exit(0);
@ -98,11 +115,17 @@ int main(int argc, char * argv[])
" num threads: %d\n"
" target domain: %ld\n"
" bytes per second: %lu\n"
" percentage: %d%%\n"
" interarrival distribution: %s\n"
" shared buffer: %d\n"
" transaction time: %lu\n"
" runtime: %d\n\n",
arr_sz, CPU_COUNT(&threads), CPU_FFS(&domain_mask) - 1, bps, ia_dist, shared_buffer, transaction_size,time);
" runtime: %d\n"
" history: %d\n"
" warmup: %d\n",
arr_sz, CPU_COUNT(&threads),
CPU_FFS(&domain_mask) - 1, bps,
pct, ia_dist, shared_buffer,
transaction_size,time, history_sz, warmup);
// init topo
if (topo_init(ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT)) {
@ -119,7 +142,7 @@ int main(int argc, char * argv[])
bool success = false;
memload_generator::memload_generator_options opts;
opts.buffer_size = arr_sz;
opts.bytes_per_second = bps;
opts.trans_per_second = bps / transaction_size;
opts.shared_buffer = shared_buffer;
opts.transaction_size = transaction_size;
opts.verbose = ntr_get_level(NTR_DEP_USER1) != NTR_LEVEL_DEFAULT;
@ -142,13 +165,35 @@ int main(int argc, char * argv[])
uint64_t cur_ts = topo_uptime_ns();
uint64_t trans = mgen->get_transactions();
uint64_t bps = (uint64_t)((double)((trans - prev_trans) * transaction_size) / ((double)(cur_ts - prev_ts) / (double)S2NS));
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: MLG bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "bps = %ld ~= %ldM\n", bps, bps / 1024 / 1024);
ofile << bps << std::endl;
prev_ts = cur_ts;
prev_trans = trans;
cur_time++;
if((int)cur_time <= warmup) {
// keep history
history.emplace_back(bps);
if ((int)history.size() > history_sz) {
history.pop_front();
}
// adjust time
if ((int)cur_time == warmup) {
uint64_t sum = 0;
size_t sz = history.size();
while (history.size() > 0) {
sum += history.front();
history.pop_front();
}
uint64_t newbps = (uint64_t)((sum / sz) * (double)pct / 100.0);
mgen->set_transactions(newbps / transaction_size);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "adjusted target bps = %ld ~= %ldM\n", newbps, newbps / 1024 / 1024);
}
}
}
mgen->stop();
delete mgen;