bdev_virtio: implement multiqueue

Virtqueues now have to be "acquired"
by a logical CPU core in order to
perform any traffic on them. The
acquire mechanism is thread-safe to
prevent two reactors accessing the
same virtqueue at the same time.

For now a single virtqueue
may be used by only one io_channel.
Support for shared virtqueues will
be implemented in future.

Added new param "Queues" to the
virtio config file for VirtioUser
bdevs. VirtioPci will use the
max available queues num -
negotiated during QEMU startup.

Change-Id: I3fd4b9d8c470f26ca9b84838b3c64de6f9e48300
Signed-off-by: Dariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
Reviewed-on: https://review.gerrithub.io/377337
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
This commit is contained in:
Dariusz Stojaczyk 2017-10-10 20:44:35 +02:00 committed by Daniel Verkamp
parent 14db5b6646
commit 8b0a4a3c53
7 changed files with 191 additions and 31 deletions

View File

@ -70,7 +70,9 @@ struct virtio_scsi_io_ctx {
struct virtio_scsi_scan_base {
struct virtio_dev *vdev;
struct spdk_bdev_poller *scan_poller;
/** Virtqueue used for the scan I/O. */
struct virtqueue *vq;
/* Currently queried target */
unsigned target;
@ -94,7 +96,9 @@ struct virtio_scsi_disk {
struct bdev_virtio_io_channel {
struct virtio_dev *vdev;
struct spdk_bdev_poller *poller;
/** Virtqueue exclusively assigned to this channel. */
struct virtqueue *vq;
};
static void scan_target(struct virtio_scsi_scan_base *base);
@ -144,6 +148,7 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);
vreq->iov = bdev_io->u.bdev.iovs;
vreq->iovcnt = bdev_io->u.bdev.iovcnt;
@ -158,15 +163,15 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks);
}
virtio_xmit_pkts(disk->vdev->vqs[2], vreq);
virtio_xmit_pkts(virtio_channel->vq, vreq);
}
static void
bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);
struct spdk_scsi_unmap_bdesc *desc, *first_desc;
uint8_t *buf;
uint64_t offset_blocks, num_blocks;
@ -206,7 +211,7 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */
memset(&buf[4], 0, 4); /* reserved */
virtio_xmit_pkts(disk->vdev->vqs[2], vreq);
virtio_xmit_pkts(virtio_channel->vq, vreq);
}
static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
@ -331,7 +336,7 @@ bdev_virtio_poll(void *arg)
struct virtio_req *req[32];
uint16_t i, cnt;
cnt = virtio_recv_pkts(ch->vdev->vqs[2], req, SPDK_COUNTOF(req));
cnt = virtio_recv_pkts(ch->vq, req, SPDK_COUNTOF(req));
for (i = 0; i < cnt; ++i) {
bdev_virtio_io_cpl(req[i]);
}
@ -340,12 +345,26 @@ bdev_virtio_poll(void *arg)
static int
bdev_virtio_create_cb(void *io_device, void *ctx_buf)
{
struct virtio_dev **vdev = io_device;
struct virtio_dev **vdev_ptr = io_device;
struct virtio_dev *vdev = *vdev_ptr;
struct bdev_virtio_io_channel *ch = ctx_buf;
struct virtqueue *vq;
int32_t queue_idx;
queue_idx = virtio_dev_find_and_acquire_queue(vdev, 2);
if (queue_idx < 0) {
SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n");
return queue_idx;
}
vq = vdev->vqs[queue_idx];
ch->vdev = vdev;
ch->vq = vq;
spdk_bdev_poller_start(&vq->poller, bdev_virtio_poll, ch,
vq->owner_lcore, 0);
ch->vdev = *vdev;
spdk_bdev_poller_start(&ch->poller, bdev_virtio_poll, ch,
spdk_env_get_current_core(), 0);
return 0;
}
@ -353,8 +372,11 @@ static void
bdev_virtio_destroy_cb(void *io_device, void *ctx_buf)
{
struct bdev_virtio_io_channel *io_channel = ctx_buf;
struct virtio_dev *vdev = io_channel->vdev;
struct virtqueue *vq = io_channel->vq;
spdk_bdev_poller_stop(&io_channel->poller);
spdk_bdev_poller_stop(&vq->poller);
virtio_dev_release_queue(vdev, vq->vq_queue_index);
}
static void
@ -368,7 +390,8 @@ scan_target_finish(struct virtio_scsi_scan_base *base)
return;
}
spdk_bdev_poller_stop(&base->scan_poller);
spdk_bdev_poller_stop(&base->vq->poller);
virtio_dev_release_queue(base->vdev, base->vq->vq_queue_index);
while ((disk = TAILQ_FIRST(&base->found_disks))) {
TAILQ_REMOVE(&base->found_disks, disk, link);
@ -400,7 +423,7 @@ send_read_cap_10(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
iov[0].iov_len = 8;
req->cdb[0] = SPDK_SBC_READ_CAPACITY_10;
virtio_xmit_pkts(base->vdev->vqs[2], vreq);
virtio_xmit_pkts(base->vq, vreq);
}
static void
@ -418,7 +441,7 @@ send_read_cap_16(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16;
to_be32(&req->cdb[10], iov[0].iov_len);
virtio_xmit_pkts(base->vdev->vqs[2], vreq);
virtio_xmit_pkts(base->vq, vreq);
}
static int
@ -556,7 +579,7 @@ bdev_scan_poll(void *arg)
struct virtio_req *req;
uint16_t cnt;
cnt = virtio_recv_pkts(base->vdev->vqs[2], &req, 1);
cnt = virtio_recv_pkts(base->vq, &req, 1);
if (cnt > 0) {
process_scan_resp(base, req);
}
@ -596,7 +619,7 @@ scan_target(struct virtio_scsi_scan_base *base)
cdb->opcode = SPDK_SPC_INQUIRY;
cdb->alloc_len[1] = 255;
virtio_xmit_pkts(base->vdev->vqs[2], vreq);
virtio_xmit_pkts(base->vq, vreq);
}
static int
@ -606,6 +629,7 @@ bdev_virtio_process_config(void)
struct virtio_dev *vdev = NULL;
char *path;
unsigned vdev_num;
int num_queues;
bool enable_pci;
int rc = 0;
@ -628,7 +652,12 @@ bdev_virtio_process_config(void)
goto out;
}
vdev = virtio_user_dev_init(path, 512);
num_queues = spdk_conf_section_get_intval(sp, "Queues");
if (num_queues < 1) {
num_queues = 1;
}
vdev = virtio_user_dev_init(path, num_queues + 2, 512);
if (vdev == NULL) {
rc = -1;
goto out;
@ -659,6 +688,7 @@ bdev_virtio_initialize(void)
{
struct virtio_scsi_scan_base *base;
struct virtio_dev *vdev = NULL;
struct virtqueue *vq;
int rc = 0;
rc = bdev_virtio_process_config();
@ -692,9 +722,16 @@ bdev_virtio_initialize(void)
base->vdev = vdev;
TAILQ_INIT(&base->found_disks);
spdk_bdev_poller_start(&base->scan_poller, bdev_scan_poll, base,
spdk_env_get_current_core(), 0);
rc = virtio_dev_acquire_queue(vdev, 2);
if (rc != 0) {
SPDK_ERRLOG("Couldn't acquire requestq for the target scan.\n");
goto out;
}
vq = vdev->vqs[2];
base->vq = vq;
spdk_bdev_poller_start(&vq->poller, bdev_scan_poll, base,
vq->owner_lcore, 0);
scan_target(base);
}

View File

@ -165,6 +165,9 @@ virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
vq->mz = mz;
vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
vq->poller = NULL;
if (vtpci_ops(dev)->setup_queue(dev, vq) < 0) {
SPDK_ERRLOG("setup_queue failed\n");
return -EINVAL;
@ -276,13 +279,6 @@ virtio_dev_init(struct virtio_dev *dev, uint64_t req_features)
if (virtio_negotiate_features(dev, req_features) < 0)
return -1;
/* FIXME
* Hardcode num_queues to 3 until we add proper
* mutli-queue support. This value should be limited
* by number of cores assigned to SPDK
*/
dev->max_queues = 3;
ret = virtio_alloc_queues(dev);
if (ret < 0)
return ret;
@ -532,4 +528,78 @@ virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req)
return 1;
}
int
virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
{
struct virtqueue *vq = NULL;
if (index >= vdev->max_queues) {
SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
index, vdev->max_queues);
return -1;
}
pthread_mutex_lock(&vdev->mutex);
vq = vdev->vqs[index];
if (vq == NULL || vq->owner_lcore != SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
pthread_mutex_unlock(&vdev->mutex);
return -1;
}
assert(vq->poller == NULL);
vq->owner_lcore = spdk_env_get_current_core();
pthread_mutex_unlock(&vdev->mutex);
return 0;
}
int32_t
virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
{
struct virtqueue *vq = NULL;
uint16_t i;
pthread_mutex_lock(&vdev->mutex);
for (i = start_index; i < vdev->max_queues; ++i) {
vq = vdev->vqs[i];
if (vq != NULL && vq->owner_lcore == SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
break;
}
}
if (vq == NULL || i == vdev->max_queues) {
SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
pthread_mutex_unlock(&vdev->mutex);
return -1;
}
assert(vq->poller == NULL);
vq->owner_lcore = spdk_env_get_current_core();
pthread_mutex_unlock(&vdev->mutex);
return i;
}
void
virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
{
struct virtqueue *vq = NULL;
if (index >= vdev->max_queues) {
SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
index, vdev->max_queues);
return;
}
pthread_mutex_lock(&vdev->mutex);
vq = vdev->vqs[index];
if (vq == NULL) {
SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
return;
}
assert(vq->poller == NULL);
assert(vq->owner_lcore == spdk_env_get_current_core());
vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
pthread_mutex_unlock(&vdev->mutex);
}
SPDK_LOG_REGISTER_TRACE_FLAG("virtio_dev", SPDK_TRACE_VIRTIO_DEV)

View File

@ -67,6 +67,11 @@
*/
#define VQ_RING_DESC_CHAIN_END 32768
/* This is a work-around for fio-plugin bug, where each
* fio job thread returns local lcore id = -1
*/
#define SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED (UINT32_MAX - 1)
struct virtio_dev {
struct virtqueue **vqs;
uint16_t started;
@ -85,6 +90,9 @@ struct virtio_dev {
/** Modern/legacy virtio device flag. */
uint8_t modern;
/** Mutex for asynchronous virtqueue-changing operations. */
pthread_mutex_t mutex;
TAILQ_ENTRY(virtio_dev) tailq;
};
@ -128,6 +136,12 @@ struct virtqueue {
uint16_t vq_queue_index; /**< PCI queue index */
uint16_t *notify_addr;
/** Logical CPU ID that's polling this queue. */
uint32_t owner_lcore;
/** Response poller. */
struct spdk_bdev_poller *poller;
struct vq_desc_extra vq_descx[0];
};
@ -212,4 +226,40 @@ virtqueue_kick_prepare(struct virtqueue *vq)
return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
}
/**
* Bind a virtqueue with given index to the current CPU core.
*
* This function is thread-safe.
*
* \param vdev vhost device
* \param index virtqueue index
* \return 0 on success, -1 in case a virtqueue with given index either
* does not exists or is already acquired.
*/
int virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index);
/**
* Look for unused queue and bind it to the current CPU core. This will
* scan the queues in range from *start_index* (inclusive) up to
* vdev->max_queues (exclusive).
*
* This function is thread-safe.
*
* \param vdev vhost device
* \param start_index virtqueue index to start looking from
* \return index of acquired queue or -1 in case no unused queue in given range
* has been found
*/
int32_t virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index);
/**
* Release previously acquired queue.
*
* This function must be called from the thread that acquired the queue.
*
* \param vdev vhost device
* \param index index of virtqueue to release
*/
void virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index);
#endif /* _VIRTIO_DEV_H_ */

View File

@ -714,6 +714,7 @@ vtpci_init(struct virtio_dev *vdev, const struct virtio_pci_ops *ops)
}
vdev->id = vdev_num;
pthread_mutex_init(&vdev->mutex, NULL);
g_virtio_driver.internal[vdev_num].vtpci_ops = ops;
return 0;

View File

@ -174,7 +174,7 @@ virtio_user_dev_setup(struct virtio_user_dev *dev)
}
struct virtio_dev *
virtio_user_dev_init(char *path, int queue_size)
virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size)
{
struct virtio_dev *vdev;
struct virtio_user_dev *dev;
@ -206,12 +206,13 @@ virtio_user_dev_init(char *path, int queue_size)
goto err;
}
if (max_queues >= VIRTIO_MAX_VIRTQUEUES) {
SPDK_ERRLOG("invalid get_queue_num value: %"PRIu64"\n", max_queues);
if (requested_queues > max_queues) {
SPDK_ERRLOG("requested %"PRIu16" queues but only %"PRIu64" available\n",
requested_queues, max_queues);
goto err;
}
vdev->max_queues = max_queues;
vdev->max_queues = requested_queues;
if (dev->ops->send_request(dev, VHOST_USER_SET_OWNER, NULL) < 0) {
SPDK_ERRLOG("set_owner fails: %s\n", strerror(errno));

View File

@ -62,7 +62,7 @@ struct virtio_user_dev {
int virtio_user_start_device(struct virtio_user_dev *dev);
int virtio_user_stop_device(struct virtio_user_dev *dev);
struct virtio_dev *virtio_user_dev_init(char *path, int queue_size);
struct virtio_dev *virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size);
void virtio_user_dev_uninit(struct virtio_user_dev *dev);
#endif

View File

@ -1,5 +1,6 @@
[VirtioUser0]
Path /tmp/vhost.0
Queues 16
[Rpc]
Enable Yes