bdev_virtio: implement multiqueue

Virtqueues now have to be "acquired"
by a logical CPU core in order to
perform any traffic on them. The
acquire mechanism is thread-safe to
prevent two reactors accessing the
same virtqueue at the same time.

For now a single virtqueue
may be used by only one io_channel.
Support for shared virtqueues will
be implemented in future.

Added new param "Queues" to the
virtio config file for VirtioUser
bdevs. VirtioPci will use the
max available queues num -
negotiated during QEMU startup.

Change-Id: I3fd4b9d8c470f26ca9b84838b3c64de6f9e48300
Signed-off-by: Dariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
Reviewed-on: https://review.gerrithub.io/377337
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
This commit is contained in:
Dariusz Stojaczyk 2017-10-10 20:44:35 +02:00 committed by Daniel Verkamp
parent 14db5b6646
commit 8b0a4a3c53
7 changed files with 191 additions and 31 deletions

View File

@ -70,7 +70,9 @@ struct virtio_scsi_io_ctx {
struct virtio_scsi_scan_base { struct virtio_scsi_scan_base {
struct virtio_dev *vdev; struct virtio_dev *vdev;
struct spdk_bdev_poller *scan_poller;
/** Virtqueue used for the scan I/O. */
struct virtqueue *vq;
/* Currently queried target */ /* Currently queried target */
unsigned target; unsigned target;
@ -94,7 +96,9 @@ struct virtio_scsi_disk {
struct bdev_virtio_io_channel { struct bdev_virtio_io_channel {
struct virtio_dev *vdev; struct virtio_dev *vdev;
struct spdk_bdev_poller *poller;
/** Virtqueue exclusively assigned to this channel. */
struct virtqueue *vq;
}; };
static void scan_target(struct virtio_scsi_scan_base *base); static void scan_target(struct virtio_scsi_scan_base *base);
@ -144,6 +148,7 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev); struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);
vreq->iov = bdev_io->u.bdev.iovs; vreq->iov = bdev_io->u.bdev.iovs;
vreq->iovcnt = bdev_io->u.bdev.iovcnt; vreq->iovcnt = bdev_io->u.bdev.iovcnt;
@ -158,15 +163,15 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks); to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks);
} }
virtio_xmit_pkts(disk->vdev->vqs[2], vreq); virtio_xmit_pkts(virtio_channel->vq, vreq);
} }
static void static void
bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{ {
struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io); struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base; struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);
struct spdk_scsi_unmap_bdesc *desc, *first_desc; struct spdk_scsi_unmap_bdesc *desc, *first_desc;
uint8_t *buf; uint8_t *buf;
uint64_t offset_blocks, num_blocks; uint64_t offset_blocks, num_blocks;
@ -206,7 +211,7 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */ to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */
memset(&buf[4], 0, 4); /* reserved */ memset(&buf[4], 0, 4); /* reserved */
virtio_xmit_pkts(disk->vdev->vqs[2], vreq); virtio_xmit_pkts(virtio_channel->vq, vreq);
} }
static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
@ -331,7 +336,7 @@ bdev_virtio_poll(void *arg)
struct virtio_req *req[32]; struct virtio_req *req[32];
uint16_t i, cnt; uint16_t i, cnt;
cnt = virtio_recv_pkts(ch->vdev->vqs[2], req, SPDK_COUNTOF(req)); cnt = virtio_recv_pkts(ch->vq, req, SPDK_COUNTOF(req));
for (i = 0; i < cnt; ++i) { for (i = 0; i < cnt; ++i) {
bdev_virtio_io_cpl(req[i]); bdev_virtio_io_cpl(req[i]);
} }
@ -340,12 +345,26 @@ bdev_virtio_poll(void *arg)
static int static int
bdev_virtio_create_cb(void *io_device, void *ctx_buf) bdev_virtio_create_cb(void *io_device, void *ctx_buf)
{ {
struct virtio_dev **vdev = io_device; struct virtio_dev **vdev_ptr = io_device;
struct virtio_dev *vdev = *vdev_ptr;
struct bdev_virtio_io_channel *ch = ctx_buf; struct bdev_virtio_io_channel *ch = ctx_buf;
struct virtqueue *vq;
int32_t queue_idx;
queue_idx = virtio_dev_find_and_acquire_queue(vdev, 2);
if (queue_idx < 0) {
SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n");
return queue_idx;
}
vq = vdev->vqs[queue_idx];
ch->vdev = vdev;
ch->vq = vq;
spdk_bdev_poller_start(&vq->poller, bdev_virtio_poll, ch,
vq->owner_lcore, 0);
ch->vdev = *vdev;
spdk_bdev_poller_start(&ch->poller, bdev_virtio_poll, ch,
spdk_env_get_current_core(), 0);
return 0; return 0;
} }
@ -353,8 +372,11 @@ static void
bdev_virtio_destroy_cb(void *io_device, void *ctx_buf) bdev_virtio_destroy_cb(void *io_device, void *ctx_buf)
{ {
struct bdev_virtio_io_channel *io_channel = ctx_buf; struct bdev_virtio_io_channel *io_channel = ctx_buf;
struct virtio_dev *vdev = io_channel->vdev;
struct virtqueue *vq = io_channel->vq;
spdk_bdev_poller_stop(&io_channel->poller); spdk_bdev_poller_stop(&vq->poller);
virtio_dev_release_queue(vdev, vq->vq_queue_index);
} }
static void static void
@ -368,7 +390,8 @@ scan_target_finish(struct virtio_scsi_scan_base *base)
return; return;
} }
spdk_bdev_poller_stop(&base->scan_poller); spdk_bdev_poller_stop(&base->vq->poller);
virtio_dev_release_queue(base->vdev, base->vq->vq_queue_index);
while ((disk = TAILQ_FIRST(&base->found_disks))) { while ((disk = TAILQ_FIRST(&base->found_disks))) {
TAILQ_REMOVE(&base->found_disks, disk, link); TAILQ_REMOVE(&base->found_disks, disk, link);
@ -400,7 +423,7 @@ send_read_cap_10(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
iov[0].iov_len = 8; iov[0].iov_len = 8;
req->cdb[0] = SPDK_SBC_READ_CAPACITY_10; req->cdb[0] = SPDK_SBC_READ_CAPACITY_10;
virtio_xmit_pkts(base->vdev->vqs[2], vreq); virtio_xmit_pkts(base->vq, vreq);
} }
static void static void
@ -418,7 +441,7 @@ send_read_cap_16(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16; req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16;
to_be32(&req->cdb[10], iov[0].iov_len); to_be32(&req->cdb[10], iov[0].iov_len);
virtio_xmit_pkts(base->vdev->vqs[2], vreq); virtio_xmit_pkts(base->vq, vreq);
} }
static int static int
@ -556,7 +579,7 @@ bdev_scan_poll(void *arg)
struct virtio_req *req; struct virtio_req *req;
uint16_t cnt; uint16_t cnt;
cnt = virtio_recv_pkts(base->vdev->vqs[2], &req, 1); cnt = virtio_recv_pkts(base->vq, &req, 1);
if (cnt > 0) { if (cnt > 0) {
process_scan_resp(base, req); process_scan_resp(base, req);
} }
@ -596,7 +619,7 @@ scan_target(struct virtio_scsi_scan_base *base)
cdb->opcode = SPDK_SPC_INQUIRY; cdb->opcode = SPDK_SPC_INQUIRY;
cdb->alloc_len[1] = 255; cdb->alloc_len[1] = 255;
virtio_xmit_pkts(base->vdev->vqs[2], vreq); virtio_xmit_pkts(base->vq, vreq);
} }
static int static int
@ -606,6 +629,7 @@ bdev_virtio_process_config(void)
struct virtio_dev *vdev = NULL; struct virtio_dev *vdev = NULL;
char *path; char *path;
unsigned vdev_num; unsigned vdev_num;
int num_queues;
bool enable_pci; bool enable_pci;
int rc = 0; int rc = 0;
@ -628,7 +652,12 @@ bdev_virtio_process_config(void)
goto out; goto out;
} }
vdev = virtio_user_dev_init(path, 512); num_queues = spdk_conf_section_get_intval(sp, "Queues");
if (num_queues < 1) {
num_queues = 1;
}
vdev = virtio_user_dev_init(path, num_queues + 2, 512);
if (vdev == NULL) { if (vdev == NULL) {
rc = -1; rc = -1;
goto out; goto out;
@ -659,6 +688,7 @@ bdev_virtio_initialize(void)
{ {
struct virtio_scsi_scan_base *base; struct virtio_scsi_scan_base *base;
struct virtio_dev *vdev = NULL; struct virtio_dev *vdev = NULL;
struct virtqueue *vq;
int rc = 0; int rc = 0;
rc = bdev_virtio_process_config(); rc = bdev_virtio_process_config();
@ -692,9 +722,16 @@ bdev_virtio_initialize(void)
base->vdev = vdev; base->vdev = vdev;
TAILQ_INIT(&base->found_disks); TAILQ_INIT(&base->found_disks);
spdk_bdev_poller_start(&base->scan_poller, bdev_scan_poll, base, rc = virtio_dev_acquire_queue(vdev, 2);
spdk_env_get_current_core(), 0); if (rc != 0) {
SPDK_ERRLOG("Couldn't acquire requestq for the target scan.\n");
goto out;
}
vq = vdev->vqs[2];
base->vq = vq;
spdk_bdev_poller_start(&vq->poller, bdev_scan_poll, base,
vq->owner_lcore, 0);
scan_target(base); scan_target(base);
} }

View File

@ -165,6 +165,9 @@ virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
vq->mz = mz; vq->mz = mz;
vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
vq->poller = NULL;
if (vtpci_ops(dev)->setup_queue(dev, vq) < 0) { if (vtpci_ops(dev)->setup_queue(dev, vq) < 0) {
SPDK_ERRLOG("setup_queue failed\n"); SPDK_ERRLOG("setup_queue failed\n");
return -EINVAL; return -EINVAL;
@ -276,13 +279,6 @@ virtio_dev_init(struct virtio_dev *dev, uint64_t req_features)
if (virtio_negotiate_features(dev, req_features) < 0) if (virtio_negotiate_features(dev, req_features) < 0)
return -1; return -1;
/* FIXME
* Hardcode num_queues to 3 until we add proper
* mutli-queue support. This value should be limited
* by number of cores assigned to SPDK
*/
dev->max_queues = 3;
ret = virtio_alloc_queues(dev); ret = virtio_alloc_queues(dev);
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -532,4 +528,78 @@ virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req)
return 1; return 1;
} }
int
virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
{
struct virtqueue *vq = NULL;
if (index >= vdev->max_queues) {
SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
index, vdev->max_queues);
return -1;
}
pthread_mutex_lock(&vdev->mutex);
vq = vdev->vqs[index];
if (vq == NULL || vq->owner_lcore != SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
pthread_mutex_unlock(&vdev->mutex);
return -1;
}
assert(vq->poller == NULL);
vq->owner_lcore = spdk_env_get_current_core();
pthread_mutex_unlock(&vdev->mutex);
return 0;
}
int32_t
virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
{
struct virtqueue *vq = NULL;
uint16_t i;
pthread_mutex_lock(&vdev->mutex);
for (i = start_index; i < vdev->max_queues; ++i) {
vq = vdev->vqs[i];
if (vq != NULL && vq->owner_lcore == SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
break;
}
}
if (vq == NULL || i == vdev->max_queues) {
SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
pthread_mutex_unlock(&vdev->mutex);
return -1;
}
assert(vq->poller == NULL);
vq->owner_lcore = spdk_env_get_current_core();
pthread_mutex_unlock(&vdev->mutex);
return i;
}
void
virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
{
struct virtqueue *vq = NULL;
if (index >= vdev->max_queues) {
SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
index, vdev->max_queues);
return;
}
pthread_mutex_lock(&vdev->mutex);
vq = vdev->vqs[index];
if (vq == NULL) {
SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
return;
}
assert(vq->poller == NULL);
assert(vq->owner_lcore == spdk_env_get_current_core());
vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
pthread_mutex_unlock(&vdev->mutex);
}
SPDK_LOG_REGISTER_TRACE_FLAG("virtio_dev", SPDK_TRACE_VIRTIO_DEV) SPDK_LOG_REGISTER_TRACE_FLAG("virtio_dev", SPDK_TRACE_VIRTIO_DEV)

View File

@ -67,6 +67,11 @@
*/ */
#define VQ_RING_DESC_CHAIN_END 32768 #define VQ_RING_DESC_CHAIN_END 32768
/* This is a work-around for fio-plugin bug, where each
* fio job thread returns local lcore id = -1
*/
#define SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED (UINT32_MAX - 1)
struct virtio_dev { struct virtio_dev {
struct virtqueue **vqs; struct virtqueue **vqs;
uint16_t started; uint16_t started;
@ -85,6 +90,9 @@ struct virtio_dev {
/** Modern/legacy virtio device flag. */ /** Modern/legacy virtio device flag. */
uint8_t modern; uint8_t modern;
/** Mutex for asynchronous virtqueue-changing operations. */
pthread_mutex_t mutex;
TAILQ_ENTRY(virtio_dev) tailq; TAILQ_ENTRY(virtio_dev) tailq;
}; };
@ -128,6 +136,12 @@ struct virtqueue {
uint16_t vq_queue_index; /**< PCI queue index */ uint16_t vq_queue_index; /**< PCI queue index */
uint16_t *notify_addr; uint16_t *notify_addr;
/** Logical CPU ID that's polling this queue. */
uint32_t owner_lcore;
/** Response poller. */
struct spdk_bdev_poller *poller;
struct vq_desc_extra vq_descx[0]; struct vq_desc_extra vq_descx[0];
}; };
@ -212,4 +226,40 @@ virtqueue_kick_prepare(struct virtqueue *vq)
return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY); return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
} }
/**
* Bind a virtqueue with given index to the current CPU core.
*
* This function is thread-safe.
*
* \param vdev vhost device
* \param index virtqueue index
* \return 0 on success, -1 in case a virtqueue with given index either
* does not exists or is already acquired.
*/
int virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index);
/**
* Look for unused queue and bind it to the current CPU core. This will
* scan the queues in range from *start_index* (inclusive) up to
* vdev->max_queues (exclusive).
*
* This function is thread-safe.
*
* \param vdev vhost device
* \param start_index virtqueue index to start looking from
* \return index of acquired queue or -1 in case no unused queue in given range
* has been found
*/
int32_t virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index);
/**
* Release previously acquired queue.
*
* This function must be called from the thread that acquired the queue.
*
* \param vdev vhost device
* \param index index of virtqueue to release
*/
void virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index);
#endif /* _VIRTIO_DEV_H_ */ #endif /* _VIRTIO_DEV_H_ */

View File

@ -714,6 +714,7 @@ vtpci_init(struct virtio_dev *vdev, const struct virtio_pci_ops *ops)
} }
vdev->id = vdev_num; vdev->id = vdev_num;
pthread_mutex_init(&vdev->mutex, NULL);
g_virtio_driver.internal[vdev_num].vtpci_ops = ops; g_virtio_driver.internal[vdev_num].vtpci_ops = ops;
return 0; return 0;

View File

@ -174,7 +174,7 @@ virtio_user_dev_setup(struct virtio_user_dev *dev)
} }
struct virtio_dev * struct virtio_dev *
virtio_user_dev_init(char *path, int queue_size) virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size)
{ {
struct virtio_dev *vdev; struct virtio_dev *vdev;
struct virtio_user_dev *dev; struct virtio_user_dev *dev;
@ -206,12 +206,13 @@ virtio_user_dev_init(char *path, int queue_size)
goto err; goto err;
} }
if (max_queues >= VIRTIO_MAX_VIRTQUEUES) { if (requested_queues > max_queues) {
SPDK_ERRLOG("invalid get_queue_num value: %"PRIu64"\n", max_queues); SPDK_ERRLOG("requested %"PRIu16" queues but only %"PRIu64" available\n",
requested_queues, max_queues);
goto err; goto err;
} }
vdev->max_queues = max_queues; vdev->max_queues = requested_queues;
if (dev->ops->send_request(dev, VHOST_USER_SET_OWNER, NULL) < 0) { if (dev->ops->send_request(dev, VHOST_USER_SET_OWNER, NULL) < 0) {
SPDK_ERRLOG("set_owner fails: %s\n", strerror(errno)); SPDK_ERRLOG("set_owner fails: %s\n", strerror(errno));

View File

@ -62,7 +62,7 @@ struct virtio_user_dev {
int virtio_user_start_device(struct virtio_user_dev *dev); int virtio_user_start_device(struct virtio_user_dev *dev);
int virtio_user_stop_device(struct virtio_user_dev *dev); int virtio_user_stop_device(struct virtio_user_dev *dev);
struct virtio_dev *virtio_user_dev_init(char *path, int queue_size); struct virtio_dev *virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size);
void virtio_user_dev_uninit(struct virtio_user_dev *dev); void virtio_user_dev_uninit(struct virtio_user_dev *dev);
#endif #endif

View File

@ -1,5 +1,6 @@
[VirtioUser0] [VirtioUser0]
Path /tmp/vhost.0 Path /tmp/vhost.0
Queues 16
[Rpc] [Rpc]
Enable Yes Enable Yes