vhost: remove concurrent enqueue
All other DPDK PMDs doesn't support concurrent receiving or sending packets to the same queue. The upper application should deal with this, normally through queue and core bindings. Due to historical reason, vhost internally supports concurrent lockless enqueuing packets to the same virtio queue through costly cmpset operation. This patch removes this internal lockless implementation and should improve performance a bit. Luckily DPDK OVS doesn't rely on this behavior. Signed-off-by: Huawei Xie <huawei.xie@intel.com> Acked-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
This commit is contained in:
parent
7e40200c56
commit
39449e7429
@ -187,6 +187,9 @@ API Changes
|
||||
as the parameter have been changed due to the ABI refactoring mentioned
|
||||
below: it's replaced by ``int vid``.
|
||||
|
||||
* The function ``rte_vhost_enqueue_burst`` no longer supports concurrent enqueuing
|
||||
packets to the same queue.
|
||||
|
||||
|
||||
ABI Changes
|
||||
-----------
|
||||
|
@ -72,8 +72,6 @@ struct vhost_virtqueue {
|
||||
|
||||
/* Last index used on the available ring */
|
||||
volatile uint16_t last_used_idx;
|
||||
/* Used for multiple devices reserving buffers */
|
||||
volatile uint16_t last_used_idx_res;
|
||||
#define VIRTIO_INVALID_EVENTFD (-1)
|
||||
#define VIRTIO_UNINITIALIZED_EVENTFD (-2)
|
||||
|
||||
|
@ -204,49 +204,6 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* As many data cores may want to access available buffers
|
||||
* they need to be reserved.
|
||||
*/
|
||||
static inline uint32_t
|
||||
reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count,
|
||||
uint16_t *start, uint16_t *end)
|
||||
{
|
||||
uint16_t res_start_idx;
|
||||
uint16_t res_end_idx;
|
||||
uint16_t avail_idx;
|
||||
uint16_t free_entries;
|
||||
int success;
|
||||
|
||||
count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
|
||||
|
||||
again:
|
||||
res_start_idx = vq->last_used_idx_res;
|
||||
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
|
||||
|
||||
free_entries = avail_idx - res_start_idx;
|
||||
count = RTE_MIN(count, free_entries);
|
||||
if (count == 0)
|
||||
return 0;
|
||||
|
||||
res_end_idx = res_start_idx + count;
|
||||
|
||||
/*
|
||||
* update vq->last_used_idx_res atomically; try again if failed.
|
||||
*
|
||||
* TODO: Allow to disable cmpset if no concurrency in application.
|
||||
*/
|
||||
success = rte_atomic16_cmpset(&vq->last_used_idx_res,
|
||||
res_start_idx, res_end_idx);
|
||||
if (unlikely(!success))
|
||||
goto again;
|
||||
|
||||
*start = res_start_idx;
|
||||
*end = res_end_idx;
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function adds buffers to the virtio devices RX virtqueue. Buffers can
|
||||
* be received from the physical port or from another virtio device. A packet
|
||||
@ -259,7 +216,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
struct rte_mbuf **pkts, uint32_t count)
|
||||
{
|
||||
struct vhost_virtqueue *vq;
|
||||
uint16_t res_start_idx, res_end_idx;
|
||||
uint16_t avail_idx, free_entries, start_idx;
|
||||
uint16_t desc_indexes[MAX_PKT_BURST];
|
||||
uint16_t used_idx;
|
||||
uint32_t i;
|
||||
@ -275,17 +232,21 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
if (unlikely(vq->enabled == 0))
|
||||
return 0;
|
||||
|
||||
count = reserve_avail_buf(vq, count, &res_start_idx, &res_end_idx);
|
||||
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
|
||||
start_idx = vq->last_used_idx;
|
||||
free_entries = avail_idx - start_idx;
|
||||
count = RTE_MIN(count, free_entries);
|
||||
count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
|
||||
if (count == 0)
|
||||
return 0;
|
||||
|
||||
LOG_DEBUG(VHOST_DATA, "(%d) res_start_idx %d | res_end_idx Index %d\n",
|
||||
dev->vid, res_start_idx, res_end_idx);
|
||||
LOG_DEBUG(VHOST_DATA, "(%d) start_idx %d | end_idx %d\n",
|
||||
dev->vid, start_idx, start_idx + count);
|
||||
|
||||
/* Retrieve all of the desc indexes first to avoid caching issues. */
|
||||
rte_prefetch0(&vq->avail->ring[res_start_idx & (vq->size - 1)]);
|
||||
rte_prefetch0(&vq->avail->ring[start_idx & (vq->size - 1)]);
|
||||
for (i = 0; i < count; i++) {
|
||||
used_idx = (res_start_idx + i) & (vq->size - 1);
|
||||
used_idx = (start_idx + i) & (vq->size - 1);
|
||||
desc_indexes[i] = vq->avail->ring[used_idx];
|
||||
vq->used->ring[used_idx].id = desc_indexes[i];
|
||||
vq->used->ring[used_idx].len = pkts[i]->pkt_len +
|
||||
@ -302,7 +263,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
|
||||
err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx);
|
||||
if (unlikely(err)) {
|
||||
used_idx = (res_start_idx + i) & (vq->size - 1);
|
||||
used_idx = (start_idx + i) & (vq->size - 1);
|
||||
vq->used->ring[used_idx].len = dev->vhost_hlen;
|
||||
vhost_log_used_vring(dev, vq,
|
||||
offsetof(struct vring_used, ring[used_idx]),
|
||||
@ -315,12 +276,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
|
||||
rte_smp_wmb();
|
||||
|
||||
/* Wait until it's our turn to add our buffer to the used ring. */
|
||||
while (unlikely(vq->last_used_idx != res_start_idx))
|
||||
rte_pause();
|
||||
|
||||
*(volatile uint16_t *)&vq->used->idx += count;
|
||||
vq->last_used_idx = res_end_idx;
|
||||
vq->last_used_idx += count;
|
||||
vhost_log_used_vring(dev, vq,
|
||||
offsetof(struct vring_used, idx),
|
||||
sizeof(vq->used->idx));
|
||||
@ -367,40 +324,30 @@ fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
|
||||
}
|
||||
|
||||
/*
|
||||
* As many data cores may want to access available buffers concurrently,
|
||||
* they need to be reserved.
|
||||
*
|
||||
* Returns -1 on fail, 0 on success
|
||||
*/
|
||||
static inline int
|
||||
reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
|
||||
uint16_t *start, uint16_t *end,
|
||||
struct buf_vector *buf_vec)
|
||||
uint16_t *end, struct buf_vector *buf_vec)
|
||||
{
|
||||
uint16_t res_start_idx;
|
||||
uint16_t res_cur_idx;
|
||||
uint16_t cur_idx;
|
||||
uint16_t avail_idx;
|
||||
uint32_t allocated;
|
||||
uint32_t vec_idx;
|
||||
uint16_t tries;
|
||||
uint32_t allocated = 0;
|
||||
uint32_t vec_idx = 0;
|
||||
uint16_t tries = 0;
|
||||
|
||||
again:
|
||||
res_start_idx = vq->last_used_idx_res;
|
||||
res_cur_idx = res_start_idx;
|
||||
cur_idx = vq->last_used_idx;
|
||||
|
||||
allocated = 0;
|
||||
vec_idx = 0;
|
||||
tries = 0;
|
||||
while (1) {
|
||||
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
|
||||
if (unlikely(res_cur_idx == avail_idx))
|
||||
if (unlikely(cur_idx == avail_idx))
|
||||
return -1;
|
||||
|
||||
if (unlikely(fill_vec_buf(vq, res_cur_idx, &allocated,
|
||||
if (unlikely(fill_vec_buf(vq, cur_idx, &allocated,
|
||||
&vec_idx, buf_vec) < 0))
|
||||
return -1;
|
||||
|
||||
res_cur_idx++;
|
||||
cur_idx++;
|
||||
tries++;
|
||||
|
||||
if (allocated >= size)
|
||||
@ -415,27 +362,19 @@ again:
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* update vq->last_used_idx_res atomically.
|
||||
* retry again if failed.
|
||||
*/
|
||||
if (rte_atomic16_cmpset(&vq->last_used_idx_res,
|
||||
res_start_idx, res_cur_idx) == 0)
|
||||
goto again;
|
||||
|
||||
*start = res_start_idx;
|
||||
*end = res_cur_idx;
|
||||
*end = cur_idx;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline uint32_t __attribute__((always_inline))
|
||||
copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
|
||||
uint16_t res_start_idx, uint16_t res_end_idx,
|
||||
struct rte_mbuf *m, struct buf_vector *buf_vec)
|
||||
uint16_t end_idx, struct rte_mbuf *m,
|
||||
struct buf_vector *buf_vec)
|
||||
{
|
||||
struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
|
||||
uint32_t vec_idx = 0;
|
||||
uint16_t cur_idx = res_start_idx;
|
||||
uint16_t start_idx = vq->last_used_idx;
|
||||
uint16_t cur_idx = start_idx;
|
||||
uint64_t desc_addr;
|
||||
uint32_t mbuf_offset, mbuf_avail;
|
||||
uint32_t desc_offset, desc_avail;
|
||||
@ -446,7 +385,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
|
||||
return 0;
|
||||
|
||||
LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
|
||||
dev->vid, cur_idx, res_end_idx);
|
||||
dev->vid, cur_idx, end_idx);
|
||||
|
||||
if (buf_vec[vec_idx].buf_len < dev->vhost_hlen)
|
||||
return -1;
|
||||
@ -454,7 +393,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
|
||||
desc_addr = gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
|
||||
rte_prefetch0((void *)(uintptr_t)desc_addr);
|
||||
|
||||
virtio_hdr.num_buffers = res_end_idx - res_start_idx;
|
||||
virtio_hdr.num_buffers = end_idx - start_idx;
|
||||
LOG_DEBUG(VHOST_DATA, "(%d) RX: num merge buffers %d\n",
|
||||
dev->vid, virtio_hdr.num_buffers);
|
||||
|
||||
@ -523,7 +462,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
|
||||
offsetof(struct vring_used, ring[used_idx]),
|
||||
sizeof(vq->used->ring[used_idx]));
|
||||
|
||||
return res_end_idx - res_start_idx;
|
||||
return end_idx - start_idx;
|
||||
}
|
||||
|
||||
static inline uint32_t __attribute__((always_inline))
|
||||
@ -532,7 +471,7 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
{
|
||||
struct vhost_virtqueue *vq;
|
||||
uint32_t pkt_idx = 0, nr_used = 0;
|
||||
uint16_t start, end;
|
||||
uint16_t end;
|
||||
struct buf_vector buf_vec[BUF_VECTOR_MAX];
|
||||
|
||||
LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
|
||||
@ -553,7 +492,7 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
|
||||
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
|
||||
|
||||
if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len, &start,
|
||||
if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len,
|
||||
&end, buf_vec) < 0)) {
|
||||
LOG_DEBUG(VHOST_DATA,
|
||||
"(%d) failed to get enough desc from vring\n",
|
||||
@ -561,17 +500,10 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
|
||||
break;
|
||||
}
|
||||
|
||||
nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
|
||||
nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end,
|
||||
pkts[pkt_idx], buf_vec);
|
||||
rte_smp_wmb();
|
||||
|
||||
/*
|
||||
* Wait until it's our turn to add our buffer
|
||||
* to the used ring.
|
||||
*/
|
||||
while (unlikely(vq->last_used_idx != start))
|
||||
rte_pause();
|
||||
|
||||
*(volatile uint16_t *)&vq->used->idx += nr_used;
|
||||
vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
|
||||
sizeof(vq->used->idx));
|
||||
|
@ -567,7 +567,6 @@ vhost_set_vring_addr(int vid, struct vhost_vring_addr *addr)
|
||||
"some packets maybe resent for Tx and dropped for Rx\n",
|
||||
vq->last_used_idx, vq->used->idx);
|
||||
vq->last_used_idx = vq->used->idx;
|
||||
vq->last_used_idx_res = vq->used->idx;
|
||||
}
|
||||
|
||||
vq->log_guest_addr = addr->log_guest_addr;
|
||||
@ -599,7 +598,6 @@ vhost_set_vring_base(int vid, struct vhost_vring_state *state)
|
||||
|
||||
/* State->index refers to the queue index. The txq is 1, rxq is 0. */
|
||||
dev->virtqueue[state->index]->last_used_idx = state->num;
|
||||
dev->virtqueue[state->index]->last_used_idx_res = state->num;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -796,7 +794,7 @@ rte_vhost_avail_entries(int vid, uint16_t queue_id)
|
||||
if (!vq->enabled)
|
||||
return 0;
|
||||
|
||||
return *(volatile uint16_t *)&vq->avail->idx - vq->last_used_idx_res;
|
||||
return *(volatile uint16_t *)&vq->avail->idx - vq->last_used_idx;
|
||||
}
|
||||
|
||||
int
|
||||
|
Loading…
x
Reference in New Issue
Block a user