vhost: remove dequeue zero-copy support

Dequeue zero-copy removal was announced in DPDK v20.08.
This feature brings constraints which makes the maintenance
of the Vhost library difficult. Its limitations makes it also
difficult to use by the applications (Tx vring starvation).

Removing it makes it easier to add new features, and also remove
some code in the hot path, which should bring a performance
improvement for the standard path.

Signed-off-by: Maxime Coquelin <maxime.coquelin@redhat.com>
Reviewed-by: Chenbo Xia <chenbo.xia@intel.com>
This commit is contained in:
Maxime Coquelin 2020-09-28 11:17:12 +02:00 committed by Ferruh Yigit
parent 9ea8d28986
commit cacf8267cc
9 changed files with 35 additions and 522 deletions

View File

@ -51,50 +51,6 @@ The following is an overview of some key Vhost API functions:
This reconnect option is enabled by default. However, it can be turned off
by setting this flag.
- ``RTE_VHOST_USER_DEQUEUE_ZERO_COPY``
Dequeue zero copy will be enabled when this flag is set. It is disabled by
default.
There are some truths (including limitations) you might want to know while
setting this flag:
* zero copy is not good for small packets (typically for packet size below
512).
* zero copy is really good for VM2VM case. For iperf between two VMs, the
boost could be above 70% (when TSO is enabled).
* For zero copy in VM2NIC case, guest Tx used vring may be starved if the
PMD driver consume the mbuf but not release them timely.
For example, i40e driver has an optimization to maximum NIC pipeline which
postpones returning transmitted mbuf until only tx_free_threshold free
descs left. The virtio TX used ring will be starved if the formula
(num_i40e_tx_desc - num_virtio_tx_desc > tx_free_threshold) is true, since
i40e will not return back mbuf.
A performance tip for tuning zero copy in VM2NIC case is to adjust the
frequency of mbuf free (i.e. adjust tx_free_threshold of i40e driver) to
balance consumer and producer.
* Guest memory should be backended with huge pages to achieve better
performance. Using 1G page size is the best.
When dequeue zero copy is enabled, the guest phys address and host phys
address mapping has to be established. Using non-huge pages means far
more page segments. To make it simple, DPDK vhost does a linear search
of those segments, thus the fewer the segments, the quicker we will get
the mapping. NOTE: we may speed it by using tree searching in future.
* zero copy can not work when using vfio-pci with iommu mode currently, this
is because we don't setup iommu dma mapping for guest memory. If you have
to use vfio-pci driver, please insert vfio-pci kernel module in noiommu
mode.
* The consumer of zero copy mbufs should consume these mbufs as soon as
possible, otherwise it may block the operations in vhost.
- ``RTE_VHOST_USER_IOMMU_SUPPORT``
IOMMU support will be enabled when this flag is set. It is disabled by
@ -362,16 +318,16 @@ Guest memory requirement
* Memory pre-allocation
For non-zerocopy non-async data path, guest memory pre-allocation is not a
For non-async data path, guest memory pre-allocation is not a
must. This can help save of memory. If users really want the guest memory
to be pre-allocated (e.g., for performance reason), we can add option
``-mem-prealloc`` when starting QEMU. Or, we can lock all memory at vhost
side which will force memory to be allocated when mmap at vhost side;
option --mlockall in ovs-dpdk is an example in hand.
For async and zerocopy data path, we force the VM memory to be
pre-allocated at vhost lib when mapping the guest memory; and also we need
to lock the memory to prevent pages being swapped out to disk.
For async data path, we force the VM memory to be pre-allocated at vhost
lib when mapping the guest memory; and also we need to lock the memory to
prevent pages being swapped out to disk.
* Memory sharing

View File

@ -171,12 +171,6 @@ Deprecation Notices
following the IPv6 header, as proposed in RFC
https://mails.dpdk.org/archives/dev/2020-August/177257.html.
* vhost: Vhost-user dequeue zero-copy support will be removed in 20.11.
The only known user is OVS where the feature is still experimental,
and has not received any update for 2.5 years.
This feature faces reliability issues and is often conflicting with
new features being implemented.
* security: The API ``rte_security_session_create`` takes only single mempool
for session and session private data. So the application need to create
mempool for twice the number of sessions needed and will also lead to

View File

@ -119,6 +119,8 @@ Removed Items
Also, make sure to start the actual text at the margin.
=======================================================
* vhost: Dequeue zero-copy support has been removed.
API Changes
-----------

View File

@ -28,7 +28,7 @@ extern "C" {
#define RTE_VHOST_USER_CLIENT (1ULL << 0)
#define RTE_VHOST_USER_NO_RECONNECT (1ULL << 1)
#define RTE_VHOST_USER_DEQUEUE_ZERO_COPY (1ULL << 2)
#define RTE_VHOST_USER_RESERVED_1 (1ULL << 2)
#define RTE_VHOST_USER_IOMMU_SUPPORT (1ULL << 3)
#define RTE_VHOST_USER_POSTCOPY_SUPPORT (1ULL << 4)
/* support mbuf with external buffer attached */

View File

@ -37,7 +37,6 @@ struct vhost_user_socket {
struct sockaddr_un un;
bool is_server;
bool reconnect;
bool dequeue_zero_copy;
bool iommu_support;
bool use_builtin_virtio_net;
bool extbuf;
@ -229,9 +228,6 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
vhost_attach_vdpa_device(vid, vsocket->vdpa_dev);
if (vsocket->dequeue_zero_copy)
vhost_enable_dequeue_zero_copy(vid);
if (vsocket->extbuf)
vhost_enable_extbuf(vid);
@ -878,18 +874,8 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
goto out_free;
}
vsocket->vdpa_dev = NULL;
vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;
vsocket->extbuf = flags & RTE_VHOST_USER_EXTBUF_SUPPORT;
vsocket->linearbuf = flags & RTE_VHOST_USER_LINEARBUF_SUPPORT;
if (vsocket->dequeue_zero_copy &&
(flags & RTE_VHOST_USER_IOMMU_SUPPORT)) {
VHOST_LOG_CONFIG(ERR,
"error: enabling dequeue zero copy and IOMMU features "
"simultaneously is not supported\n");
goto out_mutex;
}
vsocket->async_copy = flags & RTE_VHOST_USER_ASYNC_COPY;
if (vsocket->async_copy &&
@ -918,39 +904,6 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
vsocket->features = VIRTIO_NET_SUPPORTED_FEATURES;
vsocket->protocol_features = VHOST_USER_PROTOCOL_FEATURES;
/*
* Dequeue zero copy can't assure descriptors returned in order.
* Also, it requires that the guest memory is populated, which is
* not compatible with postcopy.
*/
if (vsocket->dequeue_zero_copy) {
if (vsocket->extbuf) {
VHOST_LOG_CONFIG(ERR,
"error: zero copy is incompatible with external buffers\n");
ret = -1;
goto out_mutex;
}
if (vsocket->linearbuf) {
VHOST_LOG_CONFIG(ERR,
"error: zero copy is incompatible with linear buffers\n");
ret = -1;
goto out_mutex;
}
if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
VHOST_LOG_CONFIG(ERR,
"error: zero copy is incompatible with vhost client mode\n");
ret = -1;
goto out_mutex;
}
vsocket->supported_features &= ~(1ULL << VIRTIO_F_IN_ORDER);
vsocket->features &= ~(1ULL << VIRTIO_F_IN_ORDER);
VHOST_LOG_CONFIG(INFO,
"Dequeue zero copy requested, disabling postcopy support\n");
vsocket->protocol_features &=
~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT);
}
if (vsocket->async_copy) {
vsocket->supported_features &= ~(1ULL << VHOST_F_LOG_ALL);
vsocket->features &= ~(1ULL << VHOST_F_LOG_ALL);

View File

@ -539,8 +539,6 @@ init_vring_queue(struct virtio_net *dev, uint32_t vring_idx)
vhost_user_iotlb_init(dev, vring_idx);
/* Backends are set to -1 indicating an inactive device. */
vq->backend = -1;
TAILQ_INIT(&vq->zmbuf_list);
}
static void
@ -704,18 +702,6 @@ vhost_set_ifname(int vid, const char *if_name, unsigned int if_len)
dev->ifname[sizeof(dev->ifname) - 1] = '\0';
}
void
vhost_enable_dequeue_zero_copy(int vid)
{
struct virtio_net *dev = get_device(vid);
if (dev == NULL)
return;
dev->dequeue_zero_copy = 1;
VHOST_LOG_CONFIG(INFO, "dequeue zero copy is enabled\n");
}
void
vhost_set_builtin_virtio_net(int vid, bool enable)
{

View File

@ -94,20 +94,6 @@ struct buf_vector {
uint32_t desc_idx;
};
/*
* A structure to hold some fields needed in zero copy code path,
* mainly for associating an mbuf with the right desc_idx.
*/
struct zcopy_mbuf {
struct rte_mbuf *mbuf;
uint32_t desc_idx;
uint16_t desc_count;
uint16_t in_use;
TAILQ_ENTRY(zcopy_mbuf) next;
};
TAILQ_HEAD(zcopy_mbuf_list, zcopy_mbuf);
/*
* Structure contains the info for each batched memory copy.
*/
@ -185,12 +171,6 @@ struct vhost_virtqueue {
struct rte_vhost_resubmit_info *resubmit_inflight;
uint64_t global_counter;
uint16_t nr_zmbuf;
uint16_t zmbuf_size;
uint16_t last_zmbuf_idx;
struct zcopy_mbuf *zmbufs;
struct zcopy_mbuf_list zmbuf_list;
union {
struct vring_used_elem *shadow_used_split;
struct vring_used_elem_packed *shadow_used_packed;
@ -380,7 +360,6 @@ struct virtio_net {
/* to tell if we need broadcast rarp packet */
int16_t broadcast_rarp;
uint32_t nr_vring;
int dequeue_zero_copy;
int async_copy;
int extbuf;
int linearbuf;
@ -718,7 +697,6 @@ int alloc_vring_queue(struct virtio_net *dev, uint32_t vring_idx);
void vhost_attach_vdpa_device(int vid, struct rte_vdpa_device *dev);
void vhost_set_ifname(int, const char *if_name, unsigned int if_len);
void vhost_enable_dequeue_zero_copy(int vid);
void vhost_set_builtin_virtio_net(int vid, bool enable);
void vhost_enable_extbuf(int vid);
void vhost_enable_linearbuf(int vid);
@ -898,10 +876,4 @@ mbuf_is_consumed(struct rte_mbuf *m)
return true;
}
static __rte_always_inline void
put_zmbuf(struct zcopy_mbuf *zmbuf)
{
zmbuf->in_use = 0;
}
#endif /* _VHOST_NET_CDEV_H_ */

View File

@ -134,47 +134,15 @@ get_blk_size(int fd)
return ret == -1 ? (uint64_t)-1 : (uint64_t)stat.st_blksize;
}
/*
* Reclaim all the outstanding zmbufs for a virtqueue.
*/
static void
drain_zmbuf_list(struct vhost_virtqueue *vq)
{
struct zcopy_mbuf *zmbuf, *next;
for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
zmbuf != NULL; zmbuf = next) {
next = TAILQ_NEXT(zmbuf, next);
while (!mbuf_is_consumed(zmbuf->mbuf))
usleep(1000);
TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
restore_mbuf(zmbuf->mbuf);
rte_pktmbuf_free(zmbuf->mbuf);
put_zmbuf(zmbuf);
vq->nr_zmbuf -= 1;
}
}
static void
free_mem_region(struct virtio_net *dev)
{
uint32_t i;
struct rte_vhost_mem_region *reg;
struct vhost_virtqueue *vq;
if (!dev || !dev->mem)
return;
if (dev->dequeue_zero_copy) {
for (i = 0; i < dev->nr_vring; i++) {
vq = dev->virtqueue[i];
if (vq)
drain_zmbuf_list(vq);
}
}
for (i = 0; i < dev->mem->nregions; i++) {
reg = &dev->mem->regions[i];
if (reg->host_user_addr) {
@ -454,23 +422,6 @@ vhost_user_set_vring_num(struct virtio_net **pdev,
return RTE_VHOST_MSG_RESULT_ERR;
}
if (dev->dequeue_zero_copy) {
vq->nr_zmbuf = 0;
vq->last_zmbuf_idx = 0;
vq->zmbuf_size = vq->size;
if (vq->zmbufs)
rte_free(vq->zmbufs);
vq->zmbufs = rte_zmalloc(NULL, vq->zmbuf_size *
sizeof(struct zcopy_mbuf), 0);
if (vq->zmbufs == NULL) {
VHOST_LOG_CONFIG(WARNING,
"failed to allocate mem for zero copy; "
"zero copy is force disabled\n");
dev->dequeue_zero_copy = 0;
}
TAILQ_INIT(&vq->zmbuf_list);
}
if (vq_is_packed(dev)) {
if (vq->shadow_used_packed)
rte_free(vq->shadow_used_packed);
@ -524,7 +475,6 @@ numa_realloc(struct virtio_net *dev, int index)
int oldnode, newnode;
struct virtio_net *old_dev;
struct vhost_virtqueue *old_vq, *vq;
struct zcopy_mbuf *new_zmbuf;
struct vring_used_elem *new_shadow_used_split;
struct vring_used_elem_packed *new_shadow_used_packed;
struct batch_copy_elem *new_batch_copy_elems;
@ -555,16 +505,6 @@ numa_realloc(struct virtio_net *dev, int index)
return dev;
memcpy(vq, old_vq, sizeof(*vq));
TAILQ_INIT(&vq->zmbuf_list);
if (dev->dequeue_zero_copy) {
new_zmbuf = rte_malloc_socket(NULL, vq->zmbuf_size *
sizeof(struct zcopy_mbuf), 0, newnode);
if (new_zmbuf) {
rte_free(vq->zmbufs);
vq->zmbufs = new_zmbuf;
}
}
if (vq_is_packed(dev)) {
new_shadow_used_packed = rte_malloc_socket(NULL,
@ -1179,8 +1119,7 @@ vhost_user_set_mem_table(struct virtio_net **pdev, struct VhostUserMsg *msg,
goto err_mmap;
}
populate = (dev->dequeue_zero_copy || dev->async_copy) ?
MAP_POPULATE : 0;
populate = dev->async_copy ? MAP_POPULATE : 0;
mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
MAP_SHARED | populate, fd, 0);
@ -1195,7 +1134,7 @@ vhost_user_set_mem_table(struct virtio_net **pdev, struct VhostUserMsg *msg,
reg->host_user_addr = (uint64_t)(uintptr_t)mmap_addr +
mmap_offset;
if (dev->dequeue_zero_copy || dev->async_copy)
if (dev->async_copy)
if (add_guest_pages(dev, reg, alignment) < 0) {
VHOST_LOG_CONFIG(ERR,
"adding guest pages to region %u failed.\n",
@ -1940,15 +1879,6 @@ vhost_user_set_vring_kick(struct virtio_net **pdev, struct VhostUserMsg *msg,
return RTE_VHOST_MSG_RESULT_OK;
}
static void
free_zmbufs(struct vhost_virtqueue *vq)
{
drain_zmbuf_list(vq);
rte_free(vq->zmbufs);
vq->zmbufs = NULL;
}
/*
* when virtio is stopped, qemu will send us the GET_VRING_BASE message.
*/
@ -2003,8 +1933,6 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
vq->signalled_used_valid = false;
if (dev->dequeue_zero_copy)
free_zmbufs(vq);
if (vq_is_packed(dev)) {
rte_free(vq->shadow_used_packed);
vq->shadow_used_packed = NULL;
@ -2058,10 +1986,6 @@ vhost_user_set_vring_enable(struct virtio_net **pdev,
}
}
/* On disable, rings have to be stopped being processed. */
if (!enable && dev->dequeue_zero_copy)
drain_zmbuf_list(dev->virtqueue[index]);
dev->virtqueue[index]->enabled = enable;
return RTE_VHOST_MSG_RESULT_OK;

View File

@ -1946,7 +1946,7 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
{
uint32_t buf_avail, buf_offset;
uint64_t buf_addr, buf_iova, buf_len;
uint64_t buf_addr, buf_len;
uint32_t mbuf_avail, mbuf_offset;
uint32_t cpy_len;
struct rte_mbuf *cur = m, *prev = m;
@ -1958,7 +1958,6 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
int error = 0;
buf_addr = buf_vec[vec_idx].buf_addr;
buf_iova = buf_vec[vec_idx].buf_iova;
buf_len = buf_vec[vec_idx].buf_len;
if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
@ -1988,14 +1987,12 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
buf_offset = dev->vhost_hlen - buf_len;
vec_idx++;
buf_addr = buf_vec[vec_idx].buf_addr;
buf_iova = buf_vec[vec_idx].buf_iova;
buf_len = buf_vec[vec_idx].buf_len;
buf_avail = buf_len - buf_offset;
} else if (buf_len == dev->vhost_hlen) {
if (unlikely(++vec_idx >= nr_vec))
goto out;
buf_addr = buf_vec[vec_idx].buf_addr;
buf_iova = buf_vec[vec_idx].buf_iova;
buf_len = buf_vec[vec_idx].buf_len;
buf_offset = 0;
@ -2012,48 +2009,23 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
mbuf_offset = 0;
mbuf_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
while (1) {
uint64_t hpa;
cpy_len = RTE_MIN(buf_avail, mbuf_avail);
/*
* A desc buf might across two host physical pages that are
* not continuous. In such case (gpa_to_hpa returns 0), data
* will be copied even though zero copy is enabled.
*/
if (unlikely(dev->dequeue_zero_copy && (hpa = gpa_to_hpa(dev,
buf_iova + buf_offset, cpy_len)))) {
cur->data_len = cpy_len;
cur->data_off = 0;
cur->buf_addr =
(void *)(uintptr_t)(buf_addr + buf_offset);
cur->buf_iova = hpa;
/*
* In zero copy mode, one mbuf can only reference data
* for one or partial of one desc buff.
*/
mbuf_avail = cpy_len;
} else {
if (likely(cpy_len > MAX_BATCH_LEN ||
vq->batch_copy_nb_elems >= vq->size ||
(hdr && cur == m))) {
rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
mbuf_offset),
(void *)((uintptr_t)(buf_addr +
buf_offset)),
cpy_len);
} else {
batch_copy[vq->batch_copy_nb_elems].dst =
rte_pktmbuf_mtod_offset(cur, void *,
mbuf_offset);
batch_copy[vq->batch_copy_nb_elems].src =
if (likely(cpy_len > MAX_BATCH_LEN ||
vq->batch_copy_nb_elems >= vq->size ||
(hdr && cur == m))) {
rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
mbuf_offset),
(void *)((uintptr_t)(buf_addr +
buf_offset));
batch_copy[vq->batch_copy_nb_elems].len =
cpy_len;
vq->batch_copy_nb_elems++;
}
buf_offset)), cpy_len);
} else {
batch_copy[vq->batch_copy_nb_elems].dst =
rte_pktmbuf_mtod_offset(cur, void *,
mbuf_offset);
batch_copy[vq->batch_copy_nb_elems].src =
(void *)((uintptr_t)(buf_addr + buf_offset));
batch_copy[vq->batch_copy_nb_elems].len = cpy_len;
vq->batch_copy_nb_elems++;
}
mbuf_avail -= cpy_len;
@ -2067,7 +2039,6 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
break;
buf_addr = buf_vec[vec_idx].buf_addr;
buf_iova = buf_vec[vec_idx].buf_iova;
buf_len = buf_vec[vec_idx].buf_len;
buf_offset = 0;
@ -2089,8 +2060,6 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
error = -1;
goto out;
}
if (unlikely(dev->dequeue_zero_copy))
rte_mbuf_refcnt_update(cur, 1);
prev->next = cur;
prev->data_len = mbuf_offset;
@ -2114,37 +2083,6 @@ out:
return error;
}
static __rte_always_inline struct zcopy_mbuf *
get_zmbuf(struct vhost_virtqueue *vq)
{
uint16_t i;
uint16_t last;
int tries = 0;
/* search [last_zmbuf_idx, zmbuf_size) */
i = vq->last_zmbuf_idx;
last = vq->zmbuf_size;
again:
for (; i < last; i++) {
if (vq->zmbufs[i].in_use == 0) {
vq->last_zmbuf_idx = i + 1;
vq->zmbufs[i].in_use = 1;
return &vq->zmbufs[i];
}
}
tries++;
if (tries == 1) {
/* search [0, last_zmbuf_idx) */
i = 0;
last = vq->last_zmbuf_idx;
goto again;
}
return NULL;
}
static void
virtio_dev_extbuf_free(void *addr __rte_unused, void *opaque)
{
@ -2244,30 +2182,6 @@ virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
uint16_t dropped = 0;
static bool allocerr_warned;
if (unlikely(dev->dequeue_zero_copy)) {
struct zcopy_mbuf *zmbuf, *next;
for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
zmbuf != NULL; zmbuf = next) {
next = TAILQ_NEXT(zmbuf, next);
if (mbuf_is_consumed(zmbuf->mbuf)) {
update_shadow_used_ring_split(vq,
zmbuf->desc_idx, 0);
TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
restore_mbuf(zmbuf->mbuf);
rte_pktmbuf_free(zmbuf->mbuf);
put_zmbuf(zmbuf);
vq->nr_zmbuf -= 1;
}
}
if (likely(vq->shadow_used_idx)) {
flush_shadow_used_ring_split(dev, vq);
vhost_vring_call_split(dev, vq);
}
}
/*
* The ordering between avail index and
* desc reads needs to be enforced.
@ -2300,8 +2214,7 @@ virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
VHOST_ACCESS_RO) < 0))
break;
if (likely(dev->dequeue_zero_copy == 0))
update_shadow_used_ring_split(vq, head_idx, 0);
update_shadow_used_ring_split(vq, head_idx, 0);
pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
if (unlikely(pkts[i] == NULL)) {
@ -2335,42 +2248,16 @@ virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
i++;
break;
}
if (unlikely(dev->dequeue_zero_copy)) {
struct zcopy_mbuf *zmbuf;
zmbuf = get_zmbuf(vq);
if (!zmbuf) {
rte_pktmbuf_free(pkts[i]);
dropped += 1;
i++;
break;
}
zmbuf->mbuf = pkts[i];
zmbuf->desc_idx = head_idx;
/*
* Pin lock the mbuf; we will check later to see
* whether the mbuf is freed (when we are the last
* user) or not. If that's the case, we then could
* update the used ring safely.
*/
rte_mbuf_refcnt_update(pkts[i], 1);
vq->nr_zmbuf += 1;
TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
}
}
vq->last_avail_idx += i;
if (likely(dev->dequeue_zero_copy == 0)) {
do_data_copy_dequeue(vq);
if (unlikely(i < count))
vq->shadow_used_idx = i;
if (likely(vq->shadow_used_idx)) {
flush_shadow_used_ring_split(dev, vq);
vhost_vring_call_split(dev, vq);
}
do_data_copy_dequeue(vq);
if (unlikely(i < count))
vq->shadow_used_idx = i;
if (likely(vq->shadow_used_idx)) {
flush_shadow_used_ring_split(dev, vq);
vhost_vring_call_split(dev, vq);
}
return (i - dropped);
@ -2570,162 +2457,6 @@ virtio_dev_tx_single_packed(struct virtio_net *dev,
return ret;
}
static __rte_always_inline int
virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
struct rte_mbuf **pkts)
{
struct zcopy_mbuf *zmbufs[PACKED_BATCH_SIZE];
uintptr_t desc_addrs[PACKED_BATCH_SIZE];
uint16_t ids[PACKED_BATCH_SIZE];
uint16_t i;
uint16_t avail_idx = vq->last_avail_idx;
if (vhost_reserve_avail_batch_packed(dev, vq, mbuf_pool, pkts,
avail_idx, desc_addrs, ids))
return -1;
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
zmbufs[i] = get_zmbuf(vq);
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
if (!zmbufs[i])
goto free_pkt;
}
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
zmbufs[i]->mbuf = pkts[i];
zmbufs[i]->desc_idx = ids[i];
zmbufs[i]->desc_count = 1;
}
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
rte_mbuf_refcnt_update(pkts[i], 1);
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbufs[i], next);
vq->nr_zmbuf += PACKED_BATCH_SIZE;
vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
return 0;
free_pkt:
vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
rte_pktmbuf_free(pkts[i]);
return -1;
}
static __rte_always_inline int
virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *vq,
struct rte_mempool *mbuf_pool,
struct rte_mbuf **pkts)
{
uint16_t buf_id, desc_count;
struct zcopy_mbuf *zmbuf;
if (vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
&desc_count))
return -1;
zmbuf = get_zmbuf(vq);
if (!zmbuf) {
rte_pktmbuf_free(*pkts);
return -1;
}
zmbuf->mbuf = *pkts;
zmbuf->desc_idx = buf_id;
zmbuf->desc_count = desc_count;
rte_mbuf_refcnt_update(*pkts, 1);
vq->nr_zmbuf += 1;
TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
vq_inc_last_avail_packed(vq, desc_count);
return 0;
}
static __rte_always_inline void
free_zmbuf(struct vhost_virtqueue *vq)
{
struct zcopy_mbuf *next = NULL;
struct zcopy_mbuf *zmbuf;
for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
zmbuf != NULL; zmbuf = next) {
next = TAILQ_NEXT(zmbuf, next);
uint16_t last_used_idx = vq->last_used_idx;
if (mbuf_is_consumed(zmbuf->mbuf)) {
uint16_t flags;
flags = vq->desc_packed[last_used_idx].flags;
if (vq->used_wrap_counter) {
flags |= VRING_DESC_F_USED;
flags |= VRING_DESC_F_AVAIL;
} else {
flags &= ~VRING_DESC_F_USED;
flags &= ~VRING_DESC_F_AVAIL;
}
vq->desc_packed[last_used_idx].id = zmbuf->desc_idx;
vq->desc_packed[last_used_idx].len = 0;
rte_smp_wmb();
vq->desc_packed[last_used_idx].flags = flags;
vq_inc_last_used_packed(vq, zmbuf->desc_count);
TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
restore_mbuf(zmbuf->mbuf);
rte_pktmbuf_free(zmbuf->mbuf);
put_zmbuf(zmbuf);
vq->nr_zmbuf -= 1;
}
}
}
static __rte_noinline uint16_t
virtio_dev_tx_packed_zmbuf(struct virtio_net *dev,
struct vhost_virtqueue *__rte_restrict vq,
struct rte_mempool *mbuf_pool,
struct rte_mbuf **__rte_restrict pkts,
uint32_t count)
{
uint32_t pkt_idx = 0;
uint32_t remained = count;
free_zmbuf(vq);
do {
if (remained >= PACKED_BATCH_SIZE) {
if (!virtio_dev_tx_batch_packed_zmbuf(dev, vq,
mbuf_pool, &pkts[pkt_idx])) {
pkt_idx += PACKED_BATCH_SIZE;
remained -= PACKED_BATCH_SIZE;
continue;
}
}
if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
&pkts[pkt_idx]))
break;
pkt_idx++;
remained--;
} while (remained);
if (pkt_idx)
vhost_vring_call_packed(dev, vq);
return pkt_idx;
}
static __rte_noinline uint16_t
virtio_dev_tx_packed(struct virtio_net *dev,
struct vhost_virtqueue *__rte_restrict vq,
@ -2841,14 +2572,9 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
count -= 1;
}
if (vq_is_packed(dev)) {
if (unlikely(dev->dequeue_zero_copy))
count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
pkts, count);
else
count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
count);
} else
if (vq_is_packed(dev))
count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
else
count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
out: