Compare commits

...

10 Commits
master ... dev

Author SHA1 Message Date
a1f69bb3f8
change run.py defaults 2021-02-02 00:46:18 -05:00
7b4fc6f3ab
+histogram 2021-02-02 00:43:45 -05:00
1ec01d6c37
fix redundant global 2021-02-01 23:49:30 -05:00
ff4946a699
run script and robust stderr detection 2021-02-01 23:48:14 -05:00
f2be62a9be
cat refactor + rat reborn + statskeeping 2021-01-31 02:50:58 -05:00
226449100d
NUMA detection & server multicore 2021-01-28 05:24:59 -05:00
82e1098f3b ptp working
Summary:
+arc

stuff

khat timestamp protocol working

Test Plan: by hand

Reviewers: ali

Differential Revision: https://review.rcs.uwaterloo.ca/D408
2021-01-27 03:58:12 -05:00
855b9cf714 stuff 2021-01-25 16:30:22 -05:00
73c70a5c52
+arc 2021-01-18 14:06:10 -05:00
Oscar Zhao
0500dc1c21 ptp working 2021-01-18 12:19:13 -05:00
23 changed files with 2959 additions and 390 deletions

3
.arcconfig Normal file
View File

@ -0,0 +1,3 @@
{
"phabricator.uri" : "https://review.rcs.uwaterloo.ca/"
}

271
.gitignore vendored Normal file
View File

@ -0,0 +1,271 @@
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ PYTHON STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C++ STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app

1
.pyenv Normal file
View File

@ -0,0 +1 @@
PYTHONPATH="./scripts/libs"

View File

@ -10,24 +10,49 @@ project(khat)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}") list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}")
find_package(dpdk REQUIRED) find_package(dpdk REQUIRED)
find_package(Hwloc REQUIRED)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
-Wno-deprecated-declarations -Wno-deprecated-declarations
-Wno-packed-not-aligned -Wno-packed-not-aligned
-Wno-address-of-packed-member -Wno-address-of-packed-member
-msse4) -Wno-zero-length-array
-Wno-gnu-zero-variadic-macro-arguments
-msse4
-mavx)
include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS}) include_directories(${dpdk_INCLUDE_DIRS})
include_directories(${Hwloc_INCLUDE_DIRS})
set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11)
set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(KHAT_LINKLIBS pthread nm ntr)
set(CAT_LINKLIBS pthread nm ntr gen)
set(RAT_LINKLIBS pthread nm ntr gen)
add_library(nm libnm/nm.cc)
target_link_libraries(nm ${Hwloc_LIBRARIES})
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
add_library(ntr libntr/ntr.c)
target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS})
add_library(gen libgen/generator.cc)
target_link_libraries(gen ${Hwloc_LIBRARIES})
target_compile_options(gen PRIVATE ${LIBGEN_CC_FLAGS})
add_executable(khat khat/khat.cc) add_executable(khat khat/khat.cc)
add_executable(cat cat/cat.cc) target_link_libraries(khat ${dpdk_LIBRARIES} ${KHAT_LINKLIBS})
set(LINK_LIBS ${dpdk_LIBRARIES} pthread)
target_link_libraries(khat ${LINK_LIBS})
target_compile_options(khat PRIVATE ${CC_FLAGS}) target_compile_options(khat PRIVATE ${CC_FLAGS})
target_link_libraries(cat ${LINK_LIBS}) add_executable(cat cat/cat.cc)
target_link_libraries(cat ${dpdk_LIBRARIES} ${CAT_LINKLIBS})
target_compile_options(cat PRIVATE ${CC_FLAGS}) target_compile_options(cat PRIVATE ${CC_FLAGS})
add_executable(rat rat/rat.cc)
target_link_libraries(rat ${dpdk_LIBRARIES} ${RAT_LINKLIBS})
target_compile_options(rat PRIVATE ${CC_FLAGS})

213
FindHwloc.cmake Normal file
View File

@ -0,0 +1,213 @@
#.rst:
# FindHwloc
# ----------
#
# Try to find Portable Hardware Locality (hwloc) libraries.
# http://www.open-mpi.org/software/hwloc
#
# You may declare HWLOC_ROOT environment variable to tell where
# your hwloc library is installed.
#
# Once done this will define::
#
# Hwloc_FOUND - True if hwloc was found
# Hwloc_INCLUDE_DIRS - include directories for hwloc
# Hwloc_LIBRARIES - link against these libraries to use hwloc
# Hwloc_VERSION - version
# Hwloc_CFLAGS - include directories as compiler flags
# Hwloc_LDLFAGS - link paths and libs as compiler flags
#
#=============================================================================
# Copyright 2014 Mikael Lepistö
#
# Distributed under the OSI-approved BSD License (the "License");
#
# This software is distributed WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the License for more information.
#=============================================================================
if(WIN32)
find_path(Hwloc_INCLUDE_DIR
NAMES
hwloc.h
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
include
)
find_library(Hwloc_LIBRARY
NAMES
libhwloc.lib
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
lib
)
#
# Check if the found library can be used to linking
#
SET (_TEST_SOURCE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/linktest.c")
FILE (WRITE "${_TEST_SOURCE}"
"
#include <hwloc.h>
int main()
{
hwloc_topology_t topology;
int nbcores;
hwloc_topology_init(&topology);
hwloc_topology_load(topology);
nbcores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE);
hwloc_topology_destroy(topology);
return 0;
}
"
)
TRY_COMPILE(_LINK_SUCCESS ${CMAKE_BINARY_DIR} "${_TEST_SOURCE}"
CMAKE_FLAGS
"-DINCLUDE_DIRECTORIES:STRING=${Hwloc_INCLUDE_DIR}"
CMAKE_FLAGS
"-DLINK_LIBRARIES:STRING=${Hwloc_LIBRARY}"
)
IF(NOT _LINK_SUCCESS)
if(CMAKE_SIZEOF_VOID_P EQUAL 8)
message(STATUS "You are building 64bit target.")
ELSE()
message(STATUS "You are building 32bit code. If you like to build x64 use e.g. -G 'Visual Studio 12 Win64' generator." )
ENDIF()
message(FATAL_ERROR "Library found, but linking test program failed.")
ENDIF()
#
# Resolve version if some compiled binary found...
#
find_program(HWLOC_INFO_EXECUTABLE
NAMES
hwloc-info
PATHS
ENV HWLOC_ROOT
PATH_SUFFIXES
bin
)
if(HWLOC_INFO_EXECUTABLE)
execute_process(
COMMAND ${HWLOC_INFO_EXECUTABLE} "--version"
OUTPUT_VARIABLE HWLOC_VERSION_LINE
OUTPUT_STRIP_TRAILING_WHITESPACE
)
string(REGEX MATCH "([0-9]+.[0-9]+)$"
Hwloc_VERSION "${HWLOC_VERSION_LINE}")
unset(HWLOC_VERSION_LINE)
endif()
#
# All good
#
set(Hwloc_LIBRARIES ${Hwloc_LIBRARY})
set(Hwloc_INCLUDE_DIRS ${Hwloc_INCLUDE_DIR})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
Hwloc
FOUND_VAR Hwloc_FOUND
REQUIRED_VARS Hwloc_LIBRARY Hwloc_INCLUDE_DIR Hwloc_VERSION_PARSED Hwloc_VERSION_MAJOR Hwloc_VERSION_MINOR
VERSION_VAR Hwloc_VERSION)
mark_as_advanced(
Hwloc_INCLUDE_DIR
Hwloc_LIBRARY)
foreach(arg ${Hwloc_INCLUDE_DIRS})
set(Hwloc_CFLAGS "${Hwloc_CFLAGS} /I${arg}")
endforeach()
set(Hwloc_LDFLAGS "${Hwloc_LIBRARY}")
else()
if(CMAKE_CROSSCOMPILING)
find_path(Hwloc_INCLUDE_DIRS
NAMES
hwloc.h
PATHS
ENV HWLOC_ROOT
)
find_library(Hwloc_LIBRARIES
NAMES
hwloc
PATHS
ENV HWLOC_ROOT
)
if(Hwloc_INCLUDE_DIRS AND Hwloc_LIBRARIES)
message(WARNING "HWLOC library found using find_library() - cannot determine version. Assuming 1.7.0")
set(Hwloc_FOUND 1)
set(Hwloc_VERSION "1.7.0")
endif()
else() # Find with pkgconfig for non-crosscompile builds
find_package(PkgConfig)
if(HWLOC_ROOT)
set(ENV{PKG_CONFIG_PATH} "${HWLOC_ROOT}/lib/pkgconfig")
else()
foreach(PREFIX ${CMAKE_PREFIX_PATH})
set(PKG_CONFIG_PATH "${PKG_CONFIG_PATH}:${PREFIX}/lib/pkgconfig")
endforeach()
set(ENV{PKG_CONFIG_PATH} "${PKG_CONFIG_PATH}:$ENV{PKG_CONFIG_PATH}")
endif()
if(hwloc_FIND_REQUIRED)
set(_hwloc_OPTS "REQUIRED")
elseif(hwloc_FIND_QUIETLY)
set(_hwloc_OPTS "QUIET")
else()
set(_hwloc_output 1)
endif()
if(hwloc_FIND_VERSION)
if(hwloc_FIND_VERSION_EXACT)
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc=${hwloc_FIND_VERSION})
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc>=${hwloc_FIND_VERSION})
endif()
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc)
endif()
if(Hwloc_FOUND)
string(REPLACE "." ";" Hwloc_VERSION_PARSED "${Hwloc_VERSION}")
set(Hwloc_VERSION "${Hwloc_VERSION}" CACHE STRING "version of Hwloc as a list")
list(GET Hwloc_VERSION_PARSED 0 Hwloc_VERSION_MAJOR)
set(Hwloc_VERSION_MAJOR "${Hwloc_VERSION_MAJOR}" CACHE STRING "Major version of Hwloc")
list(GET Hwloc_VERSION_PARSED 1 Hwloc_VERSION_MINOR)
set(Hwloc_VERSION_MINOR "${Hwloc_VERSION_MINOR}" CACHE STRING "Minor version of Hwloc")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Hwloc DEFAULT_MSG Hwloc_LIBRARIES)
if(NOT ${Hwloc_VERSION} VERSION_LESS 1.7.0)
set(Hwloc_GL_FOUND 1)
endif()
if(_hwloc_output)
message(STATUS
"Found hwloc ${Hwloc_VERSION} in ${Hwloc_INCLUDE_DIRS}:${Hwloc_LIBRARIES}")
endif()
endif()
endif() # cross-compile else
endif()

View File

@ -11,68 +11,103 @@
#include <rte_ether.h> #include <rte_ether.h>
#include <rte_launch.h> #include <rte_launch.h>
#include <rte_log.h> #include <rte_log.h>
#include <rte_byteorder.h>
#include <rte_ip.h>
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <fstream> #include <fstream>
#include <unistd.h> #include <unistd.h>
#include "ntrlog.h" #include "nm.h"
#include "gen.h"
#include "ntr.h"
#include "pkt.h" #include "pkt.h"
#include "rte_byteorder.h" #include "util.h"
#include "rte_ip.h"
// init NTRLOG constexpr static unsigned int MBUF_MAX_COUNT = 16384;
NTR_DECL_IMPL; constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr unsigned int MBUF_MAX_COUNT = 8191; constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr unsigned int MBUF_CACHE_SIZE = 250; constexpr static unsigned int BURST_SIZE = 32;
constexpr unsigned int RX_RING_SIZE = 1024;
constexpr unsigned int TX_RING_SIZE = 1024;
constexpr unsigned int RX_RING_NUM = 1;
constexpr unsigned int TX_RING_NUM = 1;
constexpr unsigned int BURST_SIZE = 32;
static const struct rte_eth_conf port_conf_default{}; static const struct rte_eth_conf port_conf_default{};
struct datapt{ struct datapt {
uint64_t server_proc = 0; uint32_t epoch;
uint64_t rtt = 0; uint32_t valid;
uint64_t clt_hw_tx;
uint64_t clt_sw_tx;
uint64_t clt_hw_rx;
uint64_t clt_sw_rx;
uint64_t srv_hw_tx;
uint64_t srv_sw_tx;
uint64_t srv_hw_rx;
uint64_t srv_sw_rx;
}; };
struct options_t { struct options_t {
unsigned int run_time = 5; // parameters
unsigned int warmup_time = 0; unsigned int run_time{5};
unsigned int warmup_time{3};
char output[256] = "output.txt"; char output[256] = "output.txt";
char ia_gen_str[256] = "fixed:0.01";
struct rte_ether_addr server_mac; struct rte_ether_addr server_mac;
uint64_t cpu_mask{0x2}; // 2nd core
std::vector<struct rte_ether_addr *> slaves;
unsigned long rage_quit_time = (unsigned long)-1;
unsigned long last_sent_ts = 0;
// states // states
std::atomic<bool> s_stop {false}; struct rte_mempool * mbuf_pool;
std::atomic<bool> s_record {false};
std::vector<struct datapt *> s_stats;
struct rte_mempool * s_mbuf_pool;
uint16_t s_portid;
struct rte_ether_addr s_host_mac; struct rte_ether_addr s_host_mac;
uint16_t s_portid;
unsigned int s_rxqid;
unsigned int s_txqid;
unsigned int s_total_pkts{0};
Generator * s_iagen{nullptr};
std::vector<struct datapt *> s_data;
struct datapt * s_last_datapt{nullptr};
uint32_t s_epoch;
std::atomic<bool> s_stop {false};
std::atomic<uint32_t> s_record {0};
}; };
struct options_t options; static struct options_t options;
static uint16_t static uint16_t
rx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused, rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{ {
// XXX: need to get the timestamp in every loop?
uint64_t now = rte_rdtsc(); uint64_t now = rte_rdtsc();
struct packet_data * pkt_data; struct pkt_hdr * pkt_data;
struct timespec ts;
int ret;
for (int i = 0; i < nb_pkts; i++) { for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]); pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) { if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_calc_latency: ignoring invalid packet 0x%p.\n", (void*)pkts[i]); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet 0x%p.\n", (void*)pkts[i]);
continue; continue;
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now); if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
pkt_data->clt_ts_rx = rte_cpu_to_be_64(now); uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (options.s_last_datapt != nullptr && options.s_last_datapt->epoch == epoch) {
if ((ret = rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3)) == 0) {
// has hw rx timestamp
options.s_last_datapt->clt_hw_rx = ts.tv_sec * S2NS + ts.tv_nsec;
options.s_last_datapt->clt_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p with sw: %llu hw: %llu.\n", (void*)pkts[i], now, options.s_last_datapt->clt_hw_rx);
} else {
rte_exit(EXIT_FAILURE, "rx_add_timestamp: packet %p not tagged - hw ts not available - %d.\n", (void*)pkts[i], ret);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p epoch %d != last epoch %d.\n", (void*)pkts[i], epoch, options.s_last_datapt->epoch);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
} }
return nb_pkts; return nb_pkts;
@ -82,9 +117,8 @@ static uint16_t
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{ {
// XXX: need to get the timestamp in every loop?
uint64_t now = rte_rdtsc(); uint64_t now = rte_rdtsc();
struct packet_data * pkt_data; struct pkt_hdr * pkt_data;
for (int i = 0; i < nb_pkts; i++) { for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]); pkt_data = check_valid_packet(pkts[i]);
@ -94,26 +128,37 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
continue; continue;
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now); if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
pkt_data->clt_ts_tx = rte_cpu_to_be_64(now); uint32_t epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
rte_exit(EXIT_FAILURE, "tx_add_timestamp: packet epoch %d != last epoch %d\n", epoch, options.s_last_datapt->epoch);
}
options.s_last_datapt->clt_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw: %llu.\n", (void*)pkts[i], now);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], pkt_data->type);
}
} }
return nb_pkts; return nb_pkts;
} }
#define STATE_SEND (0)
#define STATE_RECV (1)
static int static int
locore_main(void * _unused __rte_unused) locore_main(void * tif __rte_unused)
{ {
struct rte_mbuf *tx_buf; struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE]; struct rte_mbuf *rx_bufs[BURST_SIZE];
struct packet_data *pkt_data; struct pkt_hdr *pkt_data;
uint32_t core_id = rte_lcore_id(); uint32_t core_id = rte_lcore_id();
uint32_t epoch = 0; int32_t ret;
int state = STATE_SEND;
bool read_tx = true;
bool recv_stat = true;
bool recv_resp = true;
uint64_t next_ts;
// XXX: check link status instead // XXX: check link status instead
sleep(1); sleep(1);
@ -125,70 +170,154 @@ locore_main(void * _unused __rte_unused)
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running...\n", core_id);
tx_buf = rte_pktmbuf_alloc(options.s_mbuf_pool); next_ts = get_time_us();
if (tx_buf == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
}
pkt_data = construct_udp_pkt_hdr(tx_buf,
&options.s_host_mac, &options.server_mac,
RTE_IPV4(192, 168, 100, 150), RTE_IPV4(192, 168, 100, 151),
1337, 1337);
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
}
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
while(!options.s_stop.load()) { while(!options.s_stop.load()) {
uint64_t now = get_time_us();
// always pop incoming packets // always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE); const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
if (nb_rx != 0) { if (nb_rx > 0) {
// only process packets when we are ready to receive
for (int i = 0; i < nb_rx; i++) { for (int i = 0; i < nb_rx; i++) {
struct packet_data * each = check_valid_packet(rx_bufs[i]); struct pkt_hdr * each = check_valid_packet(rx_bufs[i]);
if (each == NULL) { if (each == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
dump_pkt(rx_bufs[i]);
rte_pktmbuf_free(rx_bufs[i]); rte_pktmbuf_free(rx_bufs[i]);
continue; continue;
} }
if (rte_be_to_cpu_32(each->epoch) == epoch && state == STATE_RECV) { uint16_t type = rte_be_to_cpu_16(each->type);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: received packet %p for epoch %d\n", (void*)rx_bufs[i], epoch); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type);
switch (type) {
struct pkt_payload_epoch * pld_epoch;
struct pkt_payload_stat * pld_stat;
uint32_t epoch;
if (options.s_record.load()) { case PKT_TYPE_PROBE_RESP:
// keep statistics pld_epoch = (struct pkt_payload_epoch *)each->payload;
struct datapt * dpt = new datapt; epoch = rte_be_to_cpu_32(pld_epoch->epoch);
dpt->rtt = rte_be_to_cpu_64(each->clt_ts_rx) - rte_be_to_cpu_64(each->clt_ts_tx);
dpt->server_proc = rte_be_to_cpu_64(each->srv_ts_tx) - rte_be_to_cpu_64(each->srv_ts_rx);
options.s_stats.push_back(dpt);
}
// bump the epoch and stop processing other packets if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
state = STATE_SEND; ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch);
epoch++; break;
} else { }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: ignoring packet 0x%p with invalid epoch %d.\n", (void*)rx_bufs[i], epoch);
options.s_total_pkts++;
recv_resp = true;
break;
case PKT_TYPE_STAT:
pld_stat = (struct pkt_payload_stat *)each->payload;
epoch = rte_be_to_cpu_32(pld_stat->epoch);
if (options.s_last_datapt == nullptr || epoch != options.s_last_datapt->epoch) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, options.s_last_datapt->epoch);
break;
}
options.s_last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx);
options.s_last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx);
options.s_last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx);
options.s_last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx);
recv_stat = true;
break;
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type);
rte_pktmbuf_free(rx_bufs[i]);
continue;
} }
rte_pktmbuf_free(rx_bufs[i]); rte_pktmbuf_free(rx_bufs[i]);
} }
} }
if (state == STATE_SEND) { if (read_tx && recv_stat & recv_resp) {
// set new epoch // if we have all the data
pkt_data->epoch = rte_cpu_to_be_32(epoch);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, 0, &tx_buf, 1); if (options.s_last_datapt != nullptr) {
// push the data to the queue if we haven't done so already
options.s_data.push_back(options.s_last_datapt);
if (nb_tx < 1) { ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch); " Valid: %d\n"
" client TX HW: %llu\n" \
" client TX SW: %llu\n" \
" client RX HW: %llu\n" \
" client RX SW: %llu\n" \
" server TX HW: %llu\n" \
" server TX SW: %llu\n" \
" server RX HW: %llu\n" \
" server RX SW: %llu\n\n",
options.s_last_datapt->epoch,
options.s_last_datapt->valid,
options.s_last_datapt->clt_hw_tx,
options.s_last_datapt->clt_sw_tx,
options.s_last_datapt->clt_hw_rx,
options.s_last_datapt->clt_sw_rx,
options.s_last_datapt->srv_hw_tx,
options.s_last_datapt->srv_sw_tx,
options.s_last_datapt->srv_hw_rx,
options.s_last_datapt->srv_sw_rx);
options.s_last_datapt = nullptr;
}
if (now >= next_ts) {
struct pkt_payload_epoch * pld_epoch;
uint32_t epoch;
next_ts += (int)(options.s_iagen->generate() * 1000000.0);
// generate the packet
tx_buf = rte_pktmbuf_alloc(options.mbuf_pool);
if (tx_buf == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
}
pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE,
&options.s_host_mac, &options.server_mac);
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
}
epoch = options.s_epoch;
options.s_epoch++;
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
pld_epoch->epoch = rte_cpu_to_be_32(epoch);
options.s_last_datapt = new struct datapt;
options.s_last_datapt->epoch = epoch;
options.s_last_datapt->valid = options.s_record.load();
read_tx = false;
recv_resp = false;
recv_stat = false;
options.last_sent_ts = get_time_us();
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, options.s_txqid, &tx_buf, 1);
if (nb_tx != 1) {
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
}
}
}
if (!recv_stat) {
// if we haven't recevied the stats get ready to rage quit
if(get_time_us() - options.last_sent_ts > options.rage_quit_time * 1000) {
rte_exit(EXIT_FAILURE, "waiting too long for resp. I QUIT!!\n");
}
}
if (!read_tx) {
struct timespec ts;
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS);
options.s_last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
read_tx = true;
} }
state = STATE_RECV;
} }
} }
@ -228,7 +357,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */ /* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
if (ret != 0) if (ret != 0)
return ret; return ret;
@ -236,10 +365,10 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */ /* Allocate and set up 1 RX queue per thread . */
rxconf = dev_info.default_rxconf; rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads; rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < RX_RING_NUM; i++) { for (uint32_t i = 0; i < 1; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -248,7 +377,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
txconf = dev_info.default_txconf; txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads; txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */ /* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < TX_RING_NUM; i++) { for (uint32_t i = 0; i < 1; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -264,13 +393,17 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */ /* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid); ret = rte_eth_promiscuous_enable(portid);
if (ret != 0) if (ret != 0)
return ret; return ret;
rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL); rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL);
rte_eth_add_rx_callback(portid, 0, rx_calc_latency, NULL); rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL);
return 0; return 0;
} }
@ -281,10 +414,12 @@ static void dump_options()
" run time = %d\n" \ " run time = %d\n" \
" warmup time = %d\n" \ " warmup time = %d\n" \
" output file = %s\n" \ " output file = %s\n" \
" server MAC = %x:%x:%x:%x:%x:%x\n", " rage quit time = %ld\n"\
" host MAC = %x:%x:%x:%x:%x:%x\n",
options.run_time, options.run_time,
options.warmup_time, options.warmup_time,
options.output, options.output,
options.rage_quit_time,
options.server_mac.addr_bytes[0], options.server_mac.addr_bytes[0],
options.server_mac.addr_bytes[1], options.server_mac.addr_bytes[1],
options.server_mac.addr_bytes[2], options.server_mac.addr_bytes[2],
@ -298,19 +433,31 @@ static void usage()
fprintf(stdout, fprintf(stdout,
"Usage:\n " \ "Usage:\n " \
" -v(vv): verbose mode\n" \ " -v(vv): verbose mode\n" \
" -h: display the information\n" \ " -s: server's mac\n" \
" -o: output filename\n" \ " -S: slave(rat)'s mac\n" \
" -t: run time\n" \ " -t: run time\n" \
" -T: warmup time\n" \ " -T: warmup time\n" \
" -s: server's mac\n\n" ); " -h: display the information\n" \
" -o: output filename\n" \
" -A: affinity mask\n" \
" -i: inter-arrival time distribution\n" \
" -r: rage quit time (in ms)\n");
fflush(stdout);
} }
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
unsigned int nb_ports; unsigned int nb_ports;
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; struct rte_mempool *mbuf_pool;
std::ofstream log_file; std::ofstream log_file;
ntr_init();
if (nm_init() != 0)
rte_exit(EXIT_FAILURE, "failed to init libnm\n");
// create default generator
options.s_iagen = createGenerator(options.ia_gen_str);
// init dpdk // init dpdk
int ret = rte_eal_init(argc, argv); int ret = rte_eal_init(argc, argv);
if (ret < 0) { if (ret < 0) {
@ -325,8 +472,9 @@ int main(int argc, char* argv[])
{ {
int c; int c;
// parse arguments // parse arguments
while((c = getopt(argc, argv, "hvo:t:T:s:")) != -1) { while((c = getopt(argc, argv, "vs:S:t:T:ho:A:i:r:")) != -1) {
switch (c) { switch (c) {
struct rte_ether_addr * addr;
case 'v': case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break; break;
@ -335,6 +483,13 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg); rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg);
} }
break; break;
case 'S':
addr = new struct rte_ether_addr;
if (rte_ether_unformat_addr(optarg, addr) == -1) {
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg);
}
options.slaves.push_back(addr);
break;
case 't': case 't':
options.run_time = atoi(optarg); options.run_time = atoi(optarg);
break; break;
@ -343,11 +498,26 @@ int main(int argc, char* argv[])
break; break;
case 'h': case 'h':
usage(); usage();
rte_exit(EXIT_SUCCESS, NULL); rte_exit(EXIT_SUCCESS, "\n");
break;
case 'o': case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1); strncpy(options.output, optarg, sizeof(options.output) - 1);
break; break;
case 'A':
options.cpu_mask = strtoull(optarg, nullptr, 16);
break;
case 'i':
strncpy(options.ia_gen_str, optarg, sizeof(options.ia_gen_str) - 1);
if (options.s_iagen != nullptr) {
delete options.s_iagen;
}
options.s_iagen = createGenerator(options.ia_gen_str);
if (options.s_iagen == nullptr) {
rte_exit(EXIT_FAILURE, "invalid generator string %s\n", options.ia_gen_str);
}
break;
case 'r':
options.rage_quit_time = atoi(optarg);
break;
default: default:
usage(); usage();
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c); rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
@ -367,24 +537,19 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
} }
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool_pkt == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.s_mbuf_pool = mbuf_pool_pkt;
uint16_t portid = rte_eth_find_next(0); uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) { if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n"); rte_exit(EXIT_FAILURE, "cannot find an available port\n");
} }
options.s_portid = portid; options.s_portid = portid;
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.mbuf_pool = mbuf_pool;
if (port_init(portid, mbuf_pool) != 0) { if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
} }
@ -403,39 +568,51 @@ int main(int argc, char* argv[])
dump_options(); dump_options();
uint16_t core_id = rte_get_next_lcore(0, true, false); sleep(1);
if (rte_eal_remote_launch(locore_main, NULL, core_id) != 0) {
uint64_t cmask = options.cpu_mask;
const int16_t core_id = cmask_get_next_cpu(&cmask);
if (core_id == NEXT_CPU_NULL) {
rte_exit(EXIT_FAILURE, "invalid cpu mask 0x%lx\n", cmask);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread on core %d\n", core_id);
if (rte_eal_remote_launch(locore_main, nullptr, core_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n"); rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
} }
// poor man's timer // XXX: poor man's timer
// XXX: use kqueue instead
struct timespec ts;
ts.tv_sec = 1;
ts.tv_nsec = 0;
uint32_t second = 0; uint32_t second = 0;
while(true) { while(true) {
if (second >= options.warmup_time) { if (second >= options.warmup_time) {
options.s_record.store(true); options.s_record.store(1);
} }
if (second >= options.run_time + options.warmup_time) { if (second >= options.run_time + options.warmup_time) {
options.s_stop.store(true); options.s_stop.store(true);
break; break;
} }
clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL); usleep(S2US);
second++; second++;
} }
if (rte_eal_wait_lcore(core_id) < 0) if (rte_eal_wait_lcore(core_id) < 0)
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n"); rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
uint32_t qps = 0;
// dump stats // dump stats
for (auto it = std::begin(options.s_stats); it != std::end(options.s_stats); ++it) { for (auto it : options.s_data) {
log_file << (*it)->rtt << "," << (*it)->server_proc << std::endl; if (it->valid) {
delete *it; qps++;
log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ','
<< it->clt_hw_rx << ',' << it->clt_hw_tx << ','
<< it->srv_sw_rx << ',' << it->srv_sw_tx << ','
<< it->srv_hw_rx << ',' << it->srv_hw_tx << std::endl;
}
} }
log_file.close(); log_file.close();
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Processed %d packets in %d seconds, QPS: %d\n", qps, options.run_time, qps);
// clean up // clean up
rte_eth_dev_stop(portid); rte_eth_dev_stop(portid);
rte_eth_dev_close(portid); rte_eth_dev_close(portid);

View File

@ -2,8 +2,12 @@
-O2 -O2
-std=c++11 -std=c++11
-Wall -Wall
-Wextra
-Werror -Werror
-Wpedantic
-I/usr/include/dpdk -I/usr/include/dpdk
-Iinc -Iinc
-Wno-deprecated-declarations -Wno-deprecated-declarations
-Wno-packed-not-aligned
-Wno-address-of-packed-member
-Wno-zero-length-array
-Wno-gnu-zero-variadic-macro-arguments

234
inc/gen.h Normal file
View File

@ -0,0 +1,234 @@
// modified from mutilate
// -*- c++ -*-
// 1. implement "fixed" generator
// 2. implement discrete generator
// 3. implement combine generator?
#pragma once
#include <netinet/in.h>
#include <string>
#include <vector>
#include <utility>
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/param.h>
#include "util.h"
#define D(fmt, ...)
#define DIE(fmt, ...) (void)0;
#define FNV_64_PRIME (0x100000001b3ULL)
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
static inline uint64_t fnv_64_buf(const void* buf, size_t len) {
uint64_t hval = FNV1_64_INIT;
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
while (bp < be) {
hval ^= (uint64_t)*bp++;
hval *= FNV_64_PRIME;
}
return hval;
}
static inline uint64_t fnv_64(uint64_t in) { return fnv_64_buf(&in, sizeof(in)); }
// Generator syntax:
//
// \d+ == fixed
// n[ormal]:mean,sd
// e[xponential]:lambda
// p[areto]:scale,shape
// g[ev]:loc,scale,shape
// fb_value, fb_key, fb_rate
class Generator {
public:
Generator() {}
// Generator(const Generator &g) = delete;
// virtual Generator& operator=(const Generator &g) = delete;
virtual ~Generator() {}
virtual double generate(double U = -1.0) = 0;
virtual void set_lambda(double) {DIE("set_lambda() not implemented");}
protected:
std::string type;
};
class Fixed : public Generator {
public:
Fixed(double _value = 1.0) : value(_value) { D("Fixed(%f)", value); }
virtual double generate(double) { return value; }
virtual void set_lambda(double lambda) {
if (lambda > 0.0) value = 1.0 / lambda;
else value = 0.0;
}
private:
double value;
};
class Uniform : public Generator {
public:
Uniform(double _scale) : scale(_scale) { D("Uniform(%f)", scale); }
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
return scale * U;
}
virtual void set_lambda(double lambda) {
if (lambda > 0.0) scale = 2.0 / lambda;
else scale = 0.0;
}
private:
double scale;
};
class Normal : public Generator {
public:
Normal(double _mean = 1.0, double _sd = 1.0) : mean(_mean), sd(_sd) {
D("Normal(mean=%f, sd=%f)", mean, sd);
}
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
double V = U; // drand48();
double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V);
return mean + sd * N;
}
virtual void set_lambda(double lambda) {
if (lambda > 0.0) mean = 1.0 / lambda;
else mean = 0.0;
}
private:
double mean, sd;
};
class Exponential : public Generator {
public:
Exponential(double _lambda = 1.0) : lambda(_lambda) {
D("Exponential(lambda=%f)", lambda);
}
virtual double generate(double U = -1.0) {
if (lambda <= 0.0) return 0.0;
if (U < 0.0) U = drand48();
return -log(U) / lambda;
}
virtual void set_lambda(double lambda) { this->lambda = lambda; }
private:
double lambda;
};
class GPareto : public Generator {
public:
GPareto(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) :
loc(_loc), scale(_scale), shape(_shape) {
assert(shape != 0.0);
D("GPareto(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
}
virtual double generate(double U = -1.0) {
if (U < 0.0) U = drand48();
return loc + scale * (pow(U, -shape) - 1) / shape;
}
virtual void set_lambda(double lambda) {
if (lambda <= 0.0) scale = 0.0;
else scale = (1 - shape) / lambda - (1 - shape) * loc;
}
private:
double loc /* mu */;
double scale /* sigma */, shape /* k */;
};
class GEV : public Generator {
public:
GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0) :
e(1.0), loc(_loc), scale(_scale), shape(_shape) {
assert(shape != 0.0);
D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
}
virtual double generate(double U = -1.0) {
return loc + scale * (pow(e.generate(U), -shape) - 1) / shape;
}
private:
Exponential e;
double loc /* mu */, scale /* sigma */, shape /* k */;
};
class Discrete : public Generator {
public:
~Discrete() { delete def; }
Discrete(Generator* _def = NULL) : def(_def) {
if (def == NULL) def = new Fixed(0.0);
}
virtual double generate(double U = -1.0) {
double Uc = U;
if (pv.size() > 0 && U < 0.0) U = drand48();
double sum = 0;
for (auto p: pv) {
sum += p.first;
if (U < sum) return p.second;
}
return def->generate(Uc);
}
void add(double p, double v) {
pv.push_back(std::pair<double,double>(p, v));
}
private:
Generator *def;
std::vector< std::pair<double,double> > pv;
};
class KeyGenerator {
public:
KeyGenerator(Generator* _g, double _max = 10000) : g(_g), max(_max) {}
std::string generate(uint64_t ind) {
uint64_t h = fnv_64(ind);
double U = (double) h / (double)ULLONG_MAX;
double G = g->generate(U);
int keylen = MAX(round(G), floor(log10(max)) + 1);
char key[256];
snprintf(key, 256, "%0*" PRIu64, keylen, ind);
// D("%d = %s", ind, key);
return std::string(key);
}
private:
Generator* g;
double max;
};
Generator* createGenerator(std::string str);
Generator* createFacebookKey();
Generator* createFacebookValue();
Generator* createFacebookIA();

16
inc/nm.h Normal file
View File

@ -0,0 +1,16 @@
#pragma once
#include <vector>
constexpr static int NM_LEVEL_NUMA = 0;
constexpr static int NM_LEVEL_CPU = 1;
constexpr static int NM_LEVEL_CORE = 2;
std::vector<struct nm_obj *> * nm_get_nodes();
std::vector<struct nm_obj *> * nm_get_cpus();
std::vector<struct nm_obj *> * nm_get_cores();
// 0 on success
// -1 on error
int nm_init();

37
inc/ntr.h Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include <stdio.h>
#include <stdarg.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#ifdef __cplusplus
extern "C" {
#endif
void ntr_init();
void ntr(int dep, int level, const char * fmt, ...);
void ntr_set_level(int dep, int level);
void ntr_set_output(FILE * f);
int ntr_get_level(int dep);
#ifdef __cplusplus
}
#endif

View File

@ -1,61 +0,0 @@
#pragma once
#include <stdio.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#define NTR_DECL_IMPL \
int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT}; \
FILE * ntr_out = stdout
extern int ntr_log_levels[];
extern FILE * ntr_out;
static inline
void ntr(int dep, int level, const char * fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
static inline
void ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
static inline
void ntr_set_output(FILE * f)
{
if (f != NULL) {
ntr_out = f;
}
}
static inline
int ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}

125
inc/pkt.h
View File

@ -12,29 +12,49 @@
#include <rte_net.h> #include <rte_net.h>
#include <rte_vxlan.h> #include <rte_vxlan.h>
#define IP_DEFTTL 64 /* from RFC 1340. */
#define IP_VERSION 0x40
#define IP_HDRLEN 0x05 /* default IP header length == five 32-bits words. */
#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
#define IP_ADDR_FMT_SIZE 15
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5; constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
const static struct rte_ether_addr PROBE_MAC_ADDR {0x01,0x1B,0x19,0x00,0x00,0x00};
const static uint16_t ETHER_TYPE_LOCAL_EXP = 0x88b5;
struct packet_hdr { struct ptp_hdr {
struct rte_ether_hdr eth_hdr; uint8_t ptp_msg_type;
struct rte_ipv4_hdr ipv4_hdr; uint8_t ptp_ver;
struct rte_udp_hdr udp_hdr; uint8_t unused[34];
} __attribute__((packed)); } __attribute__((packed));
struct packet_data struct pkt_hdr {
{ struct rte_ether_hdr eth_hdr;
struct packet_hdr pkt_hdr; struct ptp_hdr ptp_hdr;
uint16_t type;
uint32_t magic; uint32_t magic;
char payload[0];
} __attribute__((packed));
constexpr static uint16_t PKT_TYPE_LOAD = 0;
constexpr static uint16_t PKT_TYPE_PROBE = 1;
constexpr static uint16_t PKT_TYPE_LOAD_RESP = 2;
constexpr static uint16_t PKT_TYPE_PROBE_RESP = 3;
struct pkt_payload_epoch {
uint32_t epoch; uint32_t epoch;
uint64_t clt_ts_tx; };
uint64_t clt_ts_rx;
uint64_t srv_ts_tx; constexpr static uint16_t PKT_TYPE_STAT = 4;
uint64_t srv_ts_rx; struct pkt_payload_stat {
uint32_t epoch;
uint64_t hw_rx;
uint64_t hw_tx;
uint64_t sw_rx;
uint64_t sw_tx;
};
constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_STAT + 1;
// for fast packet verification
static const uint32_t expected_payload_size[NUM_PKT_TYPES] {
sizeof(struct pkt_payload_epoch), // LOAD
sizeof(struct pkt_payload_epoch), // PROBE
sizeof(struct pkt_payload_epoch), // LOAD_RESP
sizeof(struct pkt_payload_epoch), // PROBE_RESP
sizeof(struct pkt_payload_stat) //STAT
}; };
static inline void static inline void
@ -103,17 +123,17 @@ dump_pkt(struct rte_mbuf *pkt)
} }
// fills the packet with the information except for the payload itself
static inline static inline
struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf, struct pkt_hdr * construct_pkt_hdr(struct rte_mbuf * buf, uint16_t type,
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac, struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac)
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
{ {
rte_pktmbuf_reset(buf); rte_pktmbuf_reset(buf);
struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data)); const uint32_t total_sz = sizeof(struct pkt_hdr) + expected_payload_size[type];
struct pkt_hdr * pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz);
struct rte_ether_hdr * eth_hdr; struct rte_ether_hdr * eth_hdr;
struct rte_ipv4_hdr * ipv4_hdr;
struct rte_udp_hdr * udp_hdr;
if (pkt_data == NULL) if (pkt_data == NULL)
return NULL; return NULL;
@ -122,52 +142,47 @@ struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
buf->nb_segs = 1; buf->nb_segs = 1;
// construct l2 header // construct l2 header
eth_hdr = &pkt_data->pkt_hdr.eth_hdr; eth_hdr = &pkt_data->eth_hdr;
rte_ether_addr_copy(src_mac, &eth_hdr->s_addr); rte_ether_addr_copy(src_mac, &eth_hdr->s_addr);
rte_ether_addr_copy(dst_mac, &eth_hdr->d_addr); if (type == PKT_TYPE_PROBE || type == PKT_TYPE_PROBE_RESP) {
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4); rte_ether_addr_copy(&PROBE_MAC_ADDR, &eth_hdr->d_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_1588);
pkt_data->ptp_hdr.ptp_ver = 0x2; // VER 2
buf->ol_flags |= PKT_TX_IEEE1588_TMST;
} else {
rte_ether_addr_copy(dst_mac, &eth_hdr->d_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_LOCAL_EXP);
pkt_data->ptp_hdr.ptp_ver = 0xff;
}
buf->l2_len = sizeof(struct rte_ether_hdr); buf->l2_len = sizeof(struct rte_ether_hdr);
// construct l3 header pkt_data->ptp_hdr.ptp_msg_type = 0x0; // SYNC
ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr; pkt_data->type = rte_cpu_to_be_16(type);
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr)); pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
ipv4_hdr->version_ihl = IP_VHL_DEF;
ipv4_hdr->type_of_service = 0;
ipv4_hdr->fragment_offset = 0;
ipv4_hdr->time_to_live = IP_DEFTTL;
ipv4_hdr->next_proto_id = IPPROTO_UDP;
ipv4_hdr->packet_id = 0;
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr));
ipv4_hdr->hdr_checksum = 0;
buf->l3_len = sizeof(struct rte_ipv4_hdr);
// construct l4 header
udp_hdr = &pkt_data->pkt_hdr.udp_hdr;
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) -
sizeof(struct rte_ether_hdr) -
sizeof(struct rte_udp_hdr));
buf->l4_len = sizeof(struct rte_udp_hdr);
return pkt_data; return pkt_data;
} }
static inline static inline
struct packet_data * check_valid_packet(struct rte_mbuf * pkt) struct pkt_hdr * check_valid_packet(struct rte_mbuf * pkt)
{ {
struct packet_data * pkt_data = NULL; struct pkt_hdr * pkt_data = NULL;
const uint32_t data_len = rte_pktmbuf_data_len(pkt);
if (rte_pktmbuf_data_len(pkt) < sizeof(struct packet_data)) { if (data_len < sizeof(struct pkt_hdr)) {
return NULL; return NULL;
} }
pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *); pkt_data = rte_pktmbuf_mtod(pkt, struct pkt_hdr *);
if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) { // check MAGIC
if (rte_be_to_cpu_32(pkt_data->magic) != ETHER_FRAME_MAGIC) {
return NULL;
}
// check type and payload size
if ((rte_be_to_cpu_16(pkt_data->type) < NUM_PKT_TYPES) &&
(data_len >= (sizeof(struct pkt_hdr) + expected_payload_size[rte_be_to_cpu_16(pkt_data->type)]))) {
return pkt_data; return pkt_data;
} }

33
inc/util.h Normal file
View File

@ -0,0 +1,33 @@
#pragma once
#include <stdint.h>
#include <time.h>
#include <rte_ip.h>
constexpr static unsigned long S2NS = 100000000UL;
constexpr static unsigned long S2US = 1000000L;
constexpr static uint16_t SERVER_LOAD_PORT = 1234;
constexpr static uint16_t SERVER_PROBE_PORT = 319;
constexpr static uint32_t SERVER_IP = RTE_IPV4(192,168,123,0);
static inline uint64_t
get_time_us()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
constexpr static int NEXT_CPU_NULL = -1;
static inline int
cmask_get_next_cpu(uint64_t * mask)
{
int ffs = ffsll(*mask);
*mask &= ~(1 << (ffs - 1));
return ffs - 1;
}
static inline int
cmask_get_num_cpus(const uint64_t mask)
{
return _mm_popcnt_u64(mask);
}

View File

@ -1,50 +1,100 @@
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cassert>
#include <ctime>
#include <netinet/in.h>
#include <rte_config.h>
#include <rte_common.h> #include <rte_common.h>
#include <rte_eal.h> #include <rte_eal.h>
#include <rte_ethdev.h> #include <rte_ethdev.h>
#include <rte_cycles.h> #include <rte_cycles.h>
#include <rte_lcore.h> #include <rte_lcore.h>
#include <rte_mbuf.h> #include <rte_mbuf.h>
#include <rte_byteorder.h>
#include <rte_config.h>
#include <rte_ether.h> #include <rte_ether.h>
#include <rte_launch.h> #include <rte_launch.h>
#include <rte_log.h>
#include <atomic> #include <atomic>
#include <vector>
#include <fstream>
#include <unistd.h> #include <unistd.h>
#include <signal.h>
#include "nm.h"
#include "pkt.h" #include "pkt.h"
#include "ntrlog.h" #include "ntr.h"
#include "rte_arp.h" #include "util.h"
#include "rte_mbuf_core.h"
NTR_DECL_IMPL;
constexpr unsigned int MBUF_MAX_COUNT = 8191; /* Protocol:
constexpr unsigned int MBUF_CACHE_SIZE = 250; * regular client:
constexpr unsigned int RX_RING_SIZE = 1024; * client -> LOAD -> server
constexpr unsigned int TX_RING_SIZE = 1024; * server -> LOAD_RESP -> client
constexpr unsigned int RX_RING_NUM = 1; * measuring client:
constexpr unsigned int TX_RING_NUM = 1; * client -> PROBE -> server (client tx timestamps)
constexpr unsigned int BURST_SIZE = 32; * server -> PROBE_RESP -> client (client rx timestamps and server tx/rx timestamps)
* server -> STAT -> client (server sends its tx/rx timestamps)
*/
static void * const PROBE_MAGIC = (void*)0x12344444;
constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 32;
static const struct rte_eth_conf port_conf_default{}; static const struct rte_eth_conf port_conf_default{};
// keep track of the probe state
// when a probe packet first arrives this state is set to be influx and the rte_mbuf's userdata is set to PROBE_MAGIC
// which prevents other probe packets to be processed
// when the server sends the probe stats back to user influx is released
// this is to guarantee that the server only processes one probe packet at the time
// XXX: also this can be attached to the mbuf itself and processed by the lcore thread
// I kept this global because globally there could be only one pending probe request
// and rx_add_timestamp can save their shit here too
struct probe_state_t {
struct rte_ether_hdr hdr;
uint32_t epoch;
uint32_t timesync;
uint64_t last_sw_rx;
uint64_t last_sw_tx;
uint64_t last_hw_rx;
};
struct thread_info {
int tid;
int rxqid;
int txqid;
int lcore_id;
};
// state machine:
constexpr static int SERVER_STATE_WAIT = 0;
constexpr static int SERVER_STATE_PROBE = 1;
struct options_t { struct options_t {
//config
int num_threads{1};
uint64_t cpuset{0b010}; //2nd core
//states //states
uint16_t s_portid; uint16_t s_portid;
struct rte_ether_addr s_host_mac; struct rte_ether_addr s_host_mac;
struct rte_mempool * s_pkt_mempool; struct rte_mempool * s_pkt_mempool;
std::atomic<int> s_state {SERVER_STATE_WAIT};
struct probe_state_t s_probe_info;
std::vector<struct thread_info *> s_thr_info;
}; };
struct options_t options; static struct options_t options;
static uint16_t static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{ {
uint64_t now = rte_rdtsc(); uint64_t now = rte_rdtsc();
struct packet_data * pkt_data; struct timespec ts;
struct pkt_hdr * pkt_data;
for (int i = 0; i < nb_pkts; i++) { for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]); pkt_data = check_valid_packet(pkts[i]);
@ -53,135 +103,229 @@ rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
continue; continue;
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now); if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
pkt_data->srv_ts_rx = rte_cpu_to_be_64(now); int state_wait = SERVER_STATE_WAIT;
pkts[i]->userdata = nullptr;
if (rte_eth_timesync_read_rx_timestamp(port, &ts, pkts[i]->timesync & 0x3) == 0) {
if (options.s_state.compare_exchange_strong(state_wait, SERVER_STATE_PROBE)) {
// mark the mbuf as probe packet being processed
// only the locore that receives the pkt w/ userdata != nullptr processes that packet
pkts[i]->userdata = PROBE_MAGIC;
// tag with timestamps
options.s_probe_info.last_hw_rx = ts.tv_nsec + ts.tv_sec * S2NS;
options.s_probe_info.last_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: tagged packet %p epoch %d with sw: %llu hw:%llu.\n", (void*)pkts[i], options.s_probe_info.epoch, now, options.s_probe_info.last_hw_rx);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - server is processing a probe.\n", (void*)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n", (void*)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: packet %p not tagged - type %d.\n", (void*)pkts[i], rte_be_to_cpu_16(pkt_data->type));
} }
return nb_pkts; return nb_pkts;
} }
static uint16_t static uint16_t
tx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused, tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{ {
uint64_t now = rte_rdtsc(); uint64_t now = rte_rdtsc();
struct packet_data * pkt_data; struct pkt_hdr * pkt_data;
for (int i = 0; i < nb_pkts; i++) { for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]); pkt_data = check_valid_packet(pkts[i]);
if (pkt_data == NULL) { if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_calc_latency: ignoring invalid packet %p.\n", (void*)pkts[i]); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]);
continue; continue;
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now); if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
pkt_data->srv_ts_tx = rte_cpu_to_be_64(now); // this packet is the response to PROBE packets
// at this time the packet is not sent to the NIC yet so
// the state must be waiting stats
// XXX: this should be an assert
if(options.s_state.load() != SERVER_STATE_PROBE || pkts[i]->userdata != PROBE_MAGIC) {
rte_exit(EXIT_FAILURE, "packet %p sent to NIC before sw callback\n", (void*)pkts[i]);
}
options.s_probe_info.last_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: tagged packet %p with sw tx %llu\n", (void*)pkts[i], options.s_probe_info.last_sw_tx);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_add_timestamp: packet %p not tagged - type %d\n", (void*)pkts[i], pkt_data->type);
}
} }
return nb_pkts; return nb_pkts;
} }
static int static int
locore_main(void * _unused __rte_unused) locore_main(void * ti)
{ {
struct thread_info * tinfo = (struct thread_info *)ti;
struct rte_mbuf *bufs[BURST_SIZE]; struct rte_mbuf *bufs[BURST_SIZE];
// + 1 because it might involve an extra PKT_TYPE_STAT packet
// when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE]; struct rte_mbuf *tx_bufs[BURST_SIZE];
struct packet_data *pkt_data; struct pkt_hdr *pkt_data;
uint32_t core_id = rte_lcore_id();
bool pending_probe = false;
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will " "polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid); "not be optimal.\n", tinfo->tid, options.s_portid);
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running.\n", core_id); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main <thread %d>: running on locore %d with txidx %d and rxidx %d.\n", tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid);
while(true) { while(true) {
uint16_t nb_tx = 0; uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE); const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, tinfo->rxqid, bufs, BURST_SIZE);
struct rte_mbuf * pkt_buf;
if (nb_rx == 0) { struct pkt_hdr * tx_data;
continue;
}
for(int i = 0; i < nb_rx; i++) { for(int i = 0; i < nb_rx; i++) {
// XXX: optimization: in rx_add_timestamp every packet is already validated once
// can just mark valid packet with a value so we can avoid this redundant check
pkt_data = check_valid_packet(bufs[i]); pkt_data = check_valid_packet(bufs[i]);
if (pkt_data == NULL) { if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main <thread %d>: skipping invalid packet %p.\n", tinfo->tid, (void*)bufs[i]);
dump_pkt(bufs[i]); //dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]); rte_pktmbuf_free(bufs[i]);
continue; continue;
} }
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main <thread %d>: packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n",
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr); tinfo->tid,
uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port);
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n",
core_id,
(void*)bufs[i], (void*)bufs[i],
(src_ip >> 24) & 0xff, pkt_data->eth_hdr.s_addr.addr_bytes[0],
(src_ip >> 16) & 0xff, pkt_data->eth_hdr.s_addr.addr_bytes[1],
(src_ip >> 8) & 0xff, pkt_data->eth_hdr.s_addr.addr_bytes[2],
(src_ip >> 0) & 0xff, pkt_data->eth_hdr.s_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0], pkt_data->eth_hdr.s_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1], pkt_data->eth_hdr.s_addr.addr_bytes[5],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2], pkt_data->eth_hdr.d_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3], pkt_data->eth_hdr.d_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4], pkt_data->eth_hdr.d_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5], pkt_data->eth_hdr.d_addr.addr_bytes[3],
(dst_ip >> 24) & 0xff, pkt_data->eth_hdr.d_addr.addr_bytes[4],
(dst_ip >> 16) & 0xff, pkt_data->eth_hdr.d_addr.addr_bytes[5],
(dst_ip >> 8) & 0xff, rte_be_to_cpu_16(pkt_data->type));
(dst_ip >> 0) & 0xff,
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5],
src_port,
dst_port,
rte_be_to_cpu_32(pkt_data->epoch));
// swap s_addr and d_addr
struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf");
}
struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf,
&options.s_host_mac, switch (rte_be_to_cpu_16(pkt_data->type)) {
&pkt_data->pkt_hdr.eth_hdr.s_addr, case PKT_TYPE_PROBE: {
dst_ip, if (options.s_state.load() == SERVER_STATE_PROBE && bufs[i]->userdata == PROBE_MAGIC) {
src_ip, // send back probe_resp pkt to probe for return latency
dst_port, pending_probe = true;
src_port);
if (tx_data == NULL) { // book keep probe results
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf); options.s_probe_info.epoch = rte_be_to_cpu_32(((struct pkt_payload_epoch *)pkt_data->payload)->epoch);
options.s_probe_info.timesync = bufs[i]->timesync;
rte_memcpy(&options.s_probe_info.hdr, &pkt_data->eth_hdr, sizeof(struct rte_ether_hdr));
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
}
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP,
&options.s_host_mac,
&pkt_data->eth_hdr.s_addr);
if (tx_data == NULL) {
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
}
rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch));
pkt_buf->userdata = PROBE_MAGIC;
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
}
break;
}
case PKT_TYPE_LOAD: {
// we reply to load packet regardless of the server state
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
}
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP,
&options.s_host_mac,
&pkt_data->eth_hdr.s_addr);
if (tx_data == NULL) {
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
}
rte_memcpy(tx_data->payload, pkt_data->payload, sizeof(struct pkt_payload_epoch));
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
break;
}
default:
break;
} }
// copy, endianess doesn't matter
tx_data->epoch = pkt_data->epoch;
tx_data->magic = pkt_data->magic;
tx_data->clt_ts_rx = pkt_data->clt_ts_rx;
tx_data->clt_ts_tx = pkt_data->clt_ts_tx;
tx_data->srv_ts_rx = pkt_data->srv_ts_rx;
tx_data->srv_ts_tx = pkt_data->srv_ts_tx;
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
// free rx packet
rte_pktmbuf_free(bufs[i]); rte_pktmbuf_free(bufs[i]);
} }
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); // send the packets
// cleanup unsent packets if (nb_tx > 0) {
// don't need to free others because it's offloaded const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
if (nb_tx_succ < nb_tx) { if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); rte_exit(EXIT_FAILURE, "failed to send some packets.\n");
}
}
// we wanna check every loop not only when there are packets
if (pending_probe) {
struct timespec ts;
struct pkt_payload_stat * stat;
if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main <thread %d>: obtained hw tx timestamp %lld.\n", tinfo->tid, ts.tv_sec * S2NS + ts.tv_nsec);
// now we have everything we need
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
}
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT,
&options.s_host_mac,
&options.s_probe_info.hdr.s_addr);
// populate stats
stat = (struct pkt_payload_stat *)tx_data->payload;
stat->epoch = rte_cpu_to_be_32(options.s_probe_info.epoch);
stat->hw_rx = rte_cpu_to_be_64(options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(ts.tv_nsec + ts.tv_sec * S2NS);
stat->sw_rx = rte_cpu_to_be_64(options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(options.s_probe_info.last_sw_tx);
// send the packet
if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) {
rte_exit(EXIT_FAILURE, "failed to send some packets.\n");
}
// release flux
pending_probe = false;
int expected = SERVER_STATE_PROBE;
if (!options.s_state.compare_exchange_strong(expected, SERVER_STATE_WAIT)) {
rte_exit(EXIT_FAILURE, "s_state changed unexpectedly!");
}
}
} }
} }
@ -216,7 +360,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */ /* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf); ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf);
if (ret != 0) if (ret != 0)
return ret; return ret;
@ -224,21 +368,23 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */ /* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf; rxconf = dev_info.default_rxconf;
for (uint32_t i = 0; i < RX_RING_NUM; i++) { for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0) if (ret < 0)
return ret; return ret;
options.s_thr_info.at(i)->rxqid = i;
} }
txconf = dev_info.default_txconf; txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads; txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */ /* Allocate and set up 1 TX queue per thread per Ethernet port. */
for (uint32_t i = 0; i < TX_RING_NUM; i++) { for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0) if (ret < 0)
return ret; return ret;
options.s_thr_info.at(i)->txqid = i;
} }
ret = rte_eth_dev_start(portid); ret = rte_eth_dev_start(portid);
@ -251,13 +397,20 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */ /* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid); ret = rte_eth_promiscuous_enable(portid);
if (ret != 0) if (ret != 0)
return ret; return ret;
if (rte_eth_add_tx_callback(portid, 0, tx_calc_latency, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) { for (int i = 0; i < options.num_threads; i++) {
return -1; if (rte_eth_add_tx_callback(portid, options.s_thr_info.at(i)->txqid, tx_add_timestamp, NULL) == NULL ||
rte_eth_add_rx_callback(portid, options.s_thr_info.at(i)->rxqid, rx_add_timestamp, NULL) == NULL) {
return -1;
}
} }
return 0; return 0;
@ -268,13 +421,33 @@ static void usage()
fprintf(stdout, fprintf(stdout,
"Usage:\n" \ "Usage:\n" \
" -v(vv): verbose mode\n" \ " -v(vv): verbose mode\n" \
" -h: display the information\n"); " -h: seek help\n" \
" -A: cpu mask for worker threads\n");
fflush(stdout);
}
static void dump_options()
{
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: khat configuration:\n" \
" verbosity: +%d\n" \
" thread count: %d\n" \
" thread mask: %lld\n\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_DEFAULT,
options.num_threads,
options.cpuset);
} }
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
unsigned int nb_ports; unsigned int nb_ports;
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; struct rte_mempool *mbuf_pool;
ntr_init();
if (nm_init() != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
// init dpdk // init dpdk
int ret = rte_eal_init(argc, argv); int ret = rte_eal_init(argc, argv);
@ -290,14 +463,21 @@ int main(int argc, char* argv[])
{ {
int c; int c;
// parse arguments // parse arguments
while((c = getopt(argc, argv, "hv")) != -1) { while((c = getopt(argc, argv, "hvA:")) != -1) {
switch (c) { switch (c) {
case 'v': case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break; break;
case 'h': case 'h':
usage(); usage();
rte_exit(EXIT_SUCCESS, NULL); rte_exit(EXIT_SUCCESS, "\n");
break;
case 'A':
options.cpuset = strtoull(optarg, nullptr, 16);
options.num_threads = cmask_get_num_cpus(options.cpuset);
if (options.num_threads == 0) {
rte_exit(EXIT_FAILURE, "must run at least one thread\n");
}
break; break;
default: default:
usage(); usage();
@ -307,33 +487,36 @@ int main(int argc, char* argv[])
} }
} }
// XXX: singal handler to exit dump_options();
nb_ports = rte_eth_dev_count_avail(); nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) { if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
} }
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
// create a pkt mbuf memory pool on the socket
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool_pkt == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n");
}
options.s_pkt_mempool = mbuf_pool_pkt;
uint16_t portid = rte_eth_find_next(0); uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) { if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n"); rte_exit(EXIT_FAILURE, "cannot find an available port\n");
} }
options.s_portid = portid; options.s_portid = portid;
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.s_pkt_mempool = mbuf_pool;
// init threads
uint64_t cpuset = options.cpuset;
for(int i = 0; i < options.num_threads; i++) {
struct thread_info * tinfo = new thread_info;
tinfo->tid = i;
tinfo->lcore_id = cmask_get_next_cpu(&cpuset);
options.s_thr_info.push_back(tinfo);
}
if (port_init(portid, mbuf_pool) != 0) { if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
} }
@ -342,7 +525,7 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid); rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid);
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d on socket %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, rte_eth_dev_socket_id(portid),
options.s_host_mac.addr_bytes[0], options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1], options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2], options.s_host_mac.addr_bytes[2],
@ -350,26 +533,22 @@ int main(int argc, char* argv[])
options.s_host_mac.addr_bytes[4], options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]); options.s_host_mac.addr_bytes[5]);
usleep(S2US);
uint16_t lcore_id = rte_get_next_lcore(0, true, false); for(int i = 0; i < options.num_threads; i++) {
struct thread_info * tinfo = options.s_thr_info.at(i);
if (lcore_id == RTE_MAX_LCORE) { ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread %d on locore %d\n", tinfo->tid, tinfo->lcore_id);
rte_exit(EXIT_FAILURE, "cannot detect lcores.\n"); if (rte_eal_remote_launch(locore_main, (void *)options.s_thr_info.at(i), tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", tinfo->lcore_id);
}
} }
if (rte_eal_remote_launch(locore_main, NULL, lcore_id) != 0) { for(int i = 0; i < options.num_threads; i++) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", lcore_id); struct thread_info * tinfo = options.s_thr_info.at(i);
} ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
// while(true) { rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", tinfo->lcore_id);
// struct rte_eth_stats stats; }
// rte_eth_stats_get(portid, &stats);
// printf("recv: %d missed: %d err: %d\n",(uint32_t)stats.ipackets, (uint32_t)stats.imissed,(uint32_t)stats.ierrors);
// usleep(1000000);
// }
if (rte_eal_wait_lcore(lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", lcore_id);
} }
// shouldn't get here // shouldn't get here

74
libgen/generator.cc Normal file
View File

@ -0,0 +1,74 @@
// modified from mutilate
#include "gen.h"
Generator* createFacebookKey() { return new GEV(30.7984, 8.20449, 0.078688); }
Generator* createFacebookValue() {
Generator* g = new GPareto(15.0, 214.476, 0.348238);
Discrete* d = new Discrete(g);
d->add(0.00536, 0.0);
d->add(0.00047, 1.0);
d->add(0.17820, 2.0);
d->add(0.09239, 3.0);
d->add(0.00018, 4.0);
d->add(0.02740, 5.0);
d->add(0.00065, 6.0);
d->add(0.00606, 7.0);
d->add(0.00023, 8.0);
d->add(0.00837, 9.0);
d->add(0.00837, 10.0);
d->add(0.08989, 11.0);
d->add(0.00092, 12.0);
d->add(0.00326, 13.0);
d->add(0.01980, 14.0);
return d;
}
Generator* createFacebookIA() { return new GPareto(0, 16.0292, 0.154971); }
Generator* createGenerator(std::string str) {
if (!strcmp(str.c_str(), "fb_key")) return createFacebookKey();
else if (!strcmp(str.c_str(), "fb_value")) return createFacebookValue();
else if (!strcmp(str.c_str(), "fb_ia")) return createFacebookIA();
char *s_copy = new char[str.length() + 1];
strcpy(s_copy, str.c_str());
char *saveptr = NULL;
if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) {
double v = atof(s_copy);
delete[] s_copy;
return new Fixed(v);
}
char *t_ptr = strtok_r(s_copy, ":", &saveptr);
char *a_ptr = strtok_r(NULL, ":", &saveptr);
if (t_ptr == NULL) // || a_ptr == NULL)
DIE("strtok(.., \":\") failed to parse %s", str.c_str());
saveptr = NULL;
char *s1 = strtok_r(a_ptr, ",", &saveptr);
char *s2 = strtok_r(NULL, ",", &saveptr);
char *s3 = strtok_r(NULL, ",", &saveptr);
double a1 = s1 ? atof(s1) : 0.0;
double a2 = s2 ? atof(s2) : 0.0;
double a3 = s3 ? atof(s3) : 0.0;
delete[] s_copy;
if (strcasestr(str.c_str(), "fixed")) return new Fixed(a1);
else if (strcasestr(str.c_str(), "normal")) return new Normal(a1, a2);
else if (strcasestr(str.c_str(), "exponential")) return new Exponential(a1);
else if (strcasestr(str.c_str(), "pareto")) return new GPareto(a1, a2, a3);
else if (strcasestr(str.c_str(), "gev")) return new GEV(a1, a2, a3);
else if (strcasestr(str.c_str(), "uniform")) return new Uniform(a1);
DIE("Unable to create Generator '%s'", str.c_str());
return NULL;
}

127
libnm/nm.cc Normal file
View File

@ -0,0 +1,127 @@
#include <hwloc.h>
#include <vector>
#include <algorithm>
#include "nm.h"
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
static bool nm_obj_comparator(struct nm_obj * a, struct nm_obj * b)
{
return a->id < b->id;
}
static std::vector<struct nm_obj *> nodes;
static std::vector<struct nm_obj *> cores;
static std::vector<struct nm_obj *> cpus;
std::vector<struct nm_obj *> * nm_get_nodes()
{
return &nodes;
}
std::vector<struct nm_obj *> * nm_get_cpus()
{
return &cpus;
}
std::vector<struct nm_obj *> * nm_get_cores()
{
return &cores;
}
hwloc_obj_t get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type)
{
while(obj != nullptr) {
if (obj->type == type) {
break;
}
obj = obj->parent;
}
return obj;
}
// 0 on success
// -1 on error
int nm_init()
{
int ret;
hwloc_topology * topo;
if ((ret = hwloc_topology_init(&topo)) != 0) {
return ret;
}
if ((ret = hwloc_topology_load(topo)) != 0)
return ret;
// populate numa nodes
hwloc_obj_t obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PACKAGE, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_NUMA;
each->parent = nullptr;
nodes.push_back(each);
printf("libnm: identified NUMA node %d\n", each->id);
}
std::sort(nodes.begin(), nodes.end(), nm_obj_comparator);
// populate cpus
obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_CORE, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CPU;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_PACKAGE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = nodes.at(parent->logical_index);
each->parent->children.push_back(each);
cpus.push_back(each);
printf("libnm: identified CPU %d on NUMA node %d\n", each->id, each->parent->id);
}
std::sort(cpus.begin(), cpus.end(), nm_obj_comparator);
// populate cores
obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PU, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CORE;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_CORE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = cpus.at(parent->logical_index);
each->parent->children.push_back(each);
cores.push_back(each);
printf("libnm: identified core %d on CPU %d, NUMA node %d\n", each->id, each->parent->id, each->parent->parent->id);
}
std::sort(cores.begin(), cores.end(), nm_obj_comparator);
return ret;
}

43
libntr/ntr.c Normal file
View File

@ -0,0 +1,43 @@
#include "ntr.h"
static int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT};
static FILE * ntr_out;
void ntr_init()
{
ntr_out = stdout;
}
void ntr(int dep, int level, const char * fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
void ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
void ntr_set_output(FILE * f)
{
if (f != NULL) {
ntr_out = f;
}
}
int ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}

544
rat/rat.cc Normal file
View File

@ -0,0 +1,544 @@
#include <cstdio>
#include <ctime>
#include <netinet/in.h>
#include <rte_config.h>
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_log.h>
#include <rte_byteorder.h>
#include <rte_ip.h>
#include <atomic>
#include <vector>
#include <fstream>
#include <unistd.h>
#include "nm.h"
#include "gen.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
constexpr static unsigned int MBUF_MAX_COUNT = 16384;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 32;
constexpr static unsigned int MODE_MASTER = 0;
constexpr static unsigned int MODE_CLIENT = 1;
static const struct rte_eth_conf port_conf_default{};
struct datapt {
uint32_t epoch;
uint32_t valid;
uint64_t clt_hw_tx;
uint64_t clt_sw_tx;
uint64_t clt_hw_rx;
uint64_t clt_sw_rx;
uint64_t srv_hw_tx;
uint64_t srv_sw_tx;
uint64_t srv_hw_rx;
uint64_t srv_sw_rx;
};
struct thread_info {
unsigned int id;
unsigned int rxqid{0};
unsigned int txqid{0};
std::vector<struct datapt *> data;
struct datapt * last_datapt{nullptr};
unsigned int tot_send{0};
unsigned int tot_recv{0};
Generator * ia_gen;
};
struct options_t {
unsigned int run_time{5};
unsigned int warmup_time{0};
unsigned int num_threads{1};
unsigned int mode{MODE_MASTER};
char output[256] = "output.txt";
char ia_gen[256] = "fixed:1";
struct rte_ether_addr server_mac;
uint64_t cpu_mask;
// states
struct rte_mempool * mbuf_pool;
struct rte_ether_addr s_host_mac;
uint16_t s_portid;
std::vector<struct thread_info *> s_thr_info;
std::atomic<uint32_t> s_epoch;
std::atomic<bool> s_stop {false};
std::atomic<uint32_t> s_record {0};
};
static struct options_t options;
// static struct thread_info * get_thread_info(int qid)
// {
// return options.s_thr_info.at(qid);
// }
static int
locore_main(void * tif)
{
struct thread_info * tinfo = (struct thread_info *)tif;
struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
uint32_t core_id = rte_lcore_id();
int32_t ret;
bool read_tx = true;
bool recv_stat = true;
bool recv_resp = true;
uint64_t next_ts;
// XXX: check link status instead
sleep(1);
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running thread %d...\n", core_id, tinfo->id);
next_ts = get_time_us();
while(!options.s_stop.load()) {
uint64_t now = get_time_us();
// always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, rx_bufs, BURST_SIZE);
if (nb_rx > 0) {
for (int i = 0; i < nb_rx; i++) {
struct pkt_hdr * each = check_valid_packet(rx_bufs[i]);
if (each == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: ignoring invalid packet %p.\n", (void*)rx_bufs[i]);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
uint16_t type = rte_be_to_cpu_16(each->type);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: received packet %p type %d.\n", (void*)rx_bufs[i], type);
switch (type) {
struct pkt_payload_epoch * pld_epoch;
struct pkt_payload_stat * pld_stat;
uint32_t epoch;
case PKT_TYPE_PROBE_RESP:
pld_epoch = (struct pkt_payload_epoch *)each->payload;
epoch = rte_be_to_cpu_32(pld_epoch->epoch);
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch);
break;
}
tinfo->tot_recv++;
recv_resp = true;
break;
case PKT_TYPE_STAT:
pld_stat = (struct pkt_payload_stat *)each->payload;
epoch = rte_be_to_cpu_32(pld_stat->epoch);
if (tinfo->last_datapt == nullptr || epoch != tinfo->last_datapt->epoch) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: packet %p epoch %d doesn't match datapt %d.\n", (void*)rx_bufs[i], epoch, tinfo->last_datapt->epoch);
break;
}
tinfo->last_datapt->srv_hw_tx = rte_be_to_cpu_64(pld_stat->hw_tx);
tinfo->last_datapt->srv_hw_rx = rte_be_to_cpu_64(pld_stat->hw_rx);
tinfo->last_datapt->srv_sw_tx = rte_be_to_cpu_64(pld_stat->sw_tx);
tinfo->last_datapt->srv_sw_rx = rte_be_to_cpu_64(pld_stat->sw_rx);
tinfo->tot_recv++;
recv_stat = true;
break;
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: ignoring packet %p with unknown type %d.\n", (void*)rx_bufs[i], type);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
rte_pktmbuf_free(rx_bufs[i]);
}
}
if (read_tx && recv_stat & recv_resp) {
// if we have all the data
if (tinfo->last_datapt != nullptr) {
// push the data to the queue if we haven't done so already
tinfo->data.push_back(tinfo->last_datapt);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: datapt for epoch %d dump:\n" \
" Valid: %d\n"
" client TX HW: %llu\n" \
" client TX SW: %llu\n" \
" client RX HW: %llu\n" \
" client RX SW: %llu\n" \
" server TX HW: %llu\n" \
" server TX SW: %llu\n" \
" server RX HW: %llu\n" \
" server RX SW: %llu\n\n",
tinfo->last_datapt->epoch,
tinfo->last_datapt->valid,
tinfo->last_datapt->clt_hw_tx,
tinfo->last_datapt->clt_sw_tx,
tinfo->last_datapt->clt_hw_rx,
tinfo->last_datapt->clt_sw_rx,
tinfo->last_datapt->srv_hw_tx,
tinfo->last_datapt->srv_sw_tx,
tinfo->last_datapt->srv_hw_rx,
tinfo->last_datapt->srv_sw_rx);
tinfo->last_datapt = nullptr;
}
if (now >= next_ts) {
struct pkt_payload_epoch * pld_epoch;
uint32_t epoch;
next_ts += (int)(tinfo->ia_gen->generate() * 1000000.0);
// generate the packet
tx_buf = rte_pktmbuf_alloc(options.mbuf_pool);
if (tx_buf == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate tx_buf\n");
}
pkt_data = construct_pkt_hdr(tx_buf, PKT_TYPE_PROBE,
&options.s_host_mac, &options.server_mac);
if (pkt_data == NULL) {
rte_exit(EXIT_FAILURE, "cannot allocate space for packet_data in mbuf\n");
}
epoch = options.s_epoch.fetch_add(1);
pld_epoch = (struct pkt_payload_epoch *)pkt_data->payload;
pld_epoch->epoch = rte_cpu_to_be_32(epoch);
tinfo->last_datapt = new struct datapt;
tinfo->last_datapt->epoch = epoch;
tinfo->last_datapt->valid = options.s_record.load();
read_tx = false;
recv_resp = false;
recv_stat = false;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: sending packet %p with epoch %d\n", (void*)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(options.s_portid, tinfo->txqid, &tx_buf, 1);
if (nb_tx != 1) {
rte_exit(EXIT_FAILURE, "failed to send packet 0x%p, epoch %d\n", (void*)tx_buf, epoch);
}
}
}
if (!read_tx) {
struct timespec ts;
if ((ret = rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts)) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: read hw tx timestamp %lld.\n", ts.tv_nsec + ts.tv_sec * S2NS);
tinfo->last_datapt->clt_hw_tx = ts.tv_nsec + ts.tv_sec * S2NS;
read_tx = true;
}
}
}
rte_pktmbuf_free(tx_buf);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d successfully stopped.\n", core_id);
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info;
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf;
struct rte_eth_rxconf rxconf;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
if(!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per thread . */
rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < options.num_threads; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr;
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
return 0;
}
static void dump_options()
{
fprintf(stdout, "Configuration:\n" \
" run time = %d\n" \
" warmup time = %d\n" \
" output file = %s\n" \
" server MAC = %x:%x:%x:%x:%x:%x\n",
options.run_time,
options.warmup_time,
options.output,
options.server_mac.addr_bytes[0],
options.server_mac.addr_bytes[1],
options.server_mac.addr_bytes[2],
options.server_mac.addr_bytes[3],
options.server_mac.addr_bytes[4],
options.server_mac.addr_bytes[5]);
}
static void usage()
{
fprintf(stdout,
"Usage:\n " \
" -v(vv): verbose mode\n" \
" -h: display the information\n" \
" -o: output filename\n" \
" -t: run time\n" \
" -T: warmup time\n" \
" -s: server's mac\n" \
" -A: affinity mask\n" \
" -a: number of threads\n" \
" -C: client mode\n"
" -i: inter-arrival time distribution\n\n");
}
// static void int_handler(int)
// {
// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n");
// }
int main(int argc, char* argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
std::ofstream log_file;
struct thread_info *tinfo;
ntr_init();
if (nm_init() != 0)
rte_exit(EXIT_FAILURE, "failed to init libnm\n");
// signal(SIGINT, int_handler);
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while((c = getopt(argc, argv, "hvo:t:T:s:A:a:Ci:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 's':
if (rte_ether_unformat_addr(optarg, &options.server_mac) == -1) {
rte_exit(EXIT_FAILURE, "cannot parse %s as mac address.\n", optarg);
}
break;
case 't':
options.run_time = atoi(optarg);
break;
case 'T':
options.warmup_time = atoi(optarg);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "success\n");
case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1);
break;
case 'A':
options.cpu_mask = atoll(optarg);
break;
case 'a':
options.num_threads = atoi(optarg);
break;
case 'C':
options.mode = MODE_CLIENT;
break;
case 'i':
strncpy(options.ia_gen, optarg, sizeof(options.ia_gen) - 1);
break;
default:
usage();
rte_exit(EXIT_FAILURE, "unknown argument: %c\n", c);
break;
}
}
}
// open log file for writing
if (options.mode == MODE_MASTER) {
log_file.open(options.output, std::ofstream::out);
if (!log_file) {
rte_exit(EXIT_FAILURE, "failed to open log file %s\n", options.output);
}
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(options.s_portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.mbuf_pool = mbuf_pool;
for(int i = 0; i < 1; i++) {
tinfo = new thread_info;
tinfo->id = i;
tinfo->ia_gen = createGenerator(options.ia_gen);
options.s_thr_info.push_back(tinfo);
}
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid,
options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]);
dump_options();
sleep(1);
uint16_t core_id = rte_get_next_lcore(0, true, false);
if (rte_eal_remote_launch(locore_main, options.s_thr_info.at(0), core_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore\n");
}
// poor man's timer
// XXX: use kqueue instead
struct timespec ts;
ts.tv_sec = 1;
ts.tv_nsec = 0;
uint32_t second = 0;
while(true) {
if (second >= options.warmup_time) {
options.s_record.store(1);
}
if (second >= options.run_time + options.warmup_time) {
options.s_stop.store(true);
break;
}
clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL);
second++;
}
if (rte_eal_wait_lcore(core_id) < 0)
rte_exit(EXIT_FAILURE, "failed to wait for job completion\n");
// dump stats
if (options.mode == MODE_MASTER) {
thread_info * master_thrd = options.s_thr_info.at(0);
for (auto it : master_thrd->data) {
if (it->valid) {
log_file << it->clt_sw_rx << ',' << it->clt_sw_tx << ','
<< it->clt_hw_rx << ',' << it->clt_hw_tx << ','
<< it->srv_sw_rx << ',' << it->srv_sw_tx << ','
<< it->srv_hw_rx << ',' << it->srv_hw_tx << std::endl;
}
}
}
log_file.close();
// clean up
rte_eth_dev_stop(portid);
rte_eth_dev_close(portid);
return 0;
}

38
scripts/compile.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/sh
test_dir="/numam.d"
root=".."
servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca"
rsync_flags="-vchr"
ssh_args="-o StrictHostKeyChecking=no -p77"
user=$1
if [ -z $user ]
then
user=$(whoami)
fi
echo "USER: $user"
compile() {
# separate these functions because we might change kernel (reboot) without needing to recompile
echo "====================$1===================="
echo "Syncing directories..."
ssh $(echo $ssh_args $user@$1) "sudo mkdir -p $test_dir"
ssh $(echo $ssh_args $user@$1) "sudo chmod 777 $test_dir"
rsync $(echo $rsync_flags) -e 'ssh -p 77' $root/ $user@$1:$test_dir/
echo "Compiling..."
ssh $(echo $ssh_args $user@$1) "mkdir -p $test_dir/build; cd $test_dir/build; cmake ../; make clean all -j8" &
wait
echo "$1 Done."
echo ""
}
i=0
for server in $servers
do
i=$(expr $i + 1)
compile "$server" &
done
wait

83
scripts/histo.py Normal file
View File

@ -0,0 +1,83 @@
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import numpy as np
import sys
import re
import os
import json
import getopt
import math
import concurrent.futures as CF
import libpar as par
num_bins = 100
extra_pct = []
def saveplot(fp : str, data : [], title : str):
plt.hist(data, num_bins)
plt.xlabel("Delay")
plt.title(title)
plt.ylabel("Frequency")
plt.title(os.path.basename(fp))
f = plt.gcf()
f.set_size_inches(11.69, 8.27)
f.savefig(fp + "_" + title + "_" + ".png", dpi=160)
plt.clf()
print("Generated - " + fp + "_" + title + "_" + ".png")
executor = CF.ProcessPoolExecutor(max_workers=int(os.cpu_count()))
def process_file(each_dir):
try:
print("Processing " + each_dir + " ...")
with open(each_dir, 'r') as f:
parser = par.khat_parser()
parser.parse(f.read())
sh = []
ss = []
ch = []
cs = []
for pt in parser.datapt:
sh.append(pt.s_htx - pt.s_hrx)
ss.append(pt.s_stx - pt.s_srx)
ch.append(pt.c_hrx - pt.c_htx)
cs.append(pt.c_srx - pt.c_stx)
saveplot(each_dir, sh, "server_hw_delay")
saveplot(each_dir, ss, "server_sw_delay")
saveplot(each_dir, ch, "client_hw_delay")
saveplot(each_dir, cs, "client_sw_delay")
except Exception:
print("Unexpected error:", sys.exc_info())
def process_dir(rootdir):
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
if os.path.isfile(each_dir):
if each_dir.endswith("sample.txt") or each_dir.endswith(".sample"):
#executor.submit(process_file, each_dir)
process_file(each_dir)
else:
process_dir(each_dir)
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
datdir = "/home/oscar/projs/kqsched/scripts/pingpong/results.d/sample"
#raise Exception("Must specify -d parameter")
process_dir(datdir)
executor.shutdown()
if __name__ == "__main__":
main()

113
scripts/libs/libpar.py Normal file
View File

@ -0,0 +1,113 @@
import json
import numpy as np
class khat_parser:
class pt:
def __init__(self):
self.s_htx = 0
self.s_hrx = 0
self.s_stx = 0
self.s_srx = 0
self.c_htx = 0
self.c_hrx = 0
self.c_stx = 0
self.c_srx = 0
def __init__(self):
self.datapt = []
def parse(self, output : str):
for line in output.splitlines():
cells = line.split(',')
if len(cells) != 8:
raise Exception("Invalid line:" + line)
pt = self.pt()
pt.c_srx = int(cells[0])
pt.c_stx = int(cells[1])
pt.c_hrx = int(cells[2])
pt.c_htx = int(cells[3])
pt.s_srx = int(cells[4])
pt.s_stx = int(cells[5])
pt.s_hrx = int(cells[6])
pt.s_htx = int(cells[7])
self.datapt.append(pt)
class mutilate_data:
def __init__(self):
self.dat = {}
self.qps = 0
def to_string(self):
ret = "Throughput: " + str(self.qps) + "\n" + json.dumps(self.dat)
return ret
@staticmethod
def parse_mut_output(output):
ret = mutilate_data()
succ_qps = False
succ_read = False
table = [None, "avg", "std", "min", "5th", "10th", "50th", "90th", "95th", "99th"]
table_legacy = [None, "avg", "std", "min", "5th", "10th", "90th", "95th", "99th"]
for line in output.splitlines():
if line.find("Total QPS") != -1:
spl = line.split()
if len(spl) == 7:
ret.qps = float(spl[3])
succ_qps = True
else:
break
elif line.find("read") != -1:
spl = line.split()
if len(spl) == 10:
for i in range(1, len(spl)):
ret.dat[table[i]] = float(spl[i])
succ_read = True
elif len(spl) == 9:
for i in range(1, len(spl)):
ret.dat[table_legacy[i]] = float(spl[i])
succ_read = True
else:
break
if not (succ_qps and succ_read):
raise Exception("Failed to parse data")
return ret
@staticmethod
def parse_mut_sample(fn):
f = open(fn, "r")
qps = []
lat = []
lines = f.readlines()
for line in lines:
entry = line.split()
if len(entry) != 2:
raise Exception("Unrecognized line: " + line)
qps.append(float(entry[0]))
lat.append(float(entry[1]))
f.close()
return qps, lat
# generate mutilate output format
@staticmethod
def build_mut_output(lat_arr, qps_arr):
output = '{0: <10}'.format('#type') + '{0: >10}'.format('avg') + '{0: >10}'.format('std') + \
'{0: >10}'.format('min') + '{0: >10}'.format('5th') + '{0: >10}'.format('10th') + \
'{0: >10}'.format('50th') + '{0: >10}'.format('90th') + '{0: >10}'.format('95th') + '{0: >10}'.format('99th') + "\n"
output += '{0: <10}'.format('read') + '{0: >10}'.format("{:.1f}".format(np.mean(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.std(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.min(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 5))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 10))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 50))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 90))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 95))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 99))) + ' ' + "\n" \
output += "\n" + "Total QPS = " + "{:.1f}".format(np.mean(qps_arr)) + " (0 / 0s)"
return output

175
scripts/libs/libtc.py Normal file
View File

@ -0,0 +1,175 @@
import subprocess as sp
import time
import select
import os
import pwd
import sys
import datetime
import random
import re
from threading import Thread
tc_logfile = None
def log_print(info):
print(info)
if tc_logfile != None:
tc_logfile.write(info + "\n")
tc_logfile.flush()
tc_output_dir=""
tc_cur_test = ""
tc_test_id = 0
def init(odir = "./results.d/"):
global tc_output_dir
tc_output_dir = odir + "_" + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
tc_output_dir = os.path.expanduser(tc_output_dir)
os.system("mkdir -p " + tc_output_dir)
global tc_logfile
tc_logfile = open(tc_output_dir + "/log.txt", "w+")
def begin(name):
global tc_test_id
global tc_cur_test
tc_cur_test = name
tc_test_id += 1
os.system("mkdir -p " + get_odir())
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " started =====")
def end():
global tc_cur_test
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " completed =====")
tc_cur_test = None
def get_odir():
return tc_output_dir + "/" + tc_cur_test
SCHED_QUEUE = 1
SCHED_CPU = 2
SCHED_BEST = 4
SCHED_FEAT_WS = 1
def make_sched_flag(sched, args, feat = 0, fargs = 0):
return (sched & 0xFF) | (args & 0xFF) << 8 | (feat & 0xFF) << 16 | (fargs & 0xFF) << 24
TUNE_RTSHARE = 2
TUNE_TFREQ = 1
def make_tune_flag(obj, val):
return (obj & 0xFFFF) | (val & 0xFFFF) << 16
def get_username():
return pwd.getpwuid( os.getuid() )[0]
ssh_param = ""
def set_ssh_param(para):
global ssh_param
ssh_param = para
ssh_user = None
def set_ssh_user(user):
global ssh_user
ssh_user = user
def remote_exec(srv, cmd, blocking=True, check=True):
sub = []
for s in srv:
p = sp.Popen(["ssh " + ssh_param + " " + ((ssh_user + "@") if ssh_user != None else "") + s + " \"" + cmd +"\""], shell=True, stdout=sp.PIPE, stderr=sp.PIPE)
sub.append(p)
if blocking:
for p in sub:
p.wait()
if check and p.returncode != 0:
raise Exception("Command failed " + cmd)
return sub
def scan_stderr(p, exclude = None):
for err in p.stderr:
fail = True
err = err.decode()
err = err.strip()
# print(err)
if len(err) == 0:
continue
if exclude != None:
for exc in exclude:
if (exc != None) and (re.match(exc, err) != None):
fail = False
break
if fail:
log_print("Error detected: " + err)
return False
return True
# stderr threads
errthr_objs = []
errthr_sigstop = False
errthr_failed = False
def errthr_get_failed():
return errthr_failed
def thr_check_stderr(p : sp.Popen, exclude):
# print("thread start!")
global errthr_failed
while(not errthr_sigstop):
if not scan_stderr(p, exclude=exclude):
errthr_failed = True
# print("running!")
time.sleep(0.5 + random.uniform(-0.1, 0.1))
# print("thread exit!")
def errthr_start():
global errthr_sigstop
global errthr_failed
errthr_sigstop = False
errthr_failed = False
for thr in errthr_objs:
thr.start()
def errthr_create(cp, exclude = None):
global errthr_objs
for p in cp:
errthr_objs.append(Thread(target = thr_check_stderr, args=(p, exclude)))
def errthr_stop():
global errthr_objs
global errthr_sigstop
errthr_sigstop = True
# print("waiting!")
for thr in errthr_objs:
thr.join()
errthr_objs.clear()
def parse_hostfile(fp):
ret = {}
fh = open(fp, "r")
content = fh.readlines()
fh.close()
content = [x.strip() for x in content]
for line in content:
spl = line.split(" ")
if len(spl) >= 2:
ret[spl[0]] = spl[1]
log_print("Parsed: hostname \"" + spl[0] + "\" -> \"" + spl[1] + "\"")
return ret
def process_hostnames(names, hosts):
ret = []
for line in names:
if line in hosts:
ret.append(hosts[line])
else:
ret.append(line)
return ret
def get_cpuset_core(threads):
ret = "cpuset -l 0-" + str(threads * 2 - 1) + " "
return ret

225
scripts/run.py Executable file
View File

@ -0,0 +1,225 @@
import subprocess as sp
import time
import select
import os
import datetime
import pwd
import sys
import getopt
import numpy as np
import re
import libpar as par
import libtc as tc
step_inc_pct = 100
init_step = 20000 #
start_step = 10000
term_qps = 85000000000
term_pct = 1
inc_pct = 50
server_port = 23444
# paths
test_dir = "/numam.d/build"
file_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.join(file_dir,"..")
sample_filename = "sample.txt"
affinity = [
"0x4", # core 2
"0x1000" # core 12
]
master = ["skylake3.rcs.uwaterloo.ca"]
master_mac = ["3c:15:fb:c9:f3:4b"]
server = ["skylake2.rcs.uwaterloo.ca"]
server_mac = ["3c:15:fb:c9:f3:36"]
clients = []
client_mac = []
rage_quit = 1000 #1s
warmup = 5
duration = 10
cooldown = 0
cacheline = 0
SSH_PARAM = "-o StrictHostKeyChecking=no -p77"
SSH_USER = "oscar"
hostfile = None
lockstat = False
client_only = False
def stop_all():
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(clients, "sudo killall -9 rat", check=False)
if not client_only:
# stop server
tc.log_print("Stopping server...")
tc.remote_exec(server, "sudo killall -9 khat", check=False)
# stop master
tc.log_print("Stopping master...")
tc.remote_exec(master, "sudo killall -9 cat", check=False)
def get_client_str(clt):
ret = " "
for client in clt:
ret += " -a " + client + " "
return ret
def run_exp(sc, ld):
while True:
if client_only:
ssrv = None
else:
# start server
tc.log_print("Starting server...")
server_cmd = "sudo " + test_dir + "/khat -- -A " + sc
tc.log_print(server_cmd)
ssrv = tc.remote_exec(server, server_cmd, blocking=False)
# start clients
# tc.log_print("Starting clients...")
# client_cmd = tc.get_cpuset_core(client_threads) + " " + test_dir + "/pingpong/build/dismember -A"
# tc.log_print(client_cmd)
# sclt = tc.remote_exec(ssh_clients, client_cmd, blocking=False)
time.sleep(3)
# start master
tc.log_print("Starting master...")
master_cmd = "sudo " + test_dir + "/cat -- " + \
" -s " + server_mac[0] + \
" -o " + test_dir + "/" + sample_filename + \
" -t " + str(duration) + \
" -T " + str(warmup) + \
" -i fixed:0.01" + \
" -r " + str(rage_quit) + \
" -A 0x4"
tc.log_print(master_cmd)
sp = tc.remote_exec(master, master_cmd, blocking=False)
p = sp[0]
# launch stderr monitoring thread
tc.errthr_create(sp, exclude=[".*EAL.*"])
tc.errthr_create(ssrv, exclude=[".*EAL.*"])
tc.errthr_start()
success = False
cur = 0
while True:
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed() or cur >= int(warmup + duration) + 5 :
break
if p.poll() != None:
success = True
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
print("Cooling down...")
time.sleep(cooldown)
if success:
return
def keep_results():
scpcmd = "scp -P77 oscar@" + master[0] + ":" + test_dir + "/" + sample_filename + " " + tc.get_odir() + "/sample.txt"
tc.log_print(scpcmd)
sp.check_call(scpcmd, shell=True)
with open(tc.get_odir() + "/sample.txt", 'r') as f:
tc.log_print("Total requests: " + str(len(f.readlines())))
return
def main():
global hostfile
global server
global master
global clients
global client_only
tc.set_ssh_param(SSH_PARAM)
tc.set_ssh_user(SSH_USER)
options = getopt.getopt(sys.argv[1:], 'h:sldcp')[0]
for opt, arg in options:
if opt in ('-h'):
hostfile = arg
elif opt in ('-s'):
stop_all()
return
elif opt in ('-c'):
client_only=True
tc.init("~/results.d/numam/")
tc.log_print("Configuration:\n" + \
"Hostfile: " + ("None" if hostfile == None else hostfile) + "\n" \
"Client only: " + str(client_only) + "\n")
if hostfile != None:
hosts = tc.parse_hostfile(hostfile)
server = tc.process_hostnames(server, hosts)
clients = tc.process_hostnames(clients, hosts)
master = tc.process_hostnames(master, hosts)
stop_all()
for i in range(0, len(affinity)):
eaff = affinity[i]
# step_mul = 100
# last_load = 0
# cur_load = start_step
tc.begin(eaff)
tc.log_print("============ Affinity: " + str(eaff) + " Load: MAX" + " ============")
run_exp(eaff, 0)
keep_results()
stop_all()
# while True:
# tc.log_print("============ Sched: " + str(ename) + " Flag: " + format(esched, '#04x') + " Load: " + str(cur_load) + " ============")
# output, sout, serr = run_exp(esched, cur_load, lockstat)
# qps = keep_results(output, sout, serr)
# pct = int((qps - last_load) / init_step * 100)
# tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%")
# if cur_load > term_qps:
# tc.log_print("qps more than " + str(term_qps) + "%. Done.")
# break
# if pct <= term_pct:
# tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.")
# break
# if pct <= inc_pct:
# step_mul += step_inc_pct
# tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%")
# last_load = qps
# cur_load += int(init_step * step_mul / 100)
# tc.log_print("")
tc.end()
stop_all()
main()