NUMA detection & server multicore

This commit is contained in:
quackerd 2021-01-28 05:24:59 -05:00
parent 82e1098f3b
commit 226449100d
Signed by: d
GPG Key ID: F73412644EDE357A
9 changed files with 563 additions and 140 deletions

View File

@ -10,6 +10,7 @@ project(khat)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}") list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}")
find_package(dpdk REQUIRED) find_package(dpdk REQUIRED)
find_package(Hwloc REQUIRED)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
-Wno-deprecated-declarations -Wno-deprecated-declarations
@ -20,17 +21,25 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
-msse4 -msse4
-mavx) -mavx)
set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11)
include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS}) include_directories(${dpdk_INCLUDE_DIRS})
include_directories(${Hwloc_INCLUDE_DIRS})
add_library(nm libnm/nm.cc)
target_link_libraries(nm ${Hwloc_LIBRARIES})
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
add_library(ntr libntr/ntr.c)
target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS})
add_executable(khat khat/khat.cc ) add_executable(khat khat/khat.cc )
add_executable(cat cat/cat.cc cat/generator.cc) target_link_libraries(khat ${dpdk_LIBRARIES} pthread nm ntr)
set(LINK_LIBS ${dpdk_LIBRARIES} pthread)
target_link_libraries(khat ${LINK_LIBS})
target_compile_options(khat PRIVATE ${CC_FLAGS}) target_compile_options(khat PRIVATE ${CC_FLAGS})
target_link_libraries(cat ${LINK_LIBS}) add_executable(cat cat/cat.cc cat/generator.cc)
target_link_libraries(cat ${dpdk_LIBRARIES} pthread nm ntr)
target_compile_options(cat PRIVATE ${CC_FLAGS}) target_compile_options(cat PRIVATE ${CC_FLAGS})

213
FindHwloc.cmake Normal file
View File

@ -0,0 +1,213 @@
#.rst:
# FindHwloc
# ----------
#
# Try to find Portable Hardware Locality (hwloc) libraries.
# http://www.open-mpi.org/software/hwloc
#
# You may declare HWLOC_ROOT environment variable to tell where
# your hwloc library is installed.
#
# Once done this will define::
#
# Hwloc_FOUND - True if hwloc was found
# Hwloc_INCLUDE_DIRS - include directories for hwloc
# Hwloc_LIBRARIES - link against these libraries to use hwloc
# Hwloc_VERSION - version
# Hwloc_CFLAGS - include directories as compiler flags
# Hwloc_LDLFAGS - link paths and libs as compiler flags
#
#=============================================================================
# Copyright 2014 Mikael Lepistö
#
# Distributed under the OSI-approved BSD License (the "License");
#
# This software is distributed WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the License for more information.
#=============================================================================
if(WIN32)
find_path(Hwloc_INCLUDE_DIR
NAMES
hwloc.h
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
include
)
find_library(Hwloc_LIBRARY
NAMES
libhwloc.lib
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
lib
)
#
# Check if the found library can be used to linking
#
SET (_TEST_SOURCE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/linktest.c")
FILE (WRITE "${_TEST_SOURCE}"
"
#include <hwloc.h>
int main()
{
hwloc_topology_t topology;
int nbcores;
hwloc_topology_init(&topology);
hwloc_topology_load(topology);
nbcores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE);
hwloc_topology_destroy(topology);
return 0;
}
"
)
TRY_COMPILE(_LINK_SUCCESS ${CMAKE_BINARY_DIR} "${_TEST_SOURCE}"
CMAKE_FLAGS
"-DINCLUDE_DIRECTORIES:STRING=${Hwloc_INCLUDE_DIR}"
CMAKE_FLAGS
"-DLINK_LIBRARIES:STRING=${Hwloc_LIBRARY}"
)
IF(NOT _LINK_SUCCESS)
if(CMAKE_SIZEOF_VOID_P EQUAL 8)
message(STATUS "You are building 64bit target.")
ELSE()
message(STATUS "You are building 32bit code. If you like to build x64 use e.g. -G 'Visual Studio 12 Win64' generator." )
ENDIF()
message(FATAL_ERROR "Library found, but linking test program failed.")
ENDIF()
#
# Resolve version if some compiled binary found...
#
find_program(HWLOC_INFO_EXECUTABLE
NAMES
hwloc-info
PATHS
ENV HWLOC_ROOT
PATH_SUFFIXES
bin
)
if(HWLOC_INFO_EXECUTABLE)
execute_process(
COMMAND ${HWLOC_INFO_EXECUTABLE} "--version"
OUTPUT_VARIABLE HWLOC_VERSION_LINE
OUTPUT_STRIP_TRAILING_WHITESPACE
)
string(REGEX MATCH "([0-9]+.[0-9]+)$"
Hwloc_VERSION "${HWLOC_VERSION_LINE}")
unset(HWLOC_VERSION_LINE)
endif()
#
# All good
#
set(Hwloc_LIBRARIES ${Hwloc_LIBRARY})
set(Hwloc_INCLUDE_DIRS ${Hwloc_INCLUDE_DIR})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
Hwloc
FOUND_VAR Hwloc_FOUND
REQUIRED_VARS Hwloc_LIBRARY Hwloc_INCLUDE_DIR Hwloc_VERSION_PARSED Hwloc_VERSION_MAJOR Hwloc_VERSION_MINOR
VERSION_VAR Hwloc_VERSION)
mark_as_advanced(
Hwloc_INCLUDE_DIR
Hwloc_LIBRARY)
foreach(arg ${Hwloc_INCLUDE_DIRS})
set(Hwloc_CFLAGS "${Hwloc_CFLAGS} /I${arg}")
endforeach()
set(Hwloc_LDFLAGS "${Hwloc_LIBRARY}")
else()
if(CMAKE_CROSSCOMPILING)
find_path(Hwloc_INCLUDE_DIRS
NAMES
hwloc.h
PATHS
ENV HWLOC_ROOT
)
find_library(Hwloc_LIBRARIES
NAMES
hwloc
PATHS
ENV HWLOC_ROOT
)
if(Hwloc_INCLUDE_DIRS AND Hwloc_LIBRARIES)
message(WARNING "HWLOC library found using find_library() - cannot determine version. Assuming 1.7.0")
set(Hwloc_FOUND 1)
set(Hwloc_VERSION "1.7.0")
endif()
else() # Find with pkgconfig for non-crosscompile builds
find_package(PkgConfig)
if(HWLOC_ROOT)
set(ENV{PKG_CONFIG_PATH} "${HWLOC_ROOT}/lib/pkgconfig")
else()
foreach(PREFIX ${CMAKE_PREFIX_PATH})
set(PKG_CONFIG_PATH "${PKG_CONFIG_PATH}:${PREFIX}/lib/pkgconfig")
endforeach()
set(ENV{PKG_CONFIG_PATH} "${PKG_CONFIG_PATH}:$ENV{PKG_CONFIG_PATH}")
endif()
if(hwloc_FIND_REQUIRED)
set(_hwloc_OPTS "REQUIRED")
elseif(hwloc_FIND_QUIETLY)
set(_hwloc_OPTS "QUIET")
else()
set(_hwloc_output 1)
endif()
if(hwloc_FIND_VERSION)
if(hwloc_FIND_VERSION_EXACT)
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc=${hwloc_FIND_VERSION})
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc>=${hwloc_FIND_VERSION})
endif()
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc)
endif()
if(Hwloc_FOUND)
string(REPLACE "." ";" Hwloc_VERSION_PARSED "${Hwloc_VERSION}")
set(Hwloc_VERSION "${Hwloc_VERSION}" CACHE STRING "version of Hwloc as a list")
list(GET Hwloc_VERSION_PARSED 0 Hwloc_VERSION_MAJOR)
set(Hwloc_VERSION_MAJOR "${Hwloc_VERSION_MAJOR}" CACHE STRING "Major version of Hwloc")
list(GET Hwloc_VERSION_PARSED 1 Hwloc_VERSION_MINOR)
set(Hwloc_VERSION_MINOR "${Hwloc_VERSION_MINOR}" CACHE STRING "Minor version of Hwloc")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Hwloc DEFAULT_MSG Hwloc_LIBRARIES)
if(NOT ${Hwloc_VERSION} VERSION_LESS 1.7.0)
set(Hwloc_GL_FOUND 1)
endif()
if(_hwloc_output)
message(STATUS
"Found hwloc ${Hwloc_VERSION} in ${Hwloc_INCLUDE_DIRS}:${Hwloc_LIBRARIES}")
endif()
endif()
endif() # cross-compile else
endif()

View File

@ -18,14 +18,12 @@
#include <fstream> #include <fstream>
#include <unistd.h> #include <unistd.h>
#include "nm.h"
#include "generator.h" #include "generator.h"
#include "ntrlog.h" #include "ntr.h"
#include "pkt.h" #include "pkt.h"
#include "util.h" #include "util.h"
// init NTRLOG
NTR_DECL_IMPL;
constexpr static unsigned int MBUF_MAX_COUNT = 16384; constexpr static unsigned int MBUF_MAX_COUNT = 16384;
constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096; constexpr static unsigned int RX_RING_SIZE = 4096;
@ -462,8 +460,10 @@ int main(int argc, char* argv[])
std::ofstream log_file; std::ofstream log_file;
struct thread_info *tinfo; struct thread_info *tinfo;
ntr_init();
// signal(SIGINT, int_handler); if (nm_init() != 0)
rte_exit(EXIT_FAILURE, "failed to init libnm\n");
// signal(SIGINT, int_handler);
// init dpdk // init dpdk
int ret = rte_eal_init(argc, argv); int ret = rte_eal_init(argc, argv);
@ -497,8 +497,7 @@ int main(int argc, char* argv[])
break; break;
case 'h': case 'h':
usage(); usage();
rte_exit(EXIT_SUCCESS, NULL); rte_exit(EXIT_SUCCESS, "success\n");
break;
case 'o': case 'o':
strncpy(options.output, optarg, sizeof(options.output) - 1); strncpy(options.output, optarg, sizeof(options.output) - 1);
break; break;

16
inc/nm.h Normal file
View File

@ -0,0 +1,16 @@
#pragma once
#include <vector>
constexpr static int NM_LEVEL_NUMA = 0;
constexpr static int NM_LEVEL_CPU = 1;
constexpr static int NM_LEVEL_CORE = 2;
std::vector<struct nm_obj *> * nm_get_nodes();
std::vector<struct nm_obj *> * nm_get_cpus();
std::vector<struct nm_obj *> * nm_get_cores();
// 0 on success
// -1 on error
int nm_init();

37
inc/ntr.h Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include <stdio.h>
#include <stdarg.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#ifdef __cplusplus
extern "C" {
#endif
void ntr_init();
void ntr(int dep, int level, const char * fmt, ...);
void ntr_set_level(int dep, int level);
void ntr_set_output(FILE * f);
int ntr_get_level(int dep);
#ifdef __cplusplus
}
#endif

View File

@ -1,61 +0,0 @@
#pragma once
#include <stdio.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#define NTR_DECL_IMPL \
int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT}; \
FILE * ntr_out = stdout
extern int ntr_log_levels[];
extern FILE * ntr_out;
static inline
void ntr(int dep, int level, const char * fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
static inline
void ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
static inline
void ntr_set_output(FILE * f)
{
if (f != NULL) {
ntr_out = f;
}
}
static inline
int ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}

View File

@ -18,8 +18,9 @@
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include "nm.h"
#include "pkt.h" #include "pkt.h"
#include "ntrlog.h" #include "ntr.h"
#include "util.h" #include "util.h"
@ -33,10 +34,8 @@
* server -> STAT -> client (server sends its tx/rx timestamps) * server -> STAT -> client (server sends its tx/rx timestamps)
*/ */
NTR_DECL_IMPL;
static void * const PROBE_MAGIC = (void*)0x12344444; static void * const PROBE_MAGIC = (void*)0x12344444;
constexpr static unsigned int MBUF_MAX_COUNT = 16384; constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096; constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096; constexpr static unsigned int TX_RING_SIZE = 4096;
@ -62,18 +61,29 @@ struct probe_state_t {
uint64_t last_hw_rx; uint64_t last_hw_rx;
}; };
struct thread_info {
int tid;
int rxqid;
int txqid;
int lcore_id;
};
// state machine: // state machine:
constexpr static int SERVER_STATE_WAIT = 0; constexpr static int SERVER_STATE_WAIT = 0;
constexpr static int SERVER_STATE_PROBE = 1; constexpr static int SERVER_STATE_PROBE = 1;
struct options_t { struct options_t {
//config
int num_threads{1};
uint64_t cpuset{0b010}; //2nd core
//states //states
uint16_t s_portid; uint16_t s_portid;
struct rte_ether_addr s_host_mac; struct rte_ether_addr s_host_mac;
struct rte_mempool * s_pkt_mempool; struct rte_mempool * s_pkt_mempool;
std::atomic<int> s_state {SERVER_STATE_WAIT}; std::atomic<int> s_state {SERVER_STATE_WAIT};
struct probe_state_t s_probe_info; struct probe_state_t s_probe_info;
std::vector<struct thread_info *> s_thr_info;
}; };
static struct options_t options; static struct options_t options;
@ -154,28 +164,28 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
} }
static int static int
locore_main(void * _unused __rte_unused) locore_main(void * ti)
{ {
struct thread_info * tinfo = (struct thread_info *)ti;
struct rte_mbuf *bufs[BURST_SIZE]; struct rte_mbuf *bufs[BURST_SIZE];
// + 1 because it might involve an extra PKT_TYPE_STAT packet // + 1 because it might involve an extra PKT_TYPE_STAT packet
// when all tx timestamps are ready // when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE]; struct rte_mbuf *tx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data; struct pkt_hdr *pkt_data;
uint32_t core_id = rte_lcore_id();
bool pending_probe = false; bool pending_probe = false;
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will " "polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid); "not be optimal.\n", tinfo->tid, options.s_portid);
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running.\n", core_id); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main <thread %d>: running on locore %d with txidx %d and rxidx %d.\n", tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid);
while(true) { while(true) {
uint16_t nb_tx = 0; uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE); const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, tinfo->rxqid, bufs, BURST_SIZE);
struct rte_mbuf * pkt_buf; struct rte_mbuf * pkt_buf;
struct pkt_hdr * tx_data; struct pkt_hdr * tx_data;
@ -185,14 +195,14 @@ locore_main(void * _unused __rte_unused)
pkt_data = check_valid_packet(bufs[i]); pkt_data = check_valid_packet(bufs[i]);
if (pkt_data == NULL) { if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main <thread %d>: skipping invalid packet %p.\n", tinfo->tid, (void*)bufs[i]);
dump_pkt(bufs[i]); //dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]); rte_pktmbuf_free(bufs[i]);
continue; continue;
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n", ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main <thread %d>: packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n",
core_id, tinfo->tid,
(void*)bufs[i], (void*)bufs[i],
pkt_data->eth_hdr.s_addr.addr_bytes[0], pkt_data->eth_hdr.s_addr.addr_bytes[0],
pkt_data->eth_hdr.s_addr.addr_bytes[1], pkt_data->eth_hdr.s_addr.addr_bytes[1],
@ -223,7 +233,7 @@ locore_main(void * _unused __rte_unused)
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) { if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
} }
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP, tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP,
@ -248,7 +258,7 @@ locore_main(void * _unused __rte_unused)
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) { if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
} }
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP, tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP,
@ -273,9 +283,9 @@ locore_main(void * _unused __rte_unused)
// send the packets // send the packets
if (nb_tx > 0) { if (nb_tx > 0) {
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
if (nb_tx_succ < nb_tx) { if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); rte_exit(EXIT_FAILURE, "failed to send some packets.\n");
} }
} }
@ -284,11 +294,11 @@ locore_main(void * _unused __rte_unused)
struct timespec ts; struct timespec ts;
struct pkt_payload_stat * stat; struct pkt_payload_stat * stat;
if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) { if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: obtained hw tx timestamp %lld.\n", ts.tv_sec * S2NS + ts.tv_nsec); ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main <thread %d>: obtained hw tx timestamp %lld.\n", tinfo->tid, ts.tv_sec * S2NS + ts.tv_nsec);
// now we have everything we need // now we have everything we need
pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) { if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n");
} }
tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT, tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT,
@ -305,7 +315,7 @@ locore_main(void * _unused __rte_unused)
// send the packet // send the packet
if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) { if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); rte_exit(EXIT_FAILURE, "failed to send some packets.\n");
} }
// release flux // release flux
@ -350,7 +360,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */ /* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf);
if (ret != 0) if (ret != 0)
return ret; return ret;
@ -358,21 +368,23 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */ /* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf; rxconf = dev_info.default_rxconf;
for (uint32_t i = 0; i < 1; i++) { for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0) if (ret < 0)
return ret; return ret;
options.s_thr_info.at(i)->rxqid = i;
} }
txconf = dev_info.default_txconf; txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads; txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */ /* Allocate and set up 1 TX queue per thread per Ethernet port. */
for (uint32_t i = 0; i < 1; i++) { for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0) if (ret < 0)
return ret; return ret;
options.s_thr_info.at(i)->txqid = i;
} }
ret = rte_eth_dev_start(portid); ret = rte_eth_dev_start(portid);
@ -394,8 +406,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0) if (ret != 0)
return ret; return ret;
if (rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) { for (int i = 0; i < options.num_threads; i++) {
return -1; if (rte_eth_add_tx_callback(portid, options.s_thr_info.at(i)->txqid, tx_add_timestamp, NULL) == NULL ||
rte_eth_add_rx_callback(portid, options.s_thr_info.at(i)->rxqid, rx_add_timestamp, NULL) == NULL) {
return -1;
}
} }
return 0; return 0;
@ -406,20 +421,32 @@ static void usage()
fprintf(stdout, fprintf(stdout,
"Usage:\n" \ "Usage:\n" \
" -v(vv): verbose mode\n" \ " -v(vv): verbose mode\n" \
" -h: display the information\n"); " -h: display the information\n" \
" -m: cpu mask for worker threads");
} }
// static void int_handler(int) static void dump_options()
// { {
// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
// } "main: khat configuration:\n" \
" verbosity: +%d\n" \
" thread count: %d\n" \
" thread mask: %lld\n\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_DEFAULT,
options.num_threads,
options.cpuset);
}
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
unsigned int nb_ports; unsigned int nb_ports;
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; struct rte_mempool *mbuf_pool;
ntr_init();
//signal(SIGINT, int_handler); if (nm_init() != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
// init dpdk // init dpdk
int ret = rte_eal_init(argc, argv); int ret = rte_eal_init(argc, argv);
@ -435,7 +462,7 @@ int main(int argc, char* argv[])
{ {
int c; int c;
// parse arguments // parse arguments
while((c = getopt(argc, argv, "hv")) != -1) { while((c = getopt(argc, argv, "hvm:")) != -1) {
switch (c) { switch (c) {
case 'v': case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
@ -444,6 +471,13 @@ int main(int argc, char* argv[])
usage(); usage();
rte_exit(EXIT_SUCCESS, NULL); rte_exit(EXIT_SUCCESS, NULL);
break; break;
case 'm':
options.cpuset = strtoull(optarg, nullptr, 16);
options.num_threads = _mm_popcnt_u64(options.cpuset);
if (options.num_threads == 0) {
rte_exit(EXIT_FAILURE, "must run at least one thread\n");
}
break;
default: default:
usage(); usage();
rte_exit(EXIT_SUCCESS, "unknown argument: %c", c); rte_exit(EXIT_SUCCESS, "unknown argument: %c", c);
@ -452,33 +486,38 @@ int main(int argc, char* argv[])
} }
} }
// XXX: singal handler to exit dump_options();
nb_ports = rte_eth_dev_count_avail(); nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) { if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
} }
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
// create a pkt mbuf memory pool on the socket
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool_pkt == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n");
}
options.s_pkt_mempool = mbuf_pool_pkt;
uint16_t portid = rte_eth_find_next(0); uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) { if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n"); rte_exit(EXIT_FAILURE, "cannot find an available port\n");
} }
options.s_portid = portid; options.s_portid = portid;
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.s_pkt_mempool = mbuf_pool;
// init threads
uint64_t cpuset = options.cpuset;
for(int i = 0; i < options.num_threads; i++) {
struct thread_info * tinfo = new thread_info;
tinfo->tid = i;
int ffs = ffsll(cpuset);
tinfo->lcore_id = ffs - 1;
cpuset = cpuset & ~(1 << (ffs - 1));
options.s_thr_info.push_back(tinfo);
}
if (port_init(portid, mbuf_pool) != 0) { if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
} }
@ -487,25 +526,22 @@ int main(int argc, char* argv[])
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid); rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid);
} }
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d on socket %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, rte_eth_dev_socket_id(portid),
options.s_host_mac.addr_bytes[0], options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1], options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2], options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3], options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4], options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]); options.s_host_mac.addr_bytes[5]);
uint16_t lcore_id = rte_get_next_lcore(0, true, false);
if (lcore_id == RTE_MAX_LCORE) {
rte_exit(EXIT_FAILURE, "cannot detect lcores.\n");
}
sleep(1);
if (rte_eal_remote_launch(locore_main, NULL, lcore_id) != 0) { usleep(S2US);
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", lcore_id);
for(int i = 0; i < options.num_threads; i++) {
struct thread_info * tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread %d on locore %d\n", tinfo->tid, tinfo->lcore_id);
if (rte_eal_remote_launch(locore_main, (void *)options.s_thr_info.at(i), tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", tinfo->lcore_id);
}
} }
// while(true) { // while(true) {
@ -515,8 +551,12 @@ int main(int argc, char* argv[])
// usleep(1000000); // usleep(1000000);
// } // }
if (rte_eal_wait_lcore(lcore_id) != 0) { for(int i = 0; i < options.num_threads; i++) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", lcore_id); struct thread_info * tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", tinfo->lcore_id);
}
} }
// shouldn't get here // shouldn't get here

127
libnm/nm.cc Normal file
View File

@ -0,0 +1,127 @@
#include <hwloc.h>
#include <vector>
#include <algorithm>
#include "nm.h"
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
static bool nm_obj_comparator(struct nm_obj * a, struct nm_obj * b)
{
return a->id < b->id;
}
static std::vector<struct nm_obj *> nodes;
static std::vector<struct nm_obj *> cores;
static std::vector<struct nm_obj *> cpus;
std::vector<struct nm_obj *> * nm_get_nodes()
{
return &nodes;
}
std::vector<struct nm_obj *> * nm_get_cpus()
{
return &cpus;
}
std::vector<struct nm_obj *> * nm_get_cores()
{
return &cores;
}
hwloc_obj_t get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type)
{
while(obj != nullptr) {
if (obj->type == type) {
break;
}
obj = obj->parent;
}
return obj;
}
// 0 on success
// -1 on error
int nm_init()
{
int ret;
hwloc_topology * topo;
if ((ret = hwloc_topology_init(&topo)) != 0) {
return ret;
}
if ((ret = hwloc_topology_load(topo)) != 0)
return ret;
// populate numa nodes
hwloc_obj_t obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PACKAGE, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_NUMA;
each->parent = nullptr;
nodes.push_back(each);
printf("libnm: identified NUMA node %d\n", each->id);
}
std::sort(nodes.begin(), nodes.end(), nm_obj_comparator);
// populate cpus
obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_CORE, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CPU;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_PACKAGE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = nodes.at(parent->logical_index);
each->parent->children.push_back(each);
cpus.push_back(each);
printf("libnm: identified CPU %d on NUMA node %d\n", each->id, each->parent->id);
}
std::sort(cpus.begin(), cpus.end(), nm_obj_comparator);
// populate cores
obj = nullptr;
while(1) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PU, obj);
if (obj == nullptr) {
break;
}
struct nm_obj * each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CORE;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_CORE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = cpus.at(parent->logical_index);
each->parent->children.push_back(each);
cores.push_back(each);
printf("libnm: identified core %d on CPU %d, NUMA node %d\n", each->id, each->parent->id, each->parent->parent->id);
}
std::sort(cores.begin(), cores.end(), nm_obj_comparator);
return ret;
}

43
libntr/ntr.c Normal file
View File

@ -0,0 +1,43 @@
#include "ntr.h"
static int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT};
static FILE * ntr_out;
void ntr_init()
{
ntr_out = stdout;
}
void ntr(int dep, int level, const char * fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
void ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
void ntr_set_output(FILE * f)
{
if (f != NULL) {
ntr_out = f;
}
}
int ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}