vhost: optimize packed ring dequeue

Optimize vhost device packed ring dequeue function by splitting batch
and single functions. No-chained and direct descriptors will be handled
by batch and other will be handled by single as before.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
This commit is contained in:
Marvin Liu 2019-10-25 00:08:31 +08:00 committed by Ferruh Yigit
parent d1eafb5322
commit 31d6c6a5b8

View File

@ -201,68 +201,6 @@ vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
}
static __rte_always_inline void
flush_shadow_used_ring_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq)
{
int i;
uint16_t used_idx = vq->last_used_idx;
uint16_t head_idx = vq->last_used_idx;
uint16_t head_flags = 0;
/* Split loop in two to save memory barriers */
for (i = 0; i < vq->shadow_used_idx; i++) {
vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
used_idx += vq->shadow_used_packed[i].count;
if (used_idx >= vq->size)
used_idx -= vq->size;
}
for (i = 0; i < vq->shadow_used_idx; i++) {
uint16_t flags;
if (vq->shadow_used_packed[i].len)
flags = VRING_DESC_F_WRITE;
else
flags = 0;
if (vq->used_wrap_counter) {
flags |= VRING_DESC_F_USED;
flags |= VRING_DESC_F_AVAIL;
} else {
flags &= ~VRING_DESC_F_USED;
flags &= ~VRING_DESC_F_AVAIL;
}
if (i > 0) {
vq->desc_packed[vq->last_used_idx].flags = flags;
vhost_log_cache_used_vring(dev, vq,
vq->last_used_idx *
sizeof(struct vring_packed_desc),
sizeof(struct vring_packed_desc));
} else {
head_idx = vq->last_used_idx;
head_flags = flags;
}
vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
}
__atomic_store_n(&vq->desc_packed[head_idx].flags, head_flags,
__ATOMIC_RELEASE);
vhost_log_cache_used_vring(dev, vq,
head_idx *
sizeof(struct vring_packed_desc),
sizeof(struct vring_packed_desc));
vq->shadow_used_idx = 0;
vhost_log_cache_sync(dev, vq);
}
static __rte_always_inline void
vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq,
@ -335,17 +273,6 @@ vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
vq_inc_last_used_packed(vq, count);
}
static __rte_always_inline void
update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
uint16_t desc_idx, uint32_t len, uint16_t count)
{
uint16_t i = vq->shadow_used_idx++;
vq->shadow_used_packed[i].id = desc_idx;
vq->shadow_used_packed[i].len = len;
vq->shadow_used_packed[i].count = count;
}
static inline void
do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
{
@ -403,7 +330,7 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
}
}
static __rte_unused void
static __rte_always_inline void
vhost_flush_dequeue_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq)
{
@ -1893,7 +1820,7 @@ free_buf:
return -1;
}
static __rte_unused int
static __rte_always_inline int
virtio_dev_tx_batch_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
@ -1961,7 +1888,7 @@ vhost_dequeue_single_packed(struct virtio_net *dev,
return 0;
}
static __rte_unused int
static __rte_always_inline int
virtio_dev_tx_single_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
@ -1981,7 +1908,7 @@ virtio_dev_tx_single_packed(struct virtio_net *dev,
return 0;
}
static __rte_unused int
static __rte_always_inline int
virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
@ -2030,7 +1957,7 @@ free_pkt:
return -1;
}
static __rte_unused int
static __rte_always_inline int
virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
@ -2061,7 +1988,7 @@ virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
return 0;
}
static __rte_unused void
static __rte_always_inline void
free_zmbuf(struct vhost_virtqueue *vq)
{
struct zcopy_mbuf *next = NULL;
@ -2102,111 +2029,77 @@ free_zmbuf(struct vhost_virtqueue *vq)
}
static __rte_noinline uint16_t
virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
virtio_dev_tx_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
struct rte_mbuf **pkts,
uint32_t count)
{
uint16_t i;
uint32_t pkt_idx = 0;
uint32_t remained = count;
if (unlikely(dev->dequeue_zero_copy)) {
struct zcopy_mbuf *zmbuf, *next;
free_zmbuf(vq);
for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
zmbuf != NULL; zmbuf = next) {
next = TAILQ_NEXT(zmbuf, next);
if (mbuf_is_consumed(zmbuf->mbuf)) {
update_shadow_used_ring_packed(vq,
zmbuf->desc_idx,
0,
zmbuf->desc_count);
TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
restore_mbuf(zmbuf->mbuf);
rte_pktmbuf_free(zmbuf->mbuf);
put_zmbuf(zmbuf);
vq->nr_zmbuf -= 1;
do {
if (remained >= PACKED_BATCH_SIZE) {
if (!virtio_dev_tx_batch_packed_zmbuf(dev, vq,
mbuf_pool, &pkts[pkt_idx])) {
pkt_idx += PACKED_BATCH_SIZE;
remained -= PACKED_BATCH_SIZE;
continue;
}
}
if (likely(vq->shadow_used_idx)) {
flush_shadow_used_ring_packed(dev, vq);
vhost_vring_call_packed(dev, vq);
}
}
VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
count = RTE_MIN(count, MAX_PKT_BURST);
VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
dev->vid, count);
for (i = 0; i < count; i++) {
struct buf_vector buf_vec[BUF_VECTOR_MAX];
uint16_t buf_id;
uint32_t buf_len;
uint16_t desc_count, nr_vec = 0;
int err;
if (unlikely(fill_vec_buf_packed(dev, vq,
vq->last_avail_idx, &desc_count,
buf_vec, &nr_vec,
&buf_id, &buf_len,
VHOST_ACCESS_RO) < 0))
if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
&pkts[pkt_idx]))
break;
pkt_idx++;
remained--;
if (likely(dev->dequeue_zero_copy == 0))
update_shadow_used_ring_packed(vq, buf_id, 0,
desc_count);
} while (remained);
pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
if (unlikely(pkts[i] == NULL))
break;
if (pkt_idx)
vhost_vring_call_packed(dev, vq);
err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
mbuf_pool);
if (unlikely(err)) {
rte_pktmbuf_free(pkts[i]);
break;
}
return pkt_idx;
}
if (unlikely(dev->dequeue_zero_copy)) {
struct zcopy_mbuf *zmbuf;
static __rte_noinline uint16_t
virtio_dev_tx_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
struct rte_mbuf **pkts,
uint32_t count)
{
uint32_t pkt_idx = 0;
uint32_t remained = count;
zmbuf = get_zmbuf(vq);
if (!zmbuf) {
rte_pktmbuf_free(pkts[i]);
break;
do {
rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
if (remained >= PACKED_BATCH_SIZE) {
if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
&pkts[pkt_idx])) {
vhost_flush_dequeue_packed(dev, vq);
pkt_idx += PACKED_BATCH_SIZE;
remained -= PACKED_BATCH_SIZE;
continue;
}
zmbuf->mbuf = pkts[i];
zmbuf->desc_idx = buf_id;
zmbuf->desc_count = desc_count;
/*
* Pin lock the mbuf; we will check later to see
* whether the mbuf is freed (when we are the last
* user) or not. If that's the case, we then could
* update the used ring safely.
*/
rte_mbuf_refcnt_update(pkts[i], 1);
vq->nr_zmbuf += 1;
TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
}
vq_inc_last_avail_packed(vq, desc_count);
}
if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
&pkts[pkt_idx]))
break;
vhost_flush_dequeue_packed(dev, vq);
pkt_idx++;
remained--;
if (likely(dev->dequeue_zero_copy == 0)) {
} while (remained);
if (vq->shadow_used_idx)
do_data_copy_dequeue(vq);
if (unlikely(i < count))
vq->shadow_used_idx = i;
if (likely(vq->shadow_used_idx)) {
flush_shadow_used_ring_packed(dev, vq);
vhost_vring_call_packed(dev, vq);
}
}
return i;
return pkt_idx;
}
uint16_t
@ -2282,9 +2175,14 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
count -= 1;
}
if (vq_is_packed(dev))
count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
else
if (vq_is_packed(dev)) {
if (unlikely(dev->dequeue_zero_copy))
count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
pkts, count);
else
count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
count);
} else
count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
out: