graph: implement fastpath routines

Adding implementation for rte_graph_walk() API. This will perform a walk
on the circular buffer and call the process function of each node
and collect the stats if stats collection is enabled.

Signed-off-by: Jerin Jacob <jerinj@marvell.com>
Signed-off-by: Kiran Kumar K <kirankumark@marvell.com>
Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
Signed-off-by: Nithin Dabilpuram <ndabilpuram@marvell.com>
This commit is contained in:
Jerin Jacob 2020-04-11 19:44:11 +05:30 committed by Thomas Monjalon
parent af1ae8b6a3
commit 40d4f51403
4 changed files with 431 additions and 0 deletions

View File

@ -160,6 +160,7 @@ The public API headers are grouped by topics:
[port_in_action] (@ref rte_port_in_action.h)
[table_action] (@ref rte_table_action.h)
* [graph] (@ref rte_graph.h):
[graph_worker] (@ref rte_graph_worker.h)
- **basic**:
[approx fraction] (@ref rte_approx.h),

View File

@ -473,6 +473,22 @@ __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node)
node->realloc_count++;
}
void __rte_noinline
__rte_node_stream_alloc_size(struct rte_graph *graph, struct rte_node *node,
uint16_t req_size)
{
uint16_t size = node->size;
RTE_VERIFY(size != UINT16_MAX);
/* Allocate double amount of size to avoid immediate realloc */
size = RTE_MIN(UINT16_MAX, RTE_MAX(RTE_GRAPH_BURST_SIZE, req_size * 2));
node->objs = rte_realloc_socket(node->objs, size * sizeof(void *),
RTE_CACHE_LINE_SIZE, graph->socket);
RTE_VERIFY(node->objs);
node->size = size;
node->realloc_count++;
}
static int
graph_to_dot(FILE *f, struct graph *graph)
{

View File

@ -3,6 +3,7 @@ EXPERIMENTAL {
__rte_node_register;
__rte_node_stream_alloc;
__rte_node_stream_alloc_size;
rte_graph_create;
rte_graph_destroy;
@ -16,6 +17,7 @@ EXPERIMENTAL {
rte_graph_node_get;
rte_graph_node_get_by_name;
rte_graph_obj_dump;
rte_graph_walk;
rte_graph_cluster_stats_create;
rte_graph_cluster_stats_destroy;
@ -28,10 +30,18 @@ EXPERIMENTAL {
rte_node_edge_get;
rte_node_edge_shrink;
rte_node_edge_update;
rte_node_enqueue;
rte_node_enqueue_x1;
rte_node_enqueue_x2;
rte_node_enqueue_x4;
rte_node_enqueue_next;
rte_node_from_name;
rte_node_id_to_name;
rte_node_list_dump;
rte_node_max_count;
rte_node_next_stream_get;
rte_node_next_stream_put;
rte_node_next_stream_move;
local: *;
};

View File

@ -99,6 +99,410 @@ struct rte_node {
__rte_experimental
void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
/**
* @internal
*
* Allocate a stream with requested number of objects.
*
* If stream already exists then re-allocate it to a larger size.
*
* @param graph
* Pointer to the graph object.
* @param node
* Pointer to the node object.
* @param req_size
* Number of objects to be allocated.
*/
__rte_experimental
void __rte_node_stream_alloc_size(struct rte_graph *graph,
struct rte_node *node, uint16_t req_size);
/**
* Perform graph walk on the circular buffer and invoke the process function
* of the nodes and collect the stats.
*
* @param graph
* Graph pointer returned from rte_graph_lookup function.
*
* @see rte_graph_lookup()
*/
__rte_experimental
static inline void
rte_graph_walk(struct rte_graph *graph)
{
const rte_graph_off_t *cir_start = graph->cir_start;
const rte_node_t mask = graph->cir_mask;
uint32_t head = graph->head;
struct rte_node *node;
uint64_t start;
uint16_t rc;
void **objs;
/*
* Walk on the source node(s) ((cir_start - head) -> cir_start) and then
* on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
* in a circular buffer fashion.
*
* +-----+ <= cir_start - head [number of source nodes]
* | |
* | ... | <= source nodes
* | |
* +-----+ <= cir_start [head = 0] [tail = 0]
* | |
* | ... | <= pending streams
* | |
* +-----+ <= cir_start + mask
*/
while (likely(head != graph->tail)) {
node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
objs = node->objs;
rte_prefetch0(objs);
if (rte_graph_has_stats_feature()) {
start = rte_rdtsc();
rc = node->process(graph, node, objs, node->idx);
node->total_cycles += rte_rdtsc() - start;
node->total_calls++;
node->total_objs += rc;
} else {
node->process(graph, node, objs, node->idx);
}
node->idx = 0;
head = likely((int32_t)head > 0) ? head & mask : head;
}
graph->tail = 0;
}
/* Fast path helper functions */
/**
* @internal
*
* Enqueue a given node to the tail of the graph reel.
*
* @param graph
* Pointer Graph object.
* @param node
* Pointer to node object to be enqueued.
*/
static __rte_always_inline void
__rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
{
uint32_t tail;
tail = graph->tail;
graph->cir_start[tail++] = node->off;
graph->tail = tail & graph->cir_mask;
}
/**
* @internal
*
* Enqueue sequence prologue function.
*
* Updates the node to tail of graph reel and resizes the number of objects
* available in the stream as needed.
*
* @param graph
* Pointer to the graph object.
* @param node
* Pointer to the node object.
* @param idx
* Index at which the object enqueue starts from.
* @param space
* Space required for the object enqueue.
*/
static __rte_always_inline void
__rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
const uint16_t idx, const uint16_t space)
{
/* Add to the pending stream list if the node is new */
if (idx == 0)
__rte_node_enqueue_tail_update(graph, node);
if (unlikely(node->size < (idx + space)))
__rte_node_stream_alloc(graph, node);
}
/**
* @internal
*
* Get the node pointer from current node edge id.
*
* @param node
* Current node pointer.
* @param next
* Edge id of the required node.
*
* @return
* Pointer to the node denoted by the edge id.
*/
static __rte_always_inline struct rte_node *
__rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
{
RTE_ASSERT(next < node->nb_edges);
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
node = node->nodes[next];
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
return node;
}
/**
* Enqueue the objs to next node for further processing and set
* the next node to pending state in the circular buffer.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param objs
* Objs to enqueue.
* @param nb_objs
* Number of objs to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void **objs, uint16_t nb_objs)
{
node = __rte_node_next_node_get(node, next);
const uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
node->idx = idx + nb_objs;
}
/**
* Enqueue only one obj to next node for further processing and
* set the next node to pending state in the circular buffer.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj
* Obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 1);
node->objs[idx++] = obj;
node->idx = idx;
}
/**
* Enqueue only two objs to next node for further processing and
* set the next node to pending state in the circular buffer.
* Same as rte_node_enqueue_x1 but enqueue two objs.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj0
* Obj to enqueue.
* @param obj1
* Obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj0, void *obj1)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 2);
node->objs[idx++] = obj0;
node->objs[idx++] = obj1;
node->idx = idx;
}
/**
* Enqueue only four objs to next node for further processing and
* set the next node to pending state in the circular buffer.
* Same as rte_node_enqueue_x1 but enqueue four objs.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to enqueue objs.
* @param obj0
* 1st obj to enqueue.
* @param obj1
* 2nd obj to enqueue.
* @param obj2
* 3rd obj to enqueue.
* @param obj3
* 4th obj to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, void *obj0, void *obj1, void *obj2,
void *obj3)
{
node = __rte_node_next_node_get(node, next);
uint16_t idx = node->idx;
__rte_node_enqueue_prologue(graph, node, idx, 4);
node->objs[idx++] = obj0;
node->objs[idx++] = obj1;
node->objs[idx++] = obj2;
node->objs[idx++] = obj3;
node->idx = idx;
}
/**
* Enqueue objs to multiple next nodes for further processing and
* set the next nodes to pending state in the circular buffer.
* objs[i] will be enqueued to nexts[i].
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param nexts
* List of relative next node indices to enqueue objs.
* @param objs
* List of objs to enqueue.
* @param nb_objs
* Number of objs to enqueue.
*/
__rte_experimental
static inline void
rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
rte_edge_t *nexts, void **objs, uint16_t nb_objs)
{
uint16_t i;
for (i = 0; i < nb_objs; i++)
rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
}
/**
* Get the stream of next node to enqueue the objs.
* Once done with the updating the objs, needs to call
* rte_node_next_stream_put to put the next node to pending state.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index to get stream.
* @param nb_objs
* Requested free size of the next stream.
*
* @return
* Valid next stream on success.
*
* @see rte_node_next_stream_put().
*/
__rte_experimental
static inline void **
rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, uint16_t nb_objs)
{
node = __rte_node_next_node_get(node, next);
const uint16_t idx = node->idx;
uint16_t free_space = node->size - idx;
if (unlikely(free_space < nb_objs))
__rte_node_stream_alloc_size(graph, node, nb_objs);
return &node->objs[idx];
}
/**
* Put the next stream to pending state in the circular buffer
* for further processing. Should be invoked after rte_node_next_stream_get().
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param node
* Current node pointer.
* @param next
* Relative next node index..
* @param idx
* Number of objs updated in the stream after getting the stream using
* rte_node_next_stream_get.
*
* @see rte_node_next_stream_get().
*/
__rte_experimental
static inline void
rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
rte_edge_t next, uint16_t idx)
{
if (unlikely(!idx))
return;
node = __rte_node_next_node_get(node, next);
if (node->idx == 0)
__rte_node_enqueue_tail_update(graph, node);
node->idx += idx;
}
/**
* Home run scenario, Enqueue all the objs of current node to next
* node in optimized way by swapping the streams of both nodes.
* Performs good when next node is already not in pending state.
* If next node is already in pending state then normal enqueue
* will be used.
*
* @param graph
* Graph pointer returned from rte_graph_lookup().
* @param src
* Current node pointer.
* @param next
* Relative next node index.
*/
__rte_experimental
static inline void
rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
rte_edge_t next)
{
struct rte_node *dst = __rte_node_next_node_get(src, next);
/* Let swap the pointers if dst don't have valid objs */
if (likely(dst->idx == 0)) {
void **dobjs = dst->objs;
uint16_t dsz = dst->size;
dst->objs = src->objs;
dst->size = src->size;
src->objs = dobjs;
src->size = dsz;
dst->idx = src->idx;
__rte_node_enqueue_tail_update(graph, dst);
} else { /* Move the objects from src node to dst node */
rte_node_enqueue(graph, src, next, src->objs, src->idx);
}
}
#ifdef __cplusplus
}
#endif