numam-dpdk/lib/graph/rte_graph_worker.h
Bruce Richardson 99a2dd955f lib: remove librte_ prefix from directory names
There is no reason for the DPDK libraries to all have 'librte_' prefix on
the directory names. This prefix makes the directory names longer and also
makes it awkward to add features referring to individual libraries in the
build - should the lib names be specified with or without the prefix.
Therefore, we can just remove the library prefix and use the library's
unique name as the directory name, i.e. 'eal' rather than 'librte_eal'

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
2021-04-21 14:04:09 +02:00

512 lines
13 KiB
C

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(C) 2020 Marvell International Ltd.
*/
#ifndef _RTE_GRAPH_WORKER_H_
#define _RTE_GRAPH_WORKER_H_
/**
* @file rte_graph_worker.h
*
* @warning
* @b EXPERIMENTAL:
* All functions in this file may be changed or removed without prior notice.
*
* This API allows a worker thread to walk over a graph and nodes to create,
* process, enqueue and move streams of objects to the next nodes.
*/
#include <rte_common.h>
#include <rte_cycles.h>
#include <rte_prefetch.h>
#include <rte_memcpy.h>
#include <rte_memory.h>
#include "rte_graph.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @internal
*
* Data structure to hold graph data.
*/
struct rte_graph {
uint32_t tail; /**< Tail of circular buffer. */
uint32_t head; /**< Head of circular buffer. */
uint32_t cir_mask; /**< Circular buffer wrap around mask. */
rte_node_t nb_nodes; /**< Number of nodes in the graph. */
rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */
rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
rte_graph_t id; /**< Graph identifier. */
int socket; /**< Socket ID where memory is allocated. */
char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */
uint64_t fence; /**< Fence. */
} __rte_cache_aligned;
/**
* @internal
*
* Data structure to hold node data.
*/
struct rte_node {
/* Slow path area */
uint64_t fence; /**< Fence. */
rte_graph_off_t next; /**< Index to next node. */
rte_node_t id; /**< Node identifier. */
rte_node_t parent_id; /**< Parent Node identifier. */
rte_edge_t nb_edges; /**< Number of edges from this node. */
uint32_t realloc_count; /**< Number of times realloced. */
char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */
/* Fast path area */
#define RTE_NODE_CTX_SZ 16
uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
uint16_t size; /**< Total number of objects available. */
uint16_t idx; /**< Number of objects used. */
rte_graph_off_t off; /**< Offset of node in the graph reel. */
uint64_t total_cycles; /**< Cycles spent in this node. */
uint64_t total_calls; /**< Calls done to this node. */
uint64_t total_objs; /**< Objects processed by this node. */
RTE_STD_C11
union {
void **objs; /**< Array of object pointers. */
uint64_t objs_u64;
};
RTE_STD_C11
union {
rte_node_process_t process; /**< Process function. */
uint64_t process_u64;
};
struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
} __rte_cache_aligned;
/**
* @internal
*
* Allocate a stream of objects.
*
* If stream already exists then re-allocate it to a larger size.
*
* @param graph
* Pointer to the graph object.
* @param node
* Pointer to the node object.
*/
__rte_experimental
void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
/**
* @internal
*
* Allocate a stream with requested number of objects.
*
* If stream already exists then re-allocate it to a larger size.
*
* @param graph
* Pointer to the graph object.
* @param node
* Pointer to the node object.
* @param req_size
* Number of objects to be allocated.
*/
__rte_experimental
void __rte_node_stream_alloc_size(struct rte_graph *graph,
struct rte_node *node, uint16_t req_size);
/**
* Perform graph walk on the circular buffer and invoke the process function
* of the nodes and collect the stats.
*
* @param graph
* Graph pointer returned from rte_graph_lookup function.
*
* @see rte_graph_lookup()
*/
__rte_experimental
static inline void
rte_graph_walk(struct rte_graph *graph)
{
const rte_graph_off_t *cir_start = graph->cir_start;
const rte_node_t mask = graph->cir_mask;
uint32_t head = graph->head;
struct rte_node *node;
uint64_t start;
uint16_t rc;
void **objs;
/*
* Walk on the source node(s) ((cir_start - head) -> cir_start) and then
* on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
* in a circular buffer fashion.
*
* +-----+ <= cir_start - head [number of source nodes]
* | |
* | ... | <= source nodes
* | |
* +-----+ <= cir_start [head = 0] [tail = 0]
* | |
* | ... | <= pending streams
* | |
* +-----+ <= cir_start + mask
*/
while (likely(head != graph->tail)) {
node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
objs = node->objs;
rte_prefetch0(objs);
if (rte_graph_has_stats_feature()) {
start = rte_rdtsc();
rc = node->process(graph, node, objs, node->idx);
node->total_cycles += rte_rdtsc() - start;
node->total_calls++;
node->total_objs += rc;
} else {
node->process(graph, node, objs, node->idx);
}
node->idx = 0;
head = likely((int32_t)head > 0) ? head & mask : head;
}
graph->tail = 0;
}
/* Fast path helper functions */
/**
* @internal
*
* Enqueue a given node to the tail of the graph reel.
*
* @param graph
* Pointer Graph object.
* @param node
* Pointer to node object to be enqueued.
*/
static __rte_always_inline void
__rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
{
uint32_t tail;
tail = graph->tail;
graph->cir_start[tail++] = node->off;
graph->tail = tail & graph->cir_mask;
}
/**
* @internal
*
* Enqueue sequence prologue function.
*
* Updates the node to tail of graph reel and resizes the number of objects
* available in the stream as needed.
*
* @param graph
* Pointer to the graph object.
* @param node
* Pointer to the node object.
* @param idx
* Index at which the object enqueue starts from.
* @param space
* Space required for the object enqueue.
*/
static __rte_always_inline void
__rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
const uint16_t idx, const uint16_t space)
{
/* Add to the pending stream list if the node is new */
if (idx == 0)
__rte_node_enqueue_tail_update(graph, node);
if (unlikely(node->size < (idx + space)))
__rte_node_stream_alloc(graph, node);
}
/**
* @internal
*
* Get the node pointer from current node edge id.
*
* @param node
* Current node pointer.
* @param next
* Edge id of the required node.
*
* @return
* Pointer to the node denoted by the edge id.
*/
static __rte_always_inline struct rte_node *
__rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
{
RTE_ASSERT(next < node->nb_edges);
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
node = node->nodes[next];
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
return node;
}
/**
* Enqueue the objs to next node for further processing and set
* the next node to pending state in the circular buffer.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param objs
* Objs to enqueue.
* @param nb_objs
* Number of objs to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void **objs, uint16_t nb_objs)
{
node = __rte_node_next_node_get(node, next);
const uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
node->idx = idx + nb_objs;
}
/**
* Enqueue only one obj to next node for further processing and
* set the next node to pending state in the circular buffer.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj
* Obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 1);
node->objs[idx++] = obj;
node->idx = idx;
}
/**
* Enqueue only two objs to next node for further processing and
* set the next node to pending state in the circular buffer.
* Same as rte_node_enqueue_x1 but enqueue two objs.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj0
* Obj to enqueue.
* @param obj1
* Obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj0, void *obj1)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 2);
node->objs[idx++] = obj0;
node->objs[idx++] = obj1;
node->idx = idx;
}
/**
* Enqueue only four objs to next node for further processing and
* set the next node to pending state in the circular buffer.
* Same as rte_node_enqueue_x1 but enqueue four objs.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj0
* 1st obj to enqueue.
* @param obj1
* 2nd obj to enqueue.
* @param obj2
* 3rd obj to enqueue.
* @param obj3
* 4th obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj0, void *obj1, void *obj2,
void *obj3)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 4);
node->objs[idx++] = obj0;
node->objs[idx++] = obj1;
node->objs[idx++] = obj2;
node->objs[idx++] = obj3;
node->idx = idx;
}
/**
* Enqueue objs to multiple next nodes for further processing and
* set the next nodes to pending state in the circular buffer.
* objs[i] will be enqueued to nexts[i].
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param nexts
* List of relative next node indices to enqueue objs.
* @param objs
* List of objs to enqueue.
* @param nb_objs
* Number of objs to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
rte_edge_t *nexts, void **objs, uint16_t nb_objs)
{
uint16_t i;
for (i = 0; i < nb_objs; i++)
rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
}
/**
* Get the stream of next node to enqueue the objs.
* Once done with the updating the objs, needs to call
* rte_node_next_stream_put to put the next node to pending state.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to get stream.
* @param nb_objs
* Requested free size of the next stream.
*
* @return
* Valid next stream on success.
*
* @see rte_node_next_stream_put().
*/
__rte_experimental
static inline void **
rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, uint16_t nb_objs)
{
node = __rte_node_next_node_get(node, next);
const uint16_t idx = node->idx;
uint16_t free_space = node->size - idx;
if (unlikely(free_space < nb_objs))
__rte_node_stream_alloc_size(graph, node, nb_objs);
return &node->objs[idx];
}
/**
* Put the next stream to pending state in the circular buffer
* for further processing. Should be invoked after rte_node_next_stream_get().
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index..
* @param idx
* Number of objs updated in the stream after getting the stream using
* rte_node_next_stream_get.
*
* @see rte_node_next_stream_get().
*/
__rte_experimental
static inline void
rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, uint16_t idx)
{
if (unlikely(!idx))
return;
node = __rte_node_next_node_get(node, next);
if (node->idx == 0)
__rte_node_enqueue_tail_update(graph, node);
node->idx += idx;
}
/**
* Home run scenario, Enqueue all the objs of current node to next
* node in optimized way by swapping the streams of both nodes.
* Performs good when next node is already not in pending state.
* If next node is already in pending state then normal enqueue
* will be used.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param src
* Current node pointer.
* @param next
* Relative next node index.
*/
__rte_experimental
static inline void
rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
rte_edge_t next)
{
struct rte_node *dst = __rte_node_next_node_get(src, next);
/* Let swap the pointers if dst don't have valid objs */
if (likely(dst->idx == 0)) {
void **dobjs = dst->objs;
uint16_t dsz = dst->size;
dst->objs = src->objs;
dst->size = src->size;
src->objs = dobjs;
src->size = dsz;
dst->idx = src->idx;
__rte_node_enqueue_tail_update(graph, dst);
} else { /* Move the objects from src node to dst node */
rte_node_enqueue(graph, src, next, src->objs, src->idx);
}
}
#ifdef __cplusplus
}
#endif
#endif /* _RTE_GRAPH_WORKER_H_ */