diff --git a/CMakeLists.txt b/CMakeLists.txt index 1265ab2..f1b980d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,16 +1,18 @@ cmake_minimum_required(VERSION 3.0) -find_program(CC_GCC clang) -find_program(CXX_GCC clang++) - -set(CMAKE_C_COMPILER ${CC_GCC}) -set(CMAKE_CXX_COMPILER ${CXX_GCC}) - project(khat) list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}") -find_package(dpdk REQUIRED) -find_package(Hwloc REQUIRED) +find_package(PkgConfig REQUIRED) + +pkg_check_modules(HWLOC hwloc REQUIRED) +pkg_check_modules(DPDK libdpdk) +pkg_check_modules(SPDK spdk_event_bdev spdk_env_dpdk) +pkg_check_modules(SPDK_SYS spdk_syslibs) +pkg_check_modules(UUID uuid) +# get_filename_component(ISAL_LIB_PATH ${SPDK_INCLUDE_DIRS} DIRECTORY) +# get_filename_component(ISAL_LIB_PATH ${ISAL_LIB_PATH} DIRECTORY) +# set(ISAL_LIB_PATH ${ISAL_LIB_PATH}/isa-l/.libs) set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -Wno-deprecated-declarations @@ -20,46 +22,41 @@ set(CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -msse4 -mavx) - include_directories(${CMAKE_SOURCE_DIR}/inc) -include_directories(${dpdk_INCLUDE_DIRS}) -include_directories(${Hwloc_INCLUDE_DIRS}) set(LIBNM_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11 -mavx -msse4) set(LIBNTR_C_FLAGS -O2 -g -Wall -Wextra -Werror -std=c11) set(LIBGEN_CC_FLAGS -O2 -g -Wall -Wextra -Werror -std=c++11) -set(KHAT_LINKLIBS pthread nm ntr ${dpdk_LIBRARIES}) -set(CAT_LINKLIBS pthread nm ntr gen ${dpdk_LIBRARIES}) -set(RAT_LINKLIBS pthread nm ntr gen ${dpdk_LIBRARIES}) -set(MEMLOAD_LINKLIBS pthread nm ntr) - -add_library(ntr libntr/ntr.c) +add_library(ntr STATIC libntr/ntr.c) target_compile_options(ntr PRIVATE ${LIBNTR_C_FLAGS}) -add_library(gen libgen/generator.cc) +add_library(gen STATIC libgen/generator.cc) target_compile_options(gen PRIVATE ${LIBGEN_CC_FLAGS}) -add_library(nm libnm/nm.cc libnm/alloc.cc libnm/loadgen.cc libnm/topo.cc) -target_link_libraries(nm gen ${Hwloc_LIBRARIES}) -target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS}) +add_library(nm STATIC libnm/nm.cc libnm/alloc.cc libnm/loadgen.cc libnm/topo.cc) +target_include_directories(nm PRIVATE ${HWLOC_INCLUDE_DIRS}) +target_link_libraries(nm PRIVATE gen ${HWLOC_LINK_LIBRARIES}) +target_compile_options(nm PRIVATE ${LIBNM_CC_FLAGS} ${HWLOC_CFLAGS}) -add_executable(khat net/khat.cc) -target_link_libraries(khat ${KHAT_LINKLIBS}) -target_compile_options(khat PRIVATE ${CC_FLAGS}) -target_link_options(khat PRIVATE -L /usr/local/lib) +add_executable(khat EXCLUDE_FROM_ALL net/khat.cc) +target_link_libraries(khat PRIVATE pthread nm ntr ${DPDK_LINK_LIBRARIES}) +target_compile_options(khat PRIVATE ${CC_FLAGS} ${DPDK_CFLAGS}) -add_executable(cat net/cat.cc) -target_link_libraries(cat ${CAT_LINKLIBS}) -target_compile_options(cat PRIVATE ${CC_FLAGS}) -target_link_options(cat PRIVATE -L /usr/local/lib) +add_executable(cat EXCLUDE_FROM_ALL net/cat.cc) +target_link_libraries(cat PRIVATE pthread nm ntr gen ${DPDK_LINK_LIBRARIES}) +target_compile_options(cat PRIVATE ${CC_FLAGS} ${DPDK_CFLAGS}) -add_executable(rat net/rat.cc) -target_link_libraries(rat ${RAT_LINKLIBS}) -target_compile_options(rat PRIVATE ${CC_FLAGS}) -target_link_options(rat PRIVATE -L /usr/local/lib) +add_executable(rat EXCLUDE_FROM_ALL net/rat.cc) +target_link_libraries(rat PRIVATE pthread nm ntr gen ${DPDK_LINK_LIBRARIES}) +target_compile_options(rat PRIVATE ${CC_FLAGS} ${DPDK_CFLAGS}) + +add_executable(birb EXCLUDE_FROM_ALL storage/birb.cc storage/io_gen.cc) +target_include_directories(birb PRIVATE ${SPDK_INCLUDE_DIRS} ${DPDK_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS}) +target_compile_options(birb PRIVATE ${CC_FLAGS} ${SPDK_CFLAGS} ${UUID_CFLAGS}) +target_link_directories(birb PRIVATE ${SPDK_LIBRARY_DIRS} ${SPDK_SYS_STATIC_LIBRARY_DIRS} ${UUID_LIBRARY_DIRS}) +target_link_libraries(birb PRIVATE pthread nm ntr gen -Wl,--whole-archive ${SPDK_LIBRARIES} -Wl,--no-whole-archive ${SPDK_SYS_STATIC_LIBRARIES}) add_executable(memloadgen util/memloadgen.cc) -target_link_libraries(memloadgen ${MEMLOAD_LINKLIBS}) -target_compile_options(memloadgen PRIVATE ${CC_FLAGS}) -target_link_options(memloadgen PRIVATE -L /usr/local/lib) \ No newline at end of file +target_link_libraries(memloadgen PRIVATE pthread nm ntr) +target_compile_options(memloadgen PRIVATE ${CC_FLAGS}) \ No newline at end of file diff --git a/FindHwloc.cmake b/FindHwloc.cmake deleted file mode 100644 index 4092a89..0000000 --- a/FindHwloc.cmake +++ /dev/null @@ -1,213 +0,0 @@ -#.rst: -# FindHwloc -# ---------- -# -# Try to find Portable Hardware Locality (hwloc) libraries. -# http://www.open-mpi.org/software/hwloc -# -# You may declare HWLOC_ROOT environment variable to tell where -# your hwloc library is installed. -# -# Once done this will define:: -# -# Hwloc_FOUND - True if hwloc was found -# Hwloc_INCLUDE_DIRS - include directories for hwloc -# Hwloc_LIBRARIES - link against these libraries to use hwloc -# Hwloc_VERSION - version -# Hwloc_CFLAGS - include directories as compiler flags -# Hwloc_LDLFAGS - link paths and libs as compiler flags -# - -#============================================================================= -# Copyright 2014 Mikael Lepistö -# -# Distributed under the OSI-approved BSD License (the "License"); -# -# This software is distributed WITHOUT ANY WARRANTY; without even the -# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the License for more information. -#============================================================================= - -if(WIN32) - find_path(Hwloc_INCLUDE_DIR - NAMES - hwloc.h - PATHS - ENV "PROGRAMFILES(X86)" - ENV HWLOC_ROOT - PATH_SUFFIXES - include - ) - - find_library(Hwloc_LIBRARY - NAMES - libhwloc.lib - PATHS - ENV "PROGRAMFILES(X86)" - ENV HWLOC_ROOT - PATH_SUFFIXES - lib - ) - - # - # Check if the found library can be used to linking - # - SET (_TEST_SOURCE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/linktest.c") - FILE (WRITE "${_TEST_SOURCE}" - " - #include - int main() - { - hwloc_topology_t topology; - int nbcores; - hwloc_topology_init(&topology); - hwloc_topology_load(topology); - nbcores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE); - hwloc_topology_destroy(topology); - return 0; - } - " - ) - - TRY_COMPILE(_LINK_SUCCESS ${CMAKE_BINARY_DIR} "${_TEST_SOURCE}" - CMAKE_FLAGS - "-DINCLUDE_DIRECTORIES:STRING=${Hwloc_INCLUDE_DIR}" - CMAKE_FLAGS - "-DLINK_LIBRARIES:STRING=${Hwloc_LIBRARY}" - ) - - IF(NOT _LINK_SUCCESS) - if(CMAKE_SIZEOF_VOID_P EQUAL 8) - message(STATUS "You are building 64bit target.") - ELSE() - message(STATUS "You are building 32bit code. If you like to build x64 use e.g. -G 'Visual Studio 12 Win64' generator." ) - ENDIF() - message(FATAL_ERROR "Library found, but linking test program failed.") - ENDIF() - - # - # Resolve version if some compiled binary found... - # - find_program(HWLOC_INFO_EXECUTABLE - NAMES - hwloc-info - PATHS - ENV HWLOC_ROOT - PATH_SUFFIXES - bin - ) - - if(HWLOC_INFO_EXECUTABLE) - execute_process( - COMMAND ${HWLOC_INFO_EXECUTABLE} "--version" - OUTPUT_VARIABLE HWLOC_VERSION_LINE - OUTPUT_STRIP_TRAILING_WHITESPACE - ) - string(REGEX MATCH "([0-9]+.[0-9]+)$" - Hwloc_VERSION "${HWLOC_VERSION_LINE}") - unset(HWLOC_VERSION_LINE) - endif() - - # - # All good - # - - set(Hwloc_LIBRARIES ${Hwloc_LIBRARY}) - set(Hwloc_INCLUDE_DIRS ${Hwloc_INCLUDE_DIR}) - - include(FindPackageHandleStandardArgs) - find_package_handle_standard_args( - Hwloc - FOUND_VAR Hwloc_FOUND - REQUIRED_VARS Hwloc_LIBRARY Hwloc_INCLUDE_DIR Hwloc_VERSION_PARSED Hwloc_VERSION_MAJOR Hwloc_VERSION_MINOR - VERSION_VAR Hwloc_VERSION) - - mark_as_advanced( - Hwloc_INCLUDE_DIR - Hwloc_LIBRARY) - - foreach(arg ${Hwloc_INCLUDE_DIRS}) - set(Hwloc_CFLAGS "${Hwloc_CFLAGS} /I${arg}") - endforeach() - - set(Hwloc_LDFLAGS "${Hwloc_LIBRARY}") - -else() - - if(CMAKE_CROSSCOMPILING) - - find_path(Hwloc_INCLUDE_DIRS - NAMES - hwloc.h - PATHS - ENV HWLOC_ROOT - ) - - find_library(Hwloc_LIBRARIES - NAMES - hwloc - PATHS - ENV HWLOC_ROOT - ) - - if(Hwloc_INCLUDE_DIRS AND Hwloc_LIBRARIES) - message(WARNING "HWLOC library found using find_library() - cannot determine version. Assuming 1.7.0") - set(Hwloc_FOUND 1) - set(Hwloc_VERSION "1.7.0") - endif() - - else() # Find with pkgconfig for non-crosscompile builds - - find_package(PkgConfig) - - if(HWLOC_ROOT) - set(ENV{PKG_CONFIG_PATH} "${HWLOC_ROOT}/lib/pkgconfig") - else() - foreach(PREFIX ${CMAKE_PREFIX_PATH}) - set(PKG_CONFIG_PATH "${PKG_CONFIG_PATH}:${PREFIX}/lib/pkgconfig") - endforeach() - set(ENV{PKG_CONFIG_PATH} "${PKG_CONFIG_PATH}:$ENV{PKG_CONFIG_PATH}") - endif() - - if(hwloc_FIND_REQUIRED) - set(_hwloc_OPTS "REQUIRED") - elseif(hwloc_FIND_QUIETLY) - set(_hwloc_OPTS "QUIET") - else() - set(_hwloc_output 1) - endif() - - if(hwloc_FIND_VERSION) - if(hwloc_FIND_VERSION_EXACT) - pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc=${hwloc_FIND_VERSION}) - else() - pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc>=${hwloc_FIND_VERSION}) - endif() - else() - pkg_check_modules(Hwloc ${_hwloc_OPTS} hwloc) - endif() - - if(Hwloc_FOUND) - string(REPLACE "." ";" Hwloc_VERSION_PARSED "${Hwloc_VERSION}") - set(Hwloc_VERSION "${Hwloc_VERSION}" CACHE STRING "version of Hwloc as a list") - list(GET Hwloc_VERSION_PARSED 0 Hwloc_VERSION_MAJOR) - set(Hwloc_VERSION_MAJOR "${Hwloc_VERSION_MAJOR}" CACHE STRING "Major version of Hwloc") - list(GET Hwloc_VERSION_PARSED 1 Hwloc_VERSION_MINOR) - set(Hwloc_VERSION_MINOR "${Hwloc_VERSION_MINOR}" CACHE STRING "Minor version of Hwloc") - - include(FindPackageHandleStandardArgs) - find_package_handle_standard_args(Hwloc DEFAULT_MSG Hwloc_LIBRARIES) - - if(NOT ${Hwloc_VERSION} VERSION_LESS 1.7.0) - set(Hwloc_GL_FOUND 1) - endif() - - if(_hwloc_output) - message(STATUS - "Found hwloc ${Hwloc_VERSION} in ${Hwloc_INCLUDE_DIRS}:${Hwloc_LIBRARIES}") - endif() - endif() - - endif() # cross-compile else - -endif() \ No newline at end of file diff --git a/Finddpdk.cmake b/Finddpdk.cmake deleted file mode 100644 index 96fc603..0000000 --- a/Finddpdk.cmake +++ /dev/null @@ -1,142 +0,0 @@ -# Try to find dpdk -# -# Once done, this will define -# -# dpdk::dpdk -# dpdk_FOUND -# dpdk_INCLUDE_DIR -# dpdk_LIBRARIES - -find_package(PkgConfig QUIET) -if(PKG_CONFIG_FOUND) - pkg_check_modules(dpdk QUIET libdpdk) -endif() - -if(dpdk_INCLUDE_DIRS) - # good -elseif(TARGET dpdk::dpdk) - get_target_property(dpdk_INCLUDE_DIRS - dpdk::dpdk INTERFACE_INCLUDE_DIRECTORIES) -else() - find_path(dpdk_config_INCLUDE_DIR rte_config.h - HINTS - ENV DPDK_DIR - PATH_SUFFIXES - dpdk - include) - find_path(dpdk_common_INCLUDE_DIR rte_common.h - HINTS - ENC DPDK_DIR - PATH_SUFFIXES - dpdk - include) - set(dpdk_INCLUDE_DIRS "${dpdk_config_INCLUDE_DIR}") - if(NOT dpdk_config_INCLUDE_DIR EQUAL dpdk_common_INCLUDE_DIR) - list(APPEND dpdk_INCLUDE_DIRS "${dpdk_common_INCLUDE_DIR}") - endif() -endif() - -set(components - bus_pci - bus_vdev - cfgfile - cmdline - eal - ethdev - hash - kvargs - mbuf - mempool - mempool_ring - mempool_stack - net - pci - pmd_af_packet - pmd_bnxt - pmd_bond - pmd_cxgbe - pmd_e1000 - pmd_ena - pmd_enic - pmd_i40e - pmd_ixgbe - pmd_mlx5 - pmd_nfp - pmd_qede - pmd_ring - pmd_sfc_efx - pmd_vmxnet3_uio - ring - timer) - -# for collecting dpdk library targets, it will be used when defining dpdk::dpdk -set(_dpdk_libs) -# for list of dpdk library archive paths -set(dpdk_LIBRARIES) - -foreach(c ${components}) - set(dpdk_lib dpdk::${c}) - if(TARGET ${dpdk_lib}) - get_target_property(DPDK_rte_${c}_LIBRARY - ${dpdk_lib} IMPORTED_LOCATION) - else() - find_library(DPDK_rte_${c}_LIBRARY rte_${c} - HINTS - ENV DPDK_DIR - ${dpdk_LIBRARY_DIRS} - PATH_SUFFIXES lib) - endif() - if(DPDK_rte_${c}_LIBRARY) - if (NOT TARGET ${dpdk_lib}) - add_library(${dpdk_lib} UNKNOWN IMPORTED) - set_target_properties(${dpdk_lib} PROPERTIES - INTERFACE_INCLUDE_DIRECTORIES "${dpdk_INCLUDE_DIRS}" - IMPORTED_LOCATION "${DPDK_rte_${c}_LIBRARY}") - if(c STREQUAL pmd_mlx5) - find_package(verbs QUIET) - if(verbs_FOUND) - target_link_libraries(${dpdk_lib} INTERFACE IBVerbs::verbs) - endif() - endif() - endif() - list(APPEND _dpdk_libs ${dpdk_lib}) - list(APPEND dpdk_LIBRARIES ${DPDK_rte_${c}_LIBRARY}) - endif() -endforeach() - -mark_as_advanced(dpdk_INCLUDE_DIRS ${dpdk_LIBRARIES}) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(dpdk DEFAULT_MSG - dpdk_INCLUDE_DIRS - dpdk_LIBRARIES) - -if(dpdk_FOUND) - if(NOT TARGET dpdk::cflags) - if(CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64|AMD64") - set(rte_cflags "-march=core2") - elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "arm|ARM") - set(rte_cflags "-march=armv7-a") - elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64") - set(rte_cflags "-march=armv8-a+crc") - endif() - add_library(dpdk::cflags INTERFACE IMPORTED) - if (rte_cflags) - set_target_properties(dpdk::cflags PROPERTIES - INTERFACE_COMPILE_OPTIONS "${rte_cflags}") - endif() - endif() - - if(NOT TARGET dpdk::dpdk) - add_library(dpdk::dpdk INTERFACE IMPORTED) - find_package(Threads QUIET) - list(APPEND _dpdk_libs - Threads::Threads - dpdk::cflags) - set_target_properties(dpdk::dpdk PROPERTIES - INTERFACE_LINK_LIBRARIES "${_dpdk_libs}" - INTERFACE_INCLUDE_DIRECTORIES "${dpdk_INCLUDE_DIRS}") - endif() -endif() - -unset(_dpdk_libs) \ No newline at end of file diff --git a/inc/defs.h b/inc/defs.h index 126968e..775da8e 100644 --- a/inc/defs.h +++ b/inc/defs.h @@ -8,6 +8,8 @@ TypeName(const TypeName &) = delete; \ void operator=(const TypeName &) = delete +#define UNUSED __attribute__((unused)) + constexpr static unsigned long S2NS = 1000000000UL; constexpr static unsigned long S2US = 1000000UL; constexpr static unsigned long MS2NS = 1000000UL; diff --git a/inc/storage/io_gen.hh b/inc/storage/io_gen.hh new file mode 100644 index 0000000..5e07363 --- /dev/null +++ b/inc/storage/io_gen.hh @@ -0,0 +1,53 @@ +#pragma once +#include +#include +#include "defs.h" +#include "gen.h" +#include + +enum io_generator_opcode { + IOGEN_READ, + IOGEN_WRITE +}; + +enum io_generator_address_mode { + IOGEN_ADDR_MONOTONIC_INCREASING, + IOGEN_ADDR_UNIFORM_RANDOM +}; + +struct io_generator_ctx { + unsigned long size; + uint64_t offset; + io_generator_opcode op; +}; + +// +// cur_offset is aligned to req_size +// +class io_generator { +public: + int issue(struct io_generator_ctx * ctx, char * buf); + io_generator(unsigned long req_size, + unsigned long capacity, + unsigned int read_pct, + io_generator_address_mode addr_mode); + io_generator() = delete; + +private: + unsigned long cur_offset; + + const unsigned long capacity; + const unsigned long req_size; + const unsigned int read_pct; + const io_generator_address_mode addr_mode; + + std::random_device rd; + std::mt19937 rng; + std::uniform_int_distribution dist; + + std::random_device addr_rd; + std::mt19937 addr_rng; + std::uniform_int_distribution addr_dist; + + DISALLOW_EVIL_CONSTRUCTORS(io_generator); +}; diff --git a/libnm/alloc.cc b/libnm/alloc.cc index 8344a3e..7c1cc22 100644 --- a/libnm/alloc.cc +++ b/libnm/alloc.cc @@ -11,7 +11,7 @@ static pthread_mutex_t alloc_lock; static constexpr unsigned int MEM_OBJ_SIZE = 4096; // 4k -static constexpr unsigned int MEM_OBJ_NUM = 1024 * 256; // 4k * 1024 * 255 = 1GB per region +static constexpr unsigned int MEM_OBJ_NUM = 1024 * 256; // 4k * 1024 * 256 = 1GB per region static constexpr unsigned int MEM_REGION_NUM = 4; // 4 x 1GB = 4GB total static int nm_mem_idx[NM_MAX_OBJS_PER_LVL]; static int nm_mem_region_idx[NM_MAX_OBJS_PER_LVL]; diff --git a/libnm/nm.cc b/libnm/nm.cc index 0647939..5ea947c 100644 --- a/libnm/nm.cc +++ b/libnm/nm.cc @@ -48,14 +48,14 @@ nm_init(int verbosity) // init nm_tsc2ns if ((ret = sysctlbyname( SYSCTL_TSC, &sysctl_tsc_freq, &sz, nullptr, 0)) < 0) { - if (verbose) { + if (nm_get_verbose()) { fprintf(stderr, "libnm: failed to query tsc frequency via sysctl (%d)\n", errno); } return ret; } - if (verbose) { + if (nm_get_verbose()) { fprintf(stdout, "libnm: tsc frequency: %lu\n", sysctl_tsc_freq); } diff --git a/scripts/storage/parse.py b/scripts/storage/parse.py new file mode 100644 index 0000000..3ca3e8b --- /dev/null +++ b/scripts/storage/parse.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3.6 + +import numpy as np +import sys +import re +import os +import json +import getopt +import math +import concurrent.futures as CF + +def main(): + datdir = None + options = getopt.getopt(sys.argv[1:], 'f:')[0] + + for opt, arg in options: + if opt in ('-f'): + datdir = arg + + if datdir == None: + raise Exception("Must specify -f parameter") + + with open(datdir) as file: + lines = file.readlines() + + datapts = [] + for line in lines: + if len(line) > 0: + datapts.append(int(line)) + + + runtime = 10 + req = len(datapts) + blk_size = 4096 + bps = blk_size * req + + + + avg_lat = np.average(datapts) + tail99_lat = np.percentile(datapts, 99) + tail95_lat = np.percentile(datapts, 95) + med_lat = np.percentile(datapts, 50) + std_dev = np.std(datapts) + + print("Runtime: " + str(runtime) + "s\n" + "Requests: " + str(req) + "\n" + "Request size: " + str(blk_size) + " bytes\n" + "Request per second: " + str(int(req/runtime)) + "\n" + "Bytes per second: " + str(bps) + " bytes = " + str(int(bps/1024/1024)) + " MB\n" + "Average Latency: " + str(int(avg_lat)) + "\n" + "99th Latency: " + str(int(tail99_lat)) + "\n" + "95th Latency: " + str(int(tail95_lat)) + "\n" + "50th Latency: " + str(int(med_lat)) + "\n" + "stddev: " + str(std_dev) + "\n") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/storage/birb.cc b/storage/birb.cc new file mode 100644 index 0000000..c70b361 --- /dev/null +++ b/storage/birb.cc @@ -0,0 +1,748 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rte_lcore.h" +#include "spdk/cpuset.h" +#include "spdk/stdinc.h" +#include "spdk/thread.h" +#include "spdk/bdev.h" +#include "spdk/env.h" +#include "spdk/event.h" +#include "spdk/log.h" +#include "spdk/string.h" +#include "spdk/bdev_zone.h" + + + +#include "gen.h" +#include "ntr.h" +#include "defs.h" +#include "nm.h" +#include "storage/io_gen.hh" + +static inline uint64_t get_cur_ts_nano() +{ + return std::chrono::duration_cast + (std::chrono::high_resolution_clock::now().time_since_epoch()).count(); +} + +/* + * We'll use this struct to gather housekeeping hello_context to pass between + * our events and callbacks. + */ +static constexpr unsigned long MAX_SPEC_LEN = 32; +static constexpr unsigned long MAX_BDEV_NAME_LEN = 32; +static constexpr unsigned long MAX_OUTPUT_FILE_LEN = 256; +struct options_t { + // args + int verbosity = NTR_LEVEL_DEFAULT; + int num_threads = 1; + unsigned long cpumask = 1; + char pattern_spec[MAX_SPEC_LEN]; + char ia_spec[MAX_SPEC_LEN]; + + unsigned int time = 5; + unsigned int warmup = 2; + unsigned int queue_depth = 1; + char bdev_name[MAX_BDEV_NAME_LEN]; + + char output_file[MAX_OUTPUT_FILE_LEN] = "output.txt"; + + unsigned long req_size = 4096; + unsigned long rps = 0; +}; + +struct main_thread_cb_vars { + uint32_t worker_thread_init_cnt; + uint32_t worker_thread_stop_cnt; +}; + +struct worker_thread_cb_vars { + uint32_t worker_start; + uint32_t worker_stop; + struct thread_context * ctx; + std::list * free_ios; +}; + +static __thread void * cb_vars; +static struct options_t options; + +struct io_record { + uint64_t start_ts; + uint64_t end_ts; +}; + +struct io_request { + uint64_t start_ts; + io_generator_opcode op; + char * user_buf; + char * dma_buf; +}; + +// thread local states (RO by worker threads) +struct thread_context { + unsigned int tid; + unsigned int coreid; + unsigned int sockid; + pthread_t sys_thread; + char thread_name[32]; + struct spdk_thread *main_thread; + struct spdk_bdev *bdev; + struct spdk_bdev_desc *bdev_desc; + struct spdk_thread *s_thread; + + const char * ia_gen_desc; + unsigned long start_region_offset; + unsigned long start_region_length; + unsigned int read_pct; + io_generator_address_mode addr_mode; + + std::list *io_records; +}; + +static void dump_options() +{ + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: Options:\n" + " bdev name: %s\n" + " worker threads: 0x%lx\n" + " number of threads: %d\n" + " IO request size: %lu\n" + " IO requests per second: %lu\n" + " IO pattern: %s\n" + " IO queue depth: %d\n" + " inter-arrival dist: %s\n" + " run time: %d\n" + " warmup time: %d\n" + " output file: %s\n", + options.bdev_name, + options.cpumask, + options.num_threads, + options.req_size, + options.rps, + options.pattern_spec, + options.queue_depth, + options.ia_spec, + options.time, + options.warmup, + options.output_file + ); +} + +static void usage() +{ + fprintf(stdout, + " -V(VV): verbose mode\n" + " -D: bdev name\n" + " -a: worker threads spec (0x3 = spawn 2 threads on core 1 & 2)\n" + " -b: IO request size\n" + " -q: IO requests per second\n" + " -P: IO request pattern\n" + " -Q: IO request queue depth\n" + " -I: inter-arrival time distribution\n" + " -t: total run time\n" + " -w: warm up time\n" + " -o: latency response output file\n"); +} + +static int parse_arg(int c, char *arg) +{ + switch (c) { + case 'V': + ntr_set_level(NTR_DEP_USER1, + ntr_get_level(NTR_DEP_USER1) + 1); + break; + case 'D': + strncpy(options.bdev_name, arg, MAX_BDEV_NAME_LEN); + break; + case 'a': + options.cpumask = strtoull(optarg, nullptr, 16); + options.num_threads = cmask_get_num_cpus( + options.cpumask); + + if (options.num_threads == 0) { + fprintf(stderr, + "must run at least one thread\n"); + return EINVAL; + } + break; + case 'b': + options.req_size = strtoull( + optarg, nullptr, 10); + break; + case 'q': + options.rps = strtoull( + optarg, nullptr, 10); + break; + case 'Q': + options.queue_depth = strtoull( + optarg, nullptr, 10); + break; + case 'P': + strncpy(options.pattern_spec, optarg, MAX_SPEC_LEN); + break; + case 'I': + strncpy(options.ia_spec, optarg, MAX_SPEC_LEN); + break; + case 't': + options.time = strtoull( + optarg, nullptr, 10); + break; + case 'w': + options.warmup = strtoull( + optarg, nullptr, 10); + break; + case 'o': + strncpy(options.output_file, optarg, MAX_OUTPUT_FILE_LEN); + break; + case 'h': + default: + return EINVAL; + } + + return 0; +} + +/* + * Callback function for io completion. + */ +static void +worker_io_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) +{ + auto vars = (struct worker_thread_cb_vars *)cb_vars; + auto req = (struct io_request *)cb_arg; + + spdk_bdev_free_io(bdev_io); + + uint64_t end_ts = get_cur_ts_nano(); + + if (!success) { + // XXX: print warning for errors for now + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "thread %d : io request failed\n", vars->ctx->tid); + } else { + auto rec = new struct io_record; + rec->start_ts = req->start_ts; + rec->end_ts = end_ts; + vars->ctx->io_records->push_back(rec); + + if (req->op == IOGEN_READ) { + memcpy(req->user_buf, req->dma_buf, options.req_size); + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d : completed io request type %d\n", vars->ctx->tid, req->op); + } + + vars->free_ios->push_back(req); +} + +static void +bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev * bdev UNUSED, + void * event_ctx UNUSED) +{ + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "unsupported bdev event: type %d\n", type); +} + +static void +cb_notify_main_init(void * arg) +{ + auto * ctx = (struct thread_context *)arg; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "cb_notify_main_init: from thread %d to main.\n", ctx->tid); + + auto * vars = (struct main_thread_cb_vars *) cb_vars; + vars->worker_thread_init_cnt++; +} + +static void +cb_notify_main_stop(void * arg) +{ + auto * ctx = (struct thread_context *)arg; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "cb_notify_main_stop: from thread %d to main.\n", ctx->tid); + + auto * vars = (struct main_thread_cb_vars *) cb_vars; + vars->worker_thread_stop_cnt++; +} + +static void +cb_notify_worker_start(void * arg) +{ + auto * ctx = (struct thread_context *)arg; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "cb_notify_worker_start: from main to thread %d.\n", ctx->tid); + + auto * vars = (struct worker_thread_cb_vars *) cb_vars; + vars->worker_start = 1; +} + +static void +cb_notify_worker_stop(void * arg) +{ + auto * ctx = (struct thread_context *)arg; + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "cb_notify_worker_stop: from main to thread %d.\n", ctx->tid); + + auto * vars = (struct worker_thread_cb_vars *) cb_vars; + vars->worker_stop = 1; +} + +static void +main_thread_cb_vars_init(struct main_thread_cb_vars * vars) +{ + vars->worker_thread_init_cnt = 0; + vars->worker_thread_stop_cnt = 0; +} + +static void +worker_thread_cb_vars_init(struct worker_thread_cb_vars * vars, struct thread_context * ctx, + std::list * free_ios) +{ + vars->worker_start = 0; + vars->worker_stop = 0; + vars->ctx = ctx; + vars->free_ios = free_ios; +} + +static void * +worker_thread_main(void * arg) +{ + int rc = 0; + + struct worker_thread_cb_vars vars; + auto *ctx = (struct thread_context *)arg; + struct spdk_io_channel *io_channel = nullptr; + const unsigned long buf_align = spdk_bdev_get_buf_align(ctx->bdev); + std::list free_ios; + + Generator * ia_gen = nullptr; + io_generator * io_gen = nullptr; + + struct io_generator_ctx io_ctx; + uint64_t next_ts; + uint64_t a_offset; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: init...\n", ctx->tid); + // associate current thread with spdk thread + spdk_set_thread(ctx->s_thread); + + // create io request objects + + for (unsigned int i = 0; i < options.queue_depth; i++) { + auto dma_buf = (char *)spdk_dma_zmalloc_socket(options.req_size, buf_align, NULL, ctx->sockid); + auto user_buf = (char *)nm_malloc(ctx->sockid, options.req_size); + + if (dma_buf == nullptr || user_buf == nullptr) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not allocate buffers!\n", ctx->tid); + rc = ENOMEM; + goto cleanup; + } + + auto io_req = new struct io_request; + io_req->dma_buf = dma_buf; + io_req->user_buf = user_buf; + + free_ios.push_back(io_req); + } + + // init thread local states + worker_thread_cb_vars_init(&vars, ctx, &free_ios); + cb_vars = &vars; + + // obtain io channel + io_channel = spdk_bdev_get_io_channel(ctx->bdev_desc); + if (io_channel == nullptr) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not create bdev I/O channel!\n", ctx->tid); + rc = EINVAL; + goto cleanup; + } + + ia_gen = createGenerator(ctx->ia_gen_desc); + if (ia_gen == nullptr) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not allocate ia generator!\n", ctx->tid); + rc = EINVAL; + goto cleanup; + } + ia_gen->set_lambda((double)options.rps / (double)(options.num_threads)); + + io_gen = new io_generator(options.req_size, ctx->start_region_length, ctx->read_pct, ctx->addr_mode); + if (io_gen == nullptr) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not allocate ia generator!\n", ctx->tid); + rc = EINVAL; + goto cleanup; + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: init complete.\n", ctx->tid); + + if ((rc = spdk_thread_send_msg(ctx->main_thread, cb_notify_main_init, ctx)) != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not send message %d\n", ctx->tid, rc); + goto cleanup; + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: waiting for start...\n", ctx->tid); + + while (vars.worker_start != 1) { + spdk_thread_poll(spdk_get_thread(), 0, 0); + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: started...\n", ctx->tid); + + /* random delay 0-100 ms */ + usleep(((rand() * nm_get_uptime_ns()) % 100) * 1000); + + next_ts = get_cur_ts_nano(); + + while (true) { + spdk_thread_poll(spdk_get_thread(), 0, 0); + + if (vars.worker_stop != 0) { + if (free_ios.size() >= options.queue_depth) { + break; + } + } else { + if (!free_ios.empty()) { + auto io_req = free_ios.front(); + + uint64_t cur_ts = get_cur_ts_nano(); + + if (cur_ts >= next_ts) { + io_gen->issue(&io_ctx, io_req->dma_buf); + + a_offset = io_ctx.offset + ctx->start_region_offset; + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: issuing IO type %d at offset 0x%lx size 0x%lx...\n", ctx->tid, io_ctx.op, a_offset, io_ctx.size); + + io_req->start_ts = cur_ts; + io_req->op = io_ctx.op; + + if(io_ctx.op == IOGEN_READ) { + rc = spdk_bdev_read(ctx->bdev_desc, io_channel, io_req->dma_buf, + a_offset, io_ctx.size, worker_io_complete, io_req); + } else { + rc = spdk_bdev_write(ctx->bdev_desc, io_channel, io_req->dma_buf, + a_offset, io_ctx.size, worker_io_complete, io_req); + } + + if (rc != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_WARNING, "thread %d: failed to issue io %d, retrying...", ctx->tid, rc); + } else { + free_ios.pop_front(); + next_ts = next_ts + ia_gen->generate() * S2NS; + } + } + } + } + } + +cleanup: + while (!free_ios.empty()) { + auto req = free_ios.front(); + free_ios.pop_front(); + spdk_dma_free(req->dma_buf); + nm_free(ctx->sockid, req->user_buf); + } + + if (io_channel != nullptr) { + spdk_put_io_channel(io_channel); + } + + if (ia_gen != nullptr) { + delete ia_gen; + } + + if (io_gen != nullptr) { + delete io_gen; + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "thread %d: stopped...\n", ctx->tid); + + if (rc != 0) { + spdk_app_stop(rc); + } + + if ((rc = spdk_thread_send_msg(ctx->main_thread, cb_notify_main_stop, ctx)) != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "thread %d: could not send message %d\n", ctx->tid, rc); + } + + return nullptr; +} + + +static void +parse_pattern(char * pattern, int * read_pct, io_generator_address_mode * addr_mode) +{ + char * token = strtok(pattern, ","); + + if (strcmp(token, "M") == 0) { + *addr_mode = IOGEN_ADDR_MONOTONIC_INCREASING; + } else { + *addr_mode = IOGEN_ADDR_UNIFORM_RANDOM; + } + + token = strtok(nullptr, ","); + *read_pct = strtoull(token, nullptr, 10); +} + +static void +print_all_bdev() +{ + struct spdk_bdev * cur = spdk_bdev_first(); + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: All registered block devices: "); + + while(cur != NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "%s, ", spdk_bdev_get_name(cur)); + cur = spdk_bdev_next(cur); + } +} + +static void +birb_main(void * arg1 UNUSED) +{ + int rc = 0; + struct spdk_bdev * bdev; + struct spdk_bdev_desc * bdev_desc; + std::list worker_threads; + std::ofstream output_file; + uint64_t total_reqs = 0; + io_generator_address_mode addr_mode = IOGEN_ADDR_MONOTONIC_INCREASING; + int read_pct = 0; + + struct main_thread_cb_vars vars; + main_thread_cb_vars_init(&vars); + cb_vars = &vars; + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: successfully started the application\n"); + + dump_options(); + + /* process spec */ + parse_pattern(options.pattern_spec, &read_pct, &addr_mode); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: read percent %d, address mode %d\n", read_pct, addr_mode); + + + /* + * There can be many bdevs configured, but this application will only use + * the one input by the user at runtime. + * + * Open the bdev by calling spdk_bdev_open_ext() with its name. + * The function will return a descriptor + */ + print_all_bdev(); + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: opening block device %s\n", options.bdev_name); + + rc = spdk_bdev_open_ext(options.bdev_name, true, bdev_event_cb, NULL, &bdev_desc); + + if (rc != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to open bdev: %d\n", rc); + spdk_app_stop(rc); + return; + } + + output_file.open(options.output_file, std::ofstream::out); + if (!output_file) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to open output file %s\n", options.output_file); + spdk_app_stop(EINVAL); + return; + } + + /* A bdev pointer is valid while the bdev is opened. */ + bdev = spdk_bdev_desc_get_bdev(bdev_desc); + const uint32_t blk_size = spdk_bdev_get_block_size(bdev); + const unsigned long bdev_capacity = blk_size * spdk_bdev_get_num_blocks(bdev); + const unsigned long per_thread_cap = bdev_capacity / options.num_threads; + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: bdev block size %d bytes, # blocks %lu, per_thread_cap %lu\n", blk_size, spdk_bdev_get_num_blocks(bdev), per_thread_cap); + unsigned long record_cutoff_time = 0; + unsigned long current_s = 0; + + /* create worker threads */ + unsigned int tid = 0; + int cur_core = cmask_get_next_cpu(&options.cpumask); + + struct spdk_cpuset * cpuset = spdk_cpuset_alloc(); + if (cpuset == NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to alloc cpuset\n"); + rc = ENOMEM; + goto end; + } + + while(cur_core != NEXT_CPU_NULL) { + auto * ctx = new struct thread_context; + memset(ctx, 0, sizeof(struct thread_context)); + + if (ctx == NULL) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to alloc thread ctx.\n"); + spdk_app_stop(ENOMEM); + return; + } + + ctx->tid = tid++; + ctx->main_thread = spdk_get_thread(); + snprintf(ctx->thread_name, 32, "birb_wrk_%d", ctx->tid); + ctx->sockid = rte_lcore_to_socket_id(cur_core); + ctx->coreid = cur_core; + ctx->bdev = bdev; + ctx->bdev_desc = bdev_desc; + ctx->io_records = new std::list(); + ctx->start_region_length = per_thread_cap; + ctx->start_region_offset = per_thread_cap * ctx->tid; + ctx->ia_gen_desc = options.ia_spec; + ctx->addr_mode = addr_mode; + ctx->read_pct = read_pct; + + // create spdk thread + spdk_cpuset_zero(cpuset); + spdk_cpuset_set_cpu(cpuset, cur_core, true); + ctx->s_thread = spdk_thread_create(ctx->thread_name, cpuset); + if (ctx->s_thread == nullptr) { + rc = ENOMEM; + goto end; + } + + // create sys thread + pthread_attr_t attr; + cpuset_t scpuset; + CPU_ZERO(&scpuset); + CPU_SET(cur_core, &scpuset); + pthread_attr_init(&attr); + pthread_attr_setaffinity_np(&attr, sizeof(cpuset_t), &scpuset); + rc = pthread_create(&ctx->sys_thread, NULL, worker_thread_main, ctx); + if (rc != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to create sys thread: %d\n", rc); + spdk_app_stop(EINVAL); + return; + } + worker_threads.push_back(ctx); + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: created worker thread %d on core %d socket %d offset 0x%lx length %ld\n", ctx->tid, cur_core, ctx->sockid, + ctx->start_region_offset, + ctx->start_region_length); + + cur_core = cmask_get_next_cpu(&options.cpumask); + } + + spdk_cpuset_free(cpuset); + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "main: waiting for worker thread preinit...\n"); + while(vars.worker_thread_init_cnt < (uint32_t)options.num_threads) { + spdk_thread_poll(spdk_get_thread(), 0, 0); + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "main: starting worker threads...\n"); + for (struct thread_context * tctx : worker_threads) { + rc = spdk_thread_send_msg(tctx->s_thread, cb_notify_worker_start, tctx); + + if (rc != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to send message %d\n", rc); + goto end; + } + } + + while(current_s < options.time) { + if (current_s >= options.warmup && record_cutoff_time == 0) { + record_cutoff_time = get_cur_ts_nano(); + } + usleep(1 * S2US); + current_s++; + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_DEBUG, "main: stopping worker threads...\n"); + for (struct thread_context * tctx : worker_threads) { + rc = spdk_thread_send_msg(tctx->s_thread, cb_notify_worker_stop, tctx); + + if (rc != 0) { + ntr(NTR_DEP_USER1, NTR_LEVEL_ERROR, "main: failed to send message %d\n", rc); + goto end; + } + } + + while(vars.worker_thread_stop_cnt < (uint32_t)options.num_threads) { + spdk_thread_poll(spdk_get_thread(), 0, 0); + } + + // keep stats + for (struct thread_context * tctx : worker_threads) { + for (struct io_record * r : *tctx->io_records) { + if (r->start_ts >= record_cutoff_time) { + output_file << r->end_ts - r->start_ts << std::endl; + total_reqs++; + } + } + } + + ntr(NTR_DEP_USER1, NTR_LEVEL_INFO, "main: total requests: %lu, bytes per second: %lu\n", total_reqs, total_reqs * options.req_size / (options.time - options.warmup)); + +end: + output_file.close(); + + for (struct thread_context * tctx : worker_threads) { + for (struct io_record * r : *tctx->io_records) { + delete r; + } + spdk_thread_destroy(tctx->s_thread); + delete tctx->io_records; + delete tctx; + } + + if (bdev_desc != nullptr) { + spdk_bdev_close(bdev_desc); + } + + exit(0); + spdk_app_stop(rc); + return; +} + +int +main(int argc, char **argv) +{ + struct spdk_app_opts opts = {}; + int rc = 0; + + ntr_init(); + ntr_set_level(NTR_DEP_USER1, NTR_LEVEL_INFO); + + /* Set default values in opts structure. */ + spdk_app_opts_init(&opts, sizeof(opts)); + opts.name = "birb"; + + /* + * Parse built-in SPDK command line parameters as well + * as our custom one(s). + */ + if ((rc = spdk_app_parse_args(argc, argv, &opts, "VD:a:b:q:Q:P:I:t:w:o:", NULL, parse_arg, + usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) { + exit(rc); + } + + nm_init(options.verbosity); + + /* + * spdk_app_start() will initialize the SPDK framework, call hello_start(), + * and then block until spdk_app_stop() is called (or if an initialization + * error occurs, spdk_app_start() will return with rc even without calling + * hello_start(). + */ + rc = spdk_app_start(&opts, birb_main, NULL); + if (rc) { + SPDK_ERRLOG("ERROR starting application\n"); + } + + /* At this point either spdk_app_stop() was called, or spdk_app_start() + * failed because of internal error. + */ + + /* Gracefully close out all of the SPDK subsystems. */ + spdk_app_fini(); + return rc; +} diff --git a/storage/io_gen.cc b/storage/io_gen.cc new file mode 100644 index 0000000..15ccea8 --- /dev/null +++ b/storage/io_gen.cc @@ -0,0 +1,57 @@ +#include +#include + +#include "nm.h" +#include "storage/io_gen.hh" + +io_generator::io_generator( + unsigned long req_size, + unsigned long capacity, + unsigned int read_pct, + io_generator_address_mode addr_mode) : cur_offset(0), + capacity(capacity), + req_size(req_size), + read_pct(read_pct), + addr_mode(addr_mode), + rng(rd()), + dist(std::uniform_int_distribution(0, 99)), + addr_rng(addr_rd()), + addr_dist(std::uniform_int_distribution(0, capacity - 1)) +{ + rng.seed(nm_get_uptime_ns()); + addr_rng.seed(nm_get_uptime_ns()); +} + + +/* returns 0 on success */ +int io_generator::issue(struct io_generator_ctx *ctx, char * buf) +{ + ctx->size = req_size; + + // determine next IO offset + if (addr_mode == IOGEN_ADDR_MONOTONIC_INCREASING) { + if (cur_offset + req_size > capacity) { + cur_offset = 0; + } + + ctx->offset = cur_offset; + cur_offset = cur_offset + req_size; + } else { + ctx->offset = (addr_dist(addr_rng) / req_size) * req_size; + if (ctx->offset + req_size > capacity) { + ctx->offset -= req_size; + } + } + + // determine next IO data + int op_rng = dist(rng); + if (op_rng < (int)read_pct) { + ctx->op = IOGEN_READ; + } else { + ctx->op = IOGEN_WRITE; + int data = dist(rng); + memset(buf, data, req_size); + } + + return 0; +} \ No newline at end of file