examples/pipeline: add new example application

Add new example application to showcase the API of the newly
introduced SWX pipeline type.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
This commit is contained in:
Cristian Dumitrescu 2020-10-01 11:20:04 +01:00 committed by David Marchand
parent d0a0096661
commit b77f660028
9 changed files with 1296 additions and 0 deletions

View File

@ -1331,6 +1331,7 @@ F: app/test/test_table*
F: app/test-pipeline/
F: doc/guides/sample_app_ug/test_pipeline.rst
F: examples/ip_pipeline/
F: examples/pipeline/
F: doc/guides/sample_app_ug/ip_pipeline.rst

View File

@ -33,6 +33,7 @@ all_examples = [
'ntb', 'packet_ordering',
'performance-thread/l3fwd-thread',
'performance-thread/pthread_shim',
'pipeline',
'ptpclient',
'qos_meter', 'qos_sched',
'rxtx_callbacks',

View File

@ -0,0 +1,50 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2020 Intel Corporation
# binary name
APP = pipeline
# all source are stored in SRCS-y
SRCS-y += main.c
SRCS-y += obj.c
SRCS-y += thread.c
# Build using pkg-config variables if possible
ifneq ($(shell pkg-config --exists libdpdk && echo 0),0)
$(error "no installation of DPDK found")
endif
all: shared
.PHONY: shared static
shared: build/$(APP)-shared
ln -sf $(APP)-shared build/$(APP)
static: build/$(APP)-static
ln -sf $(APP)-static build/$(APP)
PKGCONF ?= pkg-config
PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
CFLAGS += -I. -DALLOW_EXPERIMENTAL_API -D_GNU_SOURCE
OBJS := $(patsubst %.c,build/%.o,$(SRCS-y))
build/%.o: %.c Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) -c $< -o $@
build/$(APP)-shared: $(OBJS)
$(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
build/$(APP)-static: $(OBJS)
$(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
build:
@mkdir -p $@
.PHONY: clean
clean:
rm -f build/$(APP)* build/*.o
test -d build && rmdir -p build || true

50
examples/pipeline/main.c Normal file
View File

@ -0,0 +1,50 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <getopt.h>
#include <rte_launch.h>
#include <rte_eal.h>
#include "obj.h"
#include "thread.h"
int
main(int argc, char **argv)
{
struct obj *obj;
int status;
/* EAL */
status = rte_eal_init(argc, argv);
if (status < 0) {
printf("Error: EAL initialization failed (%d)\n", status);
return status;
};
/* Obj */
obj = obj_init();
if (!obj) {
printf("Error: Obj initialization failed (%d)\n", status);
return status;
}
/* Thread */
status = thread_init();
if (status) {
printf("Error: Thread initialization failed (%d)\n", status);
return status;
}
rte_eal_mp_remote_launch(
thread_main,
NULL,
SKIP_MASTER);
return 0;
}

View File

@ -0,0 +1,16 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2020 Intel Corporation
# meson file, for building this example as part of a main DPDK build.
#
# To build this example as a standalone application with an already-installed
# DPDK instance, use 'make'
build = cc.has_header('sys/epoll.h')
deps += ['pipeline', 'bus_pci']
allow_experimental_apis = true
sources = files(
'main.c',
'obj.c',
'thread.c',
)

470
examples/pipeline/obj.c Normal file
View File

@ -0,0 +1,470 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#include <stdlib.h>
#include <string.h>
#include <rte_mempool.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_swx_port_ethdev.h>
#include <rte_swx_port_source_sink.h>
#include <rte_swx_table_em.h>
#include <rte_swx_pipeline.h>
#include <rte_swx_ctl.h>
#include "obj.h"
/*
* mempool
*/
TAILQ_HEAD(mempool_list, mempool);
/*
* link
*/
TAILQ_HEAD(link_list, link);
/*
* pipeline
*/
TAILQ_HEAD(pipeline_list, pipeline);
/*
* obj
*/
struct obj {
struct mempool_list mempool_list;
struct link_list link_list;
struct pipeline_list pipeline_list;
};
/*
* mempool
*/
#define BUFFER_SIZE_MIN (sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
struct mempool *
mempool_create(struct obj *obj, const char *name, struct mempool_params *params)
{
struct mempool *mempool;
struct rte_mempool *m;
/* Check input params */
if ((name == NULL) ||
mempool_find(obj, name) ||
(params == NULL) ||
(params->buffer_size < BUFFER_SIZE_MIN) ||
(params->pool_size == 0))
return NULL;
/* Resource create */
m = rte_pktmbuf_pool_create(
name,
params->pool_size,
params->cache_size,
0,
params->buffer_size - sizeof(struct rte_mbuf),
params->cpu_id);
if (m == NULL)
return NULL;
/* Node allocation */
mempool = calloc(1, sizeof(struct mempool));
if (mempool == NULL) {
rte_mempool_free(m);
return NULL;
}
/* Node fill in */
strlcpy(mempool->name, name, sizeof(mempool->name));
mempool->m = m;
mempool->buffer_size = params->buffer_size;
/* Node add to list */
TAILQ_INSERT_TAIL(&obj->mempool_list, mempool, node);
return mempool;
}
struct mempool *
mempool_find(struct obj *obj, const char *name)
{
struct mempool *mempool;
if (!obj || !name)
return NULL;
TAILQ_FOREACH(mempool, &obj->mempool_list, node)
if (strcmp(mempool->name, name) == 0)
return mempool;
return NULL;
}
/*
* link
*/
static struct rte_eth_conf port_conf_default = {
.link_speeds = 0,
.rxmode = {
.mq_mode = ETH_MQ_RX_NONE,
.max_rx_pkt_len = 9000, /* Jumbo frame max packet len */
.split_hdr_size = 0, /* Header split buffer size */
},
.rx_adv_conf = {
.rss_conf = {
.rss_key = NULL,
.rss_key_len = 40,
.rss_hf = 0,
},
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
.lpbk_mode = 0,
};
#define RETA_CONF_SIZE (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE)
static int
rss_setup(uint16_t port_id,
uint16_t reta_size,
struct link_params_rss *rss)
{
struct rte_eth_rss_reta_entry64 reta_conf[RETA_CONF_SIZE];
uint32_t i;
int status;
/* RETA setting */
memset(reta_conf, 0, sizeof(reta_conf));
for (i = 0; i < reta_size; i++)
reta_conf[i / RTE_RETA_GROUP_SIZE].mask = UINT64_MAX;
for (i = 0; i < reta_size; i++) {
uint32_t reta_id = i / RTE_RETA_GROUP_SIZE;
uint32_t reta_pos = i % RTE_RETA_GROUP_SIZE;
uint32_t rss_qs_pos = i % rss->n_queues;
reta_conf[reta_id].reta[reta_pos] =
(uint16_t) rss->queue_id[rss_qs_pos];
}
/* RETA update */
status = rte_eth_dev_rss_reta_update(port_id,
reta_conf,
reta_size);
return status;
}
struct link *
link_create(struct obj *obj, const char *name, struct link_params *params)
{
struct rte_eth_dev_info port_info;
struct rte_eth_conf port_conf;
struct link *link;
struct link_params_rss *rss;
struct mempool *mempool;
uint32_t cpu_id, i;
int status;
uint16_t port_id;
/* Check input params */
if ((name == NULL) ||
link_find(obj, name) ||
(params == NULL) ||
(params->rx.n_queues == 0) ||
(params->rx.queue_size == 0) ||
(params->tx.n_queues == 0) ||
(params->tx.queue_size == 0))
return NULL;
port_id = params->port_id;
if (params->dev_name) {
status = rte_eth_dev_get_port_by_name(params->dev_name,
&port_id);
if (status)
return NULL;
} else
if (!rte_eth_dev_is_valid_port(port_id))
return NULL;
if (rte_eth_dev_info_get(port_id, &port_info) != 0)
return NULL;
mempool = mempool_find(obj, params->rx.mempool_name);
if (mempool == NULL)
return NULL;
rss = params->rx.rss;
if (rss) {
if ((port_info.reta_size == 0) ||
(port_info.reta_size > ETH_RSS_RETA_SIZE_512))
return NULL;
if ((rss->n_queues == 0) ||
(rss->n_queues >= LINK_RXQ_RSS_MAX))
return NULL;
for (i = 0; i < rss->n_queues; i++)
if (rss->queue_id[i] >= port_info.max_rx_queues)
return NULL;
}
/**
* Resource create
*/
/* Port */
memcpy(&port_conf, &port_conf_default, sizeof(port_conf));
if (rss) {
port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
port_conf.rx_adv_conf.rss_conf.rss_hf =
(ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP) &
port_info.flow_type_rss_offloads;
}
cpu_id = (uint32_t) rte_eth_dev_socket_id(port_id);
if (cpu_id == (uint32_t) SOCKET_ID_ANY)
cpu_id = 0;
status = rte_eth_dev_configure(
port_id,
params->rx.n_queues,
params->tx.n_queues,
&port_conf);
if (status < 0)
return NULL;
if (params->promiscuous) {
status = rte_eth_promiscuous_enable(port_id);
if (status != 0)
return NULL;
}
/* Port RX */
for (i = 0; i < params->rx.n_queues; i++) {
status = rte_eth_rx_queue_setup(
port_id,
i,
params->rx.queue_size,
cpu_id,
NULL,
mempool->m);
if (status < 0)
return NULL;
}
/* Port TX */
for (i = 0; i < params->tx.n_queues; i++) {
status = rte_eth_tx_queue_setup(
port_id,
i,
params->tx.queue_size,
cpu_id,
NULL);
if (status < 0)
return NULL;
}
/* Port start */
status = rte_eth_dev_start(port_id);
if (status < 0)
return NULL;
if (rss) {
status = rss_setup(port_id, port_info.reta_size, rss);
if (status) {
rte_eth_dev_stop(port_id);
return NULL;
}
}
/* Port link up */
status = rte_eth_dev_set_link_up(port_id);
if ((status < 0) && (status != -ENOTSUP)) {
rte_eth_dev_stop(port_id);
return NULL;
}
/* Node allocation */
link = calloc(1, sizeof(struct link));
if (link == NULL) {
rte_eth_dev_stop(port_id);
return NULL;
}
/* Node fill in */
strlcpy(link->name, name, sizeof(link->name));
link->port_id = port_id;
rte_eth_dev_get_name_by_port(port_id, link->dev_name);
link->n_rxq = params->rx.n_queues;
link->n_txq = params->tx.n_queues;
/* Node add to list */
TAILQ_INSERT_TAIL(&obj->link_list, link, node);
return link;
}
int
link_is_up(struct obj *obj, const char *name)
{
struct rte_eth_link link_params;
struct link *link;
/* Check input params */
if (!obj || !name)
return 0;
link = link_find(obj, name);
if (link == NULL)
return 0;
/* Resource */
if (rte_eth_link_get(link->port_id, &link_params) < 0)
return 0;
return (link_params.link_status == ETH_LINK_DOWN) ? 0 : 1;
}
struct link *
link_find(struct obj *obj, const char *name)
{
struct link *link;
if (!obj || !name)
return NULL;
TAILQ_FOREACH(link, &obj->link_list, node)
if (strcmp(link->name, name) == 0)
return link;
return NULL;
}
struct link *
link_next(struct obj *obj, struct link *link)
{
return (link == NULL) ?
TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node);
}
/*
* pipeline
*/
#ifndef PIPELINE_MSGQ_SIZE
#define PIPELINE_MSGQ_SIZE 64
#endif
struct pipeline *
pipeline_create(struct obj *obj, const char *name, int numa_node)
{
struct pipeline *pipeline;
struct rte_swx_pipeline *p = NULL;
int status;
/* Check input params */
if ((name == NULL) ||
pipeline_find(obj, name))
return NULL;
/* Resource create */
status = rte_swx_pipeline_config(&p, numa_node);
if (status)
goto error;
status = rte_swx_pipeline_port_in_type_register(p,
"ethdev",
&rte_swx_port_ethdev_reader_ops);
if (status)
goto error;
status = rte_swx_pipeline_port_out_type_register(p,
"ethdev",
&rte_swx_port_ethdev_writer_ops);
if (status)
goto error;
#ifdef RTE_PORT_PCAP
status = rte_swx_pipeline_port_in_type_register(p,
"source",
&rte_swx_port_source_ops);
if (status)
goto error;
#endif
status = rte_swx_pipeline_port_out_type_register(p,
"sink",
&rte_swx_port_sink_ops);
if (status)
goto error;
status = rte_swx_pipeline_table_type_register(p,
"exact",
RTE_SWX_TABLE_MATCH_EXACT,
&rte_swx_table_exact_match_ops);
if (status)
goto error;
/* Node allocation */
pipeline = calloc(1, sizeof(struct pipeline));
if (pipeline == NULL)
goto error;
/* Node fill in */
strlcpy(pipeline->name, name, sizeof(pipeline->name));
pipeline->p = p;
pipeline->timer_period_ms = 10;
/* Node add to list */
TAILQ_INSERT_TAIL(&obj->pipeline_list, pipeline, node);
return pipeline;
error:
rte_swx_pipeline_free(p);
return NULL;
}
struct pipeline *
pipeline_find(struct obj *obj, const char *name)
{
struct pipeline *pipeline;
if (!obj || !name)
return NULL;
TAILQ_FOREACH(pipeline, &obj->pipeline_list, node)
if (strcmp(name, pipeline->name) == 0)
return pipeline;
return NULL;
}
/*
* obj
*/
struct obj *
obj_init(void)
{
struct obj *obj;
obj = calloc(1, sizeof(struct obj));
if (!obj)
return NULL;
TAILQ_INIT(&obj->mempool_list);
TAILQ_INIT(&obj->link_list);
TAILQ_INIT(&obj->pipeline_list);
return obj;
}

131
examples/pipeline/obj.h Normal file
View File

@ -0,0 +1,131 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#ifndef _INCLUDE_OBJ_H_
#define _INCLUDE_OBJ_H_
#include <stdint.h>
#include <sys/queue.h>
#include <rte_mempool.h>
#include <rte_swx_pipeline.h>
#include <rte_swx_ctl.h>
#ifndef NAME_SIZE
#define NAME_SIZE 64
#endif
/*
* obj
*/
struct obj;
struct obj *
obj_init(void);
/*
* mempool
*/
struct mempool_params {
uint32_t buffer_size;
uint32_t pool_size;
uint32_t cache_size;
uint32_t cpu_id;
};
struct mempool {
TAILQ_ENTRY(mempool) node;
char name[NAME_SIZE];
struct rte_mempool *m;
uint32_t buffer_size;
};
struct mempool *
mempool_create(struct obj *obj,
const char *name,
struct mempool_params *params);
struct mempool *
mempool_find(struct obj *obj,
const char *name);
/*
* link
*/
#ifndef LINK_RXQ_RSS_MAX
#define LINK_RXQ_RSS_MAX 16
#endif
struct link_params_rss {
uint32_t queue_id[LINK_RXQ_RSS_MAX];
uint32_t n_queues;
};
struct link_params {
const char *dev_name;
uint16_t port_id; /**< Valid only when *dev_name* is NULL. */
struct {
uint32_t n_queues;
uint32_t queue_size;
const char *mempool_name;
struct link_params_rss *rss;
} rx;
struct {
uint32_t n_queues;
uint32_t queue_size;
} tx;
int promiscuous;
};
struct link {
TAILQ_ENTRY(link) node;
char name[NAME_SIZE];
char dev_name[NAME_SIZE];
uint16_t port_id;
uint32_t n_rxq;
uint32_t n_txq;
};
struct link *
link_create(struct obj *obj,
const char *name,
struct link_params *params);
int
link_is_up(struct obj *obj, const char *name);
struct link *
link_find(struct obj *obj, const char *name);
struct link *
link_next(struct obj *obj, struct link *link);
/*
* pipeline
*/
struct pipeline {
TAILQ_ENTRY(pipeline) node;
char name[NAME_SIZE];
struct rte_swx_pipeline *p;
struct rte_swx_ctl_pipeline *ctl;
uint32_t timer_period_ms;
int enabled;
uint32_t thread_id;
uint32_t cpu_id;
};
struct pipeline *
pipeline_create(struct obj *obj,
const char *name,
int numa_node);
struct pipeline *
pipeline_find(struct obj *obj, const char *name);
#endif /* _INCLUDE_OBJ_H_ */

549
examples/pipeline/thread.c Normal file
View File

@ -0,0 +1,549 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#include <stdlib.h>
#include <rte_common.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_ring.h>
#include <rte_table_acl.h>
#include <rte_table_array.h>
#include <rte_table_hash.h>
#include <rte_table_lpm.h>
#include <rte_table_lpm_ipv6.h>
#include "obj.h"
#include "thread.h"
#ifndef THREAD_PIPELINES_MAX
#define THREAD_PIPELINES_MAX 256
#endif
#ifndef THREAD_MSGQ_SIZE
#define THREAD_MSGQ_SIZE 64
#endif
#ifndef THREAD_TIMER_PERIOD_MS
#define THREAD_TIMER_PERIOD_MS 100
#endif
/**
* Control thread: data plane thread context
*/
struct thread {
struct rte_ring *msgq_req;
struct rte_ring *msgq_rsp;
uint32_t enabled;
};
static struct thread thread[RTE_MAX_LCORE];
/**
* Data plane threads: context
*/
struct pipeline_data {
struct rte_swx_pipeline *p;
uint64_t timer_period; /* Measured in CPU cycles. */
uint64_t time_next;
};
struct thread_data {
struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
uint32_t n_pipelines;
struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
struct rte_ring *msgq_req;
struct rte_ring *msgq_rsp;
uint64_t timer_period; /* Measured in CPU cycles. */
uint64_t time_next;
uint64_t time_next_min;
} __rte_cache_aligned;
static struct thread_data thread_data[RTE_MAX_LCORE];
/**
* Control thread: data plane thread init
*/
static void
thread_free(void)
{
uint32_t i;
for (i = 0; i < RTE_MAX_LCORE; i++) {
struct thread *t = &thread[i];
if (!rte_lcore_is_enabled(i))
continue;
/* MSGQs */
if (t->msgq_req)
rte_ring_free(t->msgq_req);
if (t->msgq_rsp)
rte_ring_free(t->msgq_rsp);
}
}
int
thread_init(void)
{
uint32_t i;
RTE_LCORE_FOREACH_SLAVE(i) {
char name[NAME_MAX];
struct rte_ring *msgq_req, *msgq_rsp;
struct thread *t = &thread[i];
struct thread_data *t_data = &thread_data[i];
uint32_t cpu_id = rte_lcore_to_socket_id(i);
/* MSGQs */
snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
msgq_req = rte_ring_create(name,
THREAD_MSGQ_SIZE,
cpu_id,
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (msgq_req == NULL) {
thread_free();
return -1;
}
snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
msgq_rsp = rte_ring_create(name,
THREAD_MSGQ_SIZE,
cpu_id,
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (msgq_rsp == NULL) {
thread_free();
return -1;
}
/* Control thread records */
t->msgq_req = msgq_req;
t->msgq_rsp = msgq_rsp;
t->enabled = 1;
/* Data plane thread records */
t_data->n_pipelines = 0;
t_data->msgq_req = msgq_req;
t_data->msgq_rsp = msgq_rsp;
t_data->timer_period =
(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
t_data->time_next_min = t_data->time_next;
}
return 0;
}
static inline int
thread_is_running(uint32_t thread_id)
{
enum rte_lcore_state_t thread_state;
thread_state = rte_eal_get_lcore_state(thread_id);
return (thread_state == RUNNING) ? 1 : 0;
}
/**
* Control thread & data plane threads: message passing
*/
enum thread_req_type {
THREAD_REQ_PIPELINE_ENABLE = 0,
THREAD_REQ_PIPELINE_DISABLE,
THREAD_REQ_MAX
};
struct thread_msg_req {
enum thread_req_type type;
union {
struct {
struct rte_swx_pipeline *p;
uint32_t timer_period_ms;
} pipeline_enable;
struct {
struct rte_swx_pipeline *p;
} pipeline_disable;
};
};
struct thread_msg_rsp {
int status;
};
/**
* Control thread
*/
static struct thread_msg_req *
thread_msg_alloc(void)
{
size_t size = RTE_MAX(sizeof(struct thread_msg_req),
sizeof(struct thread_msg_rsp));
return calloc(1, size);
}
static void
thread_msg_free(struct thread_msg_rsp *rsp)
{
free(rsp);
}
static struct thread_msg_rsp *
thread_msg_send_recv(uint32_t thread_id,
struct thread_msg_req *req)
{
struct thread *t = &thread[thread_id];
struct rte_ring *msgq_req = t->msgq_req;
struct rte_ring *msgq_rsp = t->msgq_rsp;
struct thread_msg_rsp *rsp;
int status;
/* send */
do {
status = rte_ring_sp_enqueue(msgq_req, req);
} while (status == -ENOBUFS);
/* recv */
do {
status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
} while (status != 0);
return rsp;
}
int
thread_pipeline_enable(uint32_t thread_id,
struct obj *obj,
const char *pipeline_name)
{
struct pipeline *p = pipeline_find(obj, pipeline_name);
struct thread *t;
struct thread_msg_req *req;
struct thread_msg_rsp *rsp;
int status;
/* Check input params */
if ((thread_id >= RTE_MAX_LCORE) ||
(p == NULL))
return -1;
t = &thread[thread_id];
if (t->enabled == 0)
return -1;
if (!thread_is_running(thread_id)) {
struct thread_data *td = &thread_data[thread_id];
struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
if (td->n_pipelines >= THREAD_PIPELINES_MAX)
return -1;
/* Data plane thread */
td->p[td->n_pipelines] = p->p;
tdp->p = p->p;
tdp->timer_period =
(rte_get_tsc_hz() * p->timer_period_ms) / 1000;
tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
td->n_pipelines++;
/* Pipeline */
p->thread_id = thread_id;
p->enabled = 1;
return 0;
}
/* Allocate request */
req = thread_msg_alloc();
if (req == NULL)
return -1;
/* Write request */
req->type = THREAD_REQ_PIPELINE_ENABLE;
req->pipeline_enable.p = p->p;
req->pipeline_enable.timer_period_ms = p->timer_period_ms;
/* Send request and wait for response */
rsp = thread_msg_send_recv(thread_id, req);
/* Read response */
status = rsp->status;
/* Free response */
thread_msg_free(rsp);
/* Request completion */
if (status)
return status;
p->thread_id = thread_id;
p->enabled = 1;
return 0;
}
int
thread_pipeline_disable(uint32_t thread_id,
struct obj *obj,
const char *pipeline_name)
{
struct pipeline *p = pipeline_find(obj, pipeline_name);
struct thread *t;
struct thread_msg_req *req;
struct thread_msg_rsp *rsp;
int status;
/* Check input params */
if ((thread_id >= RTE_MAX_LCORE) ||
(p == NULL))
return -1;
t = &thread[thread_id];
if (t->enabled == 0)
return -1;
if (p->enabled == 0)
return 0;
if (p->thread_id != thread_id)
return -1;
if (!thread_is_running(thread_id)) {
struct thread_data *td = &thread_data[thread_id];
uint32_t i;
for (i = 0; i < td->n_pipelines; i++) {
struct pipeline_data *tdp = &td->pipeline_data[i];
if (tdp->p != p->p)
continue;
/* Data plane thread */
if (i < td->n_pipelines - 1) {
struct rte_swx_pipeline *pipeline_last =
td->p[td->n_pipelines - 1];
struct pipeline_data *tdp_last =
&td->pipeline_data[td->n_pipelines - 1];
td->p[i] = pipeline_last;
memcpy(tdp, tdp_last, sizeof(*tdp));
}
td->n_pipelines--;
/* Pipeline */
p->enabled = 0;
break;
}
return 0;
}
/* Allocate request */
req = thread_msg_alloc();
if (req == NULL)
return -1;
/* Write request */
req->type = THREAD_REQ_PIPELINE_DISABLE;
req->pipeline_disable.p = p->p;
/* Send request and wait for response */
rsp = thread_msg_send_recv(thread_id, req);
/* Read response */
status = rsp->status;
/* Free response */
thread_msg_free(rsp);
/* Request completion */
if (status)
return status;
p->enabled = 0;
return 0;
}
/**
* Data plane threads: message handling
*/
static inline struct thread_msg_req *
thread_msg_recv(struct rte_ring *msgq_req)
{
struct thread_msg_req *req;
int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
if (status != 0)
return NULL;
return req;
}
static inline void
thread_msg_send(struct rte_ring *msgq_rsp,
struct thread_msg_rsp *rsp)
{
int status;
do {
status = rte_ring_sp_enqueue(msgq_rsp, rsp);
} while (status == -ENOBUFS);
}
static struct thread_msg_rsp *
thread_msg_handle_pipeline_enable(struct thread_data *t,
struct thread_msg_req *req)
{
struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
/* Request */
if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
rsp->status = -1;
return rsp;
}
t->p[t->n_pipelines] = req->pipeline_enable.p;
p->p = req->pipeline_enable.p;
p->timer_period = (rte_get_tsc_hz() *
req->pipeline_enable.timer_period_ms) / 1000;
p->time_next = rte_get_tsc_cycles() + p->timer_period;
t->n_pipelines++;
/* Response */
rsp->status = 0;
return rsp;
}
static struct thread_msg_rsp *
thread_msg_handle_pipeline_disable(struct thread_data *t,
struct thread_msg_req *req)
{
struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
uint32_t n_pipelines = t->n_pipelines;
struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
uint32_t i;
/* find pipeline */
for (i = 0; i < n_pipelines; i++) {
struct pipeline_data *p = &t->pipeline_data[i];
if (p->p != pipeline)
continue;
if (i < n_pipelines - 1) {
struct rte_swx_pipeline *pipeline_last =
t->p[n_pipelines - 1];
struct pipeline_data *p_last =
&t->pipeline_data[n_pipelines - 1];
t->p[i] = pipeline_last;
memcpy(p, p_last, sizeof(*p));
}
t->n_pipelines--;
rsp->status = 0;
return rsp;
}
/* should not get here */
rsp->status = 0;
return rsp;
}
static void
thread_msg_handle(struct thread_data *t)
{
for ( ; ; ) {
struct thread_msg_req *req;
struct thread_msg_rsp *rsp;
req = thread_msg_recv(t->msgq_req);
if (req == NULL)
break;
switch (req->type) {
case THREAD_REQ_PIPELINE_ENABLE:
rsp = thread_msg_handle_pipeline_enable(t, req);
break;
case THREAD_REQ_PIPELINE_DISABLE:
rsp = thread_msg_handle_pipeline_disable(t, req);
break;
default:
rsp = (struct thread_msg_rsp *) req;
rsp->status = -1;
}
thread_msg_send(t->msgq_rsp, rsp);
}
}
/**
* Data plane threads: main
*/
int
thread_main(void *arg __rte_unused)
{
struct thread_data *t;
uint32_t thread_id, i;
thread_id = rte_lcore_id();
t = &thread_data[thread_id];
/* Dispatch loop */
for (i = 0; ; i++) {
uint32_t j;
/* Data Plane */
for (j = 0; j < t->n_pipelines; j++)
rte_swx_pipeline_run(t->p[j], 1000000);
/* Control Plane */
if ((i & 0xF) == 0) {
uint64_t time = rte_get_tsc_cycles();
uint64_t time_next_min = UINT64_MAX;
if (time < t->time_next_min)
continue;
/* Thread message queues */
{
uint64_t time_next = t->time_next;
if (time_next <= time) {
thread_msg_handle(t);
time_next = time + t->timer_period;
t->time_next = time_next;
}
if (time_next < time_next_min)
time_next_min = time_next;
}
t->time_next_min = time_next_min;
}
}
return 0;
}

View File

@ -0,0 +1,28 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#ifndef _INCLUDE_THREAD_H_
#define _INCLUDE_THREAD_H_
#include <stdint.h>
#include "obj.h"
int
thread_pipeline_enable(uint32_t thread_id,
struct obj *obj,
const char *pipeline_name);
int
thread_pipeline_disable(uint32_t thread_id,
struct obj *obj,
const char *pipeline_name);
int
thread_init(void);
int
thread_main(void *arg);
#endif /* _INCLUDE_THREAD_H_ */