pipeline: support I/O specification

Add specification data structure and API for the pipeline I/O ports
and related pipeline configuration such as packet mirroring.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
Signed-off-by: Kamalakannan R <kamalakannan.r@intel.com>
This commit is contained in:
Cristian Dumitrescu 2022-07-28 15:11:36 +00:00 committed by Thomas Monjalon
parent 894c93e14e
commit 54cae37ef4
2 changed files with 910 additions and 0 deletions

View File

@ -9,6 +9,12 @@
#include <errno.h>
#include <rte_common.h>
#include <rte_mempool.h>
#include <rte_swx_port_ethdev.h>
#include <rte_swx_port_ring.h>
#include <rte_swx_port_source_sink.h>
#include <rte_swx_port_fd.h>
#include "rte_swx_pipeline_spec.h"
@ -3566,3 +3572,849 @@ rte_swx_pipeline_build_from_spec(struct rte_swx_pipeline *p,
pipeline_spec_free(s);
return status;
}
static void
port_in_params_free(void *params, const char *port_type)
{
uintptr_t dev_name;
if (!params || !port_type)
return;
if (!strcmp(port_type, "ethdev")) {
struct rte_swx_port_ethdev_reader_params *p = params;
dev_name = (uintptr_t)p->dev_name;
} else if (!strcmp(port_type, "ring")) {
struct rte_swx_port_ring_reader_params *p = params;
dev_name = (uintptr_t)p->name;
} else if (!strcmp(port_type, "source")) {
struct rte_swx_port_source_params *p = params;
dev_name = (uintptr_t)p->file_name;
} else
dev_name = (uintptr_t)NULL;
free((void *)dev_name);
free(params);
}
static void
port_out_params_free(void *params, const char *port_type)
{
uintptr_t dev_name;
if (!params || !port_type)
return;
if (!strcmp(port_type, "ethdev")) {
struct rte_swx_port_ethdev_writer_params *p = params;
dev_name = (uintptr_t)p->dev_name;
} else if (!strcmp(port_type, "ring")) {
struct rte_swx_port_ring_writer_params *p = params;
dev_name = (uintptr_t)p->name;
} else if (!strcmp(port_type, "sink")) {
struct rte_swx_port_sink_params *p = params;
dev_name = (uintptr_t)p->file_name;
} else
dev_name = (uintptr_t)NULL;
free((void *)dev_name);
free(params);
}
void
pipeline_iospec_free(struct pipeline_iospec *s)
{
uint32_t i;
if (!s)
return;
/* Input ports. */
for (i = 0; i < s->n_ports_in; i++) {
uintptr_t name = (uintptr_t)s->port_in_type[i];
port_in_params_free(s->port_in_params[i], s->port_in_type[i]);
free((void *)name);
}
free(s->port_in_type);
free(s->port_in_params);
/* Output ports. */
for (i = 0; i < s->n_ports_out; i++) {
uintptr_t name = (uintptr_t)s->port_out_type[i];
port_out_params_free(s->port_out_params[i], s->port_out_type[i]);
free((void *)name);
}
free(s->port_out_type);
free(s->port_out_params);
free(s);
}
static int
mirroring_parse(struct rte_swx_pipeline_mirroring_params *p,
char **tokens,
uint32_t n_tokens,
const char **err_msg)
{
char *token;
if ((n_tokens != 4) || strcmp(tokens[0], "slots") || strcmp(tokens[2], "sessions")) {
if (err_msg)
*err_msg = "Invalid statement.";
return -EINVAL;
}
/* <n_slots>. */
token = tokens[1];
p->n_slots = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <n_slots> parameter.";
return -EINVAL;
}
/* <n_sessions>. */
token = tokens[3];
p->n_sessions = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <n_sessions> parameter.";
return -EINVAL;
}
return 0;
}
static void *
port_in_ethdev_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_ethdev_reader_params *p = NULL;
char *token, *dev_name = NULL;
uint32_t queue_id, burst_size;
if ((n_tokens != 5) || strcmp(tokens[1], "rxq") || strcmp(tokens[3], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <queue_id>. */
token = tokens[2];
queue_id = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <queue_id> parameter.";
return NULL;
}
/* <burst_size>. */
token = tokens[4];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
dev_name = strdup(tokens[0]);
p = malloc(sizeof(struct rte_swx_port_ethdev_reader_params));
if (!dev_name || !p) {
free(dev_name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->dev_name = dev_name;
p->queue_id = queue_id;
p->burst_size = burst_size;
return p;
}
static void *
port_in_ring_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_ring_reader_params *p = NULL;
char *token, *name = NULL;
uint32_t burst_size;
if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <burst_size>. */
token = tokens[2];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
name = strdup(tokens[0]);
p = malloc(sizeof(struct rte_swx_port_ring_reader_params));
if (!name || !p) {
free(name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->name = name;
p->burst_size = burst_size;
return p;
}
static void *
port_in_source_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_source_params *p = NULL;
struct rte_mempool *pool = NULL;
char *token, *file_name = NULL;
uint32_t n_loops, n_pkts_max;
if ((n_tokens != 8) ||
strcmp(tokens[0], "mempool") ||
strcmp(tokens[2], "file") ||
strcmp(tokens[4], "loop") ||
strcmp(tokens[6], "packets")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <mempool_name>. */
pool = rte_mempool_lookup(tokens[1]);
if (!pool) {
if (err_msg)
*err_msg = "Invalid <mempool_name> parameter.";
return NULL;
}
/* <n_loops>. */
token = tokens[5];
n_loops = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <n_loops> parameter.";
return NULL;
}
/* <n_pkts_max>. */
token = tokens[7];
n_pkts_max = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <n_pkts_max> parameter.";
return NULL;
}
/* Memory allocation. */
file_name = strdup(tokens[3]);
p = malloc(sizeof(struct rte_swx_port_source_params));
if (!file_name || !p) {
free(file_name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->pool = pool;
p->file_name = file_name;
p->n_loops = n_loops;
p->n_pkts_max = n_pkts_max;
return p;
}
static void *
port_in_fd_parse(char **tokens,
uint32_t n_tokens,
const char **err_msg)
{
struct rte_swx_port_fd_reader_params *p = NULL;
struct rte_mempool *mempool = NULL;
char *token;
uint32_t mtu, burst_size;
int fd;
if ((n_tokens != 7) ||
strcmp(tokens[1], "mtu") ||
strcmp(tokens[3], "mempool") ||
strcmp(tokens[5], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <file_descriptor>. */
token = tokens[0];
fd = strtol(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <file_descriptor> parameter.";
return NULL;
}
/* <mtu>. */
token = tokens[2];
mtu = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <mtu> parameter.";
return NULL;
}
/* <mempool_name>. */
mempool = rte_mempool_lookup(tokens[4]);
if (!mempool) {
if (err_msg)
*err_msg = "Invalid <mempool_name> parameter.";
return NULL;
}
/* <burst_size>. */
token = tokens[6];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
p = malloc(sizeof(struct rte_swx_port_fd_reader_params));
if (!p) {
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->fd = fd;
p->mtu = mtu;
p->mempool = mempool;
p->burst_size = burst_size;
return p;
}
static void *
port_out_ethdev_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_ethdev_writer_params *p = NULL;
char *token, *dev_name = NULL;
uint32_t queue_id, burst_size;
if ((n_tokens != 5) || strcmp(tokens[1], "txq") || strcmp(tokens[3], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <queue_id>. */
token = tokens[2];
queue_id = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <queue_id> parameter.";
return NULL;
}
/* <burst_size>. */
token = tokens[4];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
dev_name = strdup(tokens[0]);
p = malloc(sizeof(struct rte_swx_port_ethdev_writer_params));
if (!dev_name || !p) {
free(dev_name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->dev_name = dev_name;
p->queue_id = queue_id;
p->burst_size = burst_size;
return p;
}
static void *
port_out_ring_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_ring_writer_params *p = NULL;
char *token, *name = NULL;
uint32_t burst_size;
if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <burst_size>. */
token = tokens[2];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
name = strdup(tokens[0]);
p = malloc(sizeof(struct rte_swx_port_ring_writer_params));
if (!name || !p) {
free(name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->name = name;
p->burst_size = burst_size;
return p;
}
static void *
port_out_sink_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
{
struct rte_swx_port_sink_params *p = NULL;
char *file_name = NULL;
int file_name_valid = 0;
if ((n_tokens != 2) || strcmp(tokens[0], "file")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* Memory allocation. */
if (strcmp(tokens[1], "none")) {
file_name_valid = 1;
file_name = strdup(tokens[1]);
}
p = malloc(sizeof(struct rte_swx_port_ring_writer_params));
if ((file_name_valid && !file_name) || !p) {
free(file_name);
free(p);
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->file_name = file_name;
return p;
}
static void *
port_out_fd_parse(char **tokens,
uint32_t n_tokens,
const char **err_msg)
{
struct rte_swx_port_fd_writer_params *p = NULL;
char *token;
uint32_t burst_size;
int fd;
if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
if (err_msg)
*err_msg = "Invalid statement.";
return NULL;
}
/* <file_descriptor>. */
token = tokens[0];
fd = strtol(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <file_descriptor> parameter.";
return NULL;
}
/* <burst_size>. */
token = tokens[2];
burst_size = strtoul(token, &token, 0);
if (token[0]) {
if (err_msg)
*err_msg = "Invalid <burst_size> parameter.";
return NULL;
}
/* Memory allocation. */
p = malloc(sizeof(struct rte_swx_port_fd_writer_params));
if (!p) {
if (err_msg)
*err_msg = "Memory allocation failed.";
return NULL;
}
/* Initialization. */
p->fd = fd;
p->burst_size = burst_size;
return p;
}
struct pipeline_iospec *
pipeline_iospec_parse(FILE *spec,
uint32_t *err_line,
const char **err_msg)
{
struct pipeline_iospec *s = NULL;
uint32_t n_lines = 0;
/* Check the input arguments. */
if (!spec) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Invalid input argument.";
goto error;
}
/* Memory allocation. */
s = calloc(sizeof(struct pipeline_iospec), 1);
if (!s) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Memory allocation failed.";
goto error;
}
/* Initialize with the default values. */
s->mirroring_params.n_slots = RTE_SWX_PACKET_MIRRORING_SLOTS_DEFAULT;
s->mirroring_params.n_sessions = RTE_SWX_PACKET_MIRRORING_SESSIONS_DEFAULT;
for (n_lines = 1; ; n_lines++) {
char line[MAX_LINE_LENGTH];
char *tokens[MAX_TOKENS], *ptr = line;
uint32_t n_tokens = 0;
/* Read next line. */
if (!fgets(line, sizeof(line), spec))
break;
/* Parse the line into tokens. */
for ( ; ; ) {
char *token;
/* Get token. */
token = strtok_r(ptr, " \f\n\r\t\v", &ptr);
if (!token)
break;
/* Handle comments. */
if ((token[0] == '#') ||
(token[0] == ';') ||
((token[0] == '/') && (token[1] == '/'))) {
break;
}
/* Handle excessively long lines. */
if (n_tokens >= RTE_DIM(tokens)) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Too many tokens.";
goto error;
}
/* Handle excessively long tokens. */
if (strnlen(token, RTE_SWX_NAME_SIZE) >=
RTE_SWX_NAME_SIZE) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Token too big.";
goto error;
}
/* Save token. */
tokens[n_tokens] = token;
n_tokens++;
}
/* Handle empty lines. */
if (!n_tokens)
continue;
/* mirroring. */
if ((n_tokens >= 1) && !strcmp(tokens[0], "mirroring")) {
int status = 0;
status = mirroring_parse(&s->mirroring_params,
&tokens[1],
n_tokens - 1,
err_msg);
if (status) {
if (err_line)
*err_line = n_lines;
goto error;
}
continue;
}
/* port in. */
if ((n_tokens >= 4) && !strcmp(tokens[0], "port") && !strcmp(tokens[1], "in")) {
char *token = tokens[2];
uint32_t *new_id = NULL;
const char **new_type = NULL, *port_type = NULL;
void **new_params = NULL, *p = NULL;
uint32_t port_id;
/* <port_id>. */
port_id = strtoul(token, &token, 0);
if (token[0]) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Invalid port ID.";
goto error;
}
/* <port_type>. */
if (!strcmp(tokens[3], "ethdev"))
p = port_in_ethdev_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "ring"))
p = port_in_ring_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "source"))
p = port_in_source_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "fd"))
p = port_in_fd_parse(&tokens[4], n_tokens - 4, err_msg);
else {
p = NULL;
if (err_msg)
*err_msg = "Invalid port type.";
}
if (!p) {
if (err_line)
*err_line = n_lines;
goto error;
}
/* New port. */
port_type = strdup(tokens[3]);
new_id = realloc(s->port_in_id,
(s->n_ports_in + 1) * sizeof(uint32_t));
new_type = realloc(s->port_in_type,
(s->n_ports_in + 1) * sizeof(char *));
new_params = realloc(s->port_in_params,
(s->n_ports_in + 1) * sizeof(void *));
if (!port_type || !new_id || !new_type || !new_params) {
uintptr_t pt = (uintptr_t)port_type;
port_in_params_free(p, tokens[3]);
free((void *)pt);
free(new_id);
free(new_type);
free(new_params);
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Memory allocation failed.";
goto error;
}
s->port_in_id = new_id;
s->port_in_type = new_type;
s->port_in_params = new_params;
s->port_in_id[s->n_ports_in] = port_id;
s->port_in_type[s->n_ports_in] = port_type;
s->port_in_params[s->n_ports_in] = p;
s->n_ports_in++;
continue;
}
/* port out. */
if ((n_tokens >= 4) && !strcmp(tokens[0], "port") && !strcmp(tokens[1], "out")) {
char *token = tokens[2];
uint32_t *new_id = NULL;
const char **new_type = NULL, *port_type = NULL;
void **new_params = NULL, *p = NULL;
uint32_t port_id;
/* <port_id>. */
port_id = strtoul(token, &token, 0);
if (token[0]) {
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Invalid port ID.";
goto error;
}
/* <port_type>. */
if (!strcmp(tokens[3], "ethdev"))
p = port_out_ethdev_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "ring"))
p = port_out_ring_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "sink"))
p = port_out_sink_parse(&tokens[4], n_tokens - 4, err_msg);
else if (!strcmp(tokens[3], "fd"))
p = port_out_fd_parse(&tokens[4], n_tokens - 4, err_msg);
else {
p = NULL;
if (err_msg)
*err_msg = "Invalid port type.";
}
if (!p) {
if (err_line)
*err_line = n_lines;
goto error;
}
/* New port. */
port_type = strdup(tokens[3]);
new_id = realloc(s->port_out_id,
(s->n_ports_out + 1) * sizeof(uint32_t));
new_type = realloc(s->port_out_type,
(s->n_ports_out + 1) * sizeof(char *));
new_params = realloc(s->port_out_params,
(s->n_ports_out + 1) * sizeof(void *));
if (!port_type || !new_id || !new_type || !new_params) {
uintptr_t pt = (uintptr_t)port_type;
port_out_params_free(p, tokens[3]);
free((void *)pt);
free(new_id);
free(new_type);
free(new_params);
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Memory allocation failed.";
goto error;
}
s->port_out_id = new_id;
s->port_out_type = new_type;
s->port_out_params = new_params;
s->port_out_id[s->n_ports_out] = port_id;
s->port_out_type[s->n_ports_out] = port_type;
s->port_out_params[s->n_ports_out] = p;
s->n_ports_out++;
continue;
}
/* Anything else. */
if (err_line)
*err_line = n_lines;
if (err_msg)
*err_msg = "Unknown I/O statement.";
goto error;
}
return s;
error:
pipeline_iospec_free(s);
return NULL;
}
int
pipeline_iospec_configure(struct rte_swx_pipeline *p,
struct pipeline_iospec *s,
const char **err_msg)
{
uint32_t i;
int status = 0;
/* Check input arguments. */
if (!p || !s) {
if (err_msg)
*err_msg = "Invalid input argument";
return -EINVAL;
}
/* Mirroring. */
status = rte_swx_pipeline_mirroring_config(p, &s->mirroring_params);
if (status) {
if (err_msg)
*err_msg = "Pipeline mirroring configuration error.";
return status;
}
/* Input ports. */
for (i = 0; i < s->n_ports_in; i++) {
status = rte_swx_pipeline_port_in_config(p,
i,
s->port_in_type[i],
s->port_in_params[i]);
if (status) {
if (err_msg)
*err_msg = "Pipeline input port configuration error.";
return status;
}
}
/* Output ports. */
for (i = 0; i < s->n_ports_out; i++) {
status = rte_swx_pipeline_port_out_config(p,
i,
s->port_out_type[i],
s->port_out_params[i]);
if (status) {
if (err_msg)
*err_msg = "Pipeline output port configuration error.";
return status;
}
}
return 0;
}

View File

@ -1,6 +1,13 @@
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2022 Intel Corporation
*/
#ifndef __INCLUDE_RTE_SWX_PIPELINE_SPEC_H__
#define __INCLUDE_RTE_SWX_PIPELINE_SPEC_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
@ -204,6 +211,38 @@ struct pipeline_spec {
uint32_t n_apply;
};
/*
* Mirroring:
* mirroring slots <n_slots> sessions <n_sessions>
*
* Input ports:
* port in <port_id> ethdev <ethdev_name> rxq <queue_id> bsz <burst_size>
* port in <port_id> ring <ring_name> bsz <burst_size>
* port in <port_id> source mempool <mempool_name> file <file_name> loop <n_loops>
* packets <n_pkts_max>
* port in <port_id> fd <file_descriptor> mtu <mtu> mempool <mempool_name> bsz <burst_size>
*
* Output ports:
* port out <port_id> ethdev <ethdev_name> txq <queue_id> bsz <burst_size>
* port out <port_id> ring <ring_name> bsz <burst_size>
* port out <port_id> sink file <file_name> | none
* port out <port_id> fd <file_descriptor> bsz <burst_size>
*/
struct pipeline_iospec {
struct rte_swx_pipeline_mirroring_params mirroring_params;
uint32_t *port_in_id;
const char **port_in_type;
void **port_in_params;
uint32_t *port_out_id;
const char **port_out_type;
void **port_out_params;
uint32_t n_ports_in;
uint32_t n_ports_out;
};
void
pipeline_spec_free(struct pipeline_spec *s);
@ -220,3 +259,22 @@ int
pipeline_spec_configure(struct rte_swx_pipeline *p,
struct pipeline_spec *s,
const char **err_msg);
void
pipeline_iospec_free(struct pipeline_iospec *s);
struct pipeline_iospec *
pipeline_iospec_parse(FILE *spec,
uint32_t *err_line,
const char **err_msg);
int
pipeline_iospec_configure(struct rte_swx_pipeline *p,
struct pipeline_iospec *s,
const char **err_msg);
#ifdef __cplusplus
}
#endif
#endif