bus/vmbus: avoid signalling host on read

Don't signal host that receive ring has been read until all events
have been processed. This reduces the number of guest exits and
therefore improves performance.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
This commit is contained in:
Stephen Hemminger 2018-07-24 14:08:53 -07:00 committed by Thomas Monjalon
parent 2d2c4991b4
commit 530af95a78
6 changed files with 56 additions and 58 deletions

View File

@ -337,12 +337,23 @@ int rte_vmbus_chan_recv(struct vmbus_channel *chan,
* @param len
* Pointer to size of receive buffer (in/out)
* @return
* On success, returns 0
* On success, returns number of bytes read.
* On failure, returns negative errno.
*/
int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
void *data, uint32_t *len);
/**
* Notify host of bytes read (after recv_raw)
* Signals host if required.
*
* @param channel
* Pointer to vmbus_channel structure.
* @param bytes_read
* Number of bytes read since last signal
*/
void rte_vmbus_chan_signal_read(struct vmbus_channel *chan, uint32_t bytes_read);
/**
* Determine sub channel index of the given channel
*

View File

@ -10,6 +10,7 @@ DPDK_18.08 {
rte_vmbus_chan_rx_empty;
rte_vmbus_chan_send;
rte_vmbus_chan_send_sglist;
rte_vmbus_chan_signal_read;
rte_vmbus_chan_signal_tx;
rte_vmbus_irq_mask;
rte_vmbus_irq_read;

View File

@ -221,6 +221,9 @@ vmbus_rxbr_read(struct vmbus_br *rbr, void *data, size_t dlen, size_t skip)
if (vmbus_br_availread(rbr) < dlen + skip + sizeof(uint64_t))
return -EAGAIN;
/* Record where host was when we started read (for debug) */
rbr->windex = rbr->vbr->windex;
/*
* Copy channel packet from RX bufring.
*/

View File

@ -176,49 +176,37 @@ bool rte_vmbus_chan_rx_empty(const struct vmbus_channel *channel)
return br->vbr->rindex == br->vbr->windex;
}
static int vmbus_read_and_signal(struct vmbus_channel *chan,
void *data, size_t dlen, size_t skip)
/* Signal host after reading N bytes */
void rte_vmbus_chan_signal_read(struct vmbus_channel *chan, uint32_t bytes_read)
{
struct vmbus_br *rbr = &chan->rxbr;
uint32_t write_sz, pending_sz, bytes_read;
int error;
/* Record where host was when we started read (for debug) */
rbr->windex = rbr->vbr->windex;
/* Read data and skip packet header */
error = vmbus_rxbr_read(rbr, data, dlen, skip);
if (error)
return error;
uint32_t write_sz, pending_sz;
/* No need for signaling on older versions */
if (!rbr->vbr->feature_bits.feat_pending_send_sz)
return 0;
return;
/* Make sure reading of pending happens after new read index */
rte_mb();
pending_sz = rbr->vbr->pending_send;
if (!pending_sz)
return 0;
return;
rte_smp_rmb();
write_sz = vmbus_br_availwrite(rbr, rbr->vbr->windex);
bytes_read = dlen + skip + sizeof(uint64_t);
/* If there was space before then host was not blocked */
if (write_sz - bytes_read > pending_sz)
return 0;
return;
/* If pending write will not fit */
if (write_sz <= pending_sz)
return 0;
return;
vmbus_set_event(chan->device, chan);
return 0;
}
/* TODO: replace this with inplace ring buffer (no copy) */
int rte_vmbus_chan_recv(struct vmbus_channel *chan, void *data, uint32_t *len,
uint64_t *request_id)
{
@ -256,10 +244,16 @@ int rte_vmbus_chan_recv(struct vmbus_channel *chan, void *data, uint32_t *len,
if (request_id)
*request_id = pkt.xactid;
/* Read data and skip the header */
return vmbus_read_and_signal(chan, data, dlen, hlen);
/* Read data and skip packet header */
error = vmbus_rxbr_read(&chan->rxbr, data, dlen, hlen);
if (error)
return error;
rte_vmbus_chan_signal_read(chan, dlen + hlen + sizeof(uint64_t));
return 0;
}
/* TODO: replace this with inplace ring buffer (no copy) */
int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
void *data, uint32_t *len)
{
@ -291,8 +285,13 @@ int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
if (unlikely(dlen > bufferlen))
return -ENOBUFS;
/* Put packet header in data buffer */
return vmbus_read_and_signal(chan, data, dlen, 0);
/* Read data and skip packet header */
error = vmbus_rxbr_read(&chan->rxbr, data, dlen, 0);
if (error)
return error;
/* Return the number of bytes read */
return dlen + sizeof(uint64_t);
}
int vmbus_chan_create(const struct rte_vmbus_device *device,

View File

@ -40,7 +40,7 @@
#define HN_TXCOPY_THRESHOLD 512
#define HN_RXCOPY_THRESHOLD 256
#define HN_RXQ_EVENT_DEFAULT 1024
#define HN_RXQ_EVENT_DEFAULT 2048
struct hn_rxinfo {
uint32_t vlan_info;
@ -709,7 +709,8 @@ struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
{
struct hn_rx_queue *rxq;
rxq = rte_zmalloc_socket("HN_RXQ", sizeof(*rxq),
rxq = rte_zmalloc_socket("HN_RXQ",
sizeof(*rxq) + HN_RXQ_EVENT_DEFAULT,
RTE_CACHE_LINE_SIZE, socket_id);
if (rxq) {
rxq->hv = hv;
@ -717,16 +718,6 @@ struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
rte_spinlock_init(&rxq->ring_lock);
rxq->port_id = hv->port_id;
rxq->queue_id = queue_id;
rxq->event_sz = HN_RXQ_EVENT_DEFAULT;
rxq->event_buf = rte_malloc_socket("RX_EVENTS",
rxq->event_sz,
RTE_CACHE_LINE_SIZE,
socket_id);
if (!rxq->event_buf) {
rte_free(rxq);
rxq = NULL;
}
}
return rxq;
}
@ -835,6 +826,7 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
{
struct rte_eth_dev *dev = &rte_eth_devices[hv->port_id];
struct hn_rx_queue *rxq;
uint32_t bytes_read = 0;
int ret = 0;
rxq = queue_id == 0 ? hv->primary : dev->data->rx_queues[queue_id];
@ -852,34 +844,21 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
for (;;) {
const struct vmbus_chanpkt_hdr *pkt;
uint32_t len = rxq->event_sz;
uint32_t len = HN_RXQ_EVENT_DEFAULT;
const void *data;
ret = rte_vmbus_chan_recv_raw(rxq->chan, rxq->event_buf, &len);
if (ret == -EAGAIN)
break; /* ring is empty */
if (ret == -ENOBUFS) {
/* expanded buffer needed */
len = rte_align32pow2(len);
PMD_DRV_LOG(DEBUG, "expand event buf to %u", len);
rxq->event_buf = rte_realloc(rxq->event_buf,
len, RTE_CACHE_LINE_SIZE);
if (rxq->event_buf) {
rxq->event_sz = len;
continue;
}
rte_exit(EXIT_FAILURE, "can not expand event buf!\n");
break;
}
if (ret != 0) {
PMD_DRV_LOG(ERR, "vmbus ring buffer error: %d", ret);
break;
}
else if (ret == -ENOBUFS)
rte_exit(EXIT_FAILURE, "event buffer not big enough (%u < %u)",
HN_RXQ_EVENT_DEFAULT, len);
else if (ret <= 0)
rte_exit(EXIT_FAILURE,
"vmbus ring buffer error: %d", ret);
bytes_read += ret;
pkt = (const struct vmbus_chanpkt_hdr *)rxq->event_buf;
data = (char *)rxq->event_buf + vmbus_chanpkt_getlen(pkt->hlen);
@ -904,6 +883,10 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
if (rxq->rx_ring && rte_ring_full(rxq->rx_ring))
break;
}
if (bytes_read > 0)
rte_vmbus_chan_signal_read(rxq->chan, bytes_read);
rte_spinlock_unlock(&rxq->ring_lock);
}

View File

@ -69,7 +69,6 @@ struct hn_rx_queue {
struct vmbus_channel *chan;
struct rte_mempool *mb_pool;
struct rte_ring *rx_ring;
void *event_buf;
rte_spinlock_t ring_lock;
uint32_t event_sz;
@ -77,6 +76,8 @@ struct hn_rx_queue {
uint16_t queue_id;
struct hn_stats stats;
uint64_t ring_full;
uint8_t event_buf[];
};