netmap: lb: switch to libnetmap

Use the newer libnetmap (included in base) rather than the older
nm_open()/nm_close() defined in netmap_user.h

MFC after:	3 days
This commit is contained in:
Vincenzo Maffione 2020-11-22 09:28:50 +00:00
parent 4bfe1a4fe2
commit 73b2e3e56b
2 changed files with 123 additions and 55 deletions

View File

@ -23,20 +23,21 @@
* SUCH DAMAGE.
*/
/* $FreeBSD$ */
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdbool.h>
#include <errno.h>
#include <inttypes.h>
#include <syslog.h>
#define NETMAP_WITH_LIBS
#include <net/netmap_user.h>
#include <sys/poll.h>
#include <libnetmap.h>
#include <netinet/in.h> /* htonl */
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <syslog.h>
#include <sys/ioctl.h>
#include <sys/poll.h>
#include <unistd.h>
#include "pkt_hash.h"
#include "ctrs.h"
@ -86,12 +87,12 @@ struct compact_ipv6_hdr {
#define DEF_BATCH 2048
#define DEF_WAIT_LINK 2
#define DEF_STATS_INT 600
#define BUF_REVOKE 100
#define BUF_REVOKE 150
#define STAT_MSG_MAXSIZE 1024
static struct {
char ifname[MAX_IFNAMELEN];
char base_name[MAX_IFNAMELEN];
char ifname[MAX_IFNAMELEN + 1];
char base_name[MAX_IFNAMELEN + 1];
int netmap_fd;
uint16_t output_rings;
uint16_t num_groups;
@ -173,7 +174,7 @@ struct port_des {
unsigned int last_sync;
uint32_t last_tail;
struct overflow_queue *oq;
struct nm_desc *nmd;
struct nmport_d *nmd;
struct netmap_ring *ring;
struct group_des *group;
};
@ -375,7 +376,7 @@ free_buffers(void)
D("added %d buffers to netmap free list", tot);
for (i = 0; i < glob_arg.output_rings + 1; ++i) {
nm_close(ports[i].nmd);
nmport_close(ports[i].nmd);
}
}
@ -480,6 +481,28 @@ init_groups(void)
g->last = 1;
}
/* To support packets that span multiple slots (NS_MOREFRAG) we
* need to make sure of the following:
*
* - all fragments of the same packet must go to the same output pipe
* - when dropping, all fragments of the same packet must be dropped
*
* For the former point we remember and reuse the last hash computed
* in each input ring, and only update it when NS_MOREFRAG was not
* set in the last received slot (this marks the start of a new packet).
*
* For the latter point, we only update the output ring head pointer
* when an entire packet has been forwarded. We keep a shadow_head
* pointer to know where to put the next partial fragment and,
* when the need to drop arises, we roll it back to head.
*/
struct morefrag {
uint16_t last_flag; /* for intput rings */
uint32_t last_hash; /* for input rings */
uint32_t shadow_head; /* for output rings */
};
/* push the packet described by slot rs to the group g.
* This may cause other buffers to be pushed down the
* chain headed by g.
@ -493,21 +516,28 @@ forward_packet(struct group_des *g, struct netmap_slot *rs)
struct port_des *port = &g->ports[output_port];
struct netmap_ring *ring = port->ring;
struct overflow_queue *q = port->oq;
struct morefrag *mf = (struct morefrag *)ring->sem;
uint16_t curmf = rs->flags & NS_MOREFRAG;
/* Move the packet to the output pipe, unless there is
* either no space left on the ring, or there is some
* packet still in the overflow queue (since those must
* take precedence over the new one)
*/
if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
struct netmap_slot *ts = &ring->slot[ring->head];
if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) {
struct netmap_slot *ts = &ring->slot[mf->shadow_head];
struct netmap_slot old_slot = *ts;
ts->buf_idx = rs->buf_idx;
ts->len = rs->len;
ts->flags |= NS_BUF_CHANGED;
ts->flags = rs->flags | NS_BUF_CHANGED;
ts->ptr = rs->ptr;
ring->head = nm_ring_next(ring, ring->head);
mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
if (!curmf) {
ring->head = mf->shadow_head;
}
ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u",
curmf, ts->flags, mf->shadow_head, ring->head, ring->tail);
port->ctr.bytes += rs->len;
port->ctr.pkts++;
forwarded++;
@ -516,9 +546,20 @@ forward_packet(struct group_des *g, struct netmap_slot *rs)
/* use the overflow queue, if available */
if (q == NULL || oq_full(q)) {
uint32_t scan;
/* no space left on the ring and no overflow queue
* available: we are forced to drop the packet
*/
/* drop previous fragments, if any */
for (scan = ring->head; scan != mf->shadow_head;
scan = nm_ring_next(ring, scan)) {
struct netmap_slot *ts = &ring->slot[scan];
dropped++;
port->ctr.drop_bytes += ts->len;
}
mf->shadow_head = ring->head;
dropped++;
port->ctr.drop++;
port->ctr.drop_bytes += rs->len;
@ -550,9 +591,12 @@ forward_packet(struct group_des *g, struct netmap_slot *rs)
/* move the oldest BUF_REVOKE buffers from the
* lp queue to the free queue
*
* We cannot revoke a partially received packet.
* To make thinks simple we make sure to leave
* at least NETMAP_MAX_FRAGS slots in the queue.
*/
// XXX optimize this cycle
for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) {
struct netmap_slot tmp = oq_deq(lp->oq);
dropped++;
@ -651,14 +695,6 @@ int main(int argc, char **argv)
return 1;
}
/* extract the base name */
char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
glob_arg.ifname : glob_arg.ifname + 7;
strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1);
for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
;
*nscan = '\0';
if (glob_arg.num_groups == 0)
parse_pipes("");
@ -678,6 +714,15 @@ int main(int argc, char **argv)
return 1;
}
struct port_des *rxport = &ports[npipes];
rxport->nmd = nmport_prepare(glob_arg.ifname);
if (rxport->nmd == NULL) {
D("cannot parse %s", glob_arg.ifname);
return (1);
}
/* extract the base name */
strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN);
init_groups();
memset(&counters_buf, 0, sizeof(counters_buf));
@ -687,24 +732,15 @@ int main(int argc, char **argv)
return 1;
}
/* we need base_req to specify pipes and extra bufs */
struct nmreq base_req;
memset(&base_req, 0, sizeof(base_req));
rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs;
base_req.nr_arg1 = npipes;
base_req.nr_arg3 = glob_arg.extra_bufs;
rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
if (rxport->nmd == NULL) {
if (nmport_open_desc(rxport->nmd) < 0) {
D("cannot open %s", glob_arg.ifname);
return (1);
} else {
D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
rxport->nmd->req.nr_tx_slots);
}
D("successfully opened %s", glob_arg.ifname);
uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs;
struct overflow_queue *oq = NULL;
/* reference ring to access the buffers */
rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
@ -772,25 +808,31 @@ run:
snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
(strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
g->pipename, g->first_id + k,
rxport->nmd->req.nr_arg2);
rxport->nmd->reg.nr_mem_id);
D("opening pipe named %s", p->interface);
p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
p->nmd = nmport_open(p->interface);
if (p->nmd == NULL) {
D("cannot open %s", p->interface);
return (1);
} else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
} else if (p->nmd->mem != rxport->nmd->mem) {
D("failed to open pipe #%d in zero-copy mode, "
"please close any application that uses either pipe %s}%d, "
"or %s{%d, and retry",
k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
return (1);
} else {
struct morefrag *mf;
D("successfully opened pipe #%d %s (tx slots: %d)",
k + 1, p->interface, p->nmd->req.nr_tx_slots);
k + 1, p->interface, p->nmd->reg.nr_tx_slots);
p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
p->last_tail = nm_ring_next(p->ring, p->ring->tail);
mf = (struct morefrag *)p->ring->sem;
mf->last_flag = 0; /* unused */
mf->last_hash = 0; /* unused */
mf->shadow_head = p->ring->head;
}
D("zerocopy %s",
(rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
@ -843,6 +885,16 @@ run:
if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
poll_timeout = glob_arg.stdout_interval;
/* initialize the morefrag structures for the input rings */
for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
struct morefrag *mf = (struct morefrag *)rxring->sem;
mf->last_flag = 0;
mf->last_hash = 0;
mf->shadow_head = 0; /* unused */
}
while (!do_abort) {
u_int polli = 0;
iter++;
@ -871,7 +923,7 @@ run:
pollfd[polli].revents = 0;
++polli;
//RD(5, "polling %d file descriptors", polli+1);
ND(5, "polling %d file descriptors", polli);
rv = poll(pollfd, polli, poll_timeout);
if (rv <= 0) {
if (rv < 0 && errno != EAGAIN && errno != EINTR)
@ -902,7 +954,7 @@ run:
struct netmap_slot *rs = &ring->slot[last];
// XXX less aggressive?
rs->buf_idx = forward_packet(g + 1, rs);
rs->flags |= NS_BUF_CHANGED;
rs->flags = NS_BUF_CHANGED;
rs->ptr = 0;
}
p->last_tail = last;
@ -918,27 +970,34 @@ run:
for (i = 0; i < npipes; i++) {
struct port_des *p = &ports[i];
struct overflow_queue *q = p->oq;
uint32_t k, lim;
uint32_t k;
int64_t lim;
struct netmap_ring *ring;
struct netmap_slot *slot;
struct morefrag *mf;
if (oq_empty(q))
continue;
ring = p->ring;
lim = nm_ring_space(ring);
mf = (struct morefrag *)ring->sem;
lim = ring->tail - mf->shadow_head;
if (!lim)
continue;
if (lim < 0)
lim += ring->num_slots;
if (q->n < lim)
lim = q->n;
for (k = 0; k < lim; k++) {
struct netmap_slot s = oq_deq(q), tmp;
tmp.ptr = 0;
slot = &ring->slot[ring->head];
slot = &ring->slot[mf->shadow_head];
tmp.buf_idx = slot->buf_idx;
oq_enq(freeq, &tmp);
*slot = s;
slot->flags |= NS_BUF_CHANGED;
ring->head = nm_ring_next(ring, ring->head);
mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
if (!(slot->flags & NS_MOREFRAG))
ring->head = mf->shadow_head;
}
}
}
@ -947,6 +1006,7 @@ run:
int batch = 0;
for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
struct morefrag *mf = (struct morefrag *)rxring->sem;
//D("prepare to scan rings");
int next_head = rxring->head;
@ -959,7 +1019,15 @@ run:
received_bytes += rs->len;
// CHOOSE THE CORRECT OUTPUT PIPE
rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
// If the previous slot had NS_MOREFRAG set, this is another
// fragment of the last packet and it should go to the same
// output pipe as before.
if (!mf->last_flag) {
// 'B' is just a hashing seed
mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
}
mf->last_flag = rs->flags & NS_MOREFRAG;
rs->ptr = mf->last_hash;
if (rs->ptr == 0) {
non_ip++; // XXX ??
}
@ -968,9 +1036,8 @@ run:
next_slot = &rxring->slot[next_head];
next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
__builtin_prefetch(next_buf);
// 'B' is just a hashing seed
rs->buf_idx = forward_packet(g, rs);
rs->flags |= NS_BUF_CHANGED;
rs->flags = NS_BUF_CHANGED;
rxring->head = rxring->cur = next_head;
batch++;

View File

@ -26,6 +26,7 @@
** POSSIBILITY OF SUCH DAMAGE.
**/
/* $FreeBSD$ */
/* for func prototypes */
#include "pkt_hash.h"