+ packet loss control & + packet depth control

This commit is contained in:
quackerd 2021-02-21 05:16:39 -05:00
parent d1e43dcf2f
commit 1fd9be7f13
Signed by: d
GPG Key ID: F73412644EDE357A
6 changed files with 224 additions and 73 deletions

View File

@ -70,10 +70,12 @@ struct options_t {
unsigned int s_rxqid { 0 };
unsigned int s_txqid { 0 };
// for qps calculation
unsigned int s_total_pkts { 0 };
std::atomic<uint32_t> s_total_pkts { 0 };
std::atomic<uint64_t> s_start_time { 0 };
std::atomic<uint64_t> s_end_time { 0 };
std::atomic<uint32_t> s_slave_qps { 0 };
std::atomic<uint32_t> s_slave_total { 0 };
std::atomic<uint32_t> s_slave_loss { 0 };
Generator *s_iagen { nullptr };
std::vector<struct datapt *> s_data;
@ -393,7 +395,7 @@ pkt_loop()
break;
}
options.s_total_pkts++;
options.s_total_pkts.fetch_add(1);
recv_resp = true;
break;
@ -577,10 +579,14 @@ locore_main(void *tif __rte_unused)
auto pld_qps = (struct pkt_payload_qps *)
pkt_hdr->payload;
uint32_t qps = rte_be_to_cpu_32(pld_qps->qps);
uint32_t total = rte_be_to_cpu_32(pld_qps->total_pkts);
uint32_t loss = rte_be_to_cpu_32(pld_qps->total_loss);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main: received qps %d from client %d\n",
qps, i);
options.s_slave_qps.fetch_add(qps);
options.s_slave_loss.fetch_add(loss);
options.s_slave_total.fetch_add(total);
rte_pktmbuf_free(mbufs[i]);
}
}
@ -922,14 +928,17 @@ main(int argc, char *argv[])
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
// calculate QPS
uint32_t qps = (uint32_t)((double)options.s_total_pkts) /
uint32_t qps = (uint32_t)((double)options.s_total_pkts.load()) /
(((double)(options.s_end_time.load() -
options.s_start_time.load()) /
(double)S2NS));
qps += options.s_slave_qps.load();
uint32_t tot = options.s_slave_total.load() + options.s_total_pkts.load();
uint32_t loss = options.s_slave_loss.load();
// dump stats
log_file << qps << std::endl;
log_file << qps << ',' << tot << ',' << loss << std::endl;
for (auto it : options.s_data) {
if (it->valid) {
log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ','
@ -941,7 +950,7 @@ main(int argc, char *argv[])
}
log_file.close();
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Total QPS = %d\n", qps);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "qps = %d, total = %d, loss = %d\n", qps, tot, loss);
// clean up
rte_eth_dev_stop(portid);

View File

@ -160,6 +160,8 @@ constexpr static uint16_t PKT_TYPE_FIN = 7;
constexpr static uint16_t PKT_TYPE_FIN_ACK = 8;
struct pkt_payload_qps {
uint32_t qps;
uint32_t total_pkts;
uint32_t total_loss;
};
constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_FIN_ACK + 1;

View File

@ -16,6 +16,9 @@
#include "util.h"
#include <atomic>
#include <list>
#include <map>
#include <mutex>
#include <random>
#include <vector>
@ -23,7 +26,7 @@ constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 1024;
constexpr static unsigned int TX_RING_SIZE = 1024;
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int BURST_SIZE = 8;
static const struct rte_eth_conf port_conf_default {
};
@ -46,16 +49,26 @@ epoch_get_epoch(unsigned int epoch)
return epoch & 0x00FFFFFF;
}
struct epoch_info {
unsigned int epoch;
uint64_t ts;
};
struct thread_info {
unsigned int id { 0 };
unsigned int lcore_id { 0 };
unsigned int rxqid { 0 };
unsigned int txqid { 0 };
// this field is read by the stat collecting thread
std::atomic<int> total_pkts { 0 };
std::atomic<int> lost_pkts { 0 };
Generator *ia_gen { nullptr };
Generator *load_gen { nullptr };
std::atomic<uint32_t> cur_epoch { 0 };
std::atomic<bool> epoch_recv { true };
std::mutex
mtx; // this lock protects data shared between worker threads, i.e.:
std::list<struct epoch_info *> recved_epochs;
};
constexpr static int STATE_SYNC = 0; // waiting for SYNC
@ -67,13 +80,15 @@ struct options_t {
unsigned int run_time { 5 };
// parameters
int slave_mode { 0 };
unsigned long rage_quit_time { (unsigned long)-1 };
uint32_t rage_quit_time { UINT32_MAX };
char ia_gen[256] { "fixed" };
char ld_gen[256] { "fixed:0" };
uint32_t target_qps { 0 };
uint32_t depth = 1;
struct net_spec server_spec {
};
uint64_t cpu_mask { 0x4 }; // 1 thread @ core 2
uint32_t pkt_loss_delay_ms = UINT32_MAX;
// states
unsigned int s_num_threads { 1 }; // 1 thread
@ -96,17 +111,30 @@ struct options_t {
static struct options_t options;
static inline uint32_t
calc_qps(uint64_t now)
static inline void
calc_stats(
uint64_t now, uint32_t *qps, uint32_t *total_pkt, uint32_t *total_loss)
{
uint32_t tot = 0;
uint32_t loss = 0;
for (auto i : options.s_thr_info) {
tot += i->total_pkts.load();
loss += i->lost_pkts.load();
}
return (uint32_t)((double)tot /
((double)(now - options.s_ts_begin.load()) / (double)S2NS));
if (total_pkt != nullptr) {
*total_pkt = tot;
}
if (total_loss != nullptr) {
*total_loss = loss;
}
if (qps != nullptr) {
*qps = (uint32_t)((double)(tot - loss) /
((double)(now - options.s_ts_begin.load()) / (double)S2NS));
}
}
static void
@ -222,10 +250,13 @@ proto_loop(struct thread_info *tinfo)
static void
pkt_loop(struct thread_info *tinfo)
{
struct rte_mbuf *tx_buf;
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct rte_mbuf *rx_bufs[BURST_SIZE];
std::vector<struct epoch_info *> recved_epochs;
std::map<unsigned int, struct epoch_info *> sent_epochs;
uint64_t cur_epoch = 0;
uint64_t next_ts;
uint64_t last_ts;
uint64_t last_recv_ts = 0;
struct conn_spec srv_cspec;
rdport_generator src_port_gen(MIN_RANDOM_PORT);
rdport_generator dst_port_gen(MIN_RANDOM_PORT);
@ -234,7 +265,6 @@ pkt_loop(struct thread_info *tinfo)
srv_cspec.dst = &options.server_spec;
next_ts = nm_get_uptime_ns();
last_ts = next_ts + options.rage_quit_time * MS2NS;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "pkt_loop <thread %d>: entering\n",
tinfo->id);
@ -262,10 +292,10 @@ pkt_loop(struct thread_info *tinfo)
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, each,
"locore_main <thread %d>: ", tinfo->id);
struct pkt_payload_epoch *pld_epoch;
struct epoch_info *einfo;
uint32_t epoch;
uint32_t id;
struct thread_info *other_t;
bool bool_expected = false;
int int_expected = STATE_RUNNING;
switch (type) {
case PKT_TYPE_LOAD_RESP:
@ -274,11 +304,9 @@ pkt_loop(struct thread_info *tinfo)
epoch = rte_be_to_cpu_32(
pld_epoch->epoch);
id = epoch_get_id(epoch);
epoch = epoch_get_epoch(epoch);
tinfo->total_pkts.fetch_add(1);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: packet %p epoch %d id %d.\n",
"pkt_loop <thread %d>: packet %p epoch 0x%x id %d.\n",
tinfo->id, (void *)rx_bufs[i],
epoch, id);
@ -291,26 +319,15 @@ pkt_loop(struct thread_info *tinfo)
break;
}
einfo = new struct epoch_info;
einfo->epoch = epoch;
einfo->ts = now;
other_t = options.s_thr_info.at(id);
if (epoch !=
other_t->cur_epoch.load()) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: packet %p invalid epoch %d != %d.\n",
tinfo->id,
(void *)rx_bufs[i], epoch,
other_t->cur_epoch.load());
break;
}
if (!other_t->epoch_recv
.compare_exchange_strong(
bool_expected, true)) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: failed to cmpxchg with thread %d.\n",
tinfo->id, other_t->id);
break;
}
other_t->mtx.lock();
other_t->recved_epochs.push_back(einfo);
other_t->mtx.unlock();
break;
case PKT_TYPE_FIN:
if (rte_is_same_ether_addr(
@ -332,14 +349,19 @@ pkt_loop(struct thread_info *tinfo)
tinfo->id);
}
uint32_t qps = calc_qps(now);
uint32_t qps;
uint32_t total_pkt;
uint32_t total_loss;
calc_stats(now, &qps,
&total_pkt, &total_loss);
struct pkt_hdr *pkt_hdr;
if (alloc_pkt_hdr(
options.mbuf_pool,
PKT_TYPE_FIN_ACK,
&options.s_master_cspec,
&tx_buf,
&tx_bufs[0],
&pkt_hdr) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt hdr\n");
@ -350,11 +372,16 @@ pkt_loop(struct thread_info *tinfo)
pkt_hdr->payload;
pld_qps->qps = rte_cpu_to_be_32(
qps);
pld_qps->total_loss =
rte_cpu_to_be_32(
total_loss);
pld_qps->total_pkts =
rte_cpu_to_be_32(total_pkt);
const uint16_t nb_tx =
rte_eth_tx_burst(
options.s_portid,
tinfo->txqid, &tx_buf,
tinfo->txqid, &tx_bufs[0],
1);
if (nb_tx != 1) {
@ -383,7 +410,72 @@ pkt_loop(struct thread_info *tinfo)
}
}
if (now >= next_ts && tinfo->epoch_recv.load()) {
// dequeue receved epochs
struct epoch_info *einfo;
tinfo->mtx.lock();
while (!tinfo->recved_epochs.empty()) {
// only dequeue, process later
einfo = tinfo->recved_epochs.front();
tinfo->recved_epochs.pop_front();
// XXX: might call into the allocator
// otherwise we need to have an array and do batching
// => complex code and don't think it's worth it
recved_epochs.push_back(einfo);
}
tinfo->mtx.unlock();
if (!recved_epochs.empty())
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: dequeued %lu received epochs\n",
tinfo->id, recved_epochs.size());
// process epochs
while (!recved_epochs.empty()) {
einfo = recved_epochs.back();
recved_epochs.pop_back();
auto it = sent_epochs.find(einfo->epoch);
if (it != sent_epochs.end()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: received epoch 0x%x\n",
tinfo->id, epoch_get_epoch(einfo->epoch));
if (einfo->ts > last_recv_ts) {
last_recv_ts = einfo->ts;
}
delete it->second;
sent_epochs.erase(it);
} else {
// we recved an epoch we never sent
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: received epoch 0x%x but never sent it. Packet loss?\n",
tinfo->id, einfo->epoch);
}
delete einfo;
}
// handle packet loss
for (auto it = sent_epochs.begin(); it != sent_epochs.end();) {
einfo = it->second;
if (now - einfo->ts >
options.pkt_loss_delay_ms * MS2NS) {
// timed out
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: epoch 0x%x is lost after not receiving for too long\n",
tinfo->id, einfo->epoch);
delete it->second;
it = sent_epochs.erase(it);
tinfo->lost_pkts.fetch_add(1);
} else {
++it;
}
}
// check to send the next packet
uint32_t total_send = 0;
while (now >= next_ts && sent_epochs.size() < options.depth && total_send < BURST_SIZE) {
struct pkt_payload_load *pld_load;
struct pkt_hdr *pkt_data;
next_ts += (int)(tinfo->ia_gen->generate() * S2NS);
@ -392,44 +484,57 @@ pkt_loop(struct thread_info *tinfo)
srv_cspec.dst_port = dst_port_gen.next();
srv_cspec.src_port = src_port_gen.next();
if (alloc_pkt_hdr(options.mbuf_pool, PKT_TYPE_LOAD,
&srv_cspec, &tx_buf, &pkt_data) != 0) {
&srv_cspec, &tx_bufs[total_send], &pkt_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt hdr\n");
}
// pre-increment the epoch
uint32_t epoch = tinfo->cur_epoch.fetch_add(1) + 1;
pld_load = (struct pkt_payload_load *)pkt_data->payload;
pld_load->load = rte_cpu_to_be_32(
tinfo->load_gen->generate());
pld_load->epoch = rte_cpu_to_be_32(
epoch_mk(tinfo->id, epoch));
tinfo->epoch_recv.store(false);
last_ts = now;
unsigned int epoch = epoch_mk(tinfo->id, cur_epoch);
pld_load->epoch = rte_cpu_to_be_32(epoch);
cur_epoch++;
einfo = new struct epoch_info;
einfo->epoch = epoch;
einfo->ts = now;
sent_epochs.insert({ epoch, einfo });
tinfo->total_pkts.fetch_add(1);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: sending packet %p with epoch %d\n",
tinfo->id, (void *)tx_buf, epoch);
"pkt_loop <thread %d>: sending packet %p with epoch 0x%x\n",
tinfo->id, (void *)tx_bufs[total_send], epoch);
total_send++;
}
if (total_send > 0) {
const uint16_t nb_tx = rte_eth_tx_burst(
options.s_portid, tinfo->txqid, &tx_buf, 1);
options.s_portid, tinfo->txqid, tx_bufs, total_send);
if (nb_tx != 1) {
if (nb_tx != total_send) {
rte_exit(
EXIT_FAILURE, "failed to send packet\n");
EXIT_FAILURE, "failed to send packet\n");
}
}
if (!tinfo->epoch_recv.load()) {
// if we haven't received the packet, get read to rage
// quit
if (now - last_ts > options.rage_quit_time * MS2NS) {
rte_exit(EXIT_FAILURE,
"waiting too long for resp. I QUIT!!\n");
}
// check rage quit
if (last_recv_ts == 0) {
last_recv_ts = nm_get_uptime_ns();
}
if (nm_get_uptime_ns() - last_recv_ts > options.rage_quit_time * MS2NS) {
rte_exit(EXIT_FAILURE,
"thread %d waiting too long for resp. I QUIT!!\n", tinfo->id);
}
}
// clean up
for (auto it = sent_epochs.begin(); it != sent_epochs.end();) {
delete it->second;
++it;
}
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: exiting loop...\n", tinfo->id);
}
@ -562,17 +667,20 @@ dump_options()
" verbosity = +%d\n"
" run time = %d\n"
" num threads = %d\n"
" rage quit time = %ld\n"
" rage quit time = %ul\n"
" cpu mask = 0x%lx\n"
" slave mode = %d\n"
" interarrival dist = %s\n"
" load dist = %s\n"
" qps = %d\n"
" host IP = 0x%x\n",
" host IP = 0x%x\n"
" depth = %u\n"
" packet loss threshold = %u\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING, options.run_time,
options.s_num_threads, options.rage_quit_time, options.cpu_mask,
options.slave_mode, options.ia_gen, options.ld_gen,
options.target_qps, options.s_host_spec.ip);
options.target_qps, options.s_host_spec.ip, options.depth,
options.pkt_loss_delay_ms);
}
static void
@ -590,7 +698,9 @@ usage()
" -l: load distribution\n"
" -r: rage quit time (in ms)\n"
" -q: target QPS\n"
" -H: host net spec\n");
" -H: host net spec\n"
" -D: max number of packets in flight\n"
" -L: the threshold in ms after which a packet should be considered lost\n");
}
int
@ -617,7 +727,8 @@ main(int argc, char *argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "vht:s:SA:i:l:r:q:H:")) != -1) {
while (
(c = getopt(argc, argv, "vht:s:SA:i:l:r:q:H:D:L:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
@ -676,6 +787,19 @@ main(int argc, char *argv[])
"invalid host net spec.\n");
}
break;
case 'D':
options.depth = strtol(optarg, nullptr, 10);
if (options.depth == 0) {
options.depth = UINT32_MAX;
}
break;
case 'L':
options.pkt_loss_delay_ms = strtol(
optarg, nullptr, 10);
if (options.pkt_loss_delay_ms == 0) {
options.pkt_loss_delay_ms = UINT32_MAX;
}
break;
default:
usage();
rte_exit(
@ -769,14 +893,17 @@ main(int argc, char *argv[])
// poor man's timer
uint32_t second = 0;
uint32_t qps = 0;
uint32_t qps;
uint32_t total_pkts;
uint32_t total_loss;
// this loop exit is signaled by SYNC_FIN in slave mode and by itself in
// non slave mode
while (options.s_state.load() != STATE_FIN) {
if (options.slave_mode != 1) {
if (second >= options.run_time) {
options.s_state.store(STATE_FIN);
qps = calc_qps(nm_get_uptime_ns());
calc_stats(nm_get_uptime_ns(), &qps,
&total_pkts, &total_loss);
break;
}
usleep(1 * S2US);
@ -795,7 +922,14 @@ main(int argc, char *argv[])
}
if (options.slave_mode != 1) {
fprintf(stdout, "main: total QPS = %d\n", qps);
fprintf(stdout, "main: total QPS = %d, packet loss = %2f%%\n",
qps, (double)total_loss / (double)total_pkts);
}
for (auto each : options.s_thr_info) {
delete each->load_gen;
delete each->ia_gen;
delete each;
}
// clean up

0
scripts/compile.sh Normal file → Executable file
View File

View File

@ -12,6 +12,8 @@ class khat_parser:
self.c_hrx = 0
self.c_stx = 0
self.c_srx = 0
self.total = 0
self.loss = 0
self.qps = 0
def __init__(self):
@ -33,11 +35,15 @@ class khat_parser:
first = True
for line in output.splitlines():
# the first line is qps
cells = line.split(',')
if (first):
self.qps = int(line)
if len(cells) != 3:
raise Exception("Invalid headline:" + line)
self.qps = int(cells[0])
self.total = int(cells[1])
self.loss = int(cells[2])
first = False
continue
cells = line.split(',')
if len(cells) != 8:
raise Exception("Invalid line:" + line)
pt = self.pt()

View File

@ -167,7 +167,7 @@ def keep_results():
tc.log_print(mvcmd)
sp.check_call(mvcmd, shell=True)
tc.log_print("=== Latency Summary:")
tc.log_print("=== Summary - qps: " + str(parser.qps) + " packet loss: " + str(float(parser.loss) / float(parser.total)) * 100.0 + "%" )
tc.log_print("=== Server HW:")
tc.log_print(par.mutilate_data.build_mut_output(parser.srv_hwlat, [parser.qps]) + "\n")
tc.log_print("=== Server SW:")