Initial commit of benchmarks

Summary:
+ UDP and PTP over UDP & hw timestamping
+ Khat protocol
+ Rat protocol
+ Nanosecond timestamping
+ Load generation
+ NUMA detection library
+ Test scripts
+ Server & Client multi threading & tx/rx queues
+ RSS on all packets w/ randomized L4 ports

Test Plan: by hand

Reviewers: ali

Reviewed By: ali

Differential Revision: https://review.rcs.uwaterloo.ca/D408
This commit is contained in:
quackerd 2021-02-10 14:06:27 -05:00
parent e9e15caea8
commit f655e5f5cb
26 changed files with 4806 additions and 822 deletions

3
.arcconfig Normal file
View File

@ -0,0 +1,3 @@
{
"phabricator.uri" : "https://review.rcs.uwaterloo.ca/"
}

194
.clang-format Normal file
View File

@ -0,0 +1,194 @@
# $FreeBSD$
# Basic .clang-format
---
BasedOnStyle: WebKit
AlignAfterOpenBracket: DontAlign
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlines: Left
AlignOperands: false
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: InlineOnly
AllowShortIfStatementsOnASingleLine: Never
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterReturnType: TopLevelDefinitions
AlwaysBreakBeforeMultilineStrings: false
AlwaysBreakTemplateDeclarations: MultiLine
BinPackArguments: true
BinPackParameters: true
BreakBeforeBinaryOperators: None
BreakBeforeBraces: WebKit
BreakBeforeTernaryOperators: false
# TODO: BreakStringLiterals can cause very strange formatting so turn it off?
BreakStringLiterals: false
# Prefer:
# some_var = function(arg1,
# arg2)
# over:
# some_var =
# function(arg1, arg2)
PenaltyBreakAssignment: 100
# Prefer:
# some_long_function(arg1, arg2
# arg3)
# over:
# some_long_function(
# arg1, arg2, arg3)
PenaltyBreakBeforeFirstCallParameter: 100
CompactNamespaces: true
DerivePointerAlignment: false
DisableFormat: false
ForEachMacros:
- ARB_ARRFOREACH
- ARB_ARRFOREACH_REVWCOND
- ARB_ARRFOREACH_REVERSE
- ARB_FOREACH
- ARB_FOREACH_FROM
- ARB_FOREACH_SAFE
- ARB_FOREACH_REVERSE
- ARB_FOREACH_REVERSE_FROM
- ARB_FOREACH_REVERSE_SAFE
- CPU_FOREACH
- FOREACH_THREAD_IN_PROC
- FOREACH_PROC_IN_SYSTEM
- FOREACH_PRISON_CHILD
- FOREACH_PRISON_DESCENDANT
- FOREACH_PRISON_DESCENDANT_LOCKED
- FOREACH_PRISON_DESCENDANT_LOCKED_LEVEL
- MNT_VNODE_FOREACH_ALL
- MNT_VNODE_FOREACH_ACTIVE
- RB_FOREACH
- RB_FOREACH_FROM
- RB_FOREACH_SAFE
- RB_FOREACH_REVERSE
- RB_FOREACH_REVERSE_FROM
- RB_FOREACH_REVERSE_SAFE
- SLIST_FOREACH
- SLIST_FOREACH_FROM
- SLIST_FOREACH_FROM_SAFE
- SLIST_FOREACH_SAFE
- SLIST_FOREACH_PREVPTR
- SPLAY_FOREACH
- LIST_FOREACH
- LIST_FOREACH_FROM
- LIST_FOREACH_FROM_SAFE
- LIST_FOREACH_SAFE
- STAILQ_FOREACH
- STAILQ_FOREACH_FROM
- STAILQ_FOREACH_FROM_SAFE
- STAILQ_FOREACH_SAFE
- TAILQ_FOREACH
- TAILQ_FOREACH_FROM
- TAILQ_FOREACH_FROM_SAFE
- TAILQ_FOREACH_REVERSE
- TAILQ_FOREACH_REVERSE_FROM
- TAILQ_FOREACH_REVERSE_FROM_SAFE
- TAILQ_FOREACH_REVERSE_SAFE
- TAILQ_FOREACH_SAFE
- VM_MAP_ENTRY_FOREACH
- VM_PAGE_DUMP_FOREACH
IndentCaseLabels: false
IndentPPDirectives: None
Language: Cpp
NamespaceIndentation: None
PointerAlignment: Right
ContinuationIndentWidth: 4
IndentWidth: 8
TabWidth: 8
ColumnLimit: 80
UseTab: Always
SpaceAfterCStyleCast: false
IncludeBlocks: Regroup
IncludeCategories:
- Regex: '^\"opt_.*\.h\"'
Priority: 1
SortPriority: 10
- Regex: '^<sys/cdefs\.h>'
Priority: 2
SortPriority: 20
- Regex: '^<sys/types\.h>'
Priority: 2
SortPriority: 21
- Regex: '^<sys/param\.h>'
Priority: 2
SortPriority: 22
- Regex: '^<sys/systm\.h>'
Priority: 2
SortPriority: 23
- Regex: '^<sys.*/'
Priority: 2
SortPriority: 24
- Regex: '^<vm/vm\.h>'
Priority: 3
SortPriority: 30
- Regex: '^<vm/'
Priority: 3
SortPriority: 31
- Regex: '^<machine/'
Priority: 4
SortPriority: 40
- Regex: '^<(x86|amd64|i386|xen)/'
Priority: 5
SortPriority: 50
- Regex: '^<dev/'
Priority: 6
SortPriority: 60
- Regex: '^<net.*/'
Priority: 7
SortPriority: 70
- Regex: '^<protocols/'
Priority: 7
SortPriority: 71
- Regex: '^<(fs|nfs(|client|server)|ufs)/'
Priority: 8
SortPriority: 80
- Regex: '^<[^/].*\.h'
Priority: 9
SortPriority: 90
- Regex: '^\".*\.h\"'
Priority: 10
SortPriority: 100
# LLVM's header include ordering style is almost the exact opposite of ours.
# Unfortunately, they have hard-coded their preferences into clang-format.
# Clobbering this regular expression to avoid matching prevents non-system
# headers from being forcibly moved to the top of the include list.
# http://llvm.org/docs/CodingStandards.html#include-style
IncludeIsMainRegex: 'BLAH_DONT_MATCH_ANYTHING'
SortIncludes: true
KeepEmptyLinesAtTheStartOfBlocks: true
TypenameMacros:
- ARB_ELMTYPE
- ARB_HEAD
- ARB8_HEAD
- ARB16_HEAD
- ARB32_HEAD
- ARB_ENTRY
- ARB8_ENTRY
- ARB16_ENTRY
- ARB32_ENTRY
- LIST_CLASS_ENTRY
- LIST_CLASS_HEAD
- LIST_ENTRY
- LIST_HEAD
- QUEUE_TYPEOF
- RB_ENTRY
- RB_HEAD
- SLIST_CLASS_HEAD
- SLIST_CLASS_ENTRY
- SLIST_HEAD
- SLIST_ENTRY
- SMR_POINTER
- SPLAY_ENTRY
- SPLAY_HEAD
- STAILQ_CLASS_ENTRY
- STAILQ_CLASS_HEAD
- STAILQ_ENTRY
- STAILQ_HEAD
- TAILQ_CLASS_ENTRY
- TAILQ_CLASS_HEAD
- TAILQ_ENTRY
- TAILQ_HEAD

1
.clang-tidy Normal file
View File

@ -0,0 +1 @@
Checks: "-*,clang-diagnostic-*,clang-analyzer-*,modernize*,performance*,-modernize-use-trailing-return-type,-modernize-avoid-c-arrays"

271
.gitignore vendored Normal file
View File

@ -0,0 +1,271 @@
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ PYTHON STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
################ C++ STUFF ##########################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
########################################################
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app

0
.gitmodules vendored Normal file
View File

1
.pyenv Normal file
View File

@ -0,0 +1 @@
PYTHONPATH="./scripts/libs"

View File

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.0)
find_program(CC_GCC gcc)
find_program(CXX_GCC g++)
find_program(CC_GCC clang)
find_program(CXX_GCC clang++)
set(CMAKE_C_COMPILER ${CC_GCC})
set(CMAKE_CXX_COMPILER ${CXX_GCC})
@ -10,24 +10,51 @@ project(khat)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}")
find_package(dpdk REQUIRED)
find_package(Hwloc REQUIRED)
set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11
-Wno-deprecated-declarations
-Wno-packed-not-aligned
-Wno-address-of-packed-member
-msse4)
-Wno-zero-length-array
-Wno-gnu-zero-variadic-macro-arguments
-msse4
-mavx)
include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(${dpdk_INCLUDE_DIRS})
include_directories(${Hwloc_INCLUDE_DIRS})
set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11)
set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11)
set(KHAT_LINKLIBS pthread nm ntr)
set(CAT_LINKLIBS pthread nm ntr gen)
set(RAT_LINKLIBS pthread nm ntr gen)
add_library(nm libnm/nm.cc)
target_link_libraries(nm ${Hwloc_LIBRARIES})
target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS})
add_library(ntr libntr/ntr.c)
target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS})
add_library(gen libgen/generator.cc)
target_link_libraries(gen ${Hwloc_LIBRARIES})
target_compile_options(gen PRIVATE ${LIBGEN_CC_FLAGS})
add_executable(khat khat/khat.cc)
add_executable(cat cat/cat.cc)
set(LINK_LIBS ${dpdk_LIBRARIES} pthread)
target_link_libraries(khat ${LINK_LIBS})
target_link_libraries(khat ${dpdk_LIBRARIES} ${KHAT_LINKLIBS})
target_compile_options(khat PRIVATE ${CC_FLAGS})
target_link_options(khat PRIVATE -L /usr/local/lib)
target_link_libraries(cat ${LINK_LIBS})
add_executable(cat cat/cat.cc)
target_link_libraries(cat ${dpdk_LIBRARIES} ${CAT_LINKLIBS})
target_compile_options(cat PRIVATE ${CC_FLAGS})
target_link_options(cat PRIVATE -L /usr/local/lib)
add_executable(rat rat/rat.cc)
target_link_libraries(rat ${dpdk_LIBRARIES} ${RAT_LINKLIBS})
target_compile_options(rat PRIVATE ${CC_FLAGS})
target_link_options(rat PRIVATE -L /usr/local/lib)

213
FindHwloc.cmake Normal file
View File

@ -0,0 +1,213 @@
#.rst:
# FindHwloc
# ----------
#
# Try to find Portable Hardware Locality (hwloc) libraries.
# http://www.open-mpi.org/software/hwloc
#
# You may declare HWLOC_ROOT environment variable to tell where
# your hwloc library is installed.
#
# Once done this will define::
#
# Hwloc_FOUND - True if hwloc was found
# Hwloc_INCLUDE_DIRS - include directories for hwloc
# Hwloc_LIBRARIES - link against these libraries to use hwloc
# Hwloc_VERSION - version
# Hwloc_CFLAGS - include directories as compiler flags
# Hwloc_LDLFAGS - link paths and libs as compiler flags
#
#=============================================================================
# Copyright 2014 Mikael Lepistö
#
# Distributed under the OSI-approved BSD License (the "License");
#
# This software is distributed WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the License for more information.
#=============================================================================
if(WIN32)
find_path(Hwloc_INCLUDE_DIR
NAMES
hwloc.h
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
include
)
find_library(Hwloc_LIBRARY
NAMES
libhwloc.lib
PATHS
ENV "PROGRAMFILES(X86)"
ENV HWLOC_ROOT
PATH_SUFFIXES
lib
)
#
# Check if the found library can be used to linking
#
SET (_TEST_SOURCE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/linktest.c")
FILE (WRITE "${_TEST_SOURCE}"
"
#include <hwloc.h>
int main()
{
hwloc_topology_t topology;
int nbcores;
hwloc_topology_init(&topology);
hwloc_topology_load(topology);
nbcores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE);
hwloc_topology_destroy(topology);
return 0;
}
"
)
TRY_COMPILE(_LINK_SUCCESS ${CMAKE_BINARY_DIR} "${_TEST_SOURCE}"
CMAKE_FLAGS
"-DINCLUDE_DIRECTORIES:STRING=${Hwloc_INCLUDE_DIR}"
CMAKE_FLAGS
"-DLINK_LIBRARIES:STRING=${Hwloc_LIBRARY}"
)
IF(NOT _LINK_SUCCESS)
if(CMAKE_SIZEOF_VOID_P EQUAL 8)
message(STATUS "You are building 64bit target.")
ELSE()
message(STATUS "You are building 32bit code. If you like to build x64 use e.g. -G 'Visual Studio 12 Win64' generator." )
ENDIF()
message(FATAL_ERROR "Library found, but linking test program failed.")
ENDIF()
#
# Resolve version if some compiled binary found...
#
find_program(HWLOC_INFO_EXECUTABLE
NAMES
hwloc-info
PATHS
ENV HWLOC_ROOT
PATH_SUFFIXES
bin
)
if(HWLOC_INFO_EXECUTABLE)
execute_process(
COMMAND ${HWLOC_INFO_EXECUTABLE} "--version"
OUTPUT_VARIABLE HWLOC_VERSION_LINE
OUTPUT_STRIP_TRAILING_WHITESPACE
)
string(REGEX MATCH "([0-9]+.[0-9]+)$"
Hwloc_VERSION "${HWLOC_VERSION_LINE}")
unset(HWLOC_VERSION_LINE)
endif()
#
# All good
#
set(Hwloc_LIBRARIES ${Hwloc_LIBRARY})
set(Hwloc_INCLUDE_DIRS ${Hwloc_INCLUDE_DIR})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
Hwloc
FOUND_VAR Hwloc_FOUND
REQUIRED_VARS Hwloc_LIBRARY Hwloc_INCLUDE_DIR Hwloc_VERSION_PARSED Hwloc_VERSION_MAJOR Hwloc_VERSION_MINOR
VERSION_VAR Hwloc_VERSION)
mark_as_advanced(
Hwloc_INCLUDE_DIR
Hwloc_LIBRARY)
foreach(arg ${Hwloc_INCLUDE_DIRS})
set(Hwloc_CFLAGS "${Hwloc_CFLAGS} /I${arg}")
endforeach()
set(Hwloc_LDFLAGS "${Hwloc_LIBRARY}")
else()
if(CMAKE_CROSSCOMPILING)
find_path(Hwloc_INCLUDE_DIRS
NAMES
hwloc.h
PATHS
ENV HWLOC_ROOT
)
find_library(Hwloc_LIBRARIES
NAMES
hwloc
PATHS
ENV HWLOC_ROOT
)
if(Hwloc_INCLUDE_DIRS AND Hwloc_LIBRARIES)
message(WARNING "HWLOC library found using find_library() - cannot determine version. Assuming 1.7.0")
set(Hwloc_FOUND 1)
set(Hwloc_VERSION "1.7.0")
endif()
else() # Find with pkgconfig for non-crosscompile builds
find_package(PkgConfig)
if(HWLOC_ROOT)
set(ENV{PKG_CONFIG_PATH} "${HWLOC_ROOT}/lib/pkgconfig")
else()
foreach(PREFIX ${CMAKE_PREFIX_PATH})
set(PKG_CONFIG_PATH "${PKG_CONFIG_PATH}:${PREFIX}/lib/pkgconfig")
endforeach()
set(ENV{PKG_CONFIG_PATH} "${PKG_CONFIG_PATH}:$ENV{PKG_CONFIG_PATH}")
endif()
if(hwloc_FIND_REQUIRED)
set(_hwloc_OPTS "REQUIRED")
elseif(hwloc_FIND_QUIETLY)
set(_hwloc_OPTS "QUIET")
else()
set(_hwloc_output 1)
endif()
if(hwloc_FIND_VERSION)
if(hwloc_FIND_VERSION_EXACT)
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc=${hwloc_FIND_VERSION})
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc>=${hwloc_FIND_VERSION})
endif()
else()
pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc)
endif()
if(Hwloc_FOUND)
string(REPLACE "." ";" Hwloc_VERSION_PARSED "${Hwloc_VERSION}")
set(Hwloc_VERSION "${Hwloc_VERSION}" CACHE STRING "version of Hwloc as a list")
list(GET Hwloc_VERSION_PARSED 0 Hwloc_VERSION_MAJOR)
set(Hwloc_VERSION_MAJOR "${Hwloc_VERSION_MAJOR}" CACHE STRING "Major version of Hwloc")
list(GET Hwloc_VERSION_PARSED 1 Hwloc_VERSION_MINOR)
set(Hwloc_VERSION_MINOR "${Hwloc_VERSION_MINOR}" CACHE STRING "Minor version of Hwloc")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Hwloc DEFAULT_MSG Hwloc_LIBRARIES)
if(NOT ${Hwloc_VERSION} VERSION_LESS 1.7.0)
set(Hwloc_GL_FOUND 1)
endif()
if(_hwloc_output)
message(STATUS
"Found hwloc ${Hwloc_VERSION} in ${Hwloc_INCLUDE_DIRS}:${Hwloc_LIBRARIES}")
endif()
endif()
endif() # cross-compile else
endif()

1180
cat/cat.cc

File diff suppressed because it is too large Load Diff

View File

@ -2,8 +2,12 @@
-O2
-std=c++11
-Wall
-Wextra
-Werror
-Wpedantic
-I/usr/include/dpdk
-Iinc
-Wno-deprecated-declarations
-Wno-deprecated-declarations
-Wno-packed-not-aligned
-Wno-address-of-packed-member
-Wno-zero-length-array
-Wno-gnu-zero-variadic-macro-arguments

296
inc/gen.h Normal file
View File

@ -0,0 +1,296 @@
// modified from mutilate
// -*- c++ -*-
// 1. implement "fixed" generator
// 2. implement discrete generator
// 3. implement combine generator?
#pragma once
#include <sys/param.h>
#include <netinet/in.h>
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "util.h"
#include <string>
#include <utility>
#include <vector>
#define D(fmt, ...)
#define DIE(fmt, ...) (void)0;
#define FNV_64_PRIME (0x100000001b3ULL)
#define FNV1_64_INIT (0xcbf29ce484222325ULL)
static inline uint64_t
fnv_64_buf(const void *buf, size_t len)
{
uint64_t hval = FNV1_64_INIT;
unsigned char *bp = (unsigned char *)buf; /* start of buffer */
unsigned char *be = bp + len; /* beyond end of buffer */
while (bp < be) {
hval ^= (uint64_t)*bp++;
hval *= FNV_64_PRIME;
}
return hval;
}
static inline uint64_t
fnv_64(uint64_t in)
{
return fnv_64_buf(&in, sizeof(in));
}
// Generator syntax:
//
// \d+ == fixed
// n[ormal]:mean,sd
// e[xponential]:lambda
// p[areto]:scale,shape
// g[ev]:loc,scale,shape
// fb_value, fb_key, fb_rate
class Generator {
public:
Generator() { }
// Generator(const Generator &g) = delete;
// virtual Generator& operator=(const Generator &g) = delete;
virtual ~Generator() { }
virtual double generate(double U = -1.0) = 0;
virtual void set_lambda(double) { DIE("set_lambda() not implemented"); }
protected:
std::string type;
};
class Fixed : public Generator {
public:
Fixed(double _value = 1.0)
: value(_value)
{
D("Fixed(%f)", value);
}
virtual double generate(double) { return value; }
virtual void set_lambda(double lambda)
{
if (lambda > 0.0)
value = 1.0 / lambda;
else
value = 0.0;
}
private:
double value;
};
class Uniform : public Generator {
public:
Uniform(double _scale)
: scale(_scale)
{
D("Uniform(%f)", scale);
}
virtual double generate(double U = -1.0)
{
if (U < 0.0)
U = drand48();
return scale * U;
}
virtual void set_lambda(double lambda)
{
if (lambda > 0.0)
scale = 2.0 / lambda;
else
scale = 0.0;
}
private:
double scale;
};
class Normal : public Generator {
public:
Normal(double _mean = 1.0, double _sd = 1.0)
: mean(_mean)
, sd(_sd)
{
D("Normal(mean=%f, sd=%f)", mean, sd);
}
virtual double generate(double U = -1.0)
{
if (U < 0.0)
U = drand48();
double V = U; // drand48();
double N = sqrt(-2 * log(U)) * cos(2 * M_PI * V);
return mean + sd * N;
}
virtual void set_lambda(double lambda)
{
if (lambda > 0.0)
mean = 1.0 / lambda;
else
mean = 0.0;
}
private:
double mean, sd;
};
class Exponential : public Generator {
public:
Exponential(double _lambda = 1.0)
: lambda(_lambda)
{
D("Exponential(lambda=%f)", lambda);
}
virtual double generate(double U = -1.0)
{
if (lambda <= 0.0)
return 0.0;
if (U < 0.0)
U = drand48();
return -log(U) / lambda;
}
virtual void set_lambda(double lambda) { this->lambda = lambda; }
private:
double lambda;
};
class GPareto : public Generator {
public:
GPareto(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0)
: loc(_loc)
, scale(_scale)
, shape(_shape)
{
assert(shape != 0.0);
D("GPareto(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
}
virtual double generate(double U = -1.0)
{
if (U < 0.0)
U = drand48();
return loc + scale * (pow(U, -shape) - 1) / shape;
}
virtual void set_lambda(double lambda)
{
if (lambda <= 0.0)
scale = 0.0;
else
scale = (1 - shape) / lambda - (1 - shape) * loc;
}
private:
double loc /* mu */;
double scale /* sigma */, shape /* k */;
};
class GEV : public Generator {
public:
GEV(double _loc = 0.0, double _scale = 1.0, double _shape = 1.0)
: e(1.0)
, loc(_loc)
, scale(_scale)
, shape(_shape)
{
assert(shape != 0.0);
D("GEV(loc=%f, scale=%f, shape=%f)", loc, scale, shape);
}
virtual double generate(double U = -1.0)
{
return loc + scale * (pow(e.generate(U), -shape) - 1) / shape;
}
private:
Exponential e;
double loc /* mu */, scale /* sigma */, shape /* k */;
};
class Discrete : public Generator {
public:
~Discrete() { delete def; }
Discrete(Generator *_def = NULL)
: def(_def)
{
if (def == NULL)
def = new Fixed(0.0);
}
virtual double generate(double U = -1.0)
{
double Uc = U;
if (pv.size() > 0 && U < 0.0)
U = drand48();
double sum = 0;
for (auto p : pv) {
sum += p.first;
if (U < sum)
return p.second;
}
return def->generate(Uc);
}
void add(double p, double v)
{
pv.push_back(std::pair<double, double>(p, v));
}
private:
Generator *def;
std::vector<std::pair<double, double>> pv;
};
class KeyGenerator {
public:
KeyGenerator(Generator *_g, double _max = 10000)
: g(_g)
, max(_max)
{
}
std::string generate(uint64_t ind)
{
uint64_t h = fnv_64(ind);
double U = (double)h / (double)ULLONG_MAX;
double G = g->generate(U);
int keylen = MAX(round(G), floor(log10(max)) + 1);
char key[256];
snprintf(key, 256, "%0*" PRIu64, keylen, ind);
// D("%d = %s", ind, key);
return std::string(key);
}
private:
Generator *g;
double max;
};
Generator *createGenerator(std::string str);
Generator *createFacebookKey();
Generator *createFacebookValue();
Generator *createFacebookIA();

20
inc/nm.h Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include <cstdint>
#include <vector>
constexpr static int NM_LEVEL_NUMA = 0;
constexpr static int NM_LEVEL_CPU = 1;
constexpr static int NM_LEVEL_CORE = 2;
std::vector<struct nm_obj *> *nm_get_nodes();
std::vector<struct nm_obj *> *nm_get_cpus();
std::vector<struct nm_obj *> *nm_get_cores();
// 0 on success
// -1 on error
int nm_init(int verbosity);
uint64_t nm_tsc2ns(uint64_t tsc);
uint64_t nm_get_uptime_ns();

38
inc/ntr.h Normal file
View File

@ -0,0 +1,38 @@
#pragma once
#include <stdarg.h>
#include <stdio.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#ifdef __cplusplus
extern "C" {
#endif
void ntr_init();
__attribute__((format(printf, 3, 4))) void ntr(
int dep, int level, const char *fmt, ...);
void ntr_set_level(int dep, int level);
void ntr_set_output(FILE *f);
int ntr_get_level(int dep);
#ifdef __cplusplus
}
#endif

View File

@ -1,61 +0,0 @@
#pragma once
#include <stdio.h>
#define NTR_LEVEL_NONE (0)
#define NTR_LEVEL_ERROR (1)
#define NTR_LEVEL_WARNING (2)
#define NTR_LEVEL_INFO (3)
#define NTR_LEVEL_DEBUG (4)
#define NTR_LEVEL_DEFAULT (NTR_LEVEL_WARNING)
#define NTR_DEP_NTR (0)
#define NTR_DEP_USER1 (1)
#define NTR_DEP_USER2 (2)
#define NTR_DEP_USER3 (3)
#define NTR_DEP_USER4 (4)
#define NTR_DEP_USER5 (5)
#define NTR_DEP_MAX (NTR_DEP_USER5 + 1)
#define NTR_DECL_IMPL \
int ntr_log_levels[NTR_DEP_MAX] = {NTR_LEVEL_DEFAULT}; \
FILE * ntr_out = stdout
extern int ntr_log_levels[];
extern FILE * ntr_out;
static inline
void ntr(int dep, int level, const char * fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
static inline
void ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
static inline
void ntr_set_output(FILE * f)
{
if (f != NULL) {
ntr_out = f;
}
}
static inline
int ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}

483
inc/pkt.h
View File

@ -1,16 +1,19 @@
#pragma once
#include <rte_mbuf_core.h>
#include <rte_mbuf.h>
#include <rte_udp.h>
#include <rte_byteorder.h>
#include <rte_ip.h>
#include <stdint.h>
#include <rte_flow.h>
#include <rte_ether.h>
#include <unistd.h>
#include <rte_flow.h>
#include <rte_ip.h>
#include <rte_mbuf.h>
#include <rte_mbuf_core.h>
#include <rte_net.h>
#include <rte_vxlan.h>
#include <rte_udp.h>
#include <unistd.h>
#include "nm.h"
#include "util.h"
#include <random>
#define IP_DEFTTL 64 /* from RFC 1340. */
#define IP_VERSION 0x40
@ -19,59 +22,247 @@
#define IP_ADDR_FMT_SIZE 15
constexpr static uint32_t ETHER_FRAME_MAGIC = 0xDCDCE5E5;
const static struct rte_ether_addr POU_MAC {
0x01, 0x00, 0x5e, 0x00, 0x01, 0x81
};
const static uint32_t POU_IP = RTE_IPV4(224, 0, 1, 129);
const static uint16_t POU_PORT = 319;
/* Khat Protocol:
* khat only processes two kinds of packets - LOAD and PROBE
* rat:
* rat -> LOAD -> khat
* khat -> LOAD_RESP -> rat
* cat:
* cat -> PROBE -> khat (cat tx timestamps)
* khat -> PROBE_RESP -> cat (cat rx timestamps and khat tx/rx
* timestamps) khat -> STAT -> cat (khat sends its tx/rx timestamps)
*/
struct packet_hdr {
struct rte_ether_hdr eth_hdr;
struct rte_ipv4_hdr ipv4_hdr;
struct rte_udp_hdr udp_hdr;
/* Rat Protocol:
* cat & rat:
* 1. both launch with full parameters
* rat with slave flag
* cat with master flag
* 2. rats create threads and wait for cat's signal
* 3. cat creates threads
* 4. cat -> rats SYNC
* 5. rats -> cat SYNC_ACK and start running
* 6. cat start running after received all SYNC_ACKs
* 7. cat stops running, cat -> rats FIN
* 8. rats stops running, rats -> cat FIN_ACK with QPS
* 9. cat exits after receiving all FIN_ACKs and flushing statsGG
*/
struct ptp_hdr {
uint8_t ptp_msg_type;
uint8_t ptp_ver;
uint8_t unused[34];
} __attribute__((packed));
struct packet_data
{
struct packet_hdr pkt_hdr;
uint32_t magic;
uint32_t epoch;
uint64_t clt_ts_tx;
uint64_t clt_ts_rx;
uint64_t srv_ts_tx;
uint64_t srv_ts_rx;
struct pkt_hdr {
struct rte_ether_hdr eth_hdr;
struct rte_ipv4_hdr ipv4_hdr;
struct rte_udp_hdr udp_hdr;
struct ptp_hdr ptp_hdr;
uint16_t type;
uint32_t magic;
char payload[0];
} __attribute__((packed));
struct net_spec {
uint32_t ip;
rte_ether_addr mac_addr;
};
static inline void
print_mac(struct rte_ether_addr * mac)
pkt_hdr_to_netspec(struct pkt_hdr *pkt, struct net_spec *src,
uint16_t *src_port, struct net_spec *dst, uint16_t *dst_port)
{
printf("%x:%x:%x:%x:%x:%x", mac->addr_bytes[0],
mac->addr_bytes[1],
mac->addr_bytes[2],
mac->addr_bytes[3],
mac->addr_bytes[4],
mac->addr_bytes[5]);
if (src != nullptr) {
rte_ether_addr_copy(&pkt->eth_hdr.s_addr, &src->mac_addr);
src->ip = rte_be_to_cpu_32(pkt->ipv4_hdr.src_addr);
}
if (src_port != nullptr) {
*src_port = rte_be_to_cpu_16(pkt->udp_hdr.src_port);
}
if (dst != nullptr) {
rte_ether_addr_copy(&pkt->eth_hdr.d_addr, &dst->mac_addr);
dst->ip = rte_be_to_cpu_32(pkt->ipv4_hdr.dst_addr);
}
if (dst_port != nullptr) {
*dst_port = rte_be_to_cpu_16(pkt->udp_hdr.dst_port);
}
};
struct conn_spec {
struct net_spec *src;
uint16_t src_port;
struct net_spec *dst;
uint16_t dst_port;
};
// returns 0 on success
static inline int
str_to_netspec(char *str, struct net_spec *out)
{
const char *tok = "@";
char *token;
char *ptr;
uint32_t a, b, c, d;
token = strtok_r(str, tok, &ptr);
if (token == nullptr ||
sscanf(token, "%d.%d.%d.%d", &a, &b, &c, &d) != 4) {
return -1;
}
out->ip = RTE_IPV4(a, b, c, d);
// mac next
token = strtok_r(nullptr, tok, &ptr);
if (token == nullptr ||
rte_ether_unformat_addr(token, &out->mac_addr) != 0) {
return -1;
}
return 0;
}
constexpr static uint16_t PKT_TYPE_LOAD = 0;
struct pkt_payload_load {
uint32_t epoch;
uint32_t load;
};
constexpr static uint16_t PKT_TYPE_PROBE = 1;
constexpr static uint16_t PKT_TYPE_LOAD_RESP = 2;
constexpr static uint16_t PKT_TYPE_PROBE_RESP = 3;
struct pkt_payload_epoch {
uint32_t epoch;
};
constexpr static uint16_t PKT_TYPE_STAT = 4;
struct pkt_payload_stat {
uint32_t epoch;
uint64_t hw_rx;
uint64_t hw_tx;
uint64_t sw_rx;
uint64_t sw_tx;
};
constexpr static uint16_t PKT_TYPE_SYNC = 5;
constexpr static uint16_t PKT_TYPE_SYNC_ACK = 6;
constexpr static uint16_t PKT_TYPE_FIN = 7;
constexpr static uint16_t PKT_TYPE_FIN_ACK = 8;
struct pkt_payload_qps {
uint32_t qps;
};
constexpr static uint16_t NUM_PKT_TYPES = PKT_TYPE_FIN_ACK + 1;
// for fast packet verification
static const uint32_t expected_payload_size[NUM_PKT_TYPES] {
sizeof(struct pkt_payload_load), // LOAD
sizeof(struct pkt_payload_epoch), // PROBE
sizeof(struct pkt_payload_epoch), // LOAD_RESP
sizeof(struct pkt_payload_epoch), // PROBE_RESP
sizeof(struct pkt_payload_stat), // STAT
0, // SYNC
0, // SYNC_ACK
0, // FIN
sizeof(struct pkt_payload_qps) // FIN_ACK
};
class rdport_generator {
private:
DISALLOW_EVIL_CONSTRUCTORS(rdport_generator);
constexpr static uint32_t MAX_PORT = 65535;
uint32_t min_port;
uint32_t cur;
std::random_device rd;
std::default_random_engine gen;
std::uniform_int_distribution<uint32_t> dist;
public:
rdport_generator(uint32_t mport)
: min_port(mport)
, cur(0)
, dist(0, MAX_PORT - min_port)
{
gen.seed(nm_get_uptime_ns());
cur = dist(gen);
}
uint16_t next()
{
uint16_t ret = ((cur) % (MAX_PORT - min_port)) + min_port;
cur++;
return ret;
}
};
#define NTR_PKT(dep, level, pkt, prefix_fmt, ...) \
ntr(dep, level, \
prefix_fmt \
"src: %d.%d.%d.%d:%d@%02x:%02x:%02x:%02x:%02x:%02x dst: %d.%d.%d.%d:%d@%02x:%02x:%02x:%02x:%02x:%02x type: %d\n", \
##__VA_ARGS__, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.src_addr) >> 24) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.src_addr) >> 16) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.src_addr) >> 8) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.src_addr) >> 0) & 0xff, \
rte_be_to_cpu_16(pkt->udp_hdr.src_port), \
pkt->eth_hdr.s_addr.addr_bytes[0], \
pkt->eth_hdr.s_addr.addr_bytes[1], \
pkt->eth_hdr.s_addr.addr_bytes[2], \
pkt->eth_hdr.s_addr.addr_bytes[3], \
pkt->eth_hdr.s_addr.addr_bytes[4], \
pkt->eth_hdr.s_addr.addr_bytes[5], \
(rte_be_to_cpu_32(pkt->ipv4_hdr.dst_addr) >> 24) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.dst_addr) >> 16) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.dst_addr) >> 8) & 0xff, \
(rte_be_to_cpu_32(pkt->ipv4_hdr.dst_addr) >> 0) & 0xff, \
rte_be_to_cpu_16(pkt->udp_hdr.dst_port), \
pkt->eth_hdr.d_addr.addr_bytes[0], \
pkt->eth_hdr.d_addr.addr_bytes[1], \
pkt->eth_hdr.d_addr.addr_bytes[2], \
pkt->eth_hdr.d_addr.addr_bytes[3], \
pkt->eth_hdr.d_addr.addr_bytes[4], \
pkt->eth_hdr.d_addr.addr_bytes[5], rte_be_to_cpu_16(pkt->type))
static inline void
print_mac(struct rte_ether_addr *mac)
{
printf("%x:%x:%x:%x:%x:%x", mac->addr_bytes[0], mac->addr_bytes[1],
mac->addr_bytes[2], mac->addr_bytes[3], mac->addr_bytes[4],
mac->addr_bytes[5]);
}
static inline void
print_ipv4(uint32_t ip)
{
printf("%d-%d-%d-%d", (ip >> 24) & 0xff,
(ip >> 16) & 0xff,
(ip >> 8) & 0xff,
(ip >> 0) & 0xff);
printf("%d-%d-%d-%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff,
(ip >> 8) & 0xff, (ip >> 0) & 0xff);
}
static inline void
dump_pkt(struct rte_mbuf *pkt)
{
if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr)) {
if (rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr)) {
return;
}
struct rte_ether_hdr _eth_hdr;
struct rte_ether_hdr * eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_read(pkt, 0, sizeof(struct rte_ether_hdr), &_eth_hdr);
if (eth_hdr == NULL) {
auto eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_read(
pkt, 0, sizeof(struct rte_ether_hdr), &_eth_hdr);
if (eth_hdr == nullptr) {
return;
}
// ethernet frame
printf("Packet %p: Length 0x%x\n", (void*)pkt, rte_pktmbuf_data_len(pkt));
printf(
"Packet %p: Length 0x%x\n", (void *)pkt, rte_pktmbuf_data_len(pkt));
printf(" Ethernet header:\n");
printf(" Src:");
print_mac(&eth_hdr->s_addr);
@ -86,12 +277,13 @@ dump_pkt(struct rte_mbuf *pkt)
return;
}
if(rte_pktmbuf_data_len(pkt) < sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)) {
if (rte_pktmbuf_data_len(pkt) <
sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)) {
return;
}
// dump ip header
struct rte_ipv4_hdr * ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
auto ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
printf(" IPv4 header:\n");
printf(" Src:");
print_ipv4(rte_be_to_cpu_32(ipv4_hdr->src_addr));
@ -100,76 +292,167 @@ dump_pkt(struct rte_mbuf *pkt)
print_ipv4(rte_be_to_cpu_32(ipv4_hdr->dst_addr));
printf("\n");
printf(" Protocol: 0x%x\n", ipv4_hdr->next_proto_id);
}
static inline
struct packet_data * construct_udp_pkt_hdr(struct rte_mbuf * buf,
struct rte_ether_addr * src_mac, struct rte_ether_addr * dst_mac,
uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
static inline bool
is_l2ts_pkt(uint16_t type)
{
rte_pktmbuf_reset(buf);
struct packet_data * pkt_data = (struct packet_data *)rte_pktmbuf_append(buf, sizeof(struct packet_data));
struct rte_ether_hdr * eth_hdr;
struct rte_ipv4_hdr * ipv4_hdr;
struct rte_udp_hdr * udp_hdr;
if (pkt_data == NULL)
return NULL;
// single segment
buf->nb_segs = 1;
// construct l2 header
eth_hdr = &pkt_data->pkt_hdr.eth_hdr;
rte_ether_addr_copy(src_mac, &eth_hdr->s_addr);
rte_ether_addr_copy(dst_mac, &eth_hdr->d_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
buf->l2_len = sizeof(struct rte_ether_hdr);
// construct l3 header
ipv4_hdr = &pkt_data->pkt_hdr.ipv4_hdr;
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
ipv4_hdr->version_ihl = IP_VHL_DEF;
ipv4_hdr->type_of_service = 0;
ipv4_hdr->fragment_offset = 0;
ipv4_hdr->time_to_live = IP_DEFTTL;
ipv4_hdr->next_proto_id = IPPROTO_UDP;
ipv4_hdr->packet_id = 0;
ipv4_hdr->src_addr = rte_cpu_to_be_32(src_ip);
ipv4_hdr->dst_addr = rte_cpu_to_be_32(dst_ip);
ipv4_hdr->total_length = rte_cpu_to_be_16(sizeof(struct packet_data) - sizeof(struct rte_ether_hdr));
ipv4_hdr->hdr_checksum = 0;
buf->l3_len = sizeof(struct rte_ipv4_hdr);
// construct l4 header
udp_hdr = &pkt_data->pkt_hdr.udp_hdr;
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct packet_data) -
sizeof(struct rte_ether_hdr) -
sizeof(struct rte_udp_hdr));
buf->l4_len = sizeof(struct rte_udp_hdr);
return pkt_data;
return type == PKT_TYPE_PROBE || type == PKT_TYPE_PROBE_RESP;
}
static inline
struct packet_data * check_valid_packet(struct rte_mbuf * pkt)
// fills the packet with the information except for the payload itself
static inline struct pkt_hdr *
construct_pkt_hdr(
struct rte_mbuf *buf, uint16_t type, const struct conn_spec *conn)
{
struct packet_data * pkt_data = NULL;
rte_pktmbuf_reset(buf);
if (rte_pktmbuf_data_len(pkt) < sizeof(struct packet_data)) {
return NULL;
}
const uint32_t total_sz = sizeof(struct pkt_hdr) +
expected_payload_size[type];
auto pkt_data = (struct pkt_hdr *)rte_pktmbuf_append(buf, total_sz);
struct rte_ether_hdr *eth_hdr;
struct rte_ipv4_hdr *ipv4_hdr;
struct rte_udp_hdr *udp_hdr;
bool is_ts_pkt = is_l2ts_pkt(type);
pkt_data = rte_pktmbuf_mtod(pkt, struct packet_data *);
if (pkt_data == nullptr)
return nullptr;
if (rte_be_to_cpu_32(pkt_data->magic) == ETHER_FRAME_MAGIC) {
return pkt_data;
}
// single segment
buf->nb_segs = 1;
return NULL;
// construct l2 header
eth_hdr = &pkt_data->eth_hdr;
rte_ether_addr_copy(&conn->src->mac_addr, &eth_hdr->s_addr);
if (is_ts_pkt) {
rte_ether_addr_copy(&POU_MAC, &eth_hdr->d_addr);
} else {
rte_ether_addr_copy(&conn->dst->mac_addr, &eth_hdr->d_addr);
}
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
buf->l2_len = sizeof(struct rte_ether_hdr);
// construct l3 header
ipv4_hdr = &pkt_data->ipv4_hdr;
memset(ipv4_hdr, 0, sizeof(struct rte_ipv4_hdr));
ipv4_hdr->version_ihl = IP_VHL_DEF;
ipv4_hdr->type_of_service = 0;
ipv4_hdr->fragment_offset = 0;
ipv4_hdr->time_to_live = IP_DEFTTL;
ipv4_hdr->next_proto_id = IPPROTO_UDP;
ipv4_hdr->packet_id = 0;
ipv4_hdr->src_addr = rte_cpu_to_be_32(conn->src->ip);
if (is_ts_pkt) {
ipv4_hdr->dst_addr = rte_cpu_to_be_32(POU_IP);
} else {
ipv4_hdr->dst_addr = rte_cpu_to_be_32(conn->dst->ip);
}
ipv4_hdr->total_length = rte_cpu_to_be_16(
sizeof(struct pkt_hdr) - sizeof(struct rte_ether_hdr));
ipv4_hdr->hdr_checksum = 0;
buf->l3_len = sizeof(struct rte_ipv4_hdr);
// construct l4 header
udp_hdr = &pkt_data->udp_hdr;
udp_hdr->src_port = rte_cpu_to_be_16(conn->src_port);
if (is_ts_pkt) {
udp_hdr->dst_port = rte_cpu_to_be_16(POU_PORT);
} else {
udp_hdr->dst_port = rte_cpu_to_be_16(conn->dst_port);
}
udp_hdr->dgram_cksum = 0; /* No UDP checksum. */
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct pkt_hdr) +
expected_payload_size[type] - sizeof(struct rte_ether_hdr) -
sizeof(struct rte_udp_hdr));
buf->l4_len = sizeof(struct rte_udp_hdr);
if (is_ts_pkt) {
// set misc flags
buf->ol_flags |= PKT_TX_IEEE1588_TMST;
pkt_data->ptp_hdr.ptp_ver = 0x2; // VER 2
pkt_data->ptp_hdr.ptp_msg_type = 0x0; // SYNC
} else {
pkt_data->ptp_hdr.ptp_ver = 0xff; // invalid ver
}
pkt_data->type = rte_cpu_to_be_16(type);
pkt_data->magic = rte_cpu_to_be_32(ETHER_FRAME_MAGIC);
return pkt_data;
}
// returns 0 on success
static inline int
alloc_pkt_hdr(struct rte_mempool *pool, uint16_t type,
const struct conn_spec *conn, struct rte_mbuf **mbuf_out,
struct pkt_hdr **hdr_out)
{
struct pkt_hdr *hdr;
struct rte_mbuf *pkt = rte_pktmbuf_alloc(pool);
if (pkt == nullptr) {
return -1;
}
// printf("alloc_pkt_hdr:\n");
// printf("from ");
// print_mac(&conn->src->mac_addr);
// printf("\nto ");
// print_mac(&conn->dst->mac_addr);
// printf("\n");
hdr = construct_pkt_hdr(pkt, type, conn);
if (hdr == nullptr) {
rte_pktmbuf_free(pkt);
return -1;
}
*mbuf_out = pkt;
*hdr_out = hdr;
return 0;
}
static inline struct pkt_hdr *
check_valid_packet(struct rte_mbuf *pkt, const struct rte_ether_addr *host_mac)
{
struct pkt_hdr *pkt_data = nullptr;
const struct rte_ether_addr *expected_mac = nullptr;
uint16_t type;
const uint32_t data_len = rte_pktmbuf_data_len(pkt);
if (data_len < sizeof(struct pkt_hdr)) {
return nullptr;
}
pkt_data = rte_pktmbuf_mtod(pkt, struct pkt_hdr *);
// check MAGIC
if (rte_be_to_cpu_32(pkt_data->magic) != ETHER_FRAME_MAGIC) {
return nullptr;
}
type = rte_be_to_cpu_16(pkt_data->type);
// check type and payload size
if ((type >= NUM_PKT_TYPES) ||
(data_len <
(sizeof(struct pkt_hdr) +
expected_payload_size[rte_be_to_cpu_16(pkt_data->type)]))) {
return nullptr;
}
// strict dest mac filter
if (host_mac != nullptr) {
if (is_l2ts_pkt(type)) {
// dst mac must be the broadcast addr
expected_mac = &POU_MAC;
} else {
// dst mac must match the host mac
expected_mac = host_mac;
}
if (!rte_is_same_ether_addr(
expected_mac, &pkt_data->eth_hdr.d_addr))
return nullptr;
}
return pkt_data;
}

102
inc/util.h Normal file
View File

@ -0,0 +1,102 @@
#pragma once
#include <rte_ethdev.h>
#include <rte_ip.h>
#define DISALLOW_EVIL_CONSTRUCTORS(TypeName) \
TypeName(const TypeName &) = delete; \
void operator=(const TypeName &) = delete
constexpr static unsigned long S2NS = 1000000000UL;
constexpr static unsigned long S2US = 1000000UL;
constexpr static unsigned long MS2NS = 1000000UL;
constexpr static uint16_t MIN_RANDOM_PORT = 1000;
constexpr static uint16_t DEFAULT_RAT_PORT = 1234;
constexpr static unsigned int INIT_DELAY = 2;
constexpr static int NEXT_CPU_NULL = -1;
static inline int
cmask_get_next_cpu(uint64_t *mask)
{
int ffs = ffsll(*mask);
*mask &= ~(1 << (ffs - 1));
return ffs - 1;
}
static inline int
cmask_get_num_cpus(const uint64_t mask)
{
return _mm_popcnt_u64(mask);
}
// constexpr static int LATENCY_MEASURE_TIMES = 10000;
// static inline void
// sync_port_clock(uint16_t portid)
//{
// int64_t lat = 0;
// int64_t get_time_lat;
// int64_t write_time_lat;
// struct timespec dum;
// struct timespec start;
// struct timespec end;
//
// // measure clock_gettime latency
// for(int i = 0; i < LATENCY_MEASURE_TIMES; i++) {
// // end - start ~= 2x clock_gettime's latency
// clock_gettime(CLOCK_REALTIME, &start);
// clock_gettime(CLOCK_REALTIME, &dum);
// clock_gettime(CLOCK_REALTIME, &end);
//
// if (end.tv_sec != start.tv_sec) {
// rte_exit(EXIT_FAILURE, "clock_gettime too slow\n");
// }
//
// // shouldn't overflow
// lat += (end.tv_nsec - start.tv_nsec) / 2;
// }
// get_time_lat = lat / LATENCY_MEASURE_TIMES;
//
// // measure rte_eth_timesync_write_time latency
// lat = 0;
// for(int i = 0; i < LATENCY_MEASURE_TIMES; i++) {
// // end - start ~= rte_eth_timesync latency + clock_gettime's latency
// clock_gettime(CLOCK_REALTIME, &dum);
// clock_gettime(CLOCK_REALTIME, &start);
// if (rte_eth_timesync_write_time(portid, &dum) != 0) {
// rte_exit(EXIT_FAILURE, "failed to write time\n");
// }
// clock_gettime(CLOCK_REALTIME, &end);
//
// if (end.tv_sec != start.tv_sec) {
// rte_exit(EXIT_FAILURE, "clock_gettime too slow!\n");
// }
//
// // shouldn't overflow
// int64_t elat = (end.tv_nsec - start.tv_nsec) - get_time_lat;
// if (elat < 0) {
// rte_exit(EXIT_FAILURE, "something is wrong with lat \n");
// }
// lat += elat;
// }
// write_time_lat = lat / LATENCY_MEASURE_TIMES;
//
// int64_t delta = (get_time_lat + write_time_lat) / 2;
// int64_t s2ns = (int64_t)S2NS;
// // sync the clock
// while (true) {
// clock_gettime(CLOCK_REALTIME, &dum);
// dum.tv_nsec += delta;
// if (dum.tv_nsec > s2ns) {
// // try again if overflow
// continue;
// }
// if (rte_eth_timesync_write_time(portid, &dum) != 0) {
// rte_exit(EXIT_FAILURE, "failed to write time\n");
// }
// break;
// }
// rte_eth_timesync_enable(portid);
//
// printf("Sync-ed time: get lat %ld write lat %ld\n", get_time_lat,
// write_time_lat);
//}

View File

@ -1,222 +1,435 @@
#include <cstdio>
#include <cstdlib>
#include <rte_common.h>
#include <rte_config.h>
#include <rte_cycles.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_byteorder.h>
#include <rte_config.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <atomic>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <stdnoreturn.h>
#include <unistd.h>
#include "nm.h"
#include "ntr.h"
#include "pkt.h"
#include "ntrlog.h"
#include "rte_arp.h"
#include "rte_mbuf_core.h"
#include "util.h"
NTR_DECL_IMPL;
#include <atomic>
#include <ctime>
#include <vector>
constexpr unsigned int MBUF_MAX_COUNT = 8191;
constexpr unsigned int MBUF_CACHE_SIZE = 250;
constexpr unsigned int RX_RING_SIZE = 1024;
constexpr unsigned int TX_RING_SIZE = 1024;
constexpr unsigned int RX_RING_NUM = 1;
constexpr unsigned int TX_RING_NUM = 1;
constexpr unsigned int BURST_SIZE = 32;
constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 8;
static const struct rte_eth_conf port_conf_default{};
static const struct rte_mbuf_dynfield rte_mbuf_dynfield_probe_flag = {
.name = "rte_mbuf_dynfield_probe_flag",
.size = sizeof(uint32_t),
.align = __alignof__(uint32_t),
.flags = 0
};
static int PROBE_FLAG_OFFSET { 0 };
static const struct rte_eth_conf port_conf_default {
};
// keep track of the probe state
// when a probe packet first arrives this state is set to be influx and the
// rte_mbuf's userdata is set to PROBE_MAGIC which prevents other probe packets
// to be processed when the server sends the probe stats back to user influx is
// released this is to guarantee that the server only processes one probe packet
// at the time
// XXX: also this can be attached to the mbuf itself and processed by the lcore
// thread
// I kept this global because globally there could be only one pending
// probe request and rx_add_timestamp can save their shit here too
struct thread_info {
int tid;
int rxqid;
int txqid;
int lcore_id;
};
// state machine:
constexpr static int SERVER_STATE_WAIT = 0;
constexpr static int SERVER_STATE_PROBE = 1;
struct probe_state_t {
struct net_spec dst;
struct conn_spec cspec {
.dst = &dst
};
uint32_t epoch;
uint64_t last_sw_rx;
uint64_t last_sw_tx;
uint64_t last_hw_rx;
};
struct options_t {
//states
uint16_t s_portid;
struct rte_ether_addr s_host_mac;
struct rte_mempool * s_pkt_mempool;
// config
int num_threads { 1 };
uint64_t cpuset { 0x4 }; // 2nd core
uint64_t memmask { 0x0 }; // same socket as the NIC
// states
uint16_t s_portid { 0 };
struct net_spec s_host_spec {
};
struct rte_mempool *s_pkt_mempool { nullptr };
std::atomic<int> s_state { SERVER_STATE_WAIT };
struct probe_state_t s_probe_info;
std::vector<struct thread_info *> s_thr_info;
};
struct options_t options;
static uint16_t
rx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused)
struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused,
void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
uint64_t now = nm_get_uptime_ns();
struct timespec ts {
};
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(
pkts[i], &options.s_host_spec.mac_addr);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "rx_add_timestamp: ignoring invalid packet %p.\n", (void*)pkts[i]);
continue;
}
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "rx_add_timestamp: tagged packet %p with %llu.\n", (void*)pkts[i], now);
pkt_data->srv_ts_rx = rte_cpu_to_be_64(now);
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE) {
int state_wait = SERVER_STATE_WAIT;
*RTE_MBUF_DYNFIELD(
pkts[i], PROBE_FLAG_OFFSET, uint32_t *) = 0;
if (rte_eth_timesync_read_rx_timestamp(
port, &ts, pkts[i]->timesync & 0x3) == 0) {
if (options.s_state.compare_exchange_strong(
state_wait, SERVER_STATE_PROBE)) {
// mark the mbuf as probe packet being
// processed only the locore that
// receives the pkt w/ userdata !=
// nullptr processes that packet
*RTE_MBUF_DYNFIELD(pkts[i],
PROBE_FLAG_OFFSET, uint32_t *) = 1;
// tag with timestamps
options.s_probe_info.last_hw_rx =
ts.tv_nsec + ts.tv_sec * S2NS;
options.s_probe_info.last_sw_rx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: tagged packet %p epoch %d with sw: %lu hw:%lu.\n",
(void *)pkts[i],
options.s_probe_info.epoch, now,
options.s_probe_info.last_hw_rx);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"rx_add_timestamp: packet %p not tagged - server is processing a probe.\n",
(void *)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"rx_add_timestamp: packet %p not tagged - hw rx timestamp not available.\n",
(void *)pkts[i]);
} else
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"rx_add_timestamp: packet %p not tagged - type %d.\n",
(void *)pkts[i], rte_be_to_cpu_16(pkt_data->type));
}
return nb_pkts;
return nb_pkts;
}
static uint16_t
tx_calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
tx_add_timestamp(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
uint64_t now = rte_rdtsc();
struct packet_data * pkt_data;
uint64_t now = nm_get_uptime_ns();
struct pkt_hdr *pkt_data;
for (int i = 0; i < nb_pkts; i++) {
for (int i = 0; i < nb_pkts; i++) {
pkt_data = check_valid_packet(pkts[i]);
pkt_data = check_valid_packet(
pkts[i], &options.s_host_spec.mac_addr);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "tx_calc_latency: ignoring invalid packet %p.\n", (void*)pkts[i]);
continue;
}
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: ignoring invalid packet %p.\n",
(void *)pkts[i]);
continue;
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "tx_calc_latency: tagged packet %p with %llu.\n", (void*)pkts[i], now);
pkt_data->srv_ts_tx = rte_cpu_to_be_64(now);
}
if (rte_be_to_cpu_16(pkt_data->type) == PKT_TYPE_PROBE_RESP) {
// this packet is the response to PROBE packets
return nb_pkts;
// at this time the packet is not sent to the NIC yet so
// the state must be waiting stats
// XXX: this should be an assert
if (options.s_state.load() != SERVER_STATE_PROBE ||
*RTE_MBUF_DYNFIELD(
pkts[i], PROBE_FLAG_OFFSET, uint32_t *) != 1) {
rte_exit(EXIT_FAILURE,
"packet %p sent to NIC before sw callback\n",
(void *)pkts[i]);
}
options.s_probe_info.last_sw_tx = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: tagged packet %p with sw tx %lu\n",
(void *)pkts[i], options.s_probe_info.last_sw_tx);
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"tx_add_timestamp: packet %p not tagged - type %d\n",
(void *)pkts[i], pkt_data->type);
}
}
return nb_pkts;
}
noreturn static int
locore_main(void *ti)
{
auto tinfo = (struct thread_info *)ti;
struct rte_mbuf *bufs[BURST_SIZE];
// + 1 because it might involve an extra PKT_TYPE_STAT packet
// when all tx timestamps are ready
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
bool pending_probe = false;
if (rte_eth_dev_socket_id(options.s_portid) > 0 &&
rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n",
tinfo->tid, options.s_portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"locore_main <thread %d>: running on locore %d with txidx %d and rxidx %d.\n",
tinfo->tid, rte_lcore_id(), tinfo->txqid, tinfo->rxqid);
while (true) {
uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(
options.s_portid, tinfo->rxqid, bufs, BURST_SIZE);
struct rte_mbuf *pkt_buf;
struct pkt_hdr *tx_data;
for (int i = 0; i < nb_rx; i++) {
// XXX: optimization: in rx_add_timestamp every packet
// is already validated once can just mark valid packet
// with a value so we can avoid this redundant check
pkt_data = check_valid_packet(
bufs[i], &options.s_host_spec.mac_addr);
if (pkt_data == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: skipping invalid packet %p.\n",
tinfo->tid, (void *)bufs[i]);
// dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]);
continue;
}
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, pkt_data,
"locore_main <thread %d>: ", tinfo->tid);
switch (rte_be_to_cpu_16(pkt_data->type)) {
case PKT_TYPE_PROBE: {
if (options.s_state.load() ==
SERVER_STATE_PROBE &&
*RTE_MBUF_DYNFIELD(bufs[i],
PROBE_FLAG_OFFSET, uint32_t *) == 1) {
// send back probe_resp pkt to probe for
// return latency
pending_probe = true;
// book keep probe results
options.s_probe_info.epoch =
rte_be_to_cpu_32(
((struct pkt_payload_epoch *)
pkt_data->payload)
->epoch);
pkt_hdr_to_netspec(pkt_data,
&options.s_probe_info.dst,
&options.s_probe_info.cspec
.dst_port,
nullptr,
&options.s_probe_info.cspec
.src_port);
options.s_probe_info.cspec.src =
&options.s_host_spec;
if (alloc_pkt_hdr(options.s_pkt_mempool,
PKT_TYPE_PROBE_RESP,
&options.s_probe_info.cspec,
&pkt_buf, &tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload,
pkt_data->payload,
sizeof(struct pkt_payload_epoch));
*RTE_MBUF_DYNFIELD(pkt_buf,
PROBE_FLAG_OFFSET, uint32_t *) = 1;
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
}
break;
}
case PKT_TYPE_LOAD: {
struct conn_spec cspec;
struct net_spec src;
struct net_spec dst;
pkt_hdr_to_netspec(pkt_data, &src,
&cspec.dst_port, &dst, &cspec.src_port);
cspec.dst = &src;
cspec.src = &dst;
// we reply to load packet regardless of the
// server state
if (alloc_pkt_hdr(options.s_pkt_mempool,
PKT_TYPE_LOAD_RESP, &cspec, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt\n");
}
rte_memcpy(tx_data->payload, pkt_data->payload,
sizeof(struct pkt_payload_load));
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
break;
}
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: ignoring packet %p with unknown type %d.\n",
tinfo->tid, (void *)bufs[i],
rte_be_to_cpu_16(pkt_data->type));
break;
}
rte_pktmbuf_free(bufs[i]);
}
// send the packets
if (nb_tx > 0) {
const uint16_t nb_tx_succ = rte_eth_tx_burst(
options.s_portid, tinfo->txqid, tx_bufs, nb_tx);
if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE,
"failed to send some packets.\n");
}
}
// we wanna check every loop not only when there are packets
if (pending_probe) {
struct timespec ts {
};
struct pkt_payload_stat *stat;
if (rte_eth_timesync_read_tx_timestamp(
options.s_portid, &ts) == 0) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"locore_main <thread %d>: obtained hw tx timestamp %lu.\n",
tinfo->tid,
(ts.tv_sec * S2NS + ts.tv_nsec));
// now we have everything we need
if (alloc_pkt_hdr(options.s_pkt_mempool,
PKT_TYPE_STAT,
&options.s_probe_info.cspec, &pkt_buf,
&tx_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to alloc pkt_buf\n");
}
// populate stats
stat = (struct pkt_payload_stat *)
tx_data->payload;
stat->epoch = rte_cpu_to_be_32(
options.s_probe_info.epoch);
stat->hw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_hw_rx);
stat->hw_tx = rte_cpu_to_be_64(
ts.tv_nsec + ts.tv_sec * S2NS);
stat->sw_rx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_rx);
stat->sw_tx = rte_cpu_to_be_64(
options.s_probe_info.last_sw_tx);
// send the packet
if (rte_eth_tx_burst(options.s_portid,
tinfo->txqid, &pkt_buf, 1) < 1) {
rte_exit(EXIT_FAILURE,
"failed to send some packets.\n");
}
// release flux
pending_probe = false;
int expected = SERVER_STATE_PROBE;
if (!options.s_state.compare_exchange_strong(
expected, SERVER_STATE_WAIT)) {
rte_exit(EXIT_FAILURE,
"s_state changed unexpectedly!");
}
}
}
}
}
static int
locore_main(void * _unused __rte_unused)
{
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_mbuf *tx_bufs[BURST_SIZE];
struct packet_data *pkt_data;
uint32_t core_id = rte_lcore_id();
if (rte_eth_dev_socket_id(options.s_portid) > 0 && rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "locore_main: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", options.s_portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d running.\n", core_id);
while(true) {
uint16_t nb_tx = 0;
const uint16_t nb_rx = rte_eth_rx_burst(options.s_portid, 0, bufs, BURST_SIZE);
if (nb_rx == 0) {
continue;
}
for(int i = 0; i < nb_rx; i++) {
pkt_data = check_valid_packet(bufs[i]);
if (pkt_data == NULL) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "locore_main: core %d skipping invalid packet %p.\n", core_id, (void*)bufs[i]);
dump_pkt(bufs[i]);
rte_pktmbuf_free(bufs[i]);
continue;
}
uint32_t dst_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.dst_addr);
uint32_t src_ip = rte_be_to_cpu_32(pkt_data->pkt_hdr.ipv4_hdr.src_addr);
uint16_t src_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.src_port);
uint16_t dst_port = rte_be_to_cpu_16(pkt_data->pkt_hdr.udp_hdr.dst_port);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main: core %d packet %p from %d.%d.%d.%d(%x:%x:%x:%x:%x:%x) to %d.%d.%d.%d(%x:%x:%x:%x:%x:%x), sport %d, dport %d, epoch %d\n",
core_id,
(void*)bufs[i],
(src_ip >> 24) & 0xff,
(src_ip >> 16) & 0xff,
(src_ip >> 8) & 0xff,
(src_ip >> 0) & 0xff,
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.s_addr.addr_bytes[5],
(dst_ip >> 24) & 0xff,
(dst_ip >> 16) & 0xff,
(dst_ip >> 8) & 0xff,
(dst_ip >> 0) & 0xff,
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[0],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[1],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[2],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[3],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[4],
pkt_data->pkt_hdr.eth_hdr.d_addr.addr_bytes[5],
src_port,
dst_port,
rte_be_to_cpu_32(pkt_data->epoch));
// swap s_addr and d_addr
struct rte_mbuf * pkt_buf = rte_pktmbuf_alloc(options.s_pkt_mempool);
if (pkt_buf == NULL) {
rte_exit(EXIT_FAILURE, "locore_main: failed to allocate memory for pkt_buf");
}
struct packet_data * tx_data = construct_udp_pkt_hdr(pkt_buf,
&options.s_host_mac,
&pkt_data->pkt_hdr.eth_hdr.s_addr,
dst_ip,
src_ip,
dst_port,
src_port);
if (tx_data == NULL) {
rte_exit(EXIT_FAILURE, "failed to construct tx packet %p", (void*)pkt_buf);
}
// copy, endianess doesn't matter
tx_data->epoch = pkt_data->epoch;
tx_data->magic = pkt_data->magic;
tx_data->clt_ts_rx = pkt_data->clt_ts_rx;
tx_data->clt_ts_tx = pkt_data->clt_ts_tx;
tx_data->srv_ts_rx = pkt_data->srv_ts_rx;
tx_data->srv_ts_tx = pkt_data->srv_ts_tx;
// queue for burst send
tx_bufs[nb_tx++] = pkt_buf;
// free rx packet
rte_pktmbuf_free(bufs[i]);
}
const uint16_t nb_tx_succ = rte_eth_tx_burst(options.s_portid, 0, tx_bufs, nb_tx);
// cleanup unsent packets
// don't need to free others because it's offloaded
if (nb_tx_succ < nb_tx) {
rte_exit(EXIT_FAILURE, "locore_main: failed to send some packets.\n");
}
}
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info;
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf;
struct rte_eth_rxconf rxconf;
struct rte_eth_dev_info dev_info {
};
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf {
};
struct rte_eth_rxconf rxconf {
};
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
if(!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
if (!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(portid, RX_RING_NUM, TX_RING_NUM, &port_conf);
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP |
ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP;
port_conf.rx_adv_conf.rss_conf.rss_key = nullptr;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_RSS_HASH;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
portid, options.num_threads, options.num_threads, &port_conf);
if (ret != 0)
return ret;
@ -224,155 +437,247 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per Ethernet port. */
rxconf = dev_info.default_rxconf;
for (uint32_t i = 0; i < RX_RING_NUM; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd, rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
/* Allocate and set up 1 RX queue per thread per Ethernet port. */
rxconf = dev_info.default_rxconf;
for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid, i, nb_rxd,
rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
options.s_thr_info.at(i)->rxqid = i;
}
txconf = dev_info.default_txconf;
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < TX_RING_NUM; i++) {
ret = rte_eth_tx_queue_setup(portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
/* Allocate and set up 1 TX queue per thread per Ethernet port. */
for (int i = 0; i < options.num_threads; i++) {
ret = rte_eth_tx_queue_setup(
portid, i, nb_txd, rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
options.s_thr_info.at(i)->txqid = i;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr;
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
struct rte_ether_addr addr {
};
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
if (rte_eth_add_tx_callback(portid, 0, tx_calc_latency, NULL) == NULL || rte_eth_add_rx_callback(portid, 0, rx_add_timestamp, NULL) == NULL) {
return -1;
}
ret = rte_eth_timesync_enable(portid);
if (ret != 0)
return ret;
/* Enable RX in promiscuous mode for the Ethernet device. */
ret = rte_eth_promiscuous_enable(portid);
if (ret != 0)
return ret;
for (int i = 0; i < options.num_threads; i++) {
if (rte_eth_add_tx_callback(portid,
options.s_thr_info.at(i)->txqid, tx_add_timestamp,
nullptr) == nullptr ||
rte_eth_add_rx_callback(portid,
options.s_thr_info.at(i)->rxqid, rx_add_timestamp,
nullptr) == nullptr) {
return -1;
}
}
// sync_port_clock(portid);
return 0;
}
static void usage()
static void
usage()
{
fprintf(stdout,
"Usage:\n" \
" -v(vv): verbose mode\n" \
" -h: display the information\n");
fprintf(stdout,
"Usage:\n"
" -v(vv): verbose mode\n"
" -h: seek help\n"
" -A: cpu mask for worker threads\n"
" -M: mempool socket affinity mask\n"
" -H: host spec\n");
fflush(stdout);
}
int main(int argc, char* argv[])
static void
dump_options()
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool, *mbuf_pool_pkt;
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while((c = getopt(argc, argv, "hv")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1, ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, NULL);
break;
default:
usage();
rte_exit(EXIT_SUCCESS, "unknown argument: %c", c);
break;
}
}
}
// XXX: singal handler to exit
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
// create a pkt mbuf memory pool on the socket
mbuf_pool_pkt = rte_pktmbuf_pool_create("MBUF_POOL_PKT", MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool_pkt == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf_pkt pool\n");
}
options.s_pkt_mempool = mbuf_pool_pkt;
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
if (rte_eth_macaddr_get(portid, &options.s_host_mac) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid,
options.s_host_mac.addr_bytes[0],
options.s_host_mac.addr_bytes[1],
options.s_host_mac.addr_bytes[2],
options.s_host_mac.addr_bytes[3],
options.s_host_mac.addr_bytes[4],
options.s_host_mac.addr_bytes[5]);
uint16_t lcore_id = rte_get_next_lcore(0, true, false);
if (lcore_id == RTE_MAX_LCORE) {
rte_exit(EXIT_FAILURE, "cannot detect lcores.\n");
}
if (rte_eal_remote_launch(locore_main, NULL, lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to launch function on locore %d\n", lcore_id);
}
// while(true) {
// struct rte_eth_stats stats;
// rte_eth_stats_get(portid, &stats);
// printf("recv: %d missed: %d err: %d\n",(uint32_t)stats.ipackets, (uint32_t)stats.imissed,(uint32_t)stats.ierrors);
// usleep(1000000);
// }
if (rte_eal_wait_lcore(lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n", lcore_id);
}
// shouldn't get here
return 0;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: khat configuration:\n"
" verbosity: +%d\n"
" thread count: %d\n"
" cpu mask: 0x%lx\n"
" mempool mask: 0x%lx\n"
" ip: 0x%x\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING,
options.num_threads, options.cpuset, options.memmask,
options.s_host_spec.ip);
}
int
main(int argc, char *argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
bool has_host_spec { false };
ntr_init();
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "hvA:M:H:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "\n");
case 'A':
options.cpuset = strtoull(optarg, nullptr, 16);
options.num_threads = cmask_get_num_cpus(
options.cpuset);
if (options.num_threads == 0) {
rte_exit(EXIT_FAILURE,
"must run at least one thread\n");
}
break;
case 'M':
options.memmask = strtoull(optarg, nullptr, 16);
break;
case 'H':
if (str_to_netspec(
optarg, &options.s_host_spec) != 0) {
rte_exit(EXIT_FAILURE,
"invalid host spec\n");
}
has_host_spec = true;
break;
default:
usage();
rte_exit(
EXIT_SUCCESS, "unknown argument: %c", c);
}
}
}
if (!has_host_spec) {
rte_exit(EXIT_FAILURE, "Must specify host spec\n");
}
dump_options();
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
// register dynamic field
PROBE_FLAG_OFFSET = rte_mbuf_dynfield_register(
&rte_mbuf_dynfield_probe_flag);
if (PROBE_FLAG_OFFSET < 0) {
rte_exit(EXIT_FAILURE, "failed to register dynamic field\n");
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
if (rte_eth_macaddr_get(portid, &options.s_host_spec.mac_addr) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n",
portid);
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
MBUF_MAX_COUNT * nb_ports, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.s_pkt_mempool = mbuf_pool;
// init threads
uint64_t cpuset = options.cpuset;
for (int i = 0; i < options.num_threads; i++) {
auto *tinfo = new struct thread_info;
tinfo->tid = i;
tinfo->lcore_id = cmask_get_next_cpu(&cpuset);
options.s_thr_info.push_back(tinfo);
}
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"Configured port %d on socket %d with mac addr %x:%x:%x:%x:%x:%x\n",
portid, rte_eth_dev_socket_id(portid),
options.s_host_spec.mac_addr.addr_bytes[0],
options.s_host_spec.mac_addr.addr_bytes[1],
options.s_host_spec.mac_addr.addr_bytes[2],
options.s_host_spec.mac_addr.addr_bytes[3],
options.s_host_spec.mac_addr.addr_bytes[4],
options.s_host_spec.mac_addr.addr_bytes[5]);
sleep(INIT_DELAY);
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: launching thread %d on locore %d\n", tinfo->tid,
tinfo->lcore_id);
if (rte_eal_remote_launch(locore_main,
(void *)options.s_thr_info.at(i),
tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE,
"failed to launch function on locore %d\n",
tinfo->lcore_id);
}
}
for (int i = 0; i < options.num_threads; i++) {
struct thread_info *tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n",
tinfo->lcore_id);
}
}
// shouldn't get here
// clean up
rte_eth_dev_stop(portid);
return 0;
}

95
libgen/generator.cc Normal file
View File

@ -0,0 +1,95 @@
// modified from mutilate
#include "gen.h"
Generator *
createFacebookKey()
{
return new GEV(30.7984, 8.20449, 0.078688);
}
Generator *
createFacebookValue()
{
Generator *g = new GPareto(15.0, 214.476, 0.348238);
Discrete *d = new Discrete(g);
d->add(0.00536, 0.0);
d->add(0.00047, 1.0);
d->add(0.17820, 2.0);
d->add(0.09239, 3.0);
d->add(0.00018, 4.0);
d->add(0.02740, 5.0);
d->add(0.00065, 6.0);
d->add(0.00606, 7.0);
d->add(0.00023, 8.0);
d->add(0.00837, 9.0);
d->add(0.00837, 10.0);
d->add(0.08989, 11.0);
d->add(0.00092, 12.0);
d->add(0.00326, 13.0);
d->add(0.01980, 14.0);
return d;
}
Generator *
createFacebookIA()
{
return new GPareto(0, 16.0292, 0.154971);
}
Generator *
createGenerator(std::string str)
{
if (!strcmp(str.c_str(), "fb_key"))
return createFacebookKey();
else if (!strcmp(str.c_str(), "fb_value"))
return createFacebookValue();
else if (!strcmp(str.c_str(), "fb_ia"))
return createFacebookIA();
char *s_copy = new char[str.length() + 1];
strcpy(s_copy, str.c_str());
char *saveptr = NULL;
if (atoi(s_copy) != 0 || !strcmp(s_copy, "0")) {
double v = atof(s_copy);
delete[] s_copy;
return new Fixed(v);
}
char *t_ptr = strtok_r(s_copy, ":", &saveptr);
char *a_ptr = strtok_r(NULL, ":", &saveptr);
if (t_ptr == NULL) // || a_ptr == NULL)
DIE("strtok(.., \":\") failed to parse %s", str.c_str());
saveptr = NULL;
char *s1 = strtok_r(a_ptr, ",", &saveptr);
char *s2 = strtok_r(NULL, ",", &saveptr);
char *s3 = strtok_r(NULL, ",", &saveptr);
double a1 = s1 ? atof(s1) : 0.0;
double a2 = s2 ? atof(s2) : 0.0;
double a3 = s3 ? atof(s3) : 0.0;
delete[] s_copy;
if (strcasestr(str.c_str(), "fixed"))
return new Fixed(a1);
else if (strcasestr(str.c_str(), "normal"))
return new Normal(a1, a2);
else if (strcasestr(str.c_str(), "exponential"))
return new Exponential(a1);
else if (strcasestr(str.c_str(), "pareto"))
return new GPareto(a1, a2, a3);
else if (strcasestr(str.c_str(), "gev"))
return new GEV(a1, a2, a3);
else if (strcasestr(str.c_str(), "uniform"))
return new Uniform(a1);
DIE("Unable to create Generator '%s'", str.c_str());
return NULL;
}

187
libnm/nm.cc Normal file
View File

@ -0,0 +1,187 @@
#include <sys/types.h>
#include <sys/sysctl.h>
#include <hwloc.h>
#include <x86intrin.h>
#include "nm.h"
#include <algorithm>
#include <vector>
static const char *SYSCTL_TSC = "machdep.tsc_freq";
static int verbose = 0;
static uint64_t sysctl_tsc_freq = 0;
struct nm_obj {
int level;
int id;
struct nm_obj *parent;
std::vector<struct nm_obj *> children;
};
static bool
nm_obj_comparator(struct nm_obj *a, struct nm_obj *b)
{
return a->id < b->id;
}
static std::vector<struct nm_obj *> nodes;
static std::vector<struct nm_obj *> cores;
static std::vector<struct nm_obj *> cpus;
std::vector<struct nm_obj *> *
nm_get_nodes()
{
return &nodes;
}
std::vector<struct nm_obj *> *
nm_get_cpus()
{
return &cpus;
}
std::vector<struct nm_obj *> *
nm_get_cores()
{
return &cores;
}
hwloc_obj_t
get_parent_type(hwloc_obj_t obj, hwloc_obj_type_t type)
{
while (obj != nullptr) {
if (obj->type == type) {
break;
}
obj = obj->parent;
}
return obj;
}
uint64_t
nm_get_uptime_ns()
{
unsigned int dummy;
return nm_tsc2ns(__rdtscp(&dummy));
}
uint64_t
nm_tsc2ns(uint64_t tsc)
{
return (uint64_t)(
(double)tsc / (double)sysctl_tsc_freq * (double)1000000000ul);
}
// 0 on success
// -1 on error
int
nm_init(int verbosity)
{
int ret;
size_t sz = sizeof(sysctl_tsc_freq);
verbose = verbosity;
// init nm_tsc2ns
if ((ret = sysctlbyname(
SYSCTL_TSC, &sysctl_tsc_freq, &sz, nullptr, 0)) < 0) {
if (verbose) {
fprintf(stderr,
"libnm: failed to query tsc frequency via sysctl (%d)\n",
errno);
}
return ret;
}
if (verbose) {
fprintf(stdout, "libnm: tsc frequency: %lu\n", sysctl_tsc_freq);
}
// init numa stuff
hwloc_topology *topo;
if ((ret = hwloc_topology_init(&topo)) != 0) {
return ret;
}
if ((ret = hwloc_topology_load(topo)) != 0)
return ret;
// populate numa nodes
hwloc_obj_t obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PACKAGE, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_NUMA;
each->parent = nullptr;
nodes.push_back(each);
if (verbose) {
fprintf(stdout, "libnm: identified NUMA node %d\n",
each->id);
}
}
std::sort(nodes.begin(), nodes.end(), nm_obj_comparator);
// populate cpus
obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_CORE, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CPU;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_PACKAGE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = nodes.at(parent->logical_index);
each->parent->children.push_back(each);
cpus.push_back(each);
if (verbose) {
fprintf(stdout,
"libnm: identified CPU %d on NUMA node %d\n",
each->id, each->parent->id);
}
}
std::sort(cpus.begin(), cpus.end(), nm_obj_comparator);
// populate cores
obj = nullptr;
while (true) {
obj = hwloc_get_next_obj_by_type(topo, HWLOC_OBJ_PU, obj);
if (obj == nullptr) {
break;
}
auto each = new struct nm_obj;
each->id = obj->logical_index;
each->level = NM_LEVEL_CORE;
hwloc_obj_t parent = get_parent_type(obj, HWLOC_OBJ_CORE);
if (parent == nullptr) {
return -1;
}
// XXX: this faults if the OS decides to be stupid
each->parent = cpus.at(parent->logical_index);
each->parent->children.push_back(each);
cores.push_back(each);
if (verbose) {
fprintf(stdout,
"libnm: identified core %d on CPU %d, NUMA node %d\n",
each->id, each->parent->id,
each->parent->parent->id);
}
}
std::sort(cores.begin(), cores.end(), nm_obj_comparator);
return ret;
}

46
libntr/ntr.c Normal file
View File

@ -0,0 +1,46 @@
#include "ntr.h"
static int ntr_log_levels[NTR_DEP_MAX] = { NTR_LEVEL_DEFAULT };
static FILE *ntr_out;
void
ntr_init()
{
ntr_out = stdout;
}
void
ntr(int dep, int level, const char *fmt, ...)
{
va_list vl;
va_start(vl, fmt);
if (dep < NTR_DEP_MAX && level <= ntr_log_levels[dep]) {
vfprintf(ntr_out, fmt, vl);
}
va_end(vl);
}
void
ntr_set_level(int dep, int level)
{
if (dep < NTR_DEP_MAX) {
ntr_log_levels[dep] = level;
}
}
void
ntr_set_output(FILE *f)
{
if (f != NULL) {
ntr_out = f;
}
}
int
ntr_get_level(int dep)
{
if (dep < NTR_DEP_MAX) {
return ntr_log_levels[dep];
}
return 0;
}

806
rat/rat.cc Normal file
View File

@ -0,0 +1,806 @@
#include <rte_byteorder.h>
#include <rte_common.h>
#include <rte_config.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_launch.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <unistd.h>
#include "gen.h"
#include "nm.h"
#include "ntr.h"
#include "pkt.h"
#include "util.h"
#include <atomic>
#include <random>
#include <vector>
constexpr static unsigned int MBUF_MAX_COUNT = 65536;
constexpr static unsigned int MBUF_CACHE_SIZE = 512;
constexpr static unsigned int RX_RING_SIZE = 4096;
constexpr static unsigned int TX_RING_SIZE = 4096;
constexpr static unsigned int BURST_SIZE = 8;
static const struct rte_eth_conf port_conf_default {
};
static unsigned int
epoch_mk(unsigned int id, unsigned int epoch)
{
return (id << 24) | epoch;
}
static unsigned int
epoch_get_id(unsigned int epoch)
{
return epoch >> 24;
}
static unsigned int
epoch_get_epoch(unsigned int epoch)
{
return epoch & 0x00FFFFFF;
}
struct thread_info {
unsigned int id { 0 };
unsigned int lcore_id { 0 };
unsigned int rxqid { 0 };
unsigned int txqid { 0 };
std::atomic<int> total_pkts { 0 };
Generator *ia_gen { nullptr };
Generator *load_gen { nullptr };
std::atomic<uint32_t> cur_epoch { 0 };
std::atomic<bool> epoch_recv { true };
};
constexpr static int STATE_SYNC = 0; // waiting for SYNC
constexpr static int STATE_SYNC_ACK = 1; // Waiting for sending SYNC_ACK
constexpr static int STATE_RUNNING = 2; // Running
constexpr static int STATE_FIN = 3; // FIN received
struct options_t {
unsigned int run_time { 5 };
// parameters
int slave_mode { 0 };
unsigned long rage_quit_time { (unsigned long)-1 };
char ia_gen[256] { "fixed" };
char ld_gen[256] { "fixed:0" };
uint32_t target_qps { 0 };
struct net_spec server_spec {
};
uint64_t cpu_mask { 0x4 }; // 1 thread @ core 2
// states
unsigned int s_num_threads { 1 }; // 1 thread
struct rte_mempool *mbuf_pool { nullptr };
struct net_spec s_host_spec {
};
struct net_spec s_master_spec {
};
struct conn_spec s_master_cspec {
.src = &s_host_spec, .src_port = DEFAULT_RAT_PORT,
.dst = &s_master_spec, .dst_port = DEFAULT_RAT_PORT,
};
uint16_t s_portid { 0 };
std::vector<struct thread_info *> s_thr_info;
std::atomic<int> s_state { STATE_RUNNING }; // default non master mode
// states for qps
std::atomic<uint64_t> s_ts_begin { 0 };
};
static struct options_t options;
static inline uint32_t
calc_qps(uint64_t now)
{
uint32_t tot = 0;
for (auto i : options.s_thr_info) {
tot += i->total_pkts.load();
}
return (uint32_t)((double)tot /
((double)(now - options.s_ts_begin.load()) / (double)S2NS));
}
static void
proto_loop(struct thread_info *tinfo)
{
struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE];
struct pkt_hdr *pkt_data;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"proto_loop <thread %d>: waiting for SYNC from cat\n", tinfo->id);
while (options.s_state.load() == STATE_SYNC) {
const uint16_t nb_rx = rte_eth_rx_burst(
options.s_portid, tinfo->rxqid, rx_bufs, BURST_SIZE);
if (nb_rx > 0) {
for (int i = 0; i < nb_rx; i++) {
struct pkt_hdr *each = check_valid_packet(
rx_bufs[i], &options.s_host_spec.mac_addr);
if (each != nullptr) {
uint16_t type = rte_be_to_cpu_16(
each->type);
if (type == PKT_TYPE_SYNC) {
int expected = STATE_SYNC;
ntr(NTR_DEP_USER1,
NTR_LEVEL_INFO,
"proto_loop <thread %d>: received SYNC from cat\n",
tinfo->id);
if (!options.s_state
.compare_exchange_strong(
expected,
STATE_SYNC_ACK)) {
// someone barged in,
// listen to that guy
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"proto_loop <thread %d>: failed to cmpxchg sync_recv.\n",
tinfo->id);
} else {
pkt_hdr_to_netspec(each,
&options
.s_master_spec,
nullptr, nullptr,
nullptr);
if (alloc_pkt_hdr(
options
.mbuf_pool,
PKT_TYPE_SYNC_ACK,
&options
.s_master_cspec,
&tx_buf,
&pkt_data) !=
0) {
rte_exit(
EXIT_FAILURE,
"failed to alloc pkt hdr\n");
}
if (rte_eth_tx_burst(
options
.s_portid,
tinfo->txqid,
&tx_buf,
1) != 1) {
rte_exit(
EXIT_FAILURE,
"failed to send packet\n");
}
expected =
STATE_SYNC_ACK;
// we've done our job,
// set off the threads
if (!options.s_state
.compare_exchange_strong(
expected,
STATE_RUNNING)) {
rte_exit(
EXIT_FAILURE,
"state unexpectedly changed\n");
}
ntr(NTR_DEP_USER1,
NTR_LEVEL_INFO,
"proto_loop <thread %d>: sent SYNC_ACK to cat\n",
tinfo->id);
}
} else {
ntr(NTR_DEP_USER1,
NTR_LEVEL_DEBUG,
"proto_loop <thread %d>: ignoring invalid packet %p type %d.\n",
tinfo->id,
(void *)rx_bufs[i], type);
}
} else {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"proto_loop <thread %d>: ignoring invalid packet %p.\n",
tinfo->id, (void *)rx_bufs[i]);
}
rte_pktmbuf_free(rx_bufs[i]);
}
}
}
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"proto_loop <thread %d>: exiting loop...\n", tinfo->id);
}
static void
pkt_loop(struct thread_info *tinfo)
{
struct rte_mbuf *tx_buf;
struct rte_mbuf *rx_bufs[BURST_SIZE];
uint64_t next_ts;
uint64_t last_ts;
struct conn_spec srv_cspec;
rdport_generator src_port_gen(MIN_RANDOM_PORT);
rdport_generator dst_port_gen(MIN_RANDOM_PORT);
srv_cspec.src = &options.s_host_spec;
srv_cspec.dst = &options.server_spec;
next_ts = nm_get_uptime_ns();
last_ts = next_ts + options.rage_quit_time * MS2NS;
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "pkt_loop <thread %d>: entering\n",
tinfo->id);
while (options.s_state.load() == STATE_RUNNING) {
uint64_t now = nm_get_uptime_ns();
// always pop incoming packets
const uint16_t nb_rx = rte_eth_rx_burst(
options.s_portid, tinfo->rxqid, rx_bufs, BURST_SIZE);
if (nb_rx > 0) {
for (int i = 0; i < nb_rx; i++) {
struct pkt_hdr *each = check_valid_packet(
rx_bufs[i], &options.s_host_spec.mac_addr);
if (each == nullptr) {
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: ignoring invalid packet %p.\n",
tinfo->id, (void *)rx_bufs[i]);
rte_pktmbuf_free(rx_bufs[i]);
continue;
}
uint16_t type = rte_be_to_cpu_16(each->type);
NTR_PKT(NTR_DEP_USER1, NTR_LEVEL_DEBUG, each,
"locore_main <thread %d>: ", tinfo->id);
struct pkt_payload_epoch *pld_epoch;
uint32_t epoch;
uint32_t id;
struct thread_info *other_t;
bool bool_expected = false;
int int_expected = STATE_RUNNING;
switch (type) {
case PKT_TYPE_LOAD_RESP:
pld_epoch = (struct pkt_payload_epoch *)
each->payload;
epoch = rte_be_to_cpu_32(
pld_epoch->epoch);
id = epoch_get_id(epoch);
epoch = epoch_get_epoch(epoch);
tinfo->total_pkts.fetch_add(1);
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: packet %p epoch %d id %d.\n",
tinfo->id, (void *)rx_bufs[i],
epoch, id);
if (id >= options.s_num_threads) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: packet %p invalid id %d.\n",
tinfo->id,
(void *)rx_bufs[i], id);
break;
}
other_t = options.s_thr_info.at(id);
if (epoch !=
other_t->cur_epoch.load()) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: packet %p invalid epoch %d != %d.\n",
tinfo->id,
(void *)rx_bufs[i], epoch,
other_t->cur_epoch.load());
break;
}
if (!other_t->epoch_recv
.compare_exchange_strong(
bool_expected, true)) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: failed to cmpxchg with thread %d.\n",
tinfo->id, other_t->id);
break;
}
break;
case PKT_TYPE_FIN:
if (rte_is_same_ether_addr(
&each->eth_hdr.s_addr,
&options.s_master_spec
.mac_addr)) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: recved FIN from cat.\n",
tinfo->id);
// master told us to stop!
if (!options.s_state
.compare_exchange_strong(
int_expected,
STATE_FIN)) {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: failed to cmpxchg state.\n",
tinfo->id);
}
uint32_t qps = calc_qps(now);
struct pkt_hdr *pkt_hdr;
if (alloc_pkt_hdr(
options.mbuf_pool,
PKT_TYPE_FIN_ACK,
&options.s_master_cspec,
&tx_buf,
&pkt_hdr) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt hdr\n");
}
auto pld_qps =
(struct pkt_payload_qps *)
pkt_hdr->payload;
pld_qps->qps = rte_cpu_to_be_32(
qps);
const uint16_t nb_tx =
rte_eth_tx_burst(
options.s_portid,
tinfo->txqid, &tx_buf,
1);
if (nb_tx != 1) {
rte_exit(EXIT_FAILURE,
"failed to send packet\n");
}
ntr(NTR_DEP_USER1,
NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: sent FIN_ACK to cat. QPS = %d.\n",
tinfo->id, qps);
} else {
ntr(NTR_DEP_USER1,
NTR_LEVEL_WARNING,
"pkt_loop <thread %d>: invalid FIN packet from a different cat.\n",
tinfo->id);
}
break;
default:
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop: ignoring packet %p with unknown type %d.\n",
(void *)rx_bufs[i], type);
}
rte_pktmbuf_free(rx_bufs[i]);
}
}
if (now >= next_ts && tinfo->epoch_recv.load()) {
struct pkt_payload_load *pld_load;
struct pkt_hdr *pkt_data;
next_ts += (int)(tinfo->ia_gen->generate() * S2NS);
// change dst port for every packet for RSS
srv_cspec.dst_port = dst_port_gen.next();
srv_cspec.src_port = src_port_gen.next();
if (alloc_pkt_hdr(options.mbuf_pool, PKT_TYPE_LOAD,
&srv_cspec, &tx_buf, &pkt_data) != 0) {
rte_exit(EXIT_FAILURE,
"failed to allocate pkt hdr\n");
}
// pre-increment the epoch
uint32_t epoch = tinfo->cur_epoch.fetch_add(1) + 1;
pld_load = (struct pkt_payload_load *)pkt_data->payload;
pld_load->load = rte_cpu_to_be_32(
tinfo->load_gen->generate());
pld_load->epoch = rte_cpu_to_be_32(
epoch_mk(tinfo->id, epoch));
tinfo->epoch_recv.store(false);
last_ts = now;
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: sending packet %p with epoch %d\n",
tinfo->id, (void *)tx_buf, epoch);
const uint16_t nb_tx = rte_eth_tx_burst(
options.s_portid, tinfo->txqid, &tx_buf, 1);
if (nb_tx != 1) {
rte_exit(
EXIT_FAILURE, "failed to send packet\n");
}
}
if (!tinfo->epoch_recv.load()) {
// if we haven't received the packet, get read to rage
// quit
if (now - last_ts > options.rage_quit_time * MS2NS) {
rte_exit(EXIT_FAILURE,
"waiting too long for resp. I QUIT!!\n");
}
}
}
ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG,
"pkt_loop <thread %d>: exiting loop...\n", tinfo->id);
}
static int
locore_main(void *tif)
{
auto tinfo = (struct thread_info *)tif;
uint32_t core_id = rte_lcore_id();
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"locore_main <thread %d>: running on core %d...\n", tinfo->id,
core_id);
if (rte_eth_dev_socket_id(options.s_portid) > 0 &&
rte_eth_dev_socket_id(options.s_portid) != (int)rte_socket_id()) {
ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING,
"locore_main <thread %d>: WARNING, port %d is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n",
tinfo->id, options.s_portid);
}
if (options.slave_mode == 1) {
// perform rat protocol
proto_loop(tinfo);
}
// wait for the primary thread sending SYNC_ACK
while (options.s_state.load() != STATE_RUNNING) {
}
// store the current timestamp
options.s_ts_begin.store(nm_get_uptime_ns());
pkt_loop(tinfo);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "locore_main <thread %d>: exited\n",
tinfo->id);
return 0;
}
static int
port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_dev_info dev_info {
};
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_txconf txconf {
};
struct rte_eth_rxconf rxconf {
};
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
if (!rte_eth_dev_is_valid_port(portid)) {
return -1;
}
int ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0) {
return ret;
}
port_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_NONFRAG_IPV4_UDP |
ETH_RSS_L2_PAYLOAD | ETH_RSS_NONFRAG_IPV4_TCP;
port_conf.rx_adv_conf.rss_conf.rss_key = nullptr;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_UDP_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_IPV4_CKSUM;
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_RSS_HASH;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
/* Configure the Ethernet device. */
ret = rte_eth_dev_configure(
portid, options.s_num_threads, options.s_num_threads, &port_conf);
if (ret != 0)
return ret;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
return ret;
/* Allocate and set up 1 RX queue per thread . */
rxconf = dev_info.default_rxconf;
rxconf.offloads = port_conf.rxmode.offloads;
for (uint32_t i = 0; i < options.s_num_threads; i++) {
ret = rte_eth_rx_queue_setup(portid,
options.s_thr_info.at(i)->rxqid, nb_rxd,
rte_eth_dev_socket_id(portid), &rxconf, mbuf_pool);
if (ret < 0)
return ret;
}
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
/* Allocate and set up 1 TX queue per Ethernet port. */
for (uint32_t i = 0; i < options.s_num_threads; i++) {
ret = rte_eth_tx_queue_setup(portid,
options.s_thr_info.at(i)->txqid, nb_txd,
rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
return ret;
}
ret = rte_eth_dev_start(portid);
if (ret < 0)
return ret;
/* Display the port MAC address. */
struct rte_ether_addr addr {
};
ret = rte_eth_macaddr_get(portid, &addr);
if (ret != 0)
return ret;
// no promiscuous mode required
return 0;
}
static void
dump_options()
{
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"Configuration:\n"
" verbosity = +%d\n"
" run time = %d\n"
" num threads = %d\n"
" rage quit time = %ld\n"
" cpu mask = 0x%lx\n"
" slave mode = %d\n"
" interarrival dist = %s\n"
" load dist = %s\n"
" qps = %d\n"
" host IP = 0x%x\n",
ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING, options.run_time,
options.s_num_threads, options.rage_quit_time, options.cpu_mask,
options.slave_mode, options.ia_gen, options.ld_gen,
options.target_qps, options.s_host_spec.ip);
}
static void
usage()
{
fprintf(stdout,
"Usage:\n"
" -v(vv): verbose mode\n"
" -h: display the information\n"
" -t: run time\n"
" -s: server net spec\n"
" -S: slave(rat) mode\n"
" -A: affinity mask\n"
" -i: inter-arrival time distribution\n"
" -l: load distribution\n"
" -r: rage quit time (in ms)\n"
" -q: target QPS\n"
" -H: host net spec\n");
}
int
main(int argc, char *argv[])
{
unsigned int nb_ports;
struct rte_mempool *mbuf_pool;
struct thread_info *tinfo;
bool has_host_spec = false;
ntr_init();
// init dpdk
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "rte_eal_init failed!\n");
}
argc -= ret;
argv += ret;
// set warning level
ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_WARNING);
{
int c;
// parse arguments
while ((c = getopt(argc, argv, "vht:s:SA:i:l:r:q:H:")) != -1) {
switch (c) {
case 'v':
ntr_set_level(NTR_DEP_USER1,
ntr_get_level(NTR_DEP_USER1) + 1);
break;
case 'h':
usage();
rte_exit(EXIT_SUCCESS, "\n");
case 't':
options.run_time = strtol(optarg, nullptr, 10);
break;
case 's':
if (str_to_netspec(
optarg, &options.server_spec) != 0) {
rte_exit(EXIT_FAILURE,
"invalid server net spec\n");
}
break;
case 'S':
options.slave_mode = 1;
options.s_state =
STATE_SYNC; // set state to wait for SYNC
break;
case 'A':
options.cpu_mask = strtoull(
optarg, nullptr, 16);
options.s_num_threads = cmask_get_num_cpus(
options.cpu_mask);
if (options.s_num_threads == 0) {
rte_exit(EXIT_FAILURE,
"invalid cpu mask 0x%lx\n",
options.cpu_mask);
}
break;
case 'i':
strncpy(options.ia_gen, optarg,
sizeof(options.ia_gen) - 1);
break;
case 'l':
strncpy(options.ld_gen, optarg,
sizeof(options.ld_gen) - 1);
break;
case 'r':
options.rage_quit_time = strtol(
optarg, nullptr, 10);
break;
case 'q':
options.target_qps = strtol(
optarg, nullptr, 10);
break;
case 'H':
has_host_spec = true;
if (str_to_netspec(
optarg, &options.s_host_spec) != 0) {
rte_exit(EXIT_FAILURE,
"invalid host net spec.\n");
}
break;
default:
usage();
rte_exit(
EXIT_FAILURE, "unknown argument: %c\n", c);
}
}
}
if (!has_host_spec) {
rte_exit(EXIT_FAILURE, "Must specify host IP.\n");
}
dump_options();
// init nm
if (nm_init(ntr_get_level(NTR_DEP_USER1) - NTR_LEVEL_WARNING) != 0) {
rte_exit(EXIT_FAILURE, "nm init failed!\n");
}
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0) {
rte_exit(EXIT_FAILURE, "number of ports must be > 0\n");
}
uint16_t portid = rte_eth_find_next(0);
if (portid == RTE_MAX_ETHPORTS) {
rte_exit(EXIT_FAILURE, "cannot find an available port\n");
}
options.s_portid = portid;
if (rte_eth_macaddr_get(portid, &options.s_host_spec.mac_addr) != 0) {
rte_exit(EXIT_FAILURE, "cannot get mac address of port %d\n",
portid);
}
// create a mbuf memory pool on the socket
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", MBUF_MAX_COUNT,
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
rte_eth_dev_socket_id(options.s_portid));
if (mbuf_pool == nullptr) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool\n");
}
options.mbuf_pool = mbuf_pool;
uint64_t cmask = options.cpu_mask;
for (unsigned int i = 0; i < options.s_num_threads; i++) {
tinfo = new thread_info;
tinfo->ia_gen = createGenerator(options.ia_gen);
tinfo->load_gen = createGenerator(options.ld_gen);
if (tinfo->ia_gen == nullptr || tinfo->load_gen == nullptr) {
rte_exit(
EXIT_FAILURE, "invalid ia_gen or ld_gen string\n");
}
tinfo->ia_gen->set_lambda((double)options.target_qps /
(double)(options.s_num_threads));
tinfo->id = i;
tinfo->lcore_id = cmask_get_next_cpu(&cmask);
tinfo->rxqid = i;
tinfo->txqid = i;
options.s_thr_info.push_back(tinfo);
}
if (port_init(portid, mbuf_pool) != 0) {
rte_exit(EXIT_FAILURE, "cannot init port %d\n", portid);
}
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"Configured port %d with mac addr %x:%x:%x:%x:%x:%x\n", portid,
options.s_host_spec.mac_addr.addr_bytes[0],
options.s_host_spec.mac_addr.addr_bytes[1],
options.s_host_spec.mac_addr.addr_bytes[2],
options.s_host_spec.mac_addr.addr_bytes[3],
options.s_host_spec.mac_addr.addr_bytes[4],
options.s_host_spec.mac_addr.addr_bytes[5]);
sleep(INIT_DELAY);
for (unsigned int i = 0; i < options.s_num_threads; i++) {
tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: launching thread %d on locore %d\n", tinfo->id,
tinfo->lcore_id);
if (rte_eal_remote_launch(locore_main,
(void *)options.s_thr_info.at(i),
tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE,
"failed to launch function on locore %d\n",
tinfo->lcore_id);
}
}
// poor man's timer
uint32_t second = 0;
uint32_t qps = 0;
// this loop exit is signaled by SYNC_FIN in slave mode and by itself in
// non slave mode
while (options.s_state.load() != STATE_FIN) {
if (options.slave_mode != 1) {
if (second >= options.run_time) {
options.s_state.store(STATE_FIN);
qps = calc_qps(nm_get_uptime_ns());
break;
}
usleep(1 * S2US);
second++;
}
}
for (unsigned int i = 0; i < options.s_num_threads; i++) {
tinfo = options.s_thr_info.at(i);
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO,
"main: waiting for locore %d...\n", tinfo->lcore_id);
if (rte_eal_wait_lcore(tinfo->lcore_id) != 0) {
rte_exit(EXIT_FAILURE, "failed to wait for locore %d\n",
tinfo->lcore_id);
}
}
if (options.slave_mode != 1) {
ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: total QPS = %d\n",
qps);
}
// clean up
rte_eth_dev_stop(portid);
return 0;
}

38
scripts/compile.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/sh
test_dir="/numam.d"
root=".."
servers="skylake2.rcs.uwaterloo.ca skylake3.rcs.uwaterloo.ca skylake6.rcs.uwaterloo.ca"
rsync_flags="-vchr"
ssh_args="-o StrictHostKeyChecking=no -p77"
user=$1
if [ -z $user ]
then
user=$(whoami)
fi
echo "USER: $user"
compile() {
# separate these functions because we might change kernel (reboot) without needing to recompile
echo "====================$1===================="
echo "Syncing directories..."
ssh $(echo $ssh_args $user@$1) "sudo mkdir -p $test_dir"
ssh $(echo $ssh_args $user@$1) "sudo chmod 777 $test_dir"
rsync $(echo $rsync_flags) -e 'ssh -p 77' $root/ $user@$1:$test_dir/
echo "Compiling..."
ssh $(echo $ssh_args $user@$1) "sudo mkdir -p $test_dir/build; cd $test_dir/build; sudo rm -rf *; sudo cmake ../; sudo make clean all -j8" &
wait
echo "$1 Done."
echo ""
}
i=0
for server in $servers
do
i=$(expr $i + 1)
compile "$server" &
done
wait

105
scripts/histo.py Normal file
View File

@ -0,0 +1,105 @@
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import numpy as np
import sys
import re
import os
import json
import getopt
import math
import concurrent.futures as CF
import libpar as par
num_bins = 1000
extra_pct = []
def saveplot(fp : str, data : [], title : str):
plt.hist(data, num_bins)
plt.xlabel("Delay")
plt.title(title)
plt.ylabel("Frequency")
f = plt.gcf()
f.set_size_inches(11.69, 8.27)
f.savefig(fp + "_" + title + "_" + ".png", dpi=160)
plt.clf()
print("Generated - " + fp + "_" + title + "_" + ".png")
executor = CF.ProcessPoolExecutor(max_workers=int(os.cpu_count()))
def clean_data(dat: []):
ret = []
arr = np.array(dat)
cutoff = np.percentile(arr, 99)
for i in arr:
if i <= cutoff:
ret.append(i)
return ret
def process_file(each_dir):
try:
print("Processing " + each_dir + " ...")
with open(each_dir, 'r') as f:
parser = par.khat_parser()
parser.parse(f.read())
sh = []
ss = []
ch = []
cs = []
for pt in parser.datapt:
sh.append(pt.s_htx - pt.s_hrx)
ss.append(pt.s_stx - pt.s_srx)
ch.append(pt.c_hrx - pt.c_htx)
cs.append(pt.c_srx - pt.c_stx)
sh = clean_data(sh)
ss = clean_data(ss)
ch = clean_data(ch)
cs = clean_data(cs)
saveplot(each_dir, sh, "server_hw_delay")
saveplot(each_dir, ss, "server_sw_delay")
saveplot(each_dir, ch, "client_hw_delay")
saveplot(each_dir, cs, "client_sw_delay")
# output median, etc.
with open(each_dir + "_" + "stats.txt", 'w') as f:
f.write("===================== SERVER HW ====================\n")
f.write(par.mutilate_data.build_mut_output(sh, [len(sh)]))
f.write("\n===================== SERVER SW ====================\n")
f.write(par.mutilate_data.build_mut_output(ss, [len(ss)]))
f.write("\n===================== CLIENT HW ====================\n")
f.write(par.mutilate_data.build_mut_output(ch, [len(ch)]))
f.write("\n===================== CLIENT SW ====================\n")
f.write(par.mutilate_data.build_mut_output(cs, [len(cs)]))
except Exception:
print("Unexpected error:", sys.exc_info())
def process_dir(rootdir):
for subdir in os.listdir(rootdir):
each_dir = os.path.join(rootdir, subdir)
if os.path.isfile(each_dir):
if each_dir.endswith("sample.txt") or each_dir.endswith(".sample"):
process_file(each_dir)
else:
process_dir(each_dir)
def main():
datdir = None
options = getopt.getopt(sys.argv[1:], 'd:')[0]
for opt, arg in options:
if opt in ('-d'):
datdir = arg
if datdir == None:
raise Exception("Must specify -d parameter")
process_dir(datdir)
executor.shutdown()
if __name__ == "__main__":
main()

113
scripts/libs/libpar.py Normal file
View File

@ -0,0 +1,113 @@
import json
import numpy as np
class khat_parser:
class pt:
def __init__(self):
self.s_htx = 0
self.s_hrx = 0
self.s_stx = 0
self.s_srx = 0
self.c_htx = 0
self.c_hrx = 0
self.c_stx = 0
self.c_srx = 0
def __init__(self):
self.datapt = []
def parse(self, output : str):
for line in output.splitlines():
cells = line.split(',')
if len(cells) != 8:
raise Exception("Invalid line:" + line)
pt = self.pt()
pt.c_srx = int(cells[0])
pt.c_stx = int(cells[1])
pt.c_hrx = int(cells[2])
pt.c_htx = int(cells[3])
pt.s_srx = int(cells[4])
pt.s_stx = int(cells[5])
pt.s_hrx = int(cells[6])
pt.s_htx = int(cells[7])
self.datapt.append(pt)
class mutilate_data:
def __init__(self):
self.dat = {}
self.qps = 0
def to_string(self):
ret = "Throughput: " + str(self.qps) + "\n" + json.dumps(self.dat)
return ret
@staticmethod
def parse_mut_output(output):
ret = mutilate_data()
succ_qps = False
succ_read = False
table = [None, "avg", "std", "min", "5th", "10th", "50th", "90th", "95th", "99th"]
table_legacy = [None, "avg", "std", "min", "5th", "10th", "90th", "95th", "99th"]
for line in output.splitlines():
if line.find("Total QPS") != -1:
spl = line.split()
if len(spl) == 7:
ret.qps = float(spl[3])
succ_qps = True
else:
break
elif line.find("read") != -1:
spl = line.split()
if len(spl) == 10:
for i in range(1, len(spl)):
ret.dat[table[i]] = float(spl[i])
succ_read = True
elif len(spl) == 9:
for i in range(1, len(spl)):
ret.dat[table_legacy[i]] = float(spl[i])
succ_read = True
else:
break
if not (succ_qps and succ_read):
raise Exception("Failed to parse data")
return ret
@staticmethod
def parse_mut_sample(fn):
f = open(fn, "r")
qps = []
lat = []
lines = f.readlines()
for line in lines:
entry = line.split()
if len(entry) != 2:
raise Exception("Unrecognized line: " + line)
qps.append(float(entry[0]))
lat.append(float(entry[1]))
f.close()
return qps, lat
# generate mutilate output format
@staticmethod
def build_mut_output(lat_arr, qps_arr):
output = '{0: <10}'.format('#type') + '{0: >10}'.format('avg') + '{0: >10}'.format('std') + \
'{0: >10}'.format('min') + '{0: >10}'.format('5th') + '{0: >10}'.format('10th') + \
'{0: >10}'.format('50th') + '{0: >10}'.format('90th') + '{0: >10}'.format('95th') + '{0: >10}'.format('99th') + "\n"
output += '{0: <10}'.format('read') + '{0: >10}'.format("{:.1f}".format(np.mean(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.std(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.min(lat_arr))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 5))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 10))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 50))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 90))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 95))) + ' ' + \
'{0: >10}'.format("{:.1f}".format(np.percentile(lat_arr, 99))) + ' ' + "\n" \
output += "\n" + "Total QPS = " + "{:.1f}".format(np.mean(qps_arr)) + " (0 / 0s)"
return output

172
scripts/libs/libtc.py Normal file
View File

@ -0,0 +1,172 @@
import subprocess as sp
import time
import select
import os
import pwd
import sys
import datetime
import random
import re
from threading import Thread
tc_logfile = None
def log_print(info):
print(info)
if tc_logfile != None:
tc_logfile.write(info + "\n")
tc_logfile.flush()
tc_output_dir=""
tc_cur_test = ""
tc_test_id = 0
def init(odir = "./results.d/"):
global tc_output_dir
tc_output_dir = odir + "_" + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
tc_output_dir = os.path.expanduser(tc_output_dir)
os.system("mkdir -p " + tc_output_dir)
global tc_logfile
tc_logfile = open(tc_output_dir + "/log.txt", "w+")
def begin(name):
global tc_test_id
global tc_cur_test
tc_cur_test = name
tc_test_id += 1
os.system("mkdir -p " + get_odir())
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " started =====")
def end():
global tc_cur_test
log_print("\n===== Test #" + str(tc_test_id) + " - " + tc_cur_test + " completed =====")
tc_cur_test = None
def get_odir():
return tc_output_dir + "/" + tc_cur_test
SCHED_QUEUE = 1
SCHED_CPU = 2
SCHED_BEST = 4
SCHED_FEAT_WS = 1
def make_sched_flag(sched, args, feat = 0, fargs = 0):
return (sched & 0xFF) | (args & 0xFF) << 8 | (feat & 0xFF) << 16 | (fargs & 0xFF) << 24
TUNE_RTSHARE = 2
TUNE_TFREQ = 1
def make_tune_flag(obj, val):
return (obj & 0xFFFF) | (val & 0xFFFF) << 16
def get_username():
return pwd.getpwuid( os.getuid() )[0]
ssh_param = ""
def set_ssh_param(para):
global ssh_param
ssh_param = para
ssh_user = None
def set_ssh_user(user):
global ssh_user
ssh_user = user
def remote_exec(srv, cmd, blocking=True, check=True):
sub = []
for s in srv:
p = sp.Popen(["ssh " + ssh_param + " " + ((ssh_user + "@") if ssh_user != None else "") + s + " \"" + cmd +"\""], shell=True, stdout=sp.PIPE, stderr=sp.PIPE)
sub.append(p)
if blocking:
for p in sub:
p.wait()
if check and p.returncode != 0:
raise Exception("Command failed " + cmd)
return sub
def scan_stderr(p, exclude = None):
for err in p.stderr:
fail = True
err = err.decode()
err = err.strip()
# print(err)
if len(err) == 0:
continue
if exclude != None:
for exc in exclude:
if (exc != None) and (re.match(exc, err) != None):
fail = False
break
if fail:
log_print("Error detected: " + err)
return False
return True
# stderr threads
errthr_objs = []
errthr_sigstop = False
errthr_failed = False
def errthr_get_failed():
return errthr_failed
def thr_check_stderr(p : sp.Popen, exclude):
global errthr_failed
while(not errthr_sigstop):
if not scan_stderr(p, exclude=exclude):
errthr_failed = True
time.sleep(0.5 + random.uniform(-0.1, 0.1))
def errthr_start():
global errthr_sigstop
global errthr_failed
errthr_sigstop = False
errthr_failed = False
for thr in errthr_objs:
thr.start()
def errthr_create(cp, exclude = None):
global errthr_objs
for p in cp:
errthr_objs.append(Thread(target = thr_check_stderr, args=(p, exclude)))
def errthr_stop():
global errthr_objs
global errthr_sigstop
errthr_sigstop = True
# print("waiting!")
for thr in errthr_objs:
thr.join()
errthr_objs.clear()
def parse_hostfile(fp):
ret = {}
fh = open(fp, "r")
content = fh.readlines()
fh.close()
content = [x.strip() for x in content]
for line in content:
spl = line.split(" ")
if len(spl) >= 2:
ret[spl[0]] = spl[1]
log_print("Parsed: hostname \"" + spl[0] + "\" -> \"" + spl[1] + "\"")
return ret
def process_hostnames(names, hosts):
ret = []
for line in names:
if line in hosts:
ret.append(hosts[line])
else:
ret.append(line)
return ret
def get_cpuset_core(threads):
ret = "cpuset -l 0-" + str(threads * 2 - 1) + " "
return ret

229
scripts/run.py Executable file
View File

@ -0,0 +1,229 @@
import subprocess as sp
import time
import select
import os
import datetime
import pwd
import sys
import getopt
import numpy as np
import re
import libpar as par
import libtc as tc
step_inc_pct = 100
init_step = 20000 #
start_step = 10000
term_qps = 85000000000
term_pct = 1
inc_pct = 50
server_port = 23444
# paths
test_dir = "/numam.d/build"
file_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.join(file_dir,"..")
sample_filename = "sample.txt"
affinity = [
"0x4", # core 2
"0x400", # core 10
"0x100000", # core 20
"0x1000000", # core 24
"0x40000000", # core 30
"0x10000000000" # core 40
]
master = ["skylake3.rcs.uwaterloo.ca"]
master_mac = ["3c:15:fb:c9:f3:4b"]
server = ["skylake2.rcs.uwaterloo.ca"]
server_mac = ["3c:15:fb:c9:f3:36"]
clients = []
client_mac = []
rage_quit = 1000 #1s
warmup = 5
duration = 25
cooldown = 0
cacheline = 0
SSH_PARAM = "-o StrictHostKeyChecking=no -p77"
SSH_USER = "oscar"
hostfile = None
lockstat = False
client_only = False
def stop_all():
# stop clients
tc.log_print("Stopping clients...")
tc.remote_exec(clients, "sudo killall -9 rat", check=False)
if not client_only:
# stop server
tc.log_print("Stopping server...")
tc.remote_exec(server, "sudo killall -9 khat", check=False)
# stop master
tc.log_print("Stopping master...")
tc.remote_exec(master, "sudo killall -9 cat", check=False)
def get_client_str(clt):
ret = " "
for client in clt:
ret += " -a " + client + " "
return ret
def run_exp(sc, ld):
while True:
if client_only:
ssrv = None
else:
# start server
tc.log_print("Starting server...")
server_cmd = "sudo " + test_dir + "/khat -- -A " + sc
tc.log_print(server_cmd)
ssrv = tc.remote_exec(server, server_cmd, blocking=False)
# start clients
# tc.log_print("Starting clients...")
# client_cmd = tc.get_cpuset_core(client_threads) + " " + test_dir + "/pingpong/build/dismember -A"
# tc.log_print(client_cmd)
# sclt = tc.remote_exec(ssh_clients, client_cmd, blocking=False)
time.sleep(3)
# start master
tc.log_print("Starting master...")
master_cmd = "sudo " + test_dir + "/cat -- " + \
" -s " + server_mac[0] + \
" -o " + test_dir + "/" + sample_filename + \
" -t " + str(duration) + \
" -T " + str(warmup) + \
" -i fixed:0.001" + \
" -r " + str(rage_quit) + \
" -A 0x4"
tc.log_print(master_cmd)
sp = tc.remote_exec(master, master_cmd, blocking=False)
p = sp[0]
# launch stderr monitoring thread
tc.errthr_create(sp, exclude=[".*EAL.*"])
tc.errthr_create(ssrv, exclude=[".*EAL.*"])
tc.errthr_start()
success = False
cur = 0
while True:
# either failed or timeout
# we use failure detection to save time for long durations
if tc.errthr_get_failed() or cur >= int(warmup + duration) + 5 :
break
if p.poll() != None:
success = True
break
time.sleep(1)
cur = cur + 1
stop_all()
tc.errthr_stop()
print("Cooling down...")
time.sleep(cooldown)
if success:
return
def keep_results():
scpcmd = "scp -P77 oscar@" + master[0] + ":" + test_dir + "/" + sample_filename + " " + tc.get_odir() + "/sample.txt"
tc.log_print(scpcmd)
sp.check_call(scpcmd, shell=True)
with open(tc.get_odir() + "/sample.txt", 'r') as f:
tc.log_print("Total requests: " + str(len(f.readlines())))
return
def main():
global hostfile
global server
global master
global clients
global client_only
tc.set_ssh_param(SSH_PARAM)
tc.set_ssh_user(SSH_USER)
options = getopt.getopt(sys.argv[1:], 'h:sldcp')[0]
for opt, arg in options:
if opt in ('-h'):
hostfile = arg
elif opt in ('-s'):
stop_all()
return
elif opt in ('-c'):
client_only=True
tc.init("~/results.d/numam/")
tc.log_print("Configuration:\n" + \
"Hostfile: " + ("None" if hostfile == None else hostfile) + "\n" \
"Client only: " + str(client_only) + "\n")
if hostfile != None:
hosts = tc.parse_hostfile(hostfile)
server = tc.process_hostnames(server, hosts)
clients = tc.process_hostnames(clients, hosts)
master = tc.process_hostnames(master, hosts)
stop_all()
for i in range(0, len(affinity)):
eaff = affinity[i]
# step_mul = 100
# last_load = 0
# cur_load = start_step
tc.begin(eaff)
tc.log_print("============ Affinity: " + str(eaff) + " Load: MAX" + " ============")
run_exp(eaff, 0)
keep_results()
stop_all()
# while True:
# tc.log_print("============ Sched: " + str(ename) + " Flag: " + format(esched, '#04x') + " Load: " + str(cur_load) + " ============")
# output, sout, serr = run_exp(esched, cur_load, lockstat)
# qps = keep_results(output, sout, serr)
# pct = int((qps - last_load) / init_step * 100)
# tc.log_print("last_load: " + str(last_load) + " this_load: " + str(qps) + " inc_pct: " + str(pct) + "%")
# if cur_load > term_qps:
# tc.log_print("qps more than " + str(term_qps) + "%. Done.")
# break
# if pct <= term_pct:
# tc.log_print("inc_pct less than TERM_PCT " + str(term_pct) + "%. Done.")
# break
# if pct <= inc_pct:
# step_mul += step_inc_pct
# tc.log_print("inc_pct less than INC_PCT " + str(inc_pct) + "%. Increasing step multiplier to " + str(step_mul) + "%")
# last_load = qps
# cur_load += int(init_step * step_mul / 100)
# tc.log_print("")
tc.end()
stop_all()
main()