+libnm refactor and numa allocator support.

+khat threads now have numa-local memory.
This commit is contained in:
quackerd 2021-03-03 22:22:06 -05:00
parent b85777e6f0
commit 7fd7c7f776
16 changed files with 863 additions and 544 deletions

5
.gitignore vendored
View File

@ -268,4 +268,7 @@ cython_debug/
# Executables
*.exe
*.out
*.app
*.app
*.clangd
compile_commands.json

View File

@ -29,32 +29,31 @@ set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -mavx -msse4)
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11)
set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(KHAT_LINKLIBS pthread nm ntr)
set(CAT_LINKLIBS pthread nm ntr gen)
set(RAT_LINKLIBS pthread nm ntr gen)
add_library(nm libnm/nm.cc)
target_link_libraries(nm ${Hwloc_LIBRARIES} gen)
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
set(KHAT_LINKLIBS pthread nm ntr ${dpdk_LIBRARIES})
set(CAT_LINKLIBS pthread nm ntr gen ${dpdk_LIBRARIES})
set(RAT_LINKLIBS pthread nm ntr gen ${dpdk_LIBRARIES})
add_library(ntr libntr/ntr.c)
target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS})
add_library(gen libgen/generator.cc)
target_link_libraries(gen ${Hwloc_LIBRARIES})
target_compile_options(gen PRIVATE ${LIBGEN_CC_FLAGS})
add_executable(khat khat/khat.cc)
target_link_libraries(khat ${dpdk_LIBRARIES} ${KHAT_LINKLIBS})
add_library(nm libnm/nm.cc libnm/alloc.cc libnm/loadgen.cc libnm/topo.cc)
target_link_libraries(nm gen ${Hwloc_LIBRARIES})
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
add_executable(khat net/khat.cc)
target_link_libraries(khat ${KHAT_LINKLIBS})
target_compile_options(khat PRIVATE ${CC_FLAGS})
target_link_options(khat PRIVATE -L /usr/local/lib)
add_executable(cat cat/cat.cc)
target_link_libraries(cat ${dpdk_LIBRARIES} ${CAT_LINKLIBS})
add_executable(cat net/cat.cc)
target_link_libraries(cat ${CAT_LINKLIBS})
target_compile_options(cat PRIVATE ${CC_FLAGS})
target_link_options(cat PRIVATE -L /usr/local/lib)
add_executable(rat rat/rat.cc)
target_link_libraries(rat ${dpdk_LIBRARIES} ${RAT_LINKLIBS})
add_executable(rat net/rat.cc)
target_link_libraries(rat ${RAT_LINKLIBS})
target_compile_options(rat PRIVATE ${CC_FLAGS})
target_link_options(rat PRIVATE -L /usr/local/lib)

31
inc/defs.h Normal file
View File

@ -0,0 +1,31 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <immintrin.h>
#define DISALLOW_EVIL_CONSTRUCTORS(TypeName) \
TypeName(const TypeName &) = delete; \
void operator=(const TypeName &) = delete
constexpr static unsigned long S2NS = 1000000000UL;
constexpr static unsigned long S2US = 1000000UL;
constexpr static unsigned long MS2NS = 1000000UL;
constexpr static int NEXT_CPU_NULL = -1;
static inline int
cmask_get_next_cpu(uint64_t *mask)
{
int ffs = ffsll(*mask);
*mask &= ~(1ul << (ffs - 1));
return ffs - 1;
}
static inline int
cmask_get_num_cpus(const uint64_t mask)
{
return _mm_popcnt_u64(mask);
}
#define ATTR_UNUSED __attribute__((unused))

View File

@ -19,8 +19,6 @@
#include <stdlib.h>
#include <string.h>
#include "util.h"
#include <string>
#include <utility>
#include <vector>

View File

@ -21,6 +21,14 @@
#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
#define IP_ADDR_FMT_SIZE 15
constexpr static uint32_t MAX_JUMBO_MTU = 9000;
constexpr static uint32_t MAX_STANDARD_MTU = 1500;
static inline int mtu_to_pkt_size(int mtu)
{
return mtu + RTE_ETHER_HDR_LEN + RTE_ETHER_CRC_LEN;
}
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
const static struct rte_ether_addr POU_MAC {
0x01, 0x00, 0x5e, 0x00, 0x01, 0x81
@ -135,7 +143,8 @@ str_to_netspec(char *str, struct net_spec *out)
constexpr static uint16_t PKT_TYPE_LOAD = 0;
struct pkt_payload_load {
uint32_t epoch;
uint32_t load;
uint32_t which; // which word (thread)
uint32_t load; // how many cache lines
};
constexpr static uint16_t PKT_TYPE_PROBE = 1;
@ -305,21 +314,26 @@ is_l2ts_pkt(uint16_t type)
// fills the packet with the information except for the payload itself
static inline struct pkt_hdr *
construct_pkt_hdr(
struct rte_mbuf *buf, uint16_t type, const struct conn_spec *conn)
struct rte_mbuf *buf, uint16_t type, const struct conn_spec *conn, int pkt_pad_sz)
{
rte_pktmbuf_reset(buf);
const uint32_t total_sz = sizeof(struct pkt_hdr) +
int total_sz = sizeof(struct pkt_hdr) +
expected_payload_size[type];
if (pkt_pad_sz > total_sz) {
total_sz = pkt_pad_sz;
}
auto pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz);
if (pkt_data == nullptr)
return nullptr;
struct rte_ether_hdr *eth_hdr;
struct rte_ipv4_hdr *ipv4_hdr;
struct rte_udp_hdr *udp_hdr;
bool is_ts_pkt = is_l2ts_pkt(type);
if (pkt_data == nullptr)
return nullptr;
// single segment
buf->nb_segs = 1;
@ -349,8 +363,7 @@ construct_pkt_hdr(
} else {
ipv4_hdr->dst_addr = rte_cpu_to_be_32(conn->dst->ip);
}
ipv4_hdr->total_length = rte_cpu_to_be_16(
sizeof(struct pkt_hdr) - sizeof(struct rte_ether_hdr));
ipv4_hdr->total_length = rte_cpu_to_be_16(total_sz - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr));
ipv4_hdr->hdr_checksum = 0;
buf->l3_len = sizeof(struct rte_ipv4_hdr);
@ -363,9 +376,7 @@ construct_pkt_hdr(
udp_hdr->dst_port = rte_cpu_to_be_16(conn->dst_port);
}
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct pkt_hdr) +
expected_payload_size[type] - sizeof(struct rte_ether_hdr) -
sizeof(struct rte_udp_hdr));
udp_hdr->dgram_len = total_sz - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr);
buf->l4_len = sizeof(struct rte_udp_hdr);
if (is_ts_pkt) {
@ -386,7 +397,7 @@ construct_pkt_hdr(
// returns 0 on success
static inline int
alloc_pkt_hdr(struct rte_mempool *pool, uint16_t type,
const struct conn_spec *conn, struct rte_mbuf **mbuf_out,
const struct conn_spec *conn, int pkt_pad_sz, struct rte_mbuf **mbuf_out,
struct pkt_hdr **hdr_out)
{
struct pkt_hdr *hdr;
@ -402,7 +413,7 @@ alloc_pkt_hdr(struct rte_mempool *pool, uint16_t type,
// print_mac(&conn->dst->mac_addr);
// printf("\n");
hdr = construct_pkt_hdr(pkt, type, conn);
hdr = construct_pkt_hdr(pkt, type, conn, pkt_pad_sz);
if (hdr == nullptr) {
rte_pktmbuf_free(pkt);
return -1;

View File

@ -2,31 +2,20 @@
#include <rte_ethdev.h>
#include <rte_ip.h>
#define DISALLOW_EVIL_CONSTRUCTORS(TypeName) \
TypeName(const TypeName &) = delete; \
void operator=(const TypeName &) = delete
constexpr static unsigned long S2NS = 1000000000UL;
constexpr static unsigned long S2US = 1000000UL;
constexpr static unsigned long MS2NS = 1000000UL;
constexpr static uint16_t MIN_RANDOM_PORT = 1000;
constexpr static uint16_t DEFAULT_RAT_PORT = 1234;
constexpr static unsigned int INIT_DELAY = 1;
constexpr static unsigned int MAX_NODES = 64;
constexpr static int NEXT_CPU_NULL = -1;
static inline int
cmask_get_next_cpu(uint64_t *mask)
static inline void
tx_burst_all(int portid, int txqid, struct rte_mbuf ** tx_bufs, int sz)
{
int ffs = ffsll(*mask);
*mask &= ~(1ul << (ffs - 1));
return ffs - 1;
}
static inline int
cmask_get_num_cpus(const uint64_t mask)
{
return _mm_popcnt_u64(mask);
int remaining = sz;
while(remaining > 0) {
remaining -= rte_eth_tx_burst(
portid, txqid, &tx_bufs[sz - remaining],
remaining);
}
}
// constexpr static int LATENCY_MEASURE_TIMES = 10000;

View File

@ -1,38 +1,59 @@
#pragma once
#include "gen.h"
#include "util.h"
#include "defs.h"
#include <atomic>
#include <cstdint>
#include <vector>
constexpr static int NM_LEVEL_NUMA = 0;
constexpr static int NM_LEVEL_CPU = 1;
constexpr static int NM_LEVEL_CORE = 2;
constexpr static unsigned int NM_LEVEL_NUMA = 0;
constexpr static unsigned int NM_LEVEL_CPU = 1;
constexpr static unsigned int NM_LEVEL_CORE = 2;
constexpr static unsigned int NM_MAX_LEVEL = NM_LEVEL_CORE + 1;
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
constexpr static int NM_MAX_OBJS_PER_LVL = 256;
std::vector<struct nm_obj *> *nm_get_nodes();
std::vector<struct nm_obj *> *nm_get_cpus();
std::vector<struct nm_obj *> *nm_get_cores();
// misc functions
// 0 on success
// -1 on error
// 0 on success -1 on error
int nm_init(int verbosity);
uint64_t nm_tsc2ns(uint64_t tsc);
uint64_t nm_get_uptime_ns();
// topology stuff
struct nm_obj;
struct nm_obj *
nm_find_parent_obj(struct nm_obj * start, int parent_level);
int
nm_obj_count(int level);
struct nm_obj *
nm_obj_from_id(int level, int id);
struct nm_obj *
nm_obj_find_parent(struct nm_obj * start, int parent_level);
int
nm_obj_get_id(struct nm_obj * obj);
int
nm_obj_get_level(struct nm_obj * obj);
static inline int
get_node_from_core(int coreid)
{
return nm_obj_get_id(nm_obj_find_parent(nm_obj_from_id(NM_LEVEL_CORE, coreid), NM_LEVEL_NUMA));
}
// memload generator
class memload_generator {
private:
DISALLOW_EVIL_CONSTRUCTORS(memload_generator);
constexpr static uint32_t FROM_REGION_CNT = 0x2; // 2 regions
struct thread_info {
pthread_t pthr;
std::atomic<uint64_t> num_trans;
@ -42,14 +63,13 @@ class memload_generator {
constexpr static int INIT_START = 0;
constexpr static int INIT_SUCCESS = 1;
constexpr static int INIT_FAILED = 2;
void *region;
void *from_region;
void *to_region;
uint32_t to_domainid;
uint32_t from_domainid;
};
constexpr static uint32_t REGION_ZS = 0x1000;
constexpr static uint32_t TRANSACTION_CNT =
0x8; // how many transactions per cycle
constexpr static uint32_t TRANSACTION_SZ =
sizeof(uint64_t); // how large each transaction is
constexpr static uint32_t TRANSACTION_SZ = 0x2000; // transaction sz must >= region_sz
constexpr static uint32_t REGION_SZ = 0x2000 * 0x2000; // 64MB per core
std::vector<struct thread_info *> thr_infos;
std::atomic<int> state;
@ -70,3 +90,10 @@ class memload_generator {
uint64_t get_bps();
~memload_generator();
};
// allocators
void *
nm_malloc(unsigned int node, size_t size);
void
nm_free(unsigned int node, void * addr);

96
libnm/alloc.cc Normal file
View File

@ -0,0 +1,96 @@
#include <pthread.h>
#include <sys/types.h>
#include <sys/cpuset.h>
#include <sys/domainset.h>
#include <sys/thr.h>
#include <sys/mman.h>
#include <cerrno>
#include "nmp.h"
static pthread_mutex_t alloc_lock;
static int nm_mem_idx[NM_MAX_OBJS_PER_LVL];
static void* nm_mem_regions[NM_MAX_OBJS_PER_LVL];
static constexpr unsigned int MEM_OBJ_SIZE = 4096; // 4k
static constexpr unsigned int MEM_OBJ_NUM = 1024 * 256; // 4k * 1024 * 256 = 1024MB per node
int
nm_alloc_init()
{
long tid;
thr_self(&tid);
domainset_t orig_dom;
int orig_policy;
pthread_mutex_init(&alloc_lock, nullptr);
DOMAINSET_ZERO(&orig_dom);
// save existing thread's allocation strategy
int ret = cpuset_getdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, tid, sizeof(orig_dom), &orig_dom, &orig_policy);
if (ret != 0) {
return ret;
}
domainset_t tmp_domain;
for (int i = 0; i < nm_obj_count(NM_LEVEL_NUMA); i++) {
DOMAINSET_ZERO(&tmp_domain);
DOMAINSET_SET(i, &tmp_domain);
ret = cpuset_setdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, tid, sizeof(tmp_domain), &tmp_domain, DOMAINSET_POLICY_PREFER);
if (ret != 0) {
if (nm_get_verbose() > 0) {
fprintf(stdout, "libnm: cpuset_setdomain failed with %d\n", errno);
}
return ret;
}
if ((nm_mem_regions[i] = mmap(nullptr, MEM_OBJ_NUM * MEM_OBJ_SIZE, PROT_READ | PROT_WRITE,
MAP_ANON | MAP_ALIGNED_SUPER | MAP_NOCORE | MAP_PRIVATE,
-1, 0)) == MAP_FAILED) {
if (nm_get_verbose() > 0) {
fprintf(stdout, "libnm: mmap failed with %d\n", errno);
}
return -1;
}
// touch the pages to prefault the pages
for (unsigned int j = 0; j < MEM_OBJ_NUM; j++) {
*(uint32_t*)((char*)nm_mem_regions[i] + j * MEM_OBJ_SIZE) = 0;
}
nm_mem_idx[i] = 0;
if (nm_get_verbose() > 0) {
fprintf(stdout, "libnm: reserved %u bytes (%u MB) on node %d\n", MEM_OBJ_NUM * MEM_OBJ_SIZE, MEM_OBJ_SIZE * MEM_OBJ_NUM / 1024 / 1024, i);
}
}
// restore existing thread's allocation strategy
ret = cpuset_setdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, tid, sizeof(orig_dom), &orig_dom, orig_policy);
return ret;
}
void *
nm_malloc(unsigned int node, size_t size)
{
void * ret = nullptr;
int num_objs = (size + MEM_OBJ_SIZE - 1) / MEM_OBJ_SIZE;
pthread_mutex_lock(&alloc_lock);
if ((int)MEM_OBJ_NUM - nm_mem_idx[node] >= num_objs) {
ret = (char*)nm_mem_regions[node] + MEM_OBJ_SIZE * nm_mem_idx[node];
nm_mem_idx[node] += num_objs;
}
pthread_mutex_unlock(&alloc_lock);
return ret;
}
void
nm_free(unsigned int node ATTR_UNUSED, void * addr ATTR_UNUSED)
{
// dummy function
}

194
libnm/loadgen.cc Normal file
View File

@ -0,0 +1,194 @@
#include "nmp.h"
#include <sys/cpuset.h>
#include <sys/domainset.h>
#include <sys/thr.h>
#include <pthread.h>
#include <pthread_np.h>
void *
memload_generator::worker_thrd(void *_tinfo)
{
auto *tinfo = (struct thread_info *)_tinfo;
long tid;
thr_self(&tid);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator <thread %ld>: from domain %d to %d\n",
tid, tinfo->from_domainid, tinfo->to_domainid);
}
tinfo->from_region = nm_malloc(tinfo->from_domainid, REGION_SZ * FROM_REGION_CNT);
tinfo->to_region = nm_malloc(tinfo->to_domainid, REGION_SZ * FROM_REGION_CNT);
if (tinfo->from_region == nullptr || tinfo->to_region == nullptr) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (nm_get_verbose() > 0) {
fprintf(stderr,
"memload_generator <thread %ld>: failed to allocate memory\n", tid);
}
return nullptr;
}
// populate the region with 1/2/3s
for(uint i = 0; i < FROM_REGION_CNT; i++) {
memset((char*)tinfo->from_region + i * REGION_SZ, i + 1, REGION_SZ);
}
tinfo->init_status.store(thread_info::INIT_SUCCESS);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator <thread %ld>: init finished, waiting for start...\n",
tid);
}
while (tinfo->state->load() == STATE_READY) {
};
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator <thread %ld>: running...\n", tid);
}
uint64_t offset = 0;
uint64_t next_ts = nm_get_uptime_ns();
while (tinfo->state->load() == STATE_START) {
// generate traffic
uint64_t now = nm_get_uptime_ns();
if (now >= next_ts) {
next_ts = next_ts + tinfo->ia_gen->generate() * S2NS;
uint64_t to_offset = offset % REGION_SZ;
uint64_t from_offset = offset % (REGION_SZ * FROM_REGION_CNT);
memcpy((char *)tinfo->to_region + to_offset, (char *)tinfo->from_region + from_offset, TRANSACTION_SZ);
offset += TRANSACTION_SZ;
tinfo->num_trans.fetch_add(1);
}
}
nm_free(tinfo->from_domainid, tinfo->from_region);
nm_free(tinfo->to_domainid, tinfo->to_region);
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator <thread %ld>: exiting...\n", tid);
}
return nullptr;
}
memload_generator::memload_generator(
uint64_t from_cmask, uint64_t to_cmask, uint64_t bps, bool *success)
{
*success = false;
state.store(STATE_READY);
int nextcore;
int to_coreid = cmask_get_next_cpu(&to_cmask);
int num_cores = cmask_get_num_cpus(from_cmask);
cpuset_t cpuset;
if (to_coreid == NEXT_CPU_NULL || num_cores == 0) {
return;
}
while ((nextcore = cmask_get_next_cpu(&from_cmask)) != NEXT_CPU_NULL) {
auto *info = new struct thread_info;
pthread_attr_t attr;
info->ia_gen = createGenerator("exponential");
info->ia_gen->set_lambda(((double)(bps) / (double)num_cores) /
(double)(REGION_SZ));
info->num_trans.store(0);
info->to_domainid = nm_obj_get_id(nm_obj_find_parent(nm_obj_from_id(NM_LEVEL_CORE, to_coreid), NM_LEVEL_NUMA));
info->from_domainid = nm_obj_get_id(nm_obj_find_parent(nm_obj_from_id(NM_LEVEL_CORE, nextcore), NM_LEVEL_NUMA));
info->init_status.store(thread_info::INIT_START);
info->state = &state;
CPU_ZERO(&cpuset);
CPU_SET(nextcore, &cpuset);
pthread_attr_init(&attr);
pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &cpuset);
pthread_create(&info->pthr, &attr, worker_thrd, info);
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator: created thread on core %d\n",
nextcore);
}
thr_infos.push_back(info);
}
if (nm_get_verbose() > 0) {
fprintf(
stdout, "memload_generator: waiting for thread init...\n");
}
bool failed = false;
uint num_success = 0;
while (num_success < thr_infos.size() && !failed) {
num_success = 0;
for (auto i : thr_infos) {
if (i->init_status.load() ==
thread_info::INIT_SUCCESS) {
num_success++;
}
if (i->init_status.load() == thread_info::INIT_FAILED) {
failed = true;
}
}
}
*success = num_success == thr_infos.size();
if (nm_get_verbose() > 0) {
fprintf(stdout,
"memload_generator: exiting constructor. Success: %d...\n",
success ? 1 : 0);
}
}
void
memload_generator::start()
{
if (this->state.load() == STATE_READY) {
state.store(STATE_START);
begin_ts = nm_get_uptime_ns();
}
}
void
memload_generator::stop()
{
if (this->state.load() != STATE_STOP) {
stop_ts = nm_get_uptime_ns();
state.store(STATE_STOP);
}
}
uint64_t
memload_generator::get_bps()
{
uint64_t now = state.load() == STATE_STOP ? stop_ts :
nm_get_uptime_ns();
uint64_t total_transactions = 0;
for (auto i : thr_infos) {
total_transactions += i->num_trans.load();
}
return (double)(TRANSACTION_SZ * total_transactions) /
(double)((now - begin_ts) / (S2NS));
}
memload_generator::~memload_generator()
{
if (this->state.load() != STATE_STOP) {
stop();
}
for (auto i : thr_infos) {
pthread_join(i->pthr, nullptr);
delete i->ia_gen;
delete i;
}
}

View File

@ -1,63 +1,25 @@
#include <sys/types.h>
#include <sys/cpuset.h>
#include <sys/domainset.h>
#include <sys/mman.h>
#include <sys/sysctl.h>
#include <sys/thr.h>
#include <hwloc.h>
#include <util.h>
#include <x86intrin.h>
#include "nm.h"
#include <algorithm>
#include <atomic>
#include <vector>
#include <cerrno>
#include "nm.h"
#include "nmp.h"
#include "defs.h"
static const char *SYSCTL_TSC = "machdep.tsc_freq";
static int verbose = 0;
static uint64_t sysctl_tsc_freq = 0;
static bool
nm_obj_comparator(struct nm_obj *a, struct nm_obj *b)
{
return a->id < b->id;
}
static int verbose = 0;
static std::vector<struct nm_obj *> nodes;
static std::vector<struct nm_obj *> cores;
static std::vector<struct nm_obj *> cpus;
std::vector<struct nm_obj *> *
nm_get_nodes()
int nm_get_verbose()
{
return &nodes;
}
std::vector<struct nm_obj *> *
nm_get_cpus()
{
return &cpus;
}
std::vector<struct nm_obj *> *
nm_get_cores()
{
return &cores;
}
hwloc_obj_t
get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type)
{
while (obj != nullptr) {
if (obj->type == type) {
break;
}
obj = obj->parent;
}
return obj;
return verbose;
}
uint64_t
@ -88,8 +50,7 @@ nm_init(int verbosity)
SYSCTL_TSC, &sysctl_tsc_freq, &sz, nullptr, 0)) < 0) {
if (verbose) {
fprintf(stderr,
"libnm: failed to query tsc frequency via sysctl (%d)\n",
errno);
"libnm: failed to query tsc frequency via sysctl (%d)\n", errno);
}
return ret;
}
@ -98,296 +59,12 @@ nm_init(int verbosity)
fprintf(stdout, "libnm: tsc frequency: %lu\n", sysctl_tsc_freq);
}
// init numa stuff
hwloc_topology *topo;
if ((ret = hwloc_topology_init(&topo)) != 0) {
ret = nm_topo_init();
if (ret != 0) {
return ret;
}
if ((ret = hwloc_topology_load(topo)) != 0)
return ret;
// populate numa nodes
hwloc_obj_t obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PACKAGE, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_NUMA;
each->parent = nullptr;
nodes.push_back(each);
if (verbose) {
fprintf(stdout, "libnm: identified NUMA node %d\n",
each->id);
}
}
std::sort(nodes.begin(), nodes.end(), nm_obj_comparator);
// populate cpus
obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_CORE, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CPU;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_PACKAGE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = nodes.at(parent->logical_index);
each->parent->children.push_back(each);
cpus.push_back(each);
if (verbose) {
fprintf(stdout,
"libnm: identified CPU %d on NUMA node %d\n",
each->id, each->parent->id);
}
}
std::sort(cpus.begin(), cpus.end(), nm_obj_comparator);
// populate cores
obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PU, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CORE;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_CORE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = cpus.at(parent->logical_index);
each->parent->children.push_back(each);
cores.push_back(each);
if (verbose) {
fprintf(stdout,
"libnm: identified core %d on CPU %d, NUMA node %d\n",
each->id, each->parent->id,
each->parent->parent->id);
}
}
std::sort(cores.begin(), cores.end(), nm_obj_comparator);
ret = nm_alloc_init();
return ret;
}
void *
memload_generator::worker_thrd(void *_tinfo)
{
auto *tinfo = (struct thread_info *)_tinfo;
long id;
domainset_t set;
DOMAINSET_ZERO(&set);
DOMAINSET_SET(tinfo->to_domainid, &set);
// get the id
thr_self(&id);
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: setting domain to %d\n",
id, tinfo->to_domainid);
}
// set allocation strategy
if (cpuset_setdomain(CPU_LEVEL_WHICH, CPU_WHICH_TID, id,
sizeof(domainset_t), &set, DOMAINSET_POLICY_PREFER) != 0) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (verbose) {
printf(
"memload_generator <thread %ld>: cpuset_setdomain failed with %d\n",
id, errno);
}
return nullptr;
}
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: mmap() memory...\n", id);
}
// mmap anonymous page
if ((tinfo->region = mmap(nullptr, REGION_ZS, PROT_READ | PROT_WRITE,
MAP_ANON | MAP_ALIGNED_SUPER | MAP_NOCORE | MAP_PRIVATE |
MAP_PREFAULT_READ,
-1, 0)) == MAP_FAILED) {
tinfo->init_status.store(thread_info::INIT_FAILED);
if (verbose) {
printf(
"memload_generator <thread %ld>: mmap failed with %d\n",
id, errno);
}
return nullptr;
}
tinfo->init_status.store(thread_info::INIT_SUCCESS);
if (verbose) {
fprintf(stdout,
"memload_generator <thread %ld>: init finished, waiting for start...\n",
id);
}
while (tinfo->state->load() == STATE_READY) {
};
if (verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: running...\n", id);
}
uint64_t next_ts = nm_get_uptime_ns();
while (tinfo->state->load() == STATE_START) {
// generate traffic
uint64_t now = nm_get_uptime_ns();
if (now >= next_ts) {
next_ts = next_ts + tinfo->ia_gen->generate() * S2NS;
for (uint i = 0; i < TRANSACTION_CNT; i++) {
// memcpy((uint64_t *)(char *)tinfo->region + i,
// &now, sizeof(now));
_mm_stream_si64(
(long long *)((char *)tinfo->region +
i * TRANSACTION_SZ),
(long long)now);
}
tinfo->num_trans.fetch_add(1);
}
}
munmap(tinfo->region, REGION_ZS);
if (verbose) {
fprintf(
stdout, "memload_generator <thread %ld>: exiting...\n", id);
}
return nullptr;
}
memload_generator::memload_generator(
uint64_t from_cmask, uint64_t to_cmask, uint64_t bps, bool *success)
{
*success = false;
state.store(STATE_READY);
int nextcore;
int to_coreid = cmask_get_next_cpu(&to_cmask);
int num_cores = cmask_get_num_cpus(from_cmask);
cpuset_t cpuset;
if (to_coreid == NEXT_CPU_NULL || num_cores == 0) {
return;
}
while ((nextcore = cmask_get_next_cpu(&from_cmask)) != NEXT_CPU_NULL) {
auto *info = new struct thread_info;
pthread_attr_t attr;
info->ia_gen = createGenerator("exponential");
info->ia_gen->set_lambda(((double)(bps) / (double)num_cores) /
(double)(TRANSACTION_CNT * TRANSACTION_SZ));
info->num_trans.store(0);
info->to_domainid = cores.at(to_coreid)->parent->parent->id;
info->init_status.store(thread_info::INIT_START);
info->state = &state;
CPU_ZERO(&cpuset);
CPU_SET(nextcore, &cpuset);
pthread_attr_init(&attr);
pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &cpuset);
pthread_create(&info->pthr, &attr, worker_thrd, info);
if (verbose) {
fprintf(stdout,
"memload_generator: created thread on core %d\n",
nextcore);
}
thr_infos.push_back(info);
}
if (verbose) {
fprintf(
stdout, "memload_generator: waiting for thread init...\n");
}
bool failed = false;
uint num_success = 0;
while (num_success < thr_infos.size() && !failed) {
num_success = 0;
for (auto i : thr_infos) {
if (i->init_status.load() ==
thread_info::INIT_SUCCESS) {
num_success++;
}
if (i->init_status.load() == thread_info::INIT_FAILED) {
failed = true;
}
}
}
*success = num_success == thr_infos.size();
if (verbose) {
fprintf(stdout,
"memload_generator: exiting constructor. Success: %d...\n",
success ? 1 : 0);
}
}
void
memload_generator::start()
{
if (this->state.load() == STATE_READY) {
state.store(STATE_START);
begin_ts = nm_get_uptime_ns();
}
}
void
memload_generator::stop()
{
if (this->state.load() != STATE_STOP) {
stop_ts = nm_get_uptime_ns();
state.store(STATE_STOP);
}
}
uint64_t
memload_generator::get_bps()
{
uint64_t now = state.load() == STATE_STOP ? stop_ts :
nm_get_uptime_ns();
uint64_t total_transactions = 0;
for (auto i : thr_infos) {
total_transactions += i->num_trans.load();
}
return (double)(TRANSACTION_CNT * TRANSACTION_SZ * total_transactions) /
(double)((now - begin_ts) / (S2NS));
}
memload_generator::~memload_generator()
{
if (this->state.load() != STATE_STOP) {
stop();
}
for (auto i : thr_infos) {
pthread_join(i->pthr, nullptr);
delete i->ia_gen;
delete i;
}
}

9
libnm/nmp.h Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include "nm.h"
int nm_topo_init();
int nm_alloc_init();
int nm_get_verbose();

205
libnm/topo.cc Normal file
View File

@ -0,0 +1,205 @@
#include "nm.h"
#include "nmp.h"
#include <cstdio>
#include <cstdlib>
#include <hwloc.h>
constexpr static int NM_MAX_CHILDREN = 128;
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
int num_children;
struct nm_obj * children[NM_MAX_CHILDREN];
};
static int size_tbl[NM_MAX_LEVEL] = { 0 };
static struct nm_obj * obj_tbl[NM_MAX_LEVEL][NM_MAX_OBJS_PER_LVL];
static hwloc_obj_t
get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type)
{
obj = obj->parent;
while (obj != nullptr) {
if (obj->type == type) {
return obj;
}
obj = obj->parent;
}
return nullptr;
}
// static int
// obj_comparator(const void * a, const void * b)
// {
// return ((struct nm_obj *)a)->id - ((struct nm_obj *)b)->id;
// }
static inline int
is_level_valid(int level)
{
return level < (int)NM_MAX_LEVEL;
}
int
nm_obj_count(int level)
{
if (is_level_valid(level)) {
return size_tbl[level];
}
return -1;
}
struct nm_obj *
nm_obj_from_id(int level, int id)
{
if (is_level_valid(level) && id <= size_tbl[level]) {
return obj_tbl[level][id];
}
return nullptr;
}
struct nm_obj *
nm_obj_find_parent(struct nm_obj * start, int parent_level)
{
struct nm_obj * ret = nullptr;
while (start != nullptr) {
if (parent_level == start->level) {
ret = start;
break;
}
start = start->parent;
}
return ret;
}
int
nm_obj_get_id(struct nm_obj * obj)
{
return obj->id;
}
int
nm_obj_get_level(struct nm_obj * obj)
{
return obj->level;
}
static int
validate_objs(int level)
{
for (int i = 0; i < size_tbl[level]; i++) {
struct nm_obj * each = obj_tbl[level][i];
if (each->id != i) {
return 0;
}
}
return 1;
}
static int
add_obj(int id, int level, struct nm_obj * parent)
{
if (size_tbl[level] >= NM_MAX_OBJS_PER_LVL) {
return -1;
}
auto each = (struct nm_obj *)malloc(sizeof(struct nm_obj));
each->id = id;
each->level = level;
each->num_children = 0;
each->parent = parent;
if (parent != nullptr) {
// add children
if (parent->num_children >= NM_MAX_CHILDREN) {
return -1;
} else {
parent->children[parent->num_children] = each;
parent->num_children++;
}
}
obj_tbl[level][size_tbl[level]] = each;
size_tbl[level]++;
return 0;
}
static int
add_level(hwloc_topology * topo, hwloc_obj_type_t hwloc_level, hwloc_obj_type_t hwloc_plevel, int level, int plevel)
{
// populate numa nodes
hwloc_obj_t obj = nullptr;
hwloc_obj_t pobj = nullptr;
struct nm_obj *parent = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, hwloc_level, obj);
if (obj == nullptr) {
break;
}
pobj = get_parent_type(obj, hwloc_plevel);
if (pobj != nullptr) {
parent = obj_tbl[plevel][pobj->logical_index];
}
if (add_obj(obj->logical_index, level, parent) != 0) {
if (nm_get_verbose() > 0) {
fprintf(stderr, "libnm: failed to add object %d.\n", obj->logical_index);
}
return -1;
}
if (nm_get_verbose() > 0) {
fprintf(stdout, "libnm: identified id %d type %d parent %d type %d\n", obj->logical_index, level, parent == nullptr ? -1 : parent->id, plevel);
}
}
// sort
// std::qsort(obj_tbl[level], size_tbl[level], sizeof(struct nm_obj), obj_comparator);
if (!validate_objs(level)) {
if (nm_get_verbose() > 0) {
fprintf(stdout, "libnm: objects are shuffled at level %d.\n", level);
}
return -1;
}
return 0;
}
int
nm_topo_init()
{
int ret;
// init numa stuff
hwloc_topology *topo;
if ((ret = hwloc_topology_init(&topo)) != 0) {
return ret;
}
if ((ret = hwloc_topology_load(topo)) != 0)
return ret;
if ((ret = add_level(topo, HWLOC_OBJ_PACKAGE, HWLOC_OBJ_PACKAGE, NM_LEVEL_NUMA, NM_LEVEL_NUMA)) != 0) {
return ret;
}
if ((ret = add_level(topo, HWLOC_OBJ_CORE, HWLOC_OBJ_PACKAGE, NM_LEVEL_CPU, NM_LEVEL_NUMA)) != 0) {
return ret;
}
if ((ret = add_level(topo, HWLOC_OBJ_PU, HWLOC_OBJ_CORE, NM_LEVEL_CORE, NM_LEVEL_CPU)) != 0) {
return ret;
}
return ret;
}

View File

@ -13,8 +13,8 @@
#include "gen.h"
#include "nm.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
#include "net/pkt.h"
#include "net/util.h"
#include <atomic>
#include <ctime>
@ -224,7 +224,7 @@ send_all_slaves(uint16_t type)
for (unsigned int i = 0; i < options.slaves.size(); i++) {
struct pkt_hdr *hdr;
cspec.dst = options.slaves.at(i);
if (alloc_pkt_hdr(options.mbuf_pool, type, &cspec, &tx_bufs[i],
if (alloc_pkt_hdr(options.mbuf_pool, type, &cspec, 0, &tx_bufs[i],
&hdr) != 0) {
rte_exit(EXIT_FAILURE, "failed to alloc packet\n");
}
@ -339,7 +339,7 @@ wait_for_slaves(uint16_t etype, struct rte_mbuf **out)
if (now - start > SLAVES_MAX_WAIT_MS * MS2NS) {
rte_exit(
EXIT_FAILURE, "waiting for too long. I QUIT!!");
EXIT_FAILURE, "cat: waiting for too long %d. I QUIT!!", etype);
}
}
}
@ -554,7 +554,7 @@ pkt_loop()
options.s_host_conn.src_port = port_gen.next();
if (alloc_pkt_hdr(options.mbuf_pool,
PKT_TYPE_PROBE, &options.s_host_conn,
PKT_TYPE_PROBE, &options.s_host_conn, 0,
&tx_buf, &pkt_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc probe packet.\n");
@ -663,7 +663,6 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
if (!rte_eth_dev_is_valid_port(portid)) {
return -1;

View File

@ -11,8 +11,8 @@
#include "nm.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
#include "net/pkt.h"
#include "net/util.h"
#include <atomic>
#include <ctime>
@ -23,6 +23,8 @@ constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 2048;
constexpr static unsigned int TX_RING_SIZE = 2048;
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int CACHELINE_SIZE = 64;
constexpr static uint16_t THREAD_LOAD_BUFFER_SZ = 16384;
constexpr static size_t MEMPOOL_NAME_BUF_LEN = 64;
static const struct rte_mbuf_dynfield rte_mbuf_dynfield_probe_flag = {
@ -52,6 +54,7 @@ struct thread_info {
int txqid;
int lcore_id;
int node_id;
void * cache_lines;
};
// state machine:
@ -75,6 +78,9 @@ struct options_t {
uint64_t cpuset { 0x4 }; // 2nd core
uint64_t memmask { 0x0 }; // same socket as the NIC
char mempool_name_buf[MEMPOOL_NAME_BUF_LEN];
bool jumbo_frame_enabled { false }; // setting this to true changes mbuf size and mtu
int port_mtu { MAX_STANDARD_MTU };
int thread_cacheline_cnt = { 128 };
bool mlg_enabled { false };
uint64_t mlg_bps { 0 };
uint64_t mlg_cmask { 0 };
@ -210,6 +216,8 @@ locore_main(void *ti)
// when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
char * thrd_load_buffer = (char *)nm_malloc(get_node_from_core(tinfo->lcore_id), THREAD_LOAD_BUFFER_SZ);
// XXX: hack hardcode to be larger than MTU
bool pending_probe = false;
@ -281,7 +289,7 @@ locore_main(void *ti)
options
.s_mempools[tinfo->node_id],
PKT_TYPE_PROBE_RESP,
&options.s_probe_info.cspec,
&options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
@ -304,16 +312,33 @@ locore_main(void *ti)
struct net_spec src;
struct net_spec dst;
// touch the unused data to pretend that we read those dummy fields
memcpy(thrd_load_buffer, pkt_data->payload, MIN(pkt_buf->data_len - sizeof(struct pkt_hdr), THREAD_LOAD_BUFFER_SZ));
// perform the load
// auto pld = (struct pkt_payload_load *)pkt_data->payload;
// uint32_t which = rte_be_to_cpu_32(pld->which);
// uint32_t load = rte_be_to_cpu_32(pld->load);
// uint32_t thrd = (which & 0xFFFF) % options.s_thr_info.size();
// uint32_t start = (which >> 16) % options.thread_cacheline_cnt;
// ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "", );
// for (uint j = 0; j < load; j++) {
// *(uint32_t *)
// start = (start + 1) % options.thread_cacheline_cnt;
// }
// reply
pkt_hdr_to_netspec(pkt_data, &src,
&cspec.dst_port, &dst, &cspec.src_port);
cspec.dst = &src;
cspec.src = &dst;
// printf("LOAD PKT SIZE: %d\n", bufs[i]->data_len);
// we reply to load packet regardless of the
// server state
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_LOAD_RESP, &cspec, &pkt_buf,
PKT_TYPE_LOAD_RESP, &cspec, 0, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
@ -336,15 +361,8 @@ locore_main(void *ti)
rte_pktmbuf_free(bufs[i]);
}
// send the packets
if (nb_tx > 0) {
const uint16_t nb_tx_succ = rte_eth_tx_burst(
options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE,
"failed to send some packets.\n");
}
}
// send all packets
tx_burst_all(options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
// we wanna check every loop not only when there are packets
if (pending_probe) {
@ -362,8 +380,8 @@ locore_main(void *ti)
if (alloc_pkt_hdr(
options.s_mempools[tinfo->node_id],
PKT_TYPE_STAT,
&options.s_probe_info.cspec, &pkt_buf,
&tx_data) != 0) {
&options.s_probe_info.cspec, 0,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
}
@ -383,11 +401,7 @@ locore_main(void *ti)
options.s_probe_info.last_sw_tx);
// send the packet
if (rte_eth_tx_burst(options.s_portid,
tinfo->txqid, &pkt_buf, 1) < 1) {
rte_exit(EXIT_FAILURE,
"failed to send some packets.\n");
}
tx_burst_all(options.s_portid, tinfo->txqid, &pkt_buf, 1);
// release flux
pending_probe = false;
@ -401,6 +415,8 @@ locore_main(void *ti)
}
}
}
delete[] thrd_load_buffer;
}
static int
@ -426,7 +442,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.max_rx_pkt_len = mtu_to_pkt_size(options.port_mtu);
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP |
ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP;
@ -437,6 +453,9 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
if (options.jumbo_frame_enabled) {
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
@ -450,6 +469,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
/* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf;
if (options.jumbo_frame_enabled) {
rxconf.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd,
rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
@ -469,6 +492,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
options.s_thr_info.at(i)->txqid = i;
}
// set mtu
ret = rte_eth_dev_set_mtu(portid, options.port_mtu);
if (ret != 0)
return ret;
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
@ -518,7 +546,8 @@ usage()
" -b: MLG bytes per second\n"
" -x: MLG thread affinity mask\n"
" -X: MLG target domain affinity mask\n"
" -H: host spec\n");
" -H: host spec\n"
" -J: enable jumbo frames\n");
fflush(stdout);
}
@ -532,11 +561,12 @@ dump_options()
" cpu mask: 0x%lx\n"
" mempool mask: 0x%lx\n"
" ip: 0x%x\n"
" MLG: %s [bps: %ld, threads: 0x%lx, domain: 0x%lx]\n",
" MLG: %s [bps: %ld, threads: 0x%lx, domain: 0x%lx]\n"
" jumbo frame: %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.cpuset, options.memmask,
options.s_host_spec.ip, options.mlg_enabled ? "on" : "off",
options.mlg_bps, options.mlg_cmask, options.mlg_dmask);
options.mlg_bps, options.mlg_cmask, options.mlg_dmask, options.jumbo_frame_enabled);
}
int
@ -562,7 +592,7 @@ main(int argc, char *argv[])
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:M:H:mb:X:x:")) != -1) {
while ((c = getopt(argc, argv, "hvA:M:H:mb:X:x:J")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
@ -605,6 +635,10 @@ main(int argc, char *argv[])
options.mlg_cmask = strtoull(
optarg, nullptr, 16);
break;
case 'J':
options.jumbo_frame_enabled = true;
options.port_mtu = MAX_JUMBO_MTU;
break;
default:
usage();
rte_exit(
@ -657,13 +691,12 @@ main(int argc, char *argv[])
portid);
}
auto nodes = nm_get_nodes();
if (nodes->size() > MAX_NODES) {
if (nm_obj_count(NM_LEVEL_NUMA) > (int)MAX_NODES) {
rte_exit(EXIT_FAILURE, "Too many numa nodes\n");
}
for (auto each : *nodes) {
uint32_t nodeid = each->id;
for (int i = 0; i < nm_obj_count(NM_LEVEL_NUMA); i++) {
uint32_t nodeid = i;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: creating mempool for node %d\n", nodeid);
@ -672,7 +705,9 @@ main(int argc, char *argv[])
"khat_mempool_%d", nodeid);
mbuf_pool = rte_pktmbuf_pool_create(options.mempool_name_buf,
MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, nodeid);
options.jumbo_frame_enabled ?
RTE_MBUF_DEFAULT_BUF_SIZE + (MAX_JUMBO_MTU - MAX_STANDARD_MTU) :
RTE_MBUF_DEFAULT_BUF_SIZE, nodeid);
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %d\n",
rte_errno);
@ -689,16 +724,17 @@ main(int argc, char *argv[])
// auto cores = nm_get_cores();
uint64_t cpuset = options.cpuset;
for (int i = 0; i < options.num_threads; i++) {
auto *tinfo = new struct thread_info;
uint32_t lcore_id = cmask_get_next_cpu(&cpuset);
uint32_t node_id = get_node_from_core(lcore_id);
auto *tinfo = (struct thread_info *)nm_malloc(node_id, sizeof(struct thread_info));
tinfo->cache_lines = nm_malloc(node_id, CACHELINE_SIZE * options.thread_cacheline_cnt);
tinfo->tid = i;
tinfo->lcore_id = cmask_get_next_cpu(&cpuset);
// tinfo->node_id =
// cores->at(tinfo->lcore_id)->parent->parent->id;
tinfo->lcore_id = lcore_id;
tinfo->node_id = 0; // XXX: hack
options.s_thr_info.push_back(tinfo);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: thread %d assigned to cpu %d, node %d\n", tinfo->tid,
tinfo->lcore_id, tinfo->node_id);
tinfo->lcore_id, get_node_from_core(lcore_id));
}
if (port_init(portid, mbuf_pool) != 0) {

View File

@ -12,8 +12,8 @@
#include "gen.h"
#include "nm.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
#include "net/pkt.h"
#include "net/util.h"
#include <atomic>
#include <list>
@ -24,9 +24,9 @@
constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 1024;
constexpr static unsigned int TX_RING_SIZE = 1024;
constexpr static unsigned int BURST_SIZE = 8;
constexpr static unsigned int RX_RING_SIZE = 2048;
constexpr static unsigned int TX_RING_SIZE = 2048;
constexpr static unsigned int BURST_SIZE = 32;
static const struct rte_eth_conf port_conf_default {
};
@ -89,6 +89,9 @@ struct options_t {
};
uint64_t cpu_mask { 0x4 }; // 1 thread @ core 2
uint32_t pkt_loss_delay_ms = UINT32_MAX;
bool jumbo_frame_enabled { false };
int pkt_pad_sz { 0 };
int port_mtu { MAX_STANDARD_MTU };
// states
unsigned int s_num_threads { 1 }; // 1 thread
@ -188,6 +191,7 @@ proto_loop(struct thread_info *tinfo)
PKT_TYPE_SYNC_ACK,
&options
.s_master_cspec,
0,
&tx_buf,
&pkt_data) !=
0) {
@ -196,16 +200,7 @@ proto_loop(struct thread_info *tinfo)
"failed to alloc pkt hdr\n");
}
if (rte_eth_tx_burst(
options
.s_portid,
tinfo->txqid,
&tx_buf,
1) != 1) {
rte_exit(
EXIT_FAILURE,
"failed to send packet\n");
}
tx_burst_all(options.s_portid, tinfo->txqid, &tx_buf, 1);
expected =
STATE_SYNC_ACK;
@ -305,6 +300,8 @@ pkt_loop(struct thread_info *tinfo)
pld_epoch->epoch);
id = epoch_get_id(epoch);
// printf("Load resp size : %d\n", rx_bufs[i]->data_len);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: packet %p epoch 0x%x id %d.\n",
tinfo->id, (void *)rx_bufs[i],
@ -361,6 +358,7 @@ pkt_loop(struct thread_info *tinfo)
options.mbuf_pool,
PKT_TYPE_FIN_ACK,
&options.s_master_cspec,
0,
&tx_bufs[0],
&pkt_hdr) != 0) {
rte_exit(EXIT_FAILURE,
@ -376,20 +374,13 @@ pkt_loop(struct thread_info *tinfo)
rte_cpu_to_be_32(
total_recv);
pld_qps->lost_pkts =
rte_cpu_to_be_32(total_loss);
rte_cpu_to_be_32(
total_loss);
const uint16_t nb_tx =
rte_eth_tx_burst(
options.s_portid,
tinfo->txqid, &tx_bufs[0],
1);
tx_burst_all(options.s_portid, tinfo->txqid, &tx_bufs[0], 1);
if (nb_tx != 1) {
rte_exit(EXIT_FAILURE,
"failed to send packet\n");
}
options.s_state.store(STATE_FIN);
options.s_state.store(
STATE_FIN);
ntr(NTR_DEP_USER1,
NTR_LEVEL_DEBUG,
@ -442,7 +433,7 @@ pkt_loop(struct thread_info *tinfo)
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: received epoch 0x%x\n",
tinfo->id, epoch_get_epoch(einfo->epoch));
if (einfo->ts > last_recv_ts) {
last_recv_ts = einfo->ts;
}
@ -478,7 +469,8 @@ pkt_loop(struct thread_info *tinfo)
// check to send the next packet
uint32_t total_send = 0;
while (now >= next_ts && sent_epochs.size() < options.depth && total_send < BURST_SIZE) {
while (now >= next_ts && sent_epochs.size() < options.depth &&
total_send < BURST_SIZE) {
struct pkt_payload_load *pld_load;
struct pkt_hdr *pkt_data;
next_ts += (int)(tinfo->ia_gen->generate() * S2NS);
@ -487,7 +479,8 @@ pkt_loop(struct thread_info *tinfo)
srv_cspec.dst_port = dst_port_gen.next();
srv_cspec.src_port = src_port_gen.next();
if (alloc_pkt_hdr(options.mbuf_pool, PKT_TYPE_LOAD,
&srv_cspec, &tx_bufs[total_send], &pkt_data) != 0) {
&srv_cspec, options.pkt_pad_sz, &tx_bufs[total_send],
&pkt_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt hdr\n");
}
@ -507,27 +500,31 @@ pkt_loop(struct thread_info *tinfo)
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: sending packet %p with epoch 0x%x\n",
tinfo->id, (void *)tx_bufs[total_send], epoch);
total_send++;
}
if (total_send > 0) {
const uint16_t nb_tx = rte_eth_tx_burst(
options.s_portid, tinfo->txqid, tx_bufs, total_send);
tx_burst_all(options.s_portid, tinfo->txqid, tx_bufs, total_send);
// if (total_send > 0) {
// const uint16_t nb_tx = rte_eth_tx_burst(
// options.s_portid, tinfo->txqid, tx_bufs,
// total_send);
if (nb_tx != total_send) {
rte_exit(
EXIT_FAILURE, "failed to send packet\n");
}
}
// if (nb_tx != total_send) {
// rte_exit(
// EXIT_FAILURE, "failed to send packet\n");
// }
// }
// check rage quit
// check rage quit only when we have sent a packet
if (last_recv_ts == 0) {
last_recv_ts = nm_get_uptime_ns();
}
if (nm_get_uptime_ns() - last_recv_ts > options.rage_quit_time * MS2NS) {
if (nm_get_uptime_ns() - last_recv_ts >
options.rage_quit_time * MS2NS) {
rte_exit(EXIT_FAILURE,
"rat: thread %d waiting too long for resp. I QUIT!!\n", tinfo->id);
"rat: thread %d waiting too long for resp. I QUIT!!\n",
tinfo->id);
}
}
@ -600,7 +597,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.max_rx_pkt_len = mtu_to_pkt_size(options.port_mtu);;
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP |
ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP;
@ -611,6 +608,9 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
if (options.jumbo_frame_enabled) {
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
@ -624,6 +624,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
/* Allocate and set up 1 RX queue per thread . */
rxconf = dev_info.default_rxconf;
if (options.jumbo_frame_enabled) {
rxconf.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
}
rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < options.s_num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid,
@ -644,6 +648,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
return ret;
}
// set mtu
ret = rte_eth_dev_set_mtu(portid, options.port_mtu);
if (ret != 0)
return ret;
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
@ -652,12 +661,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
struct rte_ether_addr addr {
};
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
// no promiscuous mode required
return 0;
return ret;
}
static void
@ -676,12 +683,15 @@ dump_options()
" qps = %d\n"
" host IP = 0x%x\n"
" depth = %u\n"
" packet loss time threshold = %u\n",
" packet loss time threshold = %u\n"
" jumbo frame = %d\n"
" packet pad size = %d\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING, options.run_time,
options.s_num_threads, options.rage_quit_time, options.cpu_mask,
options.slave_mode, options.ia_gen, options.ld_gen,
options.target_qps, options.s_host_spec.ip, options.depth,
options.pkt_loss_delay_ms);
options.pkt_loss_delay_ms, options.jumbo_frame_enabled,
options.pkt_pad_sz);
}
static void
@ -701,7 +711,9 @@ usage()
" -q: target QPS\n"
" -H: host net spec\n"
" -D: max number of packets in flight\n"
" -l: packet loss time threshold\n");
" -l: packet loss time threshold\n"
" -J: enable jumbo frame\n"
" -P: pad load packets to this size\n");
}
int
@ -728,8 +740,8 @@ main(int argc, char *argv[])
{
int c;
// parse arguments
while (
(c = getopt(argc, argv, "vht:s:SA:i:w:r:q:H:D:l:")) != -1) {
while ((c = getopt(
argc, argv, "vht:s:SA:i:w:r:q:H:D:l:JP:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
@ -801,6 +813,14 @@ main(int argc, char *argv[])
options.pkt_loss_delay_ms = UINT32_MAX;
}
break;
case 'J':
options.jumbo_frame_enabled = true;
options.port_mtu = MAX_JUMBO_MTU;
break;
case 'P':
options.pkt_pad_sz = strtol(
optarg, nullptr, 10);
break;
default:
usage();
rte_exit(
@ -809,6 +829,10 @@ main(int argc, char *argv[])
}
}
if (options.pkt_pad_sz != 0 && options.pkt_pad_sz > mtu_to_pkt_size(options.port_mtu)) {
rte_exit(EXIT_FAILURE, "pkt_pad_sz is too large for mtu %d\n", options.port_mtu);
}
if (!has_host_spec) {
rte_exit(EXIT_FAILURE, "Must specify host IP.\n");
}
@ -838,7 +862,10 @@ main(int argc, char *argv[])
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT,
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
MBUF_CACHE_SIZE, 0,
options.jumbo_frame_enabled ?
RTE_MBUF_DEFAULT_BUF_SIZE + (MAX_JUMBO_MTU - MAX_STANDARD_MTU) :
RTE_MBUF_DEFAULT_BUF_SIZE,
rte_eth_dev_socket_id(options.s_portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
@ -921,7 +948,8 @@ main(int argc, char *argv[])
uint32_t total_recv;
uint32_t total_loss;
calc_stats(nm_get_uptime_ns(), &qps, &total_recv, &total_loss);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "qps = %d, recv = %d, loss = %d\n", qps, total_recv, total_loss);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "qps = %d, recv = %d, loss = %d\n",
qps, total_recv, total_loss);
for (auto each : options.s_thr_info) {
delete each->load_gen;

View File

@ -13,14 +13,27 @@ import libpar as par
import libtc as tc
step_inc_pct = 100
init_step = 600000 #
start_step = 600000
init_step = 1000000 #
start_step = 1000000
term_qps = 50000000
term_pct = 1
inc_pct = 50
# pkt_pad
enable_pktpad = False
jumbo_frame_threshold = 1518
pkt_pads = [
"8192"
#"0",
#"256",
#"512",
#"1024",
#"2048",
#"4096"
]
# memgen
enable_memgen = True
enable_memgen = False
memgen_mask = [
"0xFFFFFF000000",
@ -56,11 +69,11 @@ root_dir = os.path.join(file_dir,"..")
sample_filename = "sample.txt"
affinity = [
"0xAAAAAA",
"0x20",
"0xA0",
"0xAA0",
"0xAAAA0",
"0xAAAAAA",
"0x2000000",
"0xA000000",
"0xAA000000",
@ -80,14 +93,14 @@ client_spec = ["192.168.123.11@3c:15:fb:62:9b:2f", "192.168.123.12@3c:15:fb:c9:f
client_cpumask = "0xAAAAAA"
client_rage_quit = 1000 #1s
warmup = 5
warmup = 10
duration = 10
cooldown = 0
cacheline = 0
SSH_PARAM = "-o StrictHostKeyChecking=no -p77"
SSH_USER = "oscar"
master_qps = 100
master_pkt_loss = 1000
master_pkt_loss = 100
master_pkt_loss_failure = -1
hostfile = None
@ -117,10 +130,12 @@ def get_client_str():
def calc_client_ld(ld : int):
return 0 if ld == 0 else ((ld - master_qps) / len(clients))
def run_exp(affinity : str, ld : int, aff_idx : int):
def run_exp(affinity : str, ld : int, pkt_pad : str, aff_idx : int):
while True:
server_cmd = "sudo " + test_dir + "/khat --log-level lib.eal:err -- -A " + affinity + \
" -H " + server_spec[0]
if int(pkt_pad) > jumbo_frame_threshold:
server_cmd += " -J "
if enable_memgen:
server_cmd += " -m -b 0 -X " + memgen_target[aff_idx] + " -x " + memgen_mask[aff_idx]
if client_only:
@ -130,7 +145,6 @@ def run_exp(affinity : str, ld : int, aff_idx : int):
# start server
tc.log_print("Starting server...")
tc.log_print(server_cmd)
ssrv = tc.remote_exec(server, server_cmd, blocking=False)
# start clients
@ -143,7 +157,10 @@ def run_exp(affinity : str, ld : int, aff_idx : int):
" -H " + client_spec[i] + \
" -s " + server_spec[0] + \
" -r " + str(client_rage_quit) + \
" -D 0 -l 100 "
" -D 1 -l 100 " + \
" -P " + pkt_pad
if int(pkt_pad) > jumbo_frame_threshold:
client_cmd += " -J "
tc.log_print(client_cmd)
sclt.append(tc.remote_exec([clients[i]], client_cmd, blocking=False)[0])
@ -261,49 +278,49 @@ def main():
stop_all()
for i in range(0, len(affinity)):
eaff = affinity[i]
step_mul = 100
last_load = 0
cur_load = start_step
for epad in pkt_pads:
eaff = affinity[i]
step_mul = 100
last_load = 0
cur_load = start_step
tc.begin(eaff)
tc.begin(eaff + "_" + epad)
tc.log_print("============ Affinity: " + str(eaff) + " Load: MAX" + " ============")
run_exp(eaff, 0, i)
keep_results()
stop_all()
#tc.log_print("============ Affinity: " + str(eaff) + " PktPad: " + epad + " Load: MAX" + " ============")
#run_exp(eaff, 0, epad, i)
#keep_results()
#stop_all()
if client_only:
break
while True:
tc.log_print("============ Affinity: " + str(eaff) + " Load: " + str(cur_load) + " ============")
run_exp(eaff, cur_load, i)
qps = keep_results()
if client_only:
break
pct = int((qps - last_load) / init_step * 100)
tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%")
while True:
tc.log_print("============ Affinity: " + str(eaff) + " PktPad: " + epad + " Load: " + str(cur_load) + " ============")
if cur_load > term_qps:
tc.log_print("qps more than " + str(term_qps) + "%. Done.")
break
run_exp(eaff, cur_load, epad, i)
if pct <= term_pct:
tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.")
break
qps = keep_results()
pct = int((qps - last_load) / init_step * 100)
tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%")
if pct <= inc_pct:
step_mul += step_inc_pct
tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%")
if cur_load > term_qps:
tc.log_print("qps more than " + str(term_qps) + "%. Done.")
break
last_load = qps
cur_load += int(init_step * step_mul / 100)
tc.log_print("")
tc.end()
if pct <= term_pct:
tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.")
break
if pct <= inc_pct:
step_mul += step_inc_pct
tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%")
last_load = qps
cur_load += int(init_step * step_mul / 100)
tc.log_print("")
tc.end()
stop_all()
main()