examples/pipeline: add message passing mechanism
Add network-based connectivity mechanism for the application to allow for the exchange of configuration messages through the network as opposed to local CLI only. Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
This commit is contained in:
parent
b77f660028
commit
5f657a7fbe
@ -5,6 +5,7 @@
|
||||
APP = pipeline
|
||||
|
||||
# all source are stored in SRCS-y
|
||||
SRCS-y += conn.c
|
||||
SRCS-y += main.c
|
||||
SRCS-y += obj.c
|
||||
SRCS-y += thread.c
|
||||
|
331
examples/pipeline/conn.c
Normal file
331
examples/pipeline/conn.c
Normal file
@ -0,0 +1,331 @@
|
||||
/* SPDX-License-Identifier: BSD-3-Clause
|
||||
* Copyright(c) 2020 Intel Corporation
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include <sys/epoll.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include "conn.h"
|
||||
|
||||
#define MSG_CMD_TOO_LONG "Command too long."
|
||||
|
||||
struct conn {
|
||||
char *welcome;
|
||||
char *prompt;
|
||||
char *buf;
|
||||
char *msg_in;
|
||||
char *msg_out;
|
||||
size_t buf_size;
|
||||
size_t msg_in_len_max;
|
||||
size_t msg_out_len_max;
|
||||
size_t msg_in_len;
|
||||
int fd_server;
|
||||
int fd_client_group;
|
||||
conn_msg_handle_t msg_handle;
|
||||
void *msg_handle_arg;
|
||||
};
|
||||
|
||||
struct conn *
|
||||
conn_init(struct conn_params *p)
|
||||
{
|
||||
struct sockaddr_in server_address;
|
||||
struct conn *conn;
|
||||
int fd_server, fd_client_group, status;
|
||||
|
||||
memset(&server_address, 0, sizeof(server_address));
|
||||
|
||||
/* Check input arguments */
|
||||
if ((p == NULL) ||
|
||||
(p->welcome == NULL) ||
|
||||
(p->prompt == NULL) ||
|
||||
(p->addr == NULL) ||
|
||||
(p->buf_size == 0) ||
|
||||
(p->msg_in_len_max == 0) ||
|
||||
(p->msg_out_len_max == 0) ||
|
||||
(p->msg_handle == NULL))
|
||||
return NULL;
|
||||
|
||||
status = inet_aton(p->addr, &server_address.sin_addr);
|
||||
if (status == 0)
|
||||
return NULL;
|
||||
|
||||
/* Memory allocation */
|
||||
conn = calloc(1, sizeof(struct conn));
|
||||
if (conn == NULL)
|
||||
return NULL;
|
||||
|
||||
conn->welcome = calloc(1, CONN_WELCOME_LEN_MAX + 1);
|
||||
conn->prompt = calloc(1, CONN_PROMPT_LEN_MAX + 1);
|
||||
conn->buf = calloc(1, p->buf_size);
|
||||
conn->msg_in = calloc(1, p->msg_in_len_max + 1);
|
||||
conn->msg_out = calloc(1, p->msg_out_len_max + 1);
|
||||
|
||||
if ((conn->welcome == NULL) ||
|
||||
(conn->prompt == NULL) ||
|
||||
(conn->buf == NULL) ||
|
||||
(conn->msg_in == NULL) ||
|
||||
(conn->msg_out == NULL)) {
|
||||
conn_free(conn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Server socket */
|
||||
server_address.sin_family = AF_INET;
|
||||
server_address.sin_port = htons(p->port);
|
||||
|
||||
fd_server = socket(AF_INET,
|
||||
SOCK_STREAM | SOCK_NONBLOCK,
|
||||
0);
|
||||
if (fd_server == -1) {
|
||||
conn_free(conn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
status = bind(fd_server,
|
||||
(struct sockaddr *) &server_address,
|
||||
sizeof(server_address));
|
||||
if (status == -1) {
|
||||
conn_free(conn);
|
||||
close(fd_server);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
status = listen(fd_server, 16);
|
||||
if (status == -1) {
|
||||
conn_free(conn);
|
||||
close(fd_server);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Client group */
|
||||
fd_client_group = epoll_create(1);
|
||||
if (fd_client_group == -1) {
|
||||
conn_free(conn);
|
||||
close(fd_server);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Fill in */
|
||||
strncpy(conn->welcome, p->welcome, CONN_WELCOME_LEN_MAX);
|
||||
strncpy(conn->prompt, p->prompt, CONN_PROMPT_LEN_MAX);
|
||||
conn->buf_size = p->buf_size;
|
||||
conn->msg_in_len_max = p->msg_in_len_max;
|
||||
conn->msg_out_len_max = p->msg_out_len_max;
|
||||
conn->msg_in_len = 0;
|
||||
conn->fd_server = fd_server;
|
||||
conn->fd_client_group = fd_client_group;
|
||||
conn->msg_handle = p->msg_handle;
|
||||
conn->msg_handle_arg = p->msg_handle_arg;
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
void
|
||||
conn_free(struct conn *conn)
|
||||
{
|
||||
if (conn == NULL)
|
||||
return;
|
||||
|
||||
if (conn->fd_client_group)
|
||||
close(conn->fd_client_group);
|
||||
|
||||
if (conn->fd_server)
|
||||
close(conn->fd_server);
|
||||
|
||||
free(conn->msg_out);
|
||||
free(conn->msg_in);
|
||||
free(conn->prompt);
|
||||
free(conn->welcome);
|
||||
free(conn);
|
||||
}
|
||||
|
||||
int
|
||||
conn_poll_for_conn(struct conn *conn)
|
||||
{
|
||||
struct sockaddr_in client_address;
|
||||
struct epoll_event event;
|
||||
socklen_t client_address_length;
|
||||
int fd_client, status;
|
||||
|
||||
/* Check input arguments */
|
||||
if (conn == NULL)
|
||||
return -1;
|
||||
|
||||
/* Server socket */
|
||||
client_address_length = sizeof(client_address);
|
||||
fd_client = accept4(conn->fd_server,
|
||||
(struct sockaddr *) &client_address,
|
||||
&client_address_length,
|
||||
SOCK_NONBLOCK);
|
||||
if (fd_client == -1) {
|
||||
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
|
||||
return 0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Client group */
|
||||
event.events = EPOLLIN | EPOLLRDHUP | EPOLLHUP;
|
||||
event.data.fd = fd_client;
|
||||
|
||||
status = epoll_ctl(conn->fd_client_group,
|
||||
EPOLL_CTL_ADD,
|
||||
fd_client,
|
||||
&event);
|
||||
if (status == -1) {
|
||||
close(fd_client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Client */
|
||||
status = write(fd_client,
|
||||
conn->welcome,
|
||||
strlen(conn->welcome));
|
||||
if (status == -1) {
|
||||
close(fd_client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
status = write(fd_client,
|
||||
conn->prompt,
|
||||
strlen(conn->prompt));
|
||||
if (status == -1) {
|
||||
close(fd_client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
data_event_handle(struct conn *conn,
|
||||
int fd_client)
|
||||
{
|
||||
ssize_t len, i, status;
|
||||
|
||||
/* Read input message */
|
||||
|
||||
len = read(fd_client,
|
||||
conn->buf,
|
||||
conn->buf_size);
|
||||
if (len == -1) {
|
||||
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
|
||||
return 0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
if (len == 0)
|
||||
return 0;
|
||||
|
||||
/* Handle input messages */
|
||||
for (i = 0; i < len; i++) {
|
||||
if (conn->buf[i] == '\n') {
|
||||
size_t n;
|
||||
|
||||
conn->msg_in[conn->msg_in_len] = 0;
|
||||
conn->msg_out[0] = 0;
|
||||
|
||||
conn->msg_handle(conn->msg_in,
|
||||
conn->msg_out,
|
||||
conn->msg_out_len_max,
|
||||
conn->msg_handle_arg);
|
||||
|
||||
n = strlen(conn->msg_out);
|
||||
if (n) {
|
||||
status = write(fd_client,
|
||||
conn->msg_out,
|
||||
n);
|
||||
if (status == -1)
|
||||
return status;
|
||||
}
|
||||
|
||||
conn->msg_in_len = 0;
|
||||
} else if (conn->msg_in_len < conn->msg_in_len_max) {
|
||||
conn->msg_in[conn->msg_in_len] = conn->buf[i];
|
||||
conn->msg_in_len++;
|
||||
} else {
|
||||
status = write(fd_client,
|
||||
MSG_CMD_TOO_LONG,
|
||||
strlen(MSG_CMD_TOO_LONG));
|
||||
if (status == -1)
|
||||
return status;
|
||||
|
||||
conn->msg_in_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Write prompt */
|
||||
status = write(fd_client,
|
||||
conn->prompt,
|
||||
strlen(conn->prompt));
|
||||
if (status == -1)
|
||||
return status;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
control_event_handle(struct conn *conn,
|
||||
int fd_client)
|
||||
{
|
||||
int status;
|
||||
|
||||
status = epoll_ctl(conn->fd_client_group,
|
||||
EPOLL_CTL_DEL,
|
||||
fd_client,
|
||||
NULL);
|
||||
if (status == -1)
|
||||
return -1;
|
||||
|
||||
status = close(fd_client);
|
||||
if (status == -1)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
conn_poll_for_msg(struct conn *conn)
|
||||
{
|
||||
struct epoll_event event;
|
||||
int fd_client, status, status_data = 0, status_control = 0;
|
||||
|
||||
/* Check input arguments */
|
||||
if (conn == NULL)
|
||||
return -1;
|
||||
|
||||
/* Client group */
|
||||
status = epoll_wait(conn->fd_client_group,
|
||||
&event,
|
||||
1,
|
||||
0);
|
||||
if (status == -1)
|
||||
return -1;
|
||||
if (status == 0)
|
||||
return 0;
|
||||
|
||||
fd_client = event.data.fd;
|
||||
|
||||
/* Data available */
|
||||
if (event.events & EPOLLIN)
|
||||
status_data = data_event_handle(conn, fd_client);
|
||||
|
||||
/* Control events */
|
||||
if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))
|
||||
status_control = control_event_handle(conn, fd_client);
|
||||
|
||||
if (status_data || status_control)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
50
examples/pipeline/conn.h
Normal file
50
examples/pipeline/conn.h
Normal file
@ -0,0 +1,50 @@
|
||||
/* SPDX-License-Identifier: BSD-3-Clause
|
||||
* Copyright(c) 2020 Intel Corporation
|
||||
*/
|
||||
|
||||
#ifndef __INCLUDE_CONN_H__
|
||||
#define __INCLUDE_CONN_H__
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
struct conn;
|
||||
|
||||
#ifndef CONN_WELCOME_LEN_MAX
|
||||
#define CONN_WELCOME_LEN_MAX 1024
|
||||
#endif
|
||||
|
||||
#ifndef CONN_PROMPT_LEN_MAX
|
||||
#define CONN_PROMPT_LEN_MAX 16
|
||||
#endif
|
||||
|
||||
typedef void
|
||||
(*conn_msg_handle_t)(char *msg_in,
|
||||
char *msg_out,
|
||||
size_t msg_out_len_max,
|
||||
void *arg);
|
||||
|
||||
struct conn_params {
|
||||
const char *welcome;
|
||||
const char *prompt;
|
||||
const char *addr;
|
||||
uint16_t port;
|
||||
size_t buf_size;
|
||||
size_t msg_in_len_max;
|
||||
size_t msg_out_len_max;
|
||||
conn_msg_handle_t msg_handle;
|
||||
void *msg_handle_arg;
|
||||
};
|
||||
|
||||
struct conn *
|
||||
conn_init(struct conn_params *p);
|
||||
|
||||
void
|
||||
conn_free(struct conn *conn);
|
||||
|
||||
int
|
||||
conn_poll_for_conn(struct conn *conn);
|
||||
|
||||
int
|
||||
conn_poll_for_msg(struct conn *conn);
|
||||
|
||||
#endif
|
@ -11,15 +11,136 @@
|
||||
#include <rte_launch.h>
|
||||
#include <rte_eal.h>
|
||||
|
||||
#include "conn.h"
|
||||
#include "obj.h"
|
||||
#include "thread.h"
|
||||
|
||||
static const char usage[] =
|
||||
"%s EAL_ARGS -- [-h HOST] [-p PORT] [-s SCRIPT]\n";
|
||||
|
||||
static struct app_params {
|
||||
struct conn_params conn;
|
||||
char *script_name;
|
||||
} app = {
|
||||
.conn = {
|
||||
.welcome = "\nWelcome!\n\n",
|
||||
.prompt = "pipeline> ",
|
||||
.addr = "0.0.0.0",
|
||||
.port = 8086,
|
||||
.buf_size = 1024 * 1024,
|
||||
.msg_in_len_max = 1024,
|
||||
.msg_out_len_max = 1024 * 1024,
|
||||
.msg_handle = NULL,
|
||||
.msg_handle_arg = NULL, /* set later. */
|
||||
},
|
||||
.script_name = NULL,
|
||||
};
|
||||
|
||||
static int
|
||||
parse_args(int argc, char **argv)
|
||||
{
|
||||
char *app_name = argv[0];
|
||||
struct option lgopts[] = {
|
||||
{ NULL, 0, 0, 0 }
|
||||
};
|
||||
int opt, option_index;
|
||||
int h_present, p_present, s_present, n_args, i;
|
||||
|
||||
/* Skip EAL input args */
|
||||
n_args = argc;
|
||||
for (i = 0; i < n_args; i++)
|
||||
if (strcmp(argv[i], "--") == 0) {
|
||||
argc -= i;
|
||||
argv += i;
|
||||
break;
|
||||
}
|
||||
|
||||
if (i == n_args)
|
||||
return 0;
|
||||
|
||||
/* Parse args */
|
||||
h_present = 0;
|
||||
p_present = 0;
|
||||
s_present = 0;
|
||||
|
||||
while ((opt = getopt_long(argc, argv, "h:p:s:", lgopts, &option_index))
|
||||
!= EOF)
|
||||
switch (opt) {
|
||||
case 'h':
|
||||
if (h_present) {
|
||||
printf("Error: Multiple -h arguments\n");
|
||||
return -1;
|
||||
}
|
||||
h_present = 1;
|
||||
|
||||
if (!strlen(optarg)) {
|
||||
printf("Error: Argument for -h not provided\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
app.conn.addr = strdup(optarg);
|
||||
if (app.conn.addr == NULL) {
|
||||
printf("Error: Not enough memory\n");
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
|
||||
case 'p':
|
||||
if (p_present) {
|
||||
printf("Error: Multiple -p arguments\n");
|
||||
return -1;
|
||||
}
|
||||
p_present = 1;
|
||||
|
||||
if (!strlen(optarg)) {
|
||||
printf("Error: Argument for -p not provided\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
app.conn.port = (uint16_t) atoi(optarg);
|
||||
break;
|
||||
|
||||
case 's':
|
||||
if (s_present) {
|
||||
printf("Error: Multiple -s arguments\n");
|
||||
return -1;
|
||||
}
|
||||
s_present = 1;
|
||||
|
||||
if (!strlen(optarg)) {
|
||||
printf("Error: Argument for -s not provided\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
app.script_name = strdup(optarg);
|
||||
if (app.script_name == NULL) {
|
||||
printf("Error: Not enough memory\n");
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
printf(usage, app_name);
|
||||
return -1;
|
||||
}
|
||||
|
||||
optind = 1; /* reset getopt lib */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
main(int argc, char **argv)
|
||||
{
|
||||
struct conn *conn;
|
||||
struct obj *obj;
|
||||
int status;
|
||||
|
||||
/* Parse application arguments */
|
||||
status = parse_args(argc, argv);
|
||||
if (status < 0)
|
||||
return status;
|
||||
|
||||
/* EAL */
|
||||
status = rte_eal_init(argc, argv);
|
||||
if (status < 0) {
|
||||
@ -46,5 +167,19 @@ main(int argc, char **argv)
|
||||
NULL,
|
||||
SKIP_MASTER);
|
||||
|
||||
return 0;
|
||||
/* Connectivity */
|
||||
app.conn.msg_handle_arg = obj;
|
||||
conn = conn_init(&app.conn);
|
||||
if (!conn) {
|
||||
printf("Error: Connectivity initialization failed (%d)\n",
|
||||
status);
|
||||
return status;
|
||||
};
|
||||
|
||||
/* Dispatch loop */
|
||||
for ( ; ; ) {
|
||||
conn_poll_for_conn(conn);
|
||||
|
||||
conn_poll_for_msg(conn);
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ build = cc.has_header('sys/epoll.h')
|
||||
deps += ['pipeline', 'bus_pci']
|
||||
allow_experimental_apis = true
|
||||
sources = files(
|
||||
'conn.c',
|
||||
'main.c',
|
||||
'obj.c',
|
||||
'thread.c',
|
||||
|
Loading…
x
Reference in New Issue
Block a user