From 4236ce9bf5bf8391cd447f6a6af01aa49318b200 Mon Sep 17 00:00:00 2001 From: Liang Ma Date: Tue, 9 Jan 2018 14:18:50 +0000 Subject: [PATCH] event/opdl: add OPDL ring infrastructure library MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library provide the core data structure and core helper function set. The Ring implements a single ring multi-port/stage pipelined packet distribution mechanism. This mechanism has the following characteristics: • No multiple queue cost, therefore, latency is significant reduced. • Fixed dependencies between queue/ports is more suitable for complex. fixed pipelines of stateless packet processing (static pipeline). • Has decentralized distribution (no scheduling core). • Packets remain in order (no reorder core(s)). * Update build system to enable compilation. Signed-off-by: Liang Ma Signed-off-by: Peter Mccarthy Reviewed-by: Seán Harte --- MAINTAINERS | 5 + config/common_base | 5 + drivers/event/Makefile | 1 + drivers/event/opdl/Makefile | 35 + drivers/event/opdl/opdl_log.h | 22 + drivers/event/opdl/opdl_ring.c | 1230 +++++++++++++++++ drivers/event/opdl/opdl_ring.h | 601 ++++++++ .../event/opdl/rte_pmd_evdev_opdl_version.map | 3 + mk/rte.app.mk | 1 + mk/toolchain/gcc/rte.toolchain-compat.mk | 6 + mk/toolchain/icc/rte.toolchain-compat.mk | 6 + 11 files changed, 1915 insertions(+) create mode 100644 drivers/event/opdl/Makefile create mode 100644 drivers/event/opdl/opdl_log.h create mode 100644 drivers/event/opdl/opdl_ring.c create mode 100644 drivers/event/opdl/opdl_ring.h create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map diff --git a/MAINTAINERS b/MAINTAINERS index e5b6e030cc..289e8ef0b2 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -723,6 +723,11 @@ F: doc/guides/eventdevs/sw.rst F: examples/eventdev_pipeline_sw_pmd/ F: doc/guides/sample_app_ug/eventdev_pipeline_sw_pmd.rst +Software OPDL Eventdev PMD +M: Liang Ma +M: Peter Mccarthy +F: drivers/event/opdl/ + Packet processing ----------------- diff --git a/config/common_base b/config/common_base index 7afd2c3af5..8d1715a551 100644 --- a/config/common_base +++ b/config/common_base @@ -589,6 +589,11 @@ CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV_DEBUG=n # CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF=y +# +# Compile PMD for OPDL event device +# +CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=y + # # Compile librte_ring # diff --git a/drivers/event/Makefile b/drivers/event/Makefile index 1402d6b1b2..7f68440a6d 100644 --- a/drivers/event/Makefile +++ b/drivers/event/Makefile @@ -8,5 +8,6 @@ DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) += dpaa2 +DIRS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl include $(RTE_SDK)/mk/rte.subdir.mk diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile new file mode 100644 index 0000000000..0c975b4be5 --- /dev/null +++ b/drivers/event/opdl/Makefile @@ -0,0 +1,35 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2010-2014 Intel Corporation + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = librte_pmd_opdl_event.a + +# build flags +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) +# for older GCC versions, allow us to initialize an event using +# designated initializers. +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) +ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1) +CFLAGS += -Wno-missing-field-initializers +endif +endif + +LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs +LDLIBS += -lrte_bus_vdev -lrte_mbuf -lrte_mempool + +# library version +LIBABIVER := 1 + +# versioning export map +EXPORT_MAP := rte_pmd_evdev_opdl_version.map + +# library source files +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c + +# export include files +SYMLINK-y-include += + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/drivers/event/opdl/opdl_log.h b/drivers/event/opdl/opdl_log.h new file mode 100644 index 0000000000..0e605aa2d1 --- /dev/null +++ b/drivers/event/opdl/opdl_log.h @@ -0,0 +1,22 @@ +/*- + * SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + */ + +#ifndef _OPDL_LOGS_H_ +#define _OPDL_LOGS_H_ + +#include + +extern int opdl_logtype_driver; + +#define PMD_DRV_LOG_RAW(level, fmt, args...) \ + rte_log(RTE_LOG_ ## level, opdl_logtype_driver, "%s(): " fmt, \ + __func__, ## args) + +#define PMD_DRV_LOG(level, fmt, args...) \ + PMD_DRV_LOG_RAW(level, fmt "\n", ## args) + + + +#endif /* _OPDL_LOGS_H_ */ diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c new file mode 100644 index 0000000000..7e16d4df1b --- /dev/null +++ b/drivers/event/opdl/opdl_ring.c @@ -0,0 +1,1230 @@ +/*- + * SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "opdl_ring.h" +#include "opdl_log.h" + +#define LIB_NAME "opdl_ring" + +#define OPDL_NAME_SIZE 64 + + +#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) + +int opdl_logtype_driver; + +/* Types of dependency between stages */ +enum dep_type { + DEP_NONE = 0, /* no dependency */ + DEP_DIRECT, /* stage has direct dependency */ + DEP_INDIRECT, /* in-direct dependency through other stage(s) */ + DEP_SELF, /* stage dependency on itself, used to detect loops */ +}; + +/* Shared section of stage state. + * Care is needed when accessing and the layout is important, especially to + * limit the adjacent cache-line HW prefetcher from impacting performance. + */ +struct shared_state { + /* Last known minimum sequence number of dependencies, used for multi + * thread operation + */ + uint32_t available_seq; + char _pad1[RTE_CACHE_LINE_SIZE * 3]; + uint32_t head; /* Head sequence number (for multi thread operation) */ + char _pad2[RTE_CACHE_LINE_SIZE * 3]; + struct opdl_stage *stage; /* back pointer */ + uint32_t tail; /* Tail sequence number */ + char _pad3[RTE_CACHE_LINE_SIZE * 2]; +} __rte_cache_aligned; + +/* A structure to keep track of "unfinished" claims. This is only used for + * stages that are threadsafe. Each lcore accesses its own instance of this + * structure to record the entries it has claimed. This allows one lcore to make + * multiple claims without being blocked by another. When disclaiming it moves + * forward the shared tail when the shared tail matches the tail value recorded + * here. + */ +struct claim_manager { + uint32_t num_to_disclaim; + uint32_t num_claimed; + uint32_t mgr_head; + uint32_t mgr_tail; + struct { + uint32_t head; + uint32_t tail; + } claims[OPDL_DISCLAIMS_PER_LCORE]; +} __rte_cache_aligned; + +/* Context for each stage of opdl_ring. + * Calculations on sequence numbers need to be done with other uint32_t values + * so that results are modulus 2^32, and not undefined. + */ +struct opdl_stage { + struct opdl_ring *t; /* back pointer, set at init */ + uint32_t num_slots; /* Number of slots for entries, set at init */ + uint32_t index; /* ID for this stage, set at init */ + bool threadsafe; /* Set to 1 if this stage supports threadsafe use */ + /* Last known min seq number of dependencies for used for single thread + * operation + */ + uint32_t available_seq; + uint32_t head; /* Current head for single-thread operation */ + uint32_t shadow_head; /* Shadow head for single-thread operation */ + uint32_t nb_instance; /* Number of instances */ + uint32_t instance_id; /* ID of this stage instance */ + uint16_t num_claimed; /* Number of slots claimed */ + uint16_t num_event; /* Number of events */ + uint32_t seq; /* sequence number */ + uint32_t num_deps; /* Number of direct dependencies */ + /* Keep track of all dependencies, used during init only */ + enum dep_type *dep_tracking; + /* Direct dependencies of this stage */ + struct shared_state **deps; + /* Other stages read this! */ + struct shared_state shared __rte_cache_aligned; + /* For managing disclaims in multi-threaded processing stages */ + struct claim_manager pending_disclaims[RTE_MAX_LCORE] + __rte_cache_aligned; +} __rte_cache_aligned; + +/* Context for opdl_ring */ +struct opdl_ring { + char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */ + int socket; /* NUMA socket that memory is allocated on */ + uint32_t num_slots; /* Number of slots for entries */ + uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */ + uint32_t slot_size; /* Size of each slot in bytes */ + uint32_t num_stages; /* Number of stages that have been added */ + uint32_t max_num_stages; /* Max number of stages */ + /* Stages indexed by ID */ + struct opdl_stage *stages; + /* Memory for storing slot data */ + uint8_t slots[0] __rte_cache_aligned; +}; + + +/* Return input stage of a opdl_ring */ +static __rte_always_inline struct opdl_stage * +input_stage(const struct opdl_ring *t) +{ + return &t->stages[0]; +} + +/* Check if a stage is the input stage */ +static __rte_always_inline bool +is_input_stage(const struct opdl_stage *s) +{ + return s->index == 0; +} + +/* Get slot pointer from sequence number */ +static __rte_always_inline void * +get_slot(const struct opdl_ring *t, uint32_t n) +{ + return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size]; +} + +/* Find how many entries are available for processing */ +static __rte_always_inline uint32_t +available(const struct opdl_stage *s) +{ + if (s->threadsafe == true) { + uint32_t n = __atomic_load_n(&s->shared.available_seq, + __ATOMIC_ACQUIRE) - + __atomic_load_n(&s->shared.head, + __ATOMIC_ACQUIRE); + + /* Return 0 if available_seq needs to be updated */ + return (n <= s->num_slots) ? n : 0; + } + + /* Single threaded */ + return s->available_seq - s->head; +} + +/* Read sequence number of dependencies and find minimum */ +static __rte_always_inline void +update_available_seq(struct opdl_stage *s) +{ + uint32_t i; + uint32_t this_tail = s->shared.tail; + uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE); + /* Input stage sequence numbers are greater than the sequence numbers of + * its dependencies so an offset of t->num_slots is needed when + * calculating available slots and also the condition which is used to + * determine the dependencies minimum sequence number must be reverted. + */ + uint32_t wrap; + + if (is_input_stage(s)) { + wrap = s->num_slots; + for (i = 1; i < s->num_deps; i++) { + uint32_t seq = __atomic_load_n(&s->deps[i]->tail, + __ATOMIC_ACQUIRE); + if ((this_tail - seq) > (this_tail - min_seq)) + min_seq = seq; + } + } else { + wrap = 0; + for (i = 1; i < s->num_deps; i++) { + uint32_t seq = __atomic_load_n(&s->deps[i]->tail, + __ATOMIC_ACQUIRE); + if ((seq - this_tail) < (min_seq - this_tail)) + min_seq = seq; + } + } + + if (s->threadsafe == false) + s->available_seq = min_seq + wrap; + else + __atomic_store_n(&s->shared.available_seq, min_seq + wrap, + __ATOMIC_RELEASE); +} + +/* Wait until the number of available slots reaches number requested */ +static __rte_always_inline void +wait_for_available(struct opdl_stage *s, uint32_t n) +{ + while (available(s) < n) { + rte_pause(); + update_available_seq(s); + } +} + +/* Return number of slots to process based on number requested and mode */ +static __rte_always_inline uint32_t +num_to_process(struct opdl_stage *s, uint32_t n, bool block) +{ + /* Don't read tail sequences of dependencies if not needed */ + if (available(s) >= n) + return n; + + update_available_seq(s); + + if (block == false) { + uint32_t avail = available(s); + + if (avail == 0) { + rte_pause(); + return 0; + } + return (avail <= n) ? avail : n; + } + + if (unlikely(n > s->num_slots)) { + PMD_DRV_LOG(ERR, "%u entries is more than max (%u)", + n, s->num_slots); + return 0; /* Avoid infinite loop */ + } + /* blocking */ + wait_for_available(s, n); + return n; +} + +/* Copy entries in to slots with wrap-around */ +static __rte_always_inline void +copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries, + uint32_t num_entries) +{ + uint32_t slot_size = t->slot_size; + uint32_t slot_index = start & t->mask; + + if (slot_index + num_entries <= t->num_slots) { + rte_memcpy(get_slot(t, start), entries, + num_entries * slot_size); + } else { + uint32_t split = t->num_slots - slot_index; + + rte_memcpy(get_slot(t, start), entries, split * slot_size); + rte_memcpy(get_slot(t, 0), + RTE_PTR_ADD(entries, split * slot_size), + (num_entries - split) * slot_size); + } +} + +/* Copy entries out from slots with wrap-around */ +static __rte_always_inline void +copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries, + uint32_t num_entries) +{ + uint32_t slot_size = t->slot_size; + uint32_t slot_index = start & t->mask; + + if (slot_index + num_entries <= t->num_slots) { + rte_memcpy(entries, get_slot(t, start), + num_entries * slot_size); + } else { + uint32_t split = t->num_slots - slot_index; + + rte_memcpy(entries, get_slot(t, start), split * slot_size); + rte_memcpy(RTE_PTR_ADD(entries, split * slot_size), + get_slot(t, 0), + (num_entries - split) * slot_size); + } +} + +/* Input function optimised for single thread */ +static __rte_always_inline uint32_t +opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries, + uint32_t num_entries, bool block) +{ + struct opdl_stage *s = input_stage(t); + uint32_t head = s->head; + + num_entries = num_to_process(s, num_entries, block); + if (num_entries == 0) + return 0; + + copy_entries_in(t, head, entries, num_entries); + + s->head += num_entries; + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); + + return num_entries; +} + +/* Convert head and tail of claim_manager into valid index */ +static __rte_always_inline uint32_t +claim_mgr_index(uint32_t n) +{ + return n & (OPDL_DISCLAIMS_PER_LCORE - 1); +} + +/* Check if there are available slots in claim_manager */ +static __rte_always_inline bool +claim_mgr_available(struct claim_manager *mgr) +{ + return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ? + true : false; +} + +/* Record a new claim. Only use after first checking an entry is available */ +static __rte_always_inline void +claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head) +{ + if ((mgr->mgr_head != mgr->mgr_tail) && + (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head == + tail)) { + /* Combine with previous claim */ + mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head; + } else { + mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head; + mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail; + mgr->mgr_head++; + } + + mgr->num_claimed += (head - tail); +} + +/* Read the oldest recorded claim */ +static __rte_always_inline bool +claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head) +{ + if (mgr->mgr_head == mgr->mgr_tail) + return false; + + *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head; + *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail; + return true; +} + +/* Remove the oldest recorded claim. Only use after first reading the entry */ +static __rte_always_inline void +claim_mgr_remove(struct claim_manager *mgr) +{ + mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head - + mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail); + mgr->mgr_tail++; +} + +/* Update tail in the oldest claim. Only use after first reading the entry */ +static __rte_always_inline void +claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries) +{ + mgr->num_claimed -= num_entries; + mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries; +} + +static __rte_always_inline void +opdl_stage_disclaim_multithread_n(struct opdl_stage *s, + uint32_t num_entries, bool block) +{ + struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()]; + uint32_t head; + uint32_t tail; + + while (num_entries) { + bool ret = claim_mgr_read(disclaims, &tail, &head); + + if (ret == false) + break; /* nothing is claimed */ + /* There should be no race condition here. If shared.tail + * matches, no other core can update it until this one does. + */ + if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) == + tail) { + if (num_entries >= (head - tail)) { + claim_mgr_remove(disclaims); + __atomic_store_n(&s->shared.tail, head, + __ATOMIC_RELEASE); + num_entries -= (head - tail); + } else { + claim_mgr_move_tail(disclaims, num_entries); + __atomic_store_n(&s->shared.tail, + num_entries + tail, + __ATOMIC_RELEASE); + num_entries = 0; + } + } else if (block == false) + break; /* blocked by other thread */ + /* Keep going until num_entries are disclaimed. */ + rte_pause(); + } + + disclaims->num_to_disclaim = num_entries; +} + +/* Move head atomically, returning number of entries available to process and + * the original value of head. For non-input stages, the claim is recorded + * so that the tail can be updated later by opdl_stage_disclaim(). + */ +static __rte_always_inline void +move_head_atomically(struct opdl_stage *s, uint32_t *num_entries, + uint32_t *old_head, bool block, bool claim_func) +{ + uint32_t orig_num_entries = *num_entries; + uint32_t ret; + struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()]; + + /* Attempt to disclaim any outstanding claims */ + opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim, + false); + + *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE); + while (true) { + bool success; + /* If called by opdl_ring_input(), claim does not need to be + * recorded, as there will be no disclaim. + */ + if (claim_func) { + /* Check that the claim can be recorded */ + ret = claim_mgr_available(disclaims); + if (ret == false) { + /* exit out if claim can't be recorded */ + *num_entries = 0; + return; + } + } + + *num_entries = num_to_process(s, orig_num_entries, block); + if (*num_entries == 0) + return; + + success = __atomic_compare_exchange_n(&s->shared.head, old_head, + *old_head + *num_entries, + true, /* may fail spuriously */ + __ATOMIC_RELEASE, /* memory order on success */ + __ATOMIC_ACQUIRE); /* memory order on fail */ + if (likely(success)) + break; + rte_pause(); + } + + if (claim_func) + /* Store the claim record */ + claim_mgr_add(disclaims, *old_head, *old_head + *num_entries); +} + +/* Input function that supports multiple threads */ +static __rte_always_inline uint32_t +opdl_ring_input_multithread(struct opdl_ring *t, const void *entries, + uint32_t num_entries, bool block) +{ + struct opdl_stage *s = input_stage(t); + uint32_t old_head; + + move_head_atomically(s, &num_entries, &old_head, block, false); + if (num_entries == 0) + return 0; + + copy_entries_in(t, old_head, entries, num_entries); + + /* If another thread started inputting before this one, but hasn't + * finished, we need to wait for it to complete to update the tail. + */ + while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) != + old_head)) + rte_pause(); + + __atomic_store_n(&s->shared.tail, old_head + num_entries, + __ATOMIC_RELEASE); + + return num_entries; +} + +static __rte_always_inline uint32_t +opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores, + uint8_t this_lcore) +{ + return ((nb_p_lcores <= 1) ? 0 : + (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) % + nb_p_lcores); +} + +/* Claim slots to process, optimised for single-thread operation */ +static __rte_always_inline uint32_t +opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block, bool atomic) +{ + uint32_t i = 0, j = 0, offset; + void *get_slots; + struct rte_event *ev; + RTE_SET_USED(seq); + struct opdl_ring *t = s->t; + uint8_t *entries_offset = (uint8_t *)entries; + + if (!atomic) { + + offset = opdl_first_entry_id(s->seq, s->nb_instance, + s->instance_id); + + num_entries = s->nb_instance * num_entries; + + num_entries = num_to_process(s, num_entries, block); + + for (; offset < num_entries; offset += s->nb_instance) { + get_slots = get_slot(t, s->head + offset); + memcpy(entries_offset, get_slots, t->slot_size); + entries_offset += t->slot_size; + i++; + } + } else { + num_entries = num_to_process(s, num_entries, block); + + for (j = 0; j < num_entries; j++) { + ev = (struct rte_event *)get_slot(t, s->head+j); + if ((ev->flow_id%s->nb_instance) == s->instance_id) { + memcpy(entries_offset, ev, t->slot_size); + entries_offset += t->slot_size; + i++; + } + } + } + s->shadow_head = s->head; + s->head += num_entries; + s->num_claimed = num_entries; + s->num_event = i; + + /* automatically disclaim entries if number of rte_events is zero */ + if (unlikely(i == 0)) + opdl_stage_disclaim(s, 0, false); + + return i; +} + +/* Thread-safe version of function to claim slots for processing */ +static __rte_always_inline uint32_t +opdl_stage_claim_multithread(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block) +{ + uint32_t old_head; + struct opdl_ring *t = s->t; + uint32_t i = 0, offset; + uint8_t *entries_offset = (uint8_t *)entries; + + offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id); + num_entries = offset + (s->nb_instance * num_entries); + + move_head_atomically(s, &num_entries, &old_head, block, true); + + for (; offset < num_entries; offset += s->nb_instance) { + memcpy(entries_offset, get_slot(t, s->head + offset), + t->slot_size); + entries_offset += t->slot_size; + i++; + } + if (seq != NULL) + *seq = old_head; + + return i; +} + +/* Claim and copy slot pointers, optimised for single-thread operation */ +static __rte_always_inline uint32_t +opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block) +{ + num_entries = num_to_process(s, num_entries, block); + if (num_entries == 0) + return 0; + copy_entries_out(s->t, s->head, entries, num_entries); + if (seq != NULL) + *seq = s->head; + s->head += num_entries; + return num_entries; +} + +/* Thread-safe version of function to claim and copy pointers to slots */ +static __rte_always_inline uint32_t +opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block) +{ + uint32_t old_head; + + move_head_atomically(s, &num_entries, &old_head, block, true); + if (num_entries == 0) + return 0; + copy_entries_out(s->t, old_head, entries, num_entries); + if (seq != NULL) + *seq = old_head; + return num_entries; +} + +static __rte_always_inline void +opdl_stage_disclaim_singlethread_n(struct opdl_stage *s, + uint32_t num_entries) +{ + uint32_t old_tail = s->shared.tail; + + if (unlikely(num_entries > (s->head - old_tail))) { + PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)", + num_entries, s->head - old_tail); + num_entries = s->head - old_tail; + } + __atomic_store_n(&s->shared.tail, num_entries + old_tail, + __ATOMIC_RELEASE); +} + +uint32_t +opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries, + bool block) +{ + if (input_stage(t)->threadsafe == false) + return opdl_ring_input_singlethread(t, entries, num_entries, + block); + else + return opdl_ring_input_multithread(t, entries, num_entries, + block); +} + +uint32_t +opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s, + const void *entries, uint32_t num_entries, bool block) +{ + uint32_t head = s->head; + + num_entries = num_to_process(s, num_entries, block); + + if (num_entries == 0) + return 0; + + copy_entries_in(t, head, entries, num_entries); + + s->head += num_entries; + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); + + return num_entries; + +} + +uint32_t +opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s, + void *entries, uint32_t num_entries, bool block) +{ + uint32_t head = s->head; + + num_entries = num_to_process(s, num_entries, block); + if (num_entries == 0) + return 0; + + copy_entries_out(t, head, entries, num_entries); + + s->head += num_entries; + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); + + return num_entries; +} + +uint32_t +opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries) +{ + /* return (num_to_process(s, num_entries, false)); */ + + if (available(s) >= num_entries) + return num_entries; + + update_available_seq(s); + + uint32_t avail = available(s); + + if (avail == 0) { + rte_pause(); + return 0; + } + return (avail <= num_entries) ? avail : num_entries; +} + +uint32_t +opdl_stage_claim(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block, bool atomic) +{ + if (s->threadsafe == false) + return opdl_stage_claim_singlethread(s, entries, num_entries, + seq, block, atomic); + else + return opdl_stage_claim_multithread(s, entries, num_entries, + seq, block); +} + +uint32_t +opdl_stage_claim_copy(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block) +{ + if (s->threadsafe == false) + return opdl_stage_claim_copy_singlethread(s, entries, + num_entries, seq, block); + else + return opdl_stage_claim_copy_multithread(s, entries, + num_entries, seq, block); +} + +void +opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries, + bool block) +{ + + if (s->threadsafe == false) { + opdl_stage_disclaim_singlethread_n(s, s->num_claimed); + } else { + struct claim_manager *disclaims = + &s->pending_disclaims[rte_lcore_id()]; + + if (unlikely(num_entries > s->num_slots)) { + PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)", + num_entries, disclaims->num_claimed); + num_entries = disclaims->num_claimed; + } + + num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim, + disclaims->num_claimed); + opdl_stage_disclaim_multithread_n(s, num_entries, block); + } +} + +int +opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block) +{ + if (num_entries != s->num_event) { + rte_errno = -EINVAL; + return 0; + } + if (s->threadsafe == false) { + __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE); + s->seq += s->num_claimed; + s->shadow_head = s->head; + s->num_claimed = 0; + } else { + struct claim_manager *disclaims = + &s->pending_disclaims[rte_lcore_id()]; + opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed, + block); + } + return num_entries; +} + +uint32_t +opdl_ring_available(struct opdl_ring *t) +{ + return opdl_stage_available(&t->stages[0]); +} + +uint32_t +opdl_stage_available(struct opdl_stage *s) +{ + update_available_seq(s); + return available(s); +} + +void +opdl_ring_flush(struct opdl_ring *t) +{ + struct opdl_stage *s = input_stage(t); + + wait_for_available(s, s->num_slots); +} + +/******************** Non performance sensitive functions ********************/ + +/* Initial setup of a new stage's context */ +static int +init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe, + bool is_input) +{ + uint32_t available = (is_input) ? t->num_slots : 0; + + s->t = t; + s->num_slots = t->num_slots; + s->index = t->num_stages; + s->threadsafe = threadsafe; + s->shared.stage = s; + + /* Alloc memory for deps */ + s->dep_tracking = rte_zmalloc_socket(LIB_NAME, + t->max_num_stages * sizeof(enum dep_type), + 0, t->socket); + if (s->dep_tracking == NULL) + return -ENOMEM; + + s->deps = rte_zmalloc_socket(LIB_NAME, + t->max_num_stages * sizeof(struct shared_state *), + 0, t->socket); + if (s->deps == NULL) { + rte_free(s->dep_tracking); + return -ENOMEM; + } + + s->dep_tracking[s->index] = DEP_SELF; + + if (threadsafe == true) + s->shared.available_seq = available; + else + s->available_seq = available; + + return 0; +} + +/* Add direct or indirect dependencies between stages */ +static int +add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency, + enum dep_type type) +{ + struct opdl_ring *t = dependent->t; + uint32_t i; + + /* Add new direct dependency */ + if ((type == DEP_DIRECT) && + (dependent->dep_tracking[dependency->index] == + DEP_NONE)) { + PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u", + t->name, dependent->index, dependency->index); + dependent->dep_tracking[dependency->index] = DEP_DIRECT; + } + + /* Add new indirect dependency or change direct to indirect */ + if ((type == DEP_INDIRECT) && + ((dependent->dep_tracking[dependency->index] == + DEP_NONE) || + (dependent->dep_tracking[dependency->index] == + DEP_DIRECT))) { + PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u", + t->name, dependent->index, dependency->index); + dependent->dep_tracking[dependency->index] = DEP_INDIRECT; + } + + /* Shouldn't happen... */ + if ((dependent->dep_tracking[dependency->index] == DEP_SELF) && + (dependent != input_stage(t))) { + PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u", + t->name, dependent->index); + return -EINVAL; + } + + /* Keep going to dependencies of the dependency, until input stage */ + if (dependency != input_stage(t)) + for (i = 0; i < dependency->num_deps; i++) { + int ret = add_dep(dependent, dependency->deps[i]->stage, + DEP_INDIRECT); + + if (ret < 0) + return ret; + } + + /* Make list of sequence numbers for direct dependencies only */ + if (type == DEP_DIRECT) + for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++) + if (dependent->dep_tracking[i] == DEP_DIRECT) { + if ((i == 0) && (dependent->num_deps > 1)) + rte_panic("%s:%u depends on > input", + t->name, + dependent->index); + dependent->deps[dependent->num_deps++] = + &t->stages[i].shared; + } + + return 0; +} + +struct opdl_ring * +opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size, + uint32_t max_num_stages, int socket) +{ + struct opdl_ring *t; + char mz_name[RTE_MEMZONE_NAMESIZE]; + int mz_flags = 0; + struct opdl_stage *st = NULL; + const struct rte_memzone *mz = NULL; + size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) + + (num_slots * slot_size)); + + /* Compile time checking */ + RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) != + 0); + RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE)); + + /* Parameter checking */ + if (name == NULL) { + PMD_DRV_LOG(ERR, "name param is NULL"); + return NULL; + } + if (!rte_is_power_of_2(num_slots)) { + PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2", + num_slots, name); + return NULL; + } + + /* Alloc memory for stages */ + st = rte_zmalloc_socket(LIB_NAME, + max_num_stages * sizeof(struct opdl_stage), + RTE_CACHE_LINE_SIZE, socket); + if (st == NULL) + goto exit_fail; + + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); + + /* Alloc memory for memzone */ + mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags); + if (mz == NULL) + goto exit_fail; + + t = mz->addr; + + /* Initialise opdl_ring queue */ + memset(t, 0, sizeof(*t)); + snprintf(t->name, sizeof(t->name), "%s", name); + t->socket = socket; + t->num_slots = num_slots; + t->mask = num_slots - 1; + t->slot_size = slot_size; + t->max_num_stages = max_num_stages; + t->stages = st; + + PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)", + t->name, t, num_slots, socket, slot_size); + + return t; + +exit_fail: + PMD_DRV_LOG(ERR, "Cannot reserve memory"); + rte_free(st); + rte_memzone_free(mz); + + return NULL; +} + +void * +opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) +{ + return get_slot(t, index); +} + +bool +opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, + uint32_t index, bool atomic) +{ + uint32_t i = 0, j = 0, offset; + struct opdl_ring *t = s->t; + struct rte_event *ev_orig = NULL; + bool ev_updated = false; + uint64_t ev_temp = 0; + + if (index > s->num_event) { + PMD_DRV_LOG(ERR, "index is overflow"); + return ev_updated; + } + + ev_temp = ev->event&OPDL_EVENT_MASK; + + if (!atomic) { + offset = opdl_first_entry_id(s->seq, s->nb_instance, + s->instance_id); + offset += index*s->nb_instance; + ev_orig = get_slot(t, s->shadow_head+offset); + if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) { + ev_orig->event = ev->event; + ev_updated = true; + } + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + } else { + for (i = 0; i < s->num_claimed; i++) { + ev_orig = (struct rte_event *) + get_slot(t, s->shadow_head+i); + + if ((ev_orig->flow_id%s->nb_instance) == + s->instance_id) { + + if (j == index) { + if ((ev_orig->event&OPDL_EVENT_MASK) != + ev_temp) { + ev_orig->event = ev->event; + ev_updated = true; + } + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + break; + } + j++; + } + } + + } + + return ev_updated; +} + +int +opdl_ring_get_socket(const struct opdl_ring *t) +{ + return t->socket; +} + +uint32_t +opdl_ring_get_num_slots(const struct opdl_ring *t) +{ + return t->num_slots; +} + +const char * +opdl_ring_get_name(const struct opdl_ring *t) +{ + return t->name; +} + +/* Check dependency list is valid for a given opdl_ring */ +static int +check_deps(struct opdl_ring *t, struct opdl_stage *deps[], + uint32_t num_deps) +{ + unsigned int i; + + for (i = 0; i < num_deps; ++i) { + if (!deps[i]) { + PMD_DRV_LOG(ERR, "deps[%u] is NULL", i); + return -EINVAL; + } + if (t != deps[i]->t) { + PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s", + i, deps[i]->t->name, t->name); + return -EINVAL; + } + } + if (num_deps > t->num_stages) { + PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", + num_deps, t->num_stages); + return -EINVAL; + } + return 0; +} + +struct opdl_stage * +opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input) +{ + struct opdl_stage *s; + + /* Parameter checking */ + if (!t) { + PMD_DRV_LOG(ERR, "opdl_ring is NULL"); + return NULL; + } + if (t->num_stages == t->max_num_stages) { + PMD_DRV_LOG(ERR, "%s has max number of stages (%u)", + t->name, t->max_num_stages); + return NULL; + } + + s = &t->stages[t->num_stages]; + + if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0) + PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned", + &s->shared, t->name); + + if (init_stage(t, s, threadsafe, is_input) < 0) { + PMD_DRV_LOG(ERR, "Cannot reserve memory"); + return NULL; + } + t->num_stages++; + + return s; +} + +uint32_t +opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s, + uint32_t nb_instance, uint32_t instance_id, + struct opdl_stage *deps[], + uint32_t num_deps) +{ + uint32_t i; + int ret = 0; + + if ((num_deps > 0) && (!deps)) { + PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name); + return -1; + } + ret = check_deps(t, deps, num_deps); + if (ret < 0) + return ret; + + for (i = 0; i < num_deps; i++) { + ret = add_dep(s, deps[i], DEP_DIRECT); + if (ret < 0) + return ret; + } + + s->nb_instance = nb_instance; + s->instance_id = instance_id; + + return ret; +} + +struct opdl_stage * +opdl_ring_get_input_stage(const struct opdl_ring *t) +{ + return input_stage(t); +} + +int +opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[], + uint32_t num_deps) +{ + unsigned int i; + int ret; + + if ((num_deps == 0) || (!deps)) { + PMD_DRV_LOG(ERR, "cannot set NULL dependencies"); + return -EINVAL; + } + + ret = check_deps(s->t, deps, num_deps); + if (ret < 0) + return ret; + + /* Update deps */ + for (i = 0; i < num_deps; i++) + s->deps[i] = &deps[i]->shared; + s->num_deps = num_deps; + + return 0; +} + +struct opdl_ring * +opdl_stage_get_opdl_ring(const struct opdl_stage *s) +{ + return s->t; +} + +void +opdl_ring_dump(const struct opdl_ring *t, FILE *f) +{ + uint32_t i; + + if (t == NULL) { + fprintf(f, "NULL OPDL!\n"); + return; + } + fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n", + t->name, t->num_slots, t->mask, t->slot_size, + t->num_stages, t->socket); + for (i = 0; i < t->num_stages; i++) { + uint32_t j; + const struct opdl_stage *s = &t->stages[i]; + + fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u", + t->name, i, (s->threadsafe) ? "true" : "false", + (s->threadsafe) ? s->shared.head : s->head, + (s->threadsafe) ? s->shared.available_seq : + s->available_seq, + s->shared.tail, (s->num_deps > 0) ? + s->deps[0]->stage->index : 0); + for (j = 1; j < s->num_deps; j++) + fprintf(f, ",%u", s->deps[j]->stage->index); + fprintf(f, "\n"); + } + fflush(f); +} + +void +opdl_ring_free(struct opdl_ring *t) +{ + uint32_t i; + const struct rte_memzone *mz; + char mz_name[RTE_MEMZONE_NAMESIZE]; + + if (t == NULL) { + PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!"); + return; + } + + PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t); + + for (i = 0; i < t->num_stages; ++i) { + rte_free(t->stages[i].deps); + rte_free(t->stages[i].dep_tracking); + } + + rte_free(t->stages); + + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name); + mz = rte_memzone_lookup(mz_name); + if (rte_memzone_free(mz) != 0) + PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name); +} + +/* search a opdl_ring from its name */ +struct opdl_ring * +opdl_ring_lookup(const char *name) +{ + const struct rte_memzone *mz; + char mz_name[RTE_MEMZONE_NAMESIZE]; + + snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); + + mz = rte_memzone_lookup(mz_name); + if (mz == NULL) + return NULL; + + return mz->addr; +} + +void +opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe) +{ + s->threadsafe = threadsafe; +} diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h new file mode 100644 index 0000000000..71ef9b6bd3 --- /dev/null +++ b/drivers/event/opdl/opdl_ring.h @@ -0,0 +1,601 @@ +/*- + * SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + */ + +#ifndef _OPDL_H_ +#define _OPDL_H_ + +/** + * @file + * The "opdl_ring" is a data structure that contains a fixed number of slots, + * with each slot having the same, but configurable, size. Entries are input + * into the opdl_ring by copying into available slots. Once in the opdl_ring, + * an entry is processed by a number of stages, with the ordering of stage + * processing controlled by making stages dependent on one or more other stages. + * An entry is not available for a stage to process until it has been processed + * by that stages dependencies. Entries are always made available for + * processing in the same order that they were input in to the opdl_ring. + * Inputting is considered as a stage that depends on all other stages, + * and is also a dependency of all stages. + * + * Inputting and processing in a stage can support multi-threading. Note that + * multi-thread processing can also be done by making stages co-operate e.g. two + * stages where one processes the even packets and the other processes odd + * packets. + * + * A opdl_ring can be used as the basis for pipeline based applications. Instead + * of each stage in a pipeline dequeueing from a ring, processing and enqueueing + * to another ring, it can process entries in-place on the ring. If stages do + * not depend on each other, they can run in parallel. + * + * The opdl_ring works with entries of configurable size, these could be + * pointers to mbufs, pointers to mbufs with application specific meta-data, + * tasks etc. + */ + +#include +#include +#include + +#include +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef OPDL_DISCLAIMS_PER_LCORE +/** Multi-threaded processing allows one thread to process multiple batches in a + * stage, while another thread is processing a single large batch. This number + * controls how many non-contiguous batches one stage can process before being + * blocked by the other stage. + */ +#define OPDL_DISCLAIMS_PER_LCORE 8 +#endif + +/** Opaque handle to a opdl_ring instance */ +struct opdl_ring; + +/** Opaque handle to a single stage in a opdl_ring */ +struct opdl_stage; + +/** + * Create a new instance of a opdl_ring. + * + * @param name + * String containing the name to give the new opdl_ring instance. + * @param num_slots + * How many slots the opdl_ring contains. Must be a power a 2! + * @param slot_size + * How many bytes in each slot. + * @param max_num_stages + * Maximum number of stages. + * @param socket + * The NUMA socket (or SOCKET_ID_ANY) to allocate the memory used for this + * opdl_ring instance. + * @param threadsafe + * Whether to support multiple threads inputting to the opdl_ring or not. + * Enabling this may have a negative impact on performance if only one thread + * will be inputting. + * + * @return + * A pointer to a new opdl_ring instance, or NULL on error. + */ +struct opdl_ring * +opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size, + uint32_t max_num_stages, int socket); + +/** + * Get pointer to individual slot in a opdl_ring. + * + * @param t + * The opdl_ring. + * @param index + * Index of slot. If greater than the number of slots it will be masked to be + * within correct range. + * + * @return + * A pointer to that slot. + */ +void * +opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index); + +/** + * Get NUMA socket used by a opdl_ring. + * + * @param t + * The opdl_ring. + * + * @return + * NUMA socket. + */ +int +opdl_ring_get_socket(const struct opdl_ring *t); + +/** + * Get number of slots in a opdl_ring. + * + * @param t + * The opdl_ring. + * + * @return + * Number of slots. + */ +uint32_t +opdl_ring_get_num_slots(const struct opdl_ring *t); + +/** + * Get name of a opdl_ring. + * + * @param t + * The opdl_ring. + * + * @return + * Name string. + */ +const char * +opdl_ring_get_name(const struct opdl_ring *t); + +/** + * Adds a new processing stage to a specified opdl_ring instance. Adding a stage + * while there are entries in the opdl_ring being processed will cause undefined + * behaviour. + * + * @param t + * The opdl_ring to add the stage to. + * @param deps + * An array of pointers to other stages that this stage depends on. The other + * stages must be part of the same opdl_ring! Note that input is an implied + * dependency. This can be NULL if num_deps is 0. + * @param num_deps + * The size of the deps array. + * @param threadsafe + * Whether to support multiple threads processing this stage or not. + * Enabling this may have a negative impact on performance if only one thread + * will be processing this stage. + * @param is_input + * Indication to nitialise the stage with all slots available or none + * + * @return + * A pointer to the new stage, or NULL on error. + */ +struct opdl_stage * +opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input); + +/** + * Returns the input stage of a opdl_ring to be used by other API functions. + * + * @param t + * The opdl_ring. + * + * @return + * A pointer to the input stage. + */ +struct opdl_stage * +opdl_ring_get_input_stage(const struct opdl_ring *t); + +/** + * Sets the dependencies for a stage (clears all the previous deps!). Changing + * dependencies while there are entries in the opdl_ring being processed will + * cause undefined behaviour. + * + * @param s + * The stage to set the dependencies for. + * @param deps + * An array of pointers to other stages that this stage will depends on. The + * other stages must be part of the same opdl_ring! + * @param num_deps + * The size of the deps array. This must be > 0. + * + * @return + * 0 on success, a negative value on error. + */ +int +opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[], + uint32_t num_deps); + +/** + * Returns the opdl_ring that a stage belongs to. + * + * @param s + * The stage + * + * @return + * A pointer to the opdl_ring that the stage belongs to. + */ +struct opdl_ring * +opdl_stage_get_opdl_ring(const struct opdl_stage *s); + +/** + * Inputs a new batch of entries into the opdl_ring. This function is only + * threadsafe (with the same opdl_ring parameter) if the threadsafe parameter of + * opdl_ring_create() was true. For performance reasons, this function does not + * check input parameters. + * + * @param t + * The opdl_ring to input entries in to. + * @param entries + * An array of entries that will be copied in to the opdl_ring. + * @param num_entries + * The size of the entries array. + * @param block + * If this is true, the function blocks until enough slots are available to + * input all the requested entries. If false, then the function inputs as + * many entries as currently possible. + * + * @return + * The number of entries successfully input. + */ +uint32_t +opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries, + bool block); + +/** + * Inputs a new batch of entries into a opdl stage. This function is only + * threadsafe (with the same opdl parameter) if the threadsafe parameter of + * opdl_create() was true. For performance reasons, this function does not + * check input parameters. + * + * @param t + * The opdl ring to input entries in to. + * @param s + * The stage to copy entries to. + * @param entries + * An array of entries that will be copied in to the opdl ring. + * @param num_entries + * The size of the entries array. + * @param block + * If this is true, the function blocks until enough slots are available to + * input all the requested entries. If false, then the function inputs as + * many entries as currently possible. + * + * @return + * The number of entries successfully input. + */ +uint32_t +opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s, + const void *entries, uint32_t num_entries, bool block); + +/** + * Copy a batch of entries from the opdl ring. This function is only + * threadsafe (with the same opdl parameter) if the threadsafe parameter of + * opdl_create() was true. For performance reasons, this function does not + * check input parameters. + * + * @param t + * The opdl ring to copy entries from. + * @param s + * The stage to copy entries from. + * @param entries + * An array of entries that will be copied from the opdl ring. + * @param num_entries + * The size of the entries array. + * @param block + * If this is true, the function blocks until enough slots are available to + * input all the requested entries. If false, then the function inputs as + * many entries as currently possible. + * + * @return + * The number of entries successfully input. + */ +uint32_t +opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s, + void *entries, uint32_t num_entries, bool block); + +/** + * Before processing a batch of entries, a stage must first claim them to get + * access. This function is threadsafe using same opdl_stage parameter if + * the stage was created with threadsafe set to true, otherwise it is only + * threadsafe with a different opdl_stage per thread. For performance + * reasons, this function does not check input parameters. + * + * @param s + * The opdl_ring stage to read entries in. + * @param entries + * An array of pointers to entries that will be filled in by this function. + * @param num_entries + * The number of entries to attempt to claim for processing (and the size of + * the entries array). + * @param seq + * If not NULL, this is set to the value of the internal stage sequence number + * associated with the first entry returned. + * @param block + * If this is true, the function blocks until num_entries slots are available + * to process. If false, then the function claims as many entries as + * currently possible. + * + * @param atomic + * if this is true, the function will return event according to event flow id + * @return + * The number of pointers to entries filled in to the entries array. + */ +uint32_t +opdl_stage_claim(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block, bool atomic); + +uint32_t +opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s, + uint32_t nb_instance, uint32_t instance_id, + struct opdl_stage *deps[], uint32_t num_deps); + +/** + * A function to check how many entries are ready to be claimed. + * + * @param entries + * An array of pointers to entries. + * @param num_entries + * Number of entries in an array. + * @param arg + * An opaque pointer to data passed to the claim function. + * @param block + * When set to true, the function should wait until num_entries are ready to + * be processed. Otherwise it should return immediately. + * + * @return + * Number of entries ready to be claimed. + */ +typedef uint32_t (opdl_ring_check_entries_t)(void *entries[], + uint32_t num_entries, void *arg, bool block); + +/** + * Before processing a batch of entries, a stage must first claim them to get + * access. Each entry is checked by the passed check() function and depending + * on block value, it waits until num_entries are ready or returns immediately. + * This function is only threadsafe with a different opdl_stage per thread. + * + * @param s + * The opdl_ring stage to read entries in. + * @param entries + * An array of pointers to entries that will be filled in by this function. + * @param num_entries + * The number of entries to attempt to claim for processing (and the size of + * the entries array). + * @param seq + * If not NULL, this is set to the value of the internal stage sequence number + * associated with the first entry returned. + * @param block + * If this is true, the function blocks until num_entries ready slots are + * available to process. If false, then the function claims as many ready + * entries as currently possible. + * @param check + * Pointer to a function called to check entries. + * @param arg + * Opaque data passed to check() function. + * + * @return + * The number of pointers to ready entries filled in to the entries array. + */ +uint32_t +opdl_stage_claim_check(struct opdl_stage *s, void **entries, + uint32_t num_entries, uint32_t *seq, bool block, + opdl_ring_check_entries_t *check, void *arg); + +/** + * Before processing a batch of entries, a stage must first claim them to get + * access. This function is threadsafe using same opdl_stage parameter if + * the stage was created with threadsafe set to true, otherwise it is only + * threadsafe with a different opdl_stage per thread. + * + * The difference between this function and opdl_stage_claim() is that this + * function copies the entries from the opdl_ring. Note that any changes made to + * the copied entries will not be reflected back in to the entries in the + * opdl_ring, so this function probably only makes sense if the entries are + * pointers to other data. For performance reasons, this function does not check + * input parameters. + * + * @param s + * The opdl_ring stage to read entries in. + * @param entries + * An array of entries that will be filled in by this function. + * @param num_entries + * The number of entries to attempt to claim for processing (and the size of + * the entries array). + * @param seq + * If not NULL, this is set to the value of the internal stage sequence number + * associated with the first entry returned. + * @param block + * If this is true, the function blocks until num_entries slots are available + * to process. If false, then the function claims as many entries as + * currently possible. + * + * @return + * The number of entries copied in to the entries array. + */ +uint32_t +opdl_stage_claim_copy(struct opdl_stage *s, void *entries, + uint32_t num_entries, uint32_t *seq, bool block); + +/** + * This function must be called when a stage has finished its processing of + * entries, to make them available to any dependent stages. All entries that are + * claimed by the calling thread in the stage will be disclaimed. It is possible + * to claim multiple batches before disclaiming. For performance reasons, this + * function does not check input parameters. + * + * @param s + * The opdl_ring stage in which to disclaim all claimed entries. + * + * @param block + * Entries are always made available to a stage in the same order that they + * were input in the stage. If a stage is multithread safe, this may mean that + * full disclaiming of a batch of entries can not be considered complete until + * all earlier threads in the stage have disclaimed. If this parameter is true + * then the function blocks until all entries are fully disclaimed, otherwise + * it disclaims as many as currently possible, with non fully disclaimed + * batches stored until the next call to a claim or disclaim function for this + * stage on this thread. + * + * If a thread is not going to process any more entries in this stage, it + * *must* first call this function with this parameter set to true to ensure + * it does not block the entire opdl_ring. + * + * In a single threaded stage, this parameter has no effect. + */ +int +opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, + bool block); + +/** + * This function can be called when a stage has finished its processing of + * entries, to make them available to any dependent stages. The difference + * between this function and opdl_stage_disclaim() is that here only a + * portion of entries are disclaimed, not all of them. For performance reasons, + * this function does not check input parameters. + * + * @param s + * The opdl_ring stage in which to disclaim entries. + * + * @param num_entries + * The number of entries to disclaim. + * + * @param block + * Entries are always made available to a stage in the same order that they + * were input in the stage. If a stage is multithread safe, this may mean that + * full disclaiming of a batch of entries can not be considered complete until + * all earlier threads in the stage have disclaimed. If this parameter is true + * then the function blocks until the specified number of entries has been + * disclaimed (or there are no more entries to disclaim). Otherwise it + * disclaims as many claims as currently possible and an attempt to disclaim + * them is made the next time a claim or disclaim function for this stage on + * this thread is called. + * + * In a single threaded stage, this parameter has no effect. + */ +void +opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries, + bool block); + +/** + * Check how many entries can be input. + * + * @param t + * The opdl_ring instance to check. + * + * @return + * The number of new entries currently allowed to be input. + */ +uint32_t +opdl_ring_available(struct opdl_ring *t); + +/** + * Check how many entries can be processed in a stage. + * + * @param s + * The stage to check. + * + * @return + * The number of entries currently available to be processed in this stage. + */ +uint32_t +opdl_stage_available(struct opdl_stage *s); + +/** + * Check how many entries are available to be processed. + * + * NOTE : DOES NOT CHANGE ANY STATE WITHIN THE STAGE + * + * @param s + * The stage to check. + * + * @param num_entries + * The number of entries to check for availability. + * + * @return + * The number of entries currently available to be processed in this stage. + */ +uint32_t +opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries); + +/** + * Create empty stage instance and return the pointer. + * + * @param t + * The pointer of opdl_ring. + * + * @param threadsafe + * enable multiple thread or not. + * @return + * The pointer of one empty stage instance. + */ +struct opdl_stage * +opdl_stage_create(struct opdl_ring *t, bool threadsafe); + +/** + * Prints information on opdl_ring instance and all its stages + * + * @param t + * The stage to print info on. + * @param f + * Where to print the info. + */ +void +opdl_ring_dump(const struct opdl_ring *t, FILE *f); + +/** + * Blocks until all entries in a opdl_ring have been processed by all stages. + * + * @param t + * The opdl_ring instance to flush. + */ +void +opdl_ring_flush(struct opdl_ring *t); + +/** + * Deallocates all resources used by a opdl_ring instance + * + * @param t + * The opdl_ring instance to free. + */ +void +opdl_ring_free(struct opdl_ring *t); + +/** + * Search for a opdl_ring by its name + * + * @param name + * The name of the opdl_ring. + * @return + * The pointer to the opdl_ring matching the name, or NULL if not found. + * + */ +struct opdl_ring * +opdl_ring_lookup(const char *name); + +/** + * Set a opdl_stage to threadsafe variable. + * + * @param s + * The opdl_stage. + * @param threadsafe + * Threadsafe value. + */ +void +opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe); + + +/** + * Compare the event descriptor with original version in the ring. + * if key field event descriptor is changed by application, then + * update the slot in the ring otherwise do nothing with it. + * the key field is flow_id, prioirty, mbuf, impl_opaque + * + * @param s + * The opdl_stage. + * @param ev + * pointer of the event descriptor. + * @param index + * index of the event descriptor. + * @param atomic + * queue type associate with the stage. + * @return + * if the evevnt key field is changed compare with previous record. + */ + +bool +opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, + uint32_t index, bool atomic); + +#ifdef __cplusplus +} +#endif + +#endif /* _OPDL_H_ */ diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map new file mode 100644 index 0000000000..58b94270d4 --- /dev/null +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map @@ -0,0 +1,3 @@ +DPDK_18.02 { + local: *; +}; diff --git a/mk/rte.app.mk b/mk/rte.app.mk index 2dcac8fc32..5d34e2c43d 100644 --- a/mk/rte.app.mk +++ b/mk/rte.app.mk @@ -214,6 +214,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_EVENTDEV) += -lrte_pmd_dpaa2_event _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += -lrte_mempool_octeontx _LDLIBS-$(CONFIG_RTE_LIBRTE_OCTEONTX_PMD) += -lrte_pmd_octeontx +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += -lrte_pmd_opdl_event endif # CONFIG_RTE_LIBRTE_EVENTDEV ifeq ($(CONFIG_RTE_LIBRTE_DPAA2_PMD),y) diff --git a/mk/toolchain/gcc/rte.toolchain-compat.mk b/mk/toolchain/gcc/rte.toolchain-compat.mk index 89dc75615a..9f17131f93 100644 --- a/mk/toolchain/gcc/rte.toolchain-compat.mk +++ b/mk/toolchain/gcc/rte.toolchain-compat.mk @@ -67,4 +67,10 @@ else ifeq ($(shell test $(GCC_VERSION) -lt 47 && echo 1), 1) CONFIG_RTE_LIBRTE_THUNDERX_NICVF_PMD=d endif + + # Disable OPDL PMD for gcc < 4.7 + ifeq ($(shell test $(GCC_VERSION) -lt 47 && echo 1), 1) + CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=d + endif + endif diff --git a/mk/toolchain/icc/rte.toolchain-compat.mk b/mk/toolchain/icc/rte.toolchain-compat.mk index f8ff3685e5..2d663b34ad 100644 --- a/mk/toolchain/icc/rte.toolchain-compat.mk +++ b/mk/toolchain/icc/rte.toolchain-compat.mk @@ -49,4 +49,10 @@ else ifeq ($(shell test $(ICC_MAJOR_VERSION) -le 16 && echo 1), 1) CONFIG_RTE_LIBRTE_THUNDERX_NICVF_PMD=d endif + + # Disable event/opdl PMD for icc <= 16.0 + ifeq ($(shell test $(ICC_MAJOR_VERSION) -le 16 && echo 1), 1) + CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV=d + endif + endif