From 226449100dc4a9646a1bd01636f8178fa0a3cb7e Mon Sep 17 00:00:00 2001 From: quackerd Date: Thu, 28 Jan 2021 05:24:59 -0500 Subject: [PATCH] NUMA detection & server multicore --- CMakeLists.txt | 21 +++-- FindHwloc.cmake | 213 ++++++++++++++++++++++++++++++++++++++++++++++++ cat/cat.cc | 15 ++-- inc/nm.h | 16 ++++ inc/ntr.h | 37 +++++++++ inc/ntrlog.h | 61 -------------- khat/khat.cc | 170 +++++++++++++++++++++++--------------- libnm/nm.cc | 127 +++++++++++++++++++++++++++++ libntr/ntr.c | 43 ++++++++++ 9 files changed, 563 insertions(+), 140 deletions(-) create mode 100644 FindHwloc.cmake create mode 100644 inc/nm.h create mode 100644 inc/ntr.h delete mode 100644 inc/ntrlog.h create mode 100644 libnm/nm.cc create mode 100644 libntr/ntr.c diff --git a/CMakeLists.txt b/CMakeLists.txt index ddd08ac..ba3f64f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,7 @@ project(khat) list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}") find_package(dpdk REQUIRED) +find_package(Hwloc REQUIRED) set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations @@ -20,17 +21,25 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -msse4 -mavx) +set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) +set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) + include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(${dpdk_INCLUDE_DIRS}) +include_directories(${Hwloc_INCLUDE_DIRS}) + +add_library(nm libnm/nm.cc) +target_link_libraries(nm ${Hwloc_LIBRARIES}) +target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS}) + +add_library(ntr libntr/ntr.c) +target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS}) add_executable(khat khat/khat.cc ) -add_executable(cat cat/cat.cc cat/generator.cc) - -set(LINK_LIBS ${dpdk_LIBRARIES} pthread) - -target_link_libraries(khat ${LINK_LIBS}) +target_link_libraries(khat ${dpdk_LIBRARIES} pthread nm ntr) target_compile_options(khat PRIVATE ${CC_FLAGS}) -target_link_libraries(cat ${LINK_LIBS}) +add_executable(cat cat/cat.cc cat/generator.cc) +target_link_libraries(cat ${dpdk_LIBRARIES} pthread nm ntr) target_compile_options(cat PRIVATE ${CC_FLAGS}) diff --git a/FindHwloc.cmake b/FindHwloc.cmake new file mode 100644 index 0000000..4092a89 --- /dev/null +++ b/FindHwloc.cmake @@ -0,0 +1,213 @@ +#.rst: +# FindHwloc +# ---------- +# +# Try to find Portable Hardware Locality (hwloc) libraries. +# http://www.open-mpi.org/software/hwloc +# +# You may declare HWLOC_ROOT environment variable to tell where +# your hwloc library is installed. +# +# Once done this will define:: +# +# Hwloc_FOUND - True if hwloc was found +# Hwloc_INCLUDE_DIRS - include directories for hwloc +# Hwloc_LIBRARIES - link against these libraries to use hwloc +# Hwloc_VERSION - version +# Hwloc_CFLAGS - include directories as compiler flags +# Hwloc_LDLFAGS - link paths and libs as compiler flags +# + +#============================================================================= +# Copyright 2014 Mikael Lepistö +# +# Distributed under the OSI-approved BSD License (the "License"); +# +# This software is distributed WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the License for more information. +#============================================================================= + +if(WIN32) + find_path(Hwloc_INCLUDE_DIR + NAMES + hwloc.h + PATHS + ENV "PROGRAMFILES(X86)" + ENV HWLOC_ROOT + PATH_SUFFIXES + include + ) + + find_library(Hwloc_LIBRARY + NAMES + libhwloc.lib + PATHS + ENV "PROGRAMFILES(X86)" + ENV HWLOC_ROOT + PATH_SUFFIXES + lib + ) + + # + # Check if the found library can be used to linking + # + SET (_TEST_SOURCE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/linktest.c") + FILE (WRITE "${_TEST_SOURCE}" + " + #include + int main() + { + hwloc_topology_t topology; + int nbcores; + hwloc_topology_init(&topology); + hwloc_topology_load(topology); + nbcores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE); + hwloc_topology_destroy(topology); + return 0; + } + " + ) + + TRY_COMPILE(_LINK_SUCCESS ${CMAKE_BINARY_DIR} "${_TEST_SOURCE}" + CMAKE_FLAGS + "-DINCLUDE_DIRECTORIES:STRING=${Hwloc_INCLUDE_DIR}" + CMAKE_FLAGS + "-DLINK_LIBRARIES:STRING=${Hwloc_LIBRARY}" + ) + + IF(NOT _LINK_SUCCESS) + if(CMAKE_SIZEOF_VOID_P EQUAL 8) + message(STATUS "You are building 64bit target.") + ELSE() + message(STATUS "You are building 32bit code. If you like to build x64 use e.g. -G 'Visual Studio 12 Win64' generator." ) + ENDIF() + message(FATAL_ERROR "Library found, but linking test program failed.") + ENDIF() + + # + # Resolve version if some compiled binary found... + # + find_program(HWLOC_INFO_EXECUTABLE + NAMES + hwloc-info + PATHS + ENV HWLOC_ROOT + PATH_SUFFIXES + bin + ) + + if(HWLOC_INFO_EXECUTABLE) + execute_process( + COMMAND ${HWLOC_INFO_EXECUTABLE} "--version" + OUTPUT_VARIABLE HWLOC_VERSION_LINE + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + string(REGEX MATCH "([0-9]+.[0-9]+)$" + Hwloc_VERSION "${HWLOC_VERSION_LINE}") + unset(HWLOC_VERSION_LINE) + endif() + + # + # All good + # + + set(Hwloc_LIBRARIES ${Hwloc_LIBRARY}) + set(Hwloc_INCLUDE_DIRS ${Hwloc_INCLUDE_DIR}) + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args( + Hwloc + FOUND_VAR Hwloc_FOUND + REQUIRED_VARS Hwloc_LIBRARY Hwloc_INCLUDE_DIR Hwloc_VERSION_PARSED Hwloc_VERSION_MAJOR Hwloc_VERSION_MINOR + VERSION_VAR Hwloc_VERSION) + + mark_as_advanced( + Hwloc_INCLUDE_DIR + Hwloc_LIBRARY) + + foreach(arg ${Hwloc_INCLUDE_DIRS}) + set(Hwloc_CFLAGS "${Hwloc_CFLAGS} /I${arg}") + endforeach() + + set(Hwloc_LDFLAGS "${Hwloc_LIBRARY}") + +else() + + if(CMAKE_CROSSCOMPILING) + + find_path(Hwloc_INCLUDE_DIRS + NAMES + hwloc.h + PATHS + ENV HWLOC_ROOT + ) + + find_library(Hwloc_LIBRARIES + NAMES + hwloc + PATHS + ENV HWLOC_ROOT + ) + + if(Hwloc_INCLUDE_DIRS AND Hwloc_LIBRARIES) + message(WARNING "HWLOC library found using find_library() - cannot determine version. Assuming 1.7.0") + set(Hwloc_FOUND 1) + set(Hwloc_VERSION "1.7.0") + endif() + + else() # Find with pkgconfig for non-crosscompile builds + + find_package(PkgConfig) + + if(HWLOC_ROOT) + set(ENV{PKG_CONFIG_PATH} "${HWLOC_ROOT}/lib/pkgconfig") + else() + foreach(PREFIX ${CMAKE_PREFIX_PATH}) + set(PKG_CONFIG_PATH "${PKG_CONFIG_PATH}:${PREFIX}/lib/pkgconfig") + endforeach() + set(ENV{PKG_CONFIG_PATH} "${PKG_CONFIG_PATH}:$ENV{PKG_CONFIG_PATH}") + endif() + + if(hwloc_FIND_REQUIRED) + set(_hwloc_OPTS "REQUIRED") + elseif(hwloc_FIND_QUIETLY) + set(_hwloc_OPTS "QUIET") + else() + set(_hwloc_output 1) + endif() + + if(hwloc_FIND_VERSION) + if(hwloc_FIND_VERSION_EXACT) + pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc=${hwloc_FIND_VERSION}) + else() + pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc>=${hwloc_FIND_VERSION}) + endif() + else() + pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc) + endif() + + if(Hwloc_FOUND) + string(REPLACE "." ";" Hwloc_VERSION_PARSED "${Hwloc_VERSION}") + set(Hwloc_VERSION "${Hwloc_VERSION}" CACHE STRING "version of Hwloc as a list") + list(GET Hwloc_VERSION_PARSED 0 Hwloc_VERSION_MAJOR) + set(Hwloc_VERSION_MAJOR "${Hwloc_VERSION_MAJOR}" CACHE STRING "Major version of Hwloc") + list(GET Hwloc_VERSION_PARSED 1 Hwloc_VERSION_MINOR) + set(Hwloc_VERSION_MINOR "${Hwloc_VERSION_MINOR}" CACHE STRING "Minor version of Hwloc") + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(Hwloc DEFAULT_MSG Hwloc_LIBRARIES) + + if(NOT ${Hwloc_VERSION} VERSION_LESS 1.7.0) + set(Hwloc_GL_FOUND 1) + endif() + + if(_hwloc_output) + message(STATUS + "Found hwloc ${Hwloc_VERSION} in ${Hwloc_INCLUDE_DIRS}:${Hwloc_LIBRARIES}") + endif() + endif() + + endif() # cross-compile else + +endif() \ No newline at end of file diff --git a/cat/cat.cc b/cat/cat.cc index 9beef94..7431fe5 100644 --- a/cat/cat.cc +++ b/cat/cat.cc @@ -18,14 +18,12 @@ #include #include +#include "nm.h" #include "generator.h" -#include "ntrlog.h" +#include "ntr.h" #include "pkt.h" #include "util.h" -// init NTRLOG -NTR_DECL_IMPL; - constexpr static unsigned int MBUF_MAX_COUNT = 16384; constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int RX_RING_SIZE = 4096; @@ -462,8 +460,10 @@ int main(int argc, char* argv[]) std::ofstream log_file; struct thread_info *tinfo; - - // signal(SIGINT, int_handler); + ntr_init(); + if (nm_init() != 0) + rte_exit(EXIT_FAILURE, "failed to init libnm\n"); + // signal(SIGINT, int_handler); // init dpdk int ret = rte_eal_init(argc, argv); @@ -497,8 +497,7 @@ int main(int argc, char* argv[]) break; case 'h': usage(); - rte_exit(EXIT_SUCCESS, NULL); - break; + rte_exit(EXIT_SUCCESS, "success\n"); case 'o': strncpy(options.output, optarg, sizeof(options.output) - 1); break; diff --git a/inc/nm.h b/inc/nm.h new file mode 100644 index 0000000..a63653d --- /dev/null +++ b/inc/nm.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +constexpr static int NM_LEVEL_NUMA = 0; +constexpr static int NM_LEVEL_CPU = 1; +constexpr static int NM_LEVEL_CORE = 2; + + +std::vector * nm_get_nodes(); +std::vector * nm_get_cpus(); +std::vector * nm_get_cores(); + +// 0 on success +// -1 on error +int nm_init(); \ No newline at end of file diff --git a/inc/ntr.h b/inc/ntr.h new file mode 100644 index 0000000..16b3471 --- /dev/null +++ b/inc/ntr.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#define NTR_LEVEL_NONE (0) +#define NTR_LEVEL_ERROR (1) +#define NTR_LEVEL_WARNING (2) +#define NTR_LEVEL_INFO (3) +#define NTR_LEVEL_DEBUG (4) +#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING) + +#define NTR_DEP_NTR (0) +#define NTR_DEP_USER1 (1) +#define NTR_DEP_USER2 (2) +#define NTR_DEP_USER3 (3) +#define NTR_DEP_USER4 (4) +#define NTR_DEP_USER5 (5) +#define NTR_DEP_MAX (NTR_DEP_USER5 + 1) + +#ifdef __cplusplus +extern "C" { +#endif + +void ntr_init(); + +void ntr(int dep, int level, const char * fmt, ...); + +void ntr_set_level(int dep, int level); + +void ntr_set_output(FILE * f); + +int ntr_get_level(int dep); + +#ifdef __cplusplus +} +#endif diff --git a/inc/ntrlog.h b/inc/ntrlog.h deleted file mode 100644 index b2760aa..0000000 --- a/inc/ntrlog.h +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include - -#define NTR_LEVEL_NONE (0) -#define NTR_LEVEL_ERROR (1) -#define NTR_LEVEL_WARNING (2) -#define NTR_LEVEL_INFO (3) -#define NTR_LEVEL_DEBUG (4) -#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING) - -#define NTR_DEP_NTR (0) -#define NTR_DEP_USER1 (1) -#define NTR_DEP_USER2 (2) -#define NTR_DEP_USER3 (3) -#define NTR_DEP_USER4 (4) -#define NTR_DEP_USER5 (5) -#define NTR_DEP_MAX (NTR_DEP_USER5 + 1) - -#define NTR_DECL_IMPL \ -int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT}; \ -FILE * ntr_out = stdout - -extern int ntr_log_levels[]; -extern FILE * ntr_out; - -static inline -void ntr(int dep, int level, const char * fmt, ...) -{ - va_list vl; - va_start(vl, fmt); - if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) { - vfprintf(ntr_out, fmt, vl); - } - va_end(vl); -} - -static inline -void ntr_set_level(int dep, int level) -{ - if (dep < NTR_DEP_MAX) { - ntr_log_levels[dep] = level; - } -} - -static inline -void ntr_set_output(FILE * f) -{ - if (f != NULL) { - ntr_out = f; - } -} - -static inline -int ntr_get_level(int dep) -{ - if (dep < NTR_DEP_MAX) { - return ntr_log_levels[dep]; - } - return 0; -} diff --git a/khat/khat.cc b/khat/khat.cc index 03f6482..98754db 100644 --- a/khat/khat.cc +++ b/khat/khat.cc @@ -18,8 +18,9 @@ #include #include +#include "nm.h" #include "pkt.h" -#include "ntrlog.h" +#include "ntr.h" #include "util.h" @@ -33,10 +34,8 @@ * server -> STAT -> client (server sends its tx/rx timestamps) */ -NTR_DECL_IMPL; - static void * const PROBE_MAGIC = (void*)0x12344444; -constexpr static unsigned int MBUF_MAX_COUNT = 16384; +constexpr static unsigned int MBUF_MAX_COUNT = 65536; constexpr static unsigned int MBUF_CACHE_SIZE = 512; constexpr static unsigned int RX_RING_SIZE = 4096; constexpr static unsigned int TX_RING_SIZE = 4096; @@ -62,18 +61,29 @@ struct probe_state_t { uint64_t last_hw_rx; }; +struct thread_info { + int tid; + int rxqid; + int txqid; + int lcore_id; +}; // state machine: constexpr static int SERVER_STATE_WAIT = 0; constexpr static int SERVER_STATE_PROBE = 1; struct options_t { + //config + int num_threads{1}; + uint64_t cpuset{0b010}; //2nd core + //states uint16_t s_portid; struct rte_ether_addr s_host_mac; struct rte_mempool * s_pkt_mempool; std::atomic s_state {SERVER_STATE_WAIT}; struct probe_state_t s_probe_info; + std::vector s_thr_info; }; static struct options_t options; @@ -154,28 +164,28 @@ tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused, } static int -locore_main(void * _unused __rte_unused) +locore_main(void * ti) { + struct thread_info * tinfo = (struct thread_info *)ti; struct rte_mbuf *bufs[BURST_SIZE]; // + 1 because it might involve an extra PKT_TYPE_STAT packet // when all tx timestamps are ready struct rte_mbuf *tx_bufs[BURST_SIZE]; struct pkt_hdr *pkt_data; - uint32_t core_id = rte_lcore_id(); bool pending_probe = false; if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) { - ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to " + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main : WARNING, port %d is on remote NUMA node to " "polling thread.\n\tPerformance will " - "not be optimal.\n", options.s_portid); + "not be optimal.\n", tinfo->tid, options.s_portid); } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running.\n", core_id); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main : running on locore %d with txidx %d and rxidx %d.\n", tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid); while(true) { uint16_t nb_tx = 0; - const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE); + const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, tinfo->rxqid, bufs, BURST_SIZE); struct rte_mbuf * pkt_buf; struct pkt_hdr * tx_data; @@ -185,14 +195,14 @@ locore_main(void * _unused __rte_unused) pkt_data = check_valid_packet(bufs[i]); if (pkt_data == NULL) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]); - dump_pkt(bufs[i]); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main : skipping invalid packet %p.\n", tinfo->tid, (void*)bufs[i]); + //dump_pkt(bufs[i]); rte_pktmbuf_free(bufs[i]); continue; } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n", - core_id, + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main : packet %p from %x:%x:%x:%x:%x:%x to %x:%x:%x:%x:%x:%x, type %d\n", + tinfo->tid, (void*)bufs[i], pkt_data->eth_hdr.s_addr.addr_bytes[0], pkt_data->eth_hdr.s_addr.addr_bytes[1], @@ -223,7 +233,7 @@ locore_main(void * _unused __rte_unused) pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); if (pkt_buf == NULL) { - rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n"); } tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_PROBE_RESP, @@ -248,7 +258,7 @@ locore_main(void * _unused __rte_unused) pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); if (pkt_buf == NULL) { - rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n"); } tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_LOAD_RESP, @@ -273,9 +283,9 @@ locore_main(void * _unused __rte_unused) // send the packets if (nb_tx > 0) { - const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx); + const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, tinfo->txqid, tx_bufs, nb_tx); if (nb_tx_succ < nb_tx) { - rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); + rte_exit(EXIT_FAILURE, "failed to send some packets.\n"); } } @@ -284,11 +294,11 @@ locore_main(void * _unused __rte_unused) struct timespec ts; struct pkt_payload_stat * stat; if (rte_eth_timesync_read_tx_timestamp(options.s_portid, &ts) == 0) { - ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: obtained hw tx timestamp %lld.\n", ts.tv_sec * S2NS + ts.tv_nsec); + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main : obtained hw tx timestamp %lld.\n", tinfo->tid, ts.tv_sec * S2NS + ts.tv_nsec); // now we have everything we need pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool); if (pkt_buf == NULL) { - rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf\n"); + rte_exit(EXIT_FAILURE, "failed to allocate memory for pkt_buf\n"); } tx_data = construct_pkt_hdr(pkt_buf, PKT_TYPE_STAT, @@ -305,7 +315,7 @@ locore_main(void * _unused __rte_unused) // send the packet if (rte_eth_tx_burst(options.s_portid, 0, &pkt_buf, 1) < 1) { - rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n"); + rte_exit(EXIT_FAILURE, "failed to send some packets.\n"); } // release flux @@ -350,7 +360,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; /* Configure the Ethernet device. */ - ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); + ret = rte_eth_dev_configure(portid, options.num_threads, options.num_threads, &port_conf); if (ret != 0) return ret; @@ -358,21 +368,23 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) if (ret != 0) return ret; - /* Allocate and set up 1 RX queue per Ethernet port. */ + /* Allocate and set up 1 RX queue per thread per Ethernet port. */ rxconf = dev_info.default_rxconf; - for (uint32_t i = 0; i < 1; i++) { + for (int i = 0; i < options.num_threads; i++) { ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool); if (ret < 0) return ret; + options.s_thr_info.at(i)->rxqid = i; } txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; - /* Allocate and set up 1 TX queue per Ethernet port. */ - for (uint32_t i = 0; i < 1; i++) { + /* Allocate and set up 1 TX queue per thread per Ethernet port. */ + for (int i = 0; i < options.num_threads; i++) { ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf); if (ret < 0) return ret; + options.s_thr_info.at(i)->txqid = i; } ret = rte_eth_dev_start(portid); @@ -394,8 +406,11 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool) if (ret != 0) return ret; - if (rte_eth_add_tx_callback(portid, 0, tx_add_timestamp, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) { - return -1; + for (int i = 0; i < options.num_threads; i++) { + if (rte_eth_add_tx_callback(portid, options.s_thr_info.at(i)->txqid, tx_add_timestamp, NULL) == NULL || + rte_eth_add_rx_callback(portid, options.s_thr_info.at(i)->rxqid, rx_add_timestamp, NULL) == NULL) { + return -1; + } } return 0; @@ -406,20 +421,32 @@ static void usage() fprintf(stdout, "Usage:\n" \ " -v(vv): verbose mode\n" \ - " -h: display the information\n"); + " -h: display the information\n" \ + " -m: cpu mask for worker threads"); } -// static void int_handler(int) -// { -// //rte_exit(EXIT_SUCCESS, "Caught SIGINT, exiting...\n"); -// } +static void dump_options() +{ + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, + "main: khat configuration:\n" \ + " verbosity: +%d\n" \ + " thread count: %d\n" \ + " thread mask: %lld\n\n", + ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_DEFAULT, + options.num_threads, + options.cpuset); +} int main(int argc, char* argv[]) { unsigned int nb_ports; - struct rte_mempool *mbuf_pool, *mbuf_pool_pkt; + struct rte_mempool *mbuf_pool; + + ntr_init(); - //signal(SIGINT, int_handler); + if (nm_init() != 0) { + rte_exit(EXIT_FAILURE, "nm init failed!\n"); + } // init dpdk int ret = rte_eal_init(argc, argv); @@ -435,7 +462,7 @@ int main(int argc, char* argv[]) { int c; // parse arguments - while((c = getopt(argc, argv, "hv")) != -1) { + while((c = getopt(argc, argv, "hvm:")) != -1) { switch (c) { case 'v': ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1); @@ -444,6 +471,13 @@ int main(int argc, char* argv[]) usage(); rte_exit(EXIT_SUCCESS, NULL); break; + case 'm': + options.cpuset = strtoull(optarg, nullptr, 16); + options.num_threads = _mm_popcnt_u64(options.cpuset); + if (options.num_threads == 0) { + rte_exit(EXIT_FAILURE, "must run at least one thread\n"); + } + break; default: usage(); rte_exit(EXIT_SUCCESS, "unknown argument: %c", c); @@ -452,33 +486,38 @@ int main(int argc, char* argv[]) } } - // XXX: singal handler to exit + dump_options(); nb_ports = rte_eth_dev_count_avail(); if (nb_ports == 0) { rte_exit(EXIT_FAILURE, "number of ports must be > 0\n"); } - // create a mbuf memory pool on the socket - mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - if (mbuf_pool == nullptr) { - rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); - } - - // create a pkt mbuf memory pool on the socket - mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - if (mbuf_pool_pkt == nullptr) { - rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n"); - } - options.s_pkt_mempool = mbuf_pool_pkt; - - uint16_t portid = rte_eth_find_next(0); if (portid == RTE_MAX_ETHPORTS) { rte_exit(EXIT_FAILURE, "cannot find an available port\n"); } options.s_portid = portid; + // create a mbuf memory pool on the socket + mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(portid)); + if (mbuf_pool == nullptr) { + rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n"); + } + + options.s_pkt_mempool = mbuf_pool; + + // init threads + uint64_t cpuset = options.cpuset; + for(int i = 0; i < options.num_threads; i++) { + struct thread_info * tinfo = new thread_info; + tinfo->tid = i; + int ffs = ffsll(cpuset); + tinfo->lcore_id = ffs - 1; + cpuset = cpuset & ~(1 << (ffs - 1)); + options.s_thr_info.push_back(tinfo); + } + if (port_init(portid, mbuf_pool) != 0) { rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid); } @@ -487,25 +526,22 @@ int main(int argc, char* argv[]) rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid); } - ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d on socket %d with mac addr %x:%x:%x:%x:%x:%x\n", portid, rte_eth_dev_socket_id(portid), options.s_host_mac.addr_bytes[0], options.s_host_mac.addr_bytes[1], options.s_host_mac.addr_bytes[2], options.s_host_mac.addr_bytes[3], options.s_host_mac.addr_bytes[4], options.s_host_mac.addr_bytes[5]); - - - uint16_t lcore_id = rte_get_next_lcore(0, true, false); - - if (lcore_id == RTE_MAX_LCORE) { - rte_exit(EXIT_FAILURE, "cannot detect lcores.\n"); - } - - sleep(1); - if (rte_eal_remote_launch(locore_main, NULL, lcore_id) != 0) { - rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", lcore_id); + usleep(S2US); + + for(int i = 0; i < options.num_threads; i++) { + struct thread_info * tinfo = options.s_thr_info.at(i); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: launching thread %d on locore %d\n", tinfo->tid, tinfo->lcore_id); + if (rte_eal_remote_launch(locore_main, (void *)options.s_thr_info.at(i), tinfo->lcore_id) != 0) { + rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", tinfo->lcore_id); + } } // while(true) { @@ -515,8 +551,12 @@ int main(int argc, char* argv[]) // usleep(1000000); // } - if (rte_eal_wait_lcore(lcore_id) != 0) { - rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", lcore_id); + for(int i = 0; i < options.num_threads; i++) { + struct thread_info * tinfo = options.s_thr_info.at(i); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: waiting for locore %d...\n", tinfo->lcore_id); + if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) { + rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", tinfo->lcore_id); + } } // shouldn't get here diff --git a/libnm/nm.cc b/libnm/nm.cc new file mode 100644 index 0000000..5b22402 --- /dev/null +++ b/libnm/nm.cc @@ -0,0 +1,127 @@ +#include +#include +#include + +#include "nm.h" + +struct nm_obj { + int level; + int id; + struct nm_obj *parent; + std::vector children; +}; + +static bool nm_obj_comparator(struct nm_obj * a, struct nm_obj * b) +{ + return a->id < b->id; +} + +static std::vector nodes; +static std::vector cores; +static std::vector cpus; + +std::vector * nm_get_nodes() +{ + return &nodes; +} + +std::vector * nm_get_cpus() +{ + return &cpus; +} + +std::vector * nm_get_cores() +{ + return &cores; +} + +hwloc_obj_t get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type) +{ + while(obj != nullptr) { + if (obj->type == type) { + break; + } + obj = obj->parent; + } + return obj; +} + +// 0 on success +// -1 on error +int nm_init() +{ + int ret; + + hwloc_topology * topo; + if ((ret = hwloc_topology_init(&topo)) != 0) { + return ret; + } + + if ((ret = hwloc_topology_load(topo)) != 0) + return ret; + + // populate numa nodes + hwloc_obj_t obj = nullptr; + while(1) { + obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PACKAGE, obj); + if (obj == nullptr) { + break; + } + + struct nm_obj * each = new struct nm_obj; + each->id = obj->logical_index; + each->level = NM_LEVEL_NUMA; + each->parent = nullptr; + nodes.push_back(each); + printf("libnm: identified NUMA node %d\n", each->id); + } + std::sort(nodes.begin(), nodes.end(), nm_obj_comparator); + + // populate cpus + obj = nullptr; + while(1) { + obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_CORE, obj); + if (obj == nullptr) { + break; + } + struct nm_obj * each = new struct nm_obj; + each->id = obj->logical_index; + each->level = NM_LEVEL_CPU; + hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_PACKAGE); + if (parent == nullptr) { + return -1; + } + + // XXX: this faults if the OS decides to be stupid + each->parent = nodes.at(parent->logical_index); + each->parent->children.push_back(each); + cpus.push_back(each); + printf("libnm: identified CPU %d on NUMA node %d\n", each->id, each->parent->id); + } + std::sort(cpus.begin(), cpus.end(), nm_obj_comparator); + + // populate cores + obj = nullptr; + while(1) { + obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PU, obj); + if (obj == nullptr) { + break; + } + struct nm_obj * each = new struct nm_obj; + each->id = obj->logical_index; + each->level = NM_LEVEL_CORE; + hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_CORE); + if (parent == nullptr) { + return -1; + } + + // XXX: this faults if the OS decides to be stupid + each->parent = cpus.at(parent->logical_index); + each->parent->children.push_back(each); + cores.push_back(each); + printf("libnm: identified core %d on CPU %d, NUMA node %d\n", each->id, each->parent->id, each->parent->parent->id); + } + std::sort(cores.begin(), cores.end(), nm_obj_comparator); + + return ret; +} \ No newline at end of file diff --git a/libntr/ntr.c b/libntr/ntr.c new file mode 100644 index 0000000..fbd6e82 --- /dev/null +++ b/libntr/ntr.c @@ -0,0 +1,43 @@ +#include "ntr.h" + +static int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT}; +static FILE * ntr_out; + +void ntr_init() +{ + ntr_out = stdout; +} + +void ntr(int dep, int level, const char * fmt, ...) +{ + va_list vl; + va_start(vl, fmt); + if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) { + vfprintf(ntr_out, fmt, vl); + } + va_end(vl); +} + +void ntr_set_level(int dep, int level) +{ + if (dep < NTR_DEP_MAX) { + ntr_log_levels[dep] = level; + } +} + + +void ntr_set_output(FILE * f) +{ + if (f != NULL) { + ntr_out = f; + } +} + + +int ntr_get_level(int dep) +{ + if (dep < NTR_DEP_MAX) { + return ntr_log_levels[dep]; + } + return 0; +}