vhost: replaced tasks ring with dynamic array

Each virtqueue will now contain
it's own buffer for I/O tasks.

Some of the task struct fields
are now the same across subsequent
I/Os, so they can be now set
just once - at the vdev start/stop.

This simplifies the code, the
debugging process and also introduces
additional sanity check preventing
vhost from processing two requests
with the same id at the same time.

Change-Id: Idcf388e8bf7c92e5536199c35eb0eb6339c00d84
Signed-off-by: Dariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
Reviewed-on: https://review.gerrithub.io/369114
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Pawel Wodkowski <pawelx.wodkowski@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Dariusz Stojaczyk 2017-10-10 22:19:40 +02:00 committed by Daniel Verkamp
parent 3a33129a6d
commit 04b2fee404
4 changed files with 153 additions and 175 deletions

View File

@ -781,6 +781,11 @@ start_device(int vid)
goto out;
}
if (vdev->virtqueue[i].vring.size == 0) {
SPDK_ERRLOG("vhost device %d: Queue %"PRIu16" has size 0.\n", vid, i);
goto out;
}
/* Disable notifications. */
if (rte_vhost_enable_guest_notification(vid, i, 0) != 0) {
SPDK_ERRLOG("vhost device %d: Failed to disable guest notification on queue %"PRIu16"\n", vid, i);

View File

@ -53,6 +53,9 @@ struct spdk_vhost_blk_task {
uint16_t req_idx;
/* If set, the task is currently used for I/O processing. */
bool used;
uint32_t length;
uint16_t iovcnt;
struct iovec iovs[SPDK_VHOST_IOVS_MAX];
@ -64,42 +67,15 @@ struct spdk_vhost_blk_dev {
struct spdk_bdev_desc *bdev_desc;
struct spdk_io_channel *bdev_io_channel;
struct spdk_poller *requestq_poller;
struct spdk_ring *tasks_pool;
bool readonly;
};
static void
spdk_vhost_blk_get_tasks(struct spdk_vhost_blk_dev *bvdev, struct spdk_vhost_blk_task **tasks,
size_t count)
blk_task_finish(struct spdk_vhost_blk_task *task)
{
size_t res_count;
bvdev->vdev.task_cnt += count;
res_count = spdk_ring_dequeue(bvdev->tasks_pool, (void **)tasks, count);
/* Allocated task count in init function is equal queue depth so dequeue must not fail. */
assert(res_count == count);
for (res_count = 0; res_count < count; res_count++) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK_TASK, "GET task %p\n", tasks[res_count]);
}
}
static void
spdk_vhost_blk_put_tasks(struct spdk_vhost_blk_dev *bvdev, struct spdk_vhost_blk_task **tasks,
size_t count)
{
size_t res_count;
for (res_count = 0; res_count < count; res_count++) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK_TASK, "PUT task %p\n", tasks[res_count]);
}
res_count = spdk_ring_enqueue(bvdev->tasks_pool, (void **)tasks, count);
/* Allocated task count in init function is equal queue depth so enqueue must not fail. */
assert(res_count == count);
bvdev->vdev.task_cnt -= count;
assert(task->bvdev->vdev.task_cnt > 0);
task->bvdev->vdev.task_cnt--;
task->used = false;
}
static void
@ -110,7 +86,7 @@ invalid_blk_request(struct spdk_vhost_blk_task *task, uint8_t status)
}
spdk_vhost_vq_used_ring_enqueue(&task->bvdev->vdev, task->vq, task->req_idx, 0);
spdk_vhost_blk_put_tasks(task->bvdev, &task, 1);
blk_task_finish(task);
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK_DATA, "Invalid request (status=%" PRIu8")\n", status);
}
@ -189,7 +165,7 @@ blk_request_finish(bool success, struct spdk_vhost_blk_task *task)
task->length);
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "Finished task (%p) req_idx=%d\n status: %s\n", task,
task->req_idx, success ? "OK" : "FAIL");
spdk_vhost_blk_put_tasks(task->bvdev, &task, 1);
blk_task_finish(task);
}
static void
@ -203,22 +179,15 @@ blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg
static int
process_blk_request(struct spdk_vhost_blk_task *task, struct spdk_vhost_blk_dev *bvdev,
struct spdk_vhost_virtqueue *vq,
uint16_t req_idx)
struct spdk_vhost_virtqueue *vq)
{
const struct virtio_blk_outhdr *req;
struct iovec *iov;
uint32_t type;
int rc;
assert(task->bvdev == bvdev);
task->req_idx = req_idx;
task->vq = vq;
task->iovcnt = SPDK_COUNTOF(task->iovs);
task->status = NULL;
if (blk_iovs_setup(&bvdev->vdev, vq, req_idx, task->iovs, &task->iovcnt, &task->length)) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "Invalid request (req_idx = %"PRIu16").\n", req_idx);
if (blk_iovs_setup(&bvdev->vdev, vq, task->req_idx, task->iovs, &task->iovcnt, &task->length)) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
/* Only READ and WRITE are supported for now. */
invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
return -1;
@ -228,7 +197,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, struct spdk_vhost_blk_dev
if (spdk_unlikely(iov->iov_len != sizeof(*req))) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK,
"First descriptor size is %zu but expected %zu (req_idx = %"PRIu16").\n",
iov->iov_len, sizeof(*req), req_idx);
iov->iov_len, sizeof(*req), task->req_idx);
invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
return -1;
}
@ -239,7 +208,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, struct spdk_vhost_blk_dev
if (spdk_unlikely(iov->iov_len != 1)) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK,
"Last descriptor size is %zu but expected %d (req_idx = %"PRIu16").\n",
iov->iov_len, 1, req_idx);
iov->iov_len, 1, task->req_idx);
invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
return -1;
}
@ -259,7 +228,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, struct spdk_vhost_blk_dev
case VIRTIO_BLK_T_OUT:
if (spdk_unlikely((task->length & (512 - 1)) != 0)) {
SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (req_idx = %"PRIu16").\n",
type ? "WRITE" : "READ", req_idx);
type ? "WRITE" : "READ", task->req_idx);
invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
return -1;
}
@ -303,7 +272,7 @@ process_blk_request(struct spdk_vhost_blk_task *task, struct spdk_vhost_blk_dev
static void
process_vq(struct spdk_vhost_blk_dev *bvdev, struct spdk_vhost_virtqueue *vq)
{
struct spdk_vhost_blk_task *tasks[32] = {0};
struct spdk_vhost_blk_task *task;
int rc;
uint16_t reqs[32];
uint16_t reqs_cnt, i;
@ -313,16 +282,37 @@ process_vq(struct spdk_vhost_blk_dev *bvdev, struct spdk_vhost_virtqueue *vq)
return;
}
spdk_vhost_blk_get_tasks(bvdev, tasks, reqs_cnt);
for (i = 0; i < reqs_cnt; i++) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "====== Starting processing request idx %"PRIu16"======\n",
reqs[i]);
rc = process_blk_request(tasks[i], bvdev, vq, reqs[i]);
if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
bvdev->vdev.name, reqs[i], vq->vring.size);
spdk_vhost_vq_used_ring_enqueue(&bvdev->vdev, vq, reqs[i], 0);
continue;
}
task = &((struct spdk_vhost_blk_task *)vq->tasks)[reqs[i]];
if (spdk_unlikely(task->used)) {
SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
bvdev->vdev.name, reqs[i]);
spdk_vhost_vq_used_ring_enqueue(&bvdev->vdev, vq, reqs[i], 0);
continue;
}
bvdev->vdev.task_cnt++;
task->used = true;
task->iovcnt = SPDK_COUNTOF(task->iovs);
task->status = NULL;
rc = process_blk_request(task, bvdev, vq);
if (rc == 0) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "====== Task %p req_idx %d submitted ======\n", tasks[i],
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "====== Task %p req_idx %d submitted ======\n", task,
reqs[i]);
} else {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "====== Task %p req_idx %d failed ======\n", tasks[i], reqs[i]);
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_BLK, "====== Task %p req_idx %d failed ======\n", task, reqs[i]);
}
}
}
@ -418,70 +408,56 @@ bdev_remove_cb(void *remove_ctx)
spdk_vhost_call_external_event(bvdev->vdev.name, _bdev_remove_cb, bvdev);
}
static void
free_task_pool(struct spdk_vhost_blk_dev *bvdev)
{
struct spdk_vhost_task *task;
struct spdk_vhost_virtqueue *vq;
uint16_t i;
if (!bvdev->tasks_pool) {
return;
for (i = 0; i < bvdev->vdev.num_queues; i++) {
vq = &bvdev->vdev.virtqueue[i];
if (vq->tasks == NULL) {
continue;
}
while (spdk_ring_dequeue(bvdev->tasks_pool, (void **)&task, 1) == 1) {
spdk_dma_free(task);
spdk_dma_free(vq->tasks);
vq->tasks = NULL;
}
spdk_ring_free(bvdev->tasks_pool);
bvdev->tasks_pool = NULL;
}
static int
alloc_task_pool(struct spdk_vhost_blk_dev *bvdev)
{
struct spdk_vhost_virtqueue *vq;
struct spdk_vhost_blk_task *task;
uint32_t task_cnt = 0;
uint32_t ring_size, socket_id;
uint32_t task_cnt;
uint16_t i;
int rc;
uint32_t j;
for (i = 0; i < bvdev->vdev.num_queues; i++) {
/*
* FIXME:
* this is too big because we need only size/2 from each queue but for now
* lets leave it as is to be sure we are not mistaken.
*
* Limit the pool size to 1024 * num_queues. This should be enough as QEMU have the
* same hard limit for queue size.
*/
task_cnt += spdk_min(bvdev->vdev.virtqueue[i].vring.size, 1024);
}
ring_size = spdk_align32pow2(task_cnt + 1);
socket_id = spdk_env_get_socket_id(bvdev->vdev.lcore);
bvdev->tasks_pool = spdk_ring_create(SPDK_RING_TYPE_SP_SC, ring_size, socket_id);
if (bvdev->tasks_pool == NULL) {
SPDK_ERRLOG("Controller %s: Failed to init vhost blk task pool\n", bvdev->vdev.name);
vq = &bvdev->vdev.virtqueue[i];
task_cnt = vq->vring.size;
if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
/* sanity check */
SPDK_ERRLOG("Controller %s: virtuque %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
bvdev->vdev.name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
free_task_pool(bvdev);
return -1;
}
for (i = 0; i < task_cnt; ++i) {
task = spdk_dma_malloc_socket(sizeof(*task), SPDK_CACHE_LINE_SIZE, NULL, socket_id);
if (task == NULL) {
SPDK_ERRLOG("Controller %s: Failed to allocate task\n", bvdev->vdev.name);
vq->tasks = spdk_dma_zmalloc(sizeof(struct spdk_vhost_blk_task) * task_cnt,
SPDK_CACHE_LINE_SIZE, NULL);
if (vq->tasks == NULL) {
SPDK_ERRLOG("Controller %s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
bvdev->vdev.name, task_cnt, i);
free_task_pool(bvdev);
return -1;
}
for (j = 0; j < task_cnt; j++) {
task = &((struct spdk_vhost_blk_task *)vq->tasks)[j];
task->bvdev = bvdev;
rc = spdk_ring_enqueue(bvdev->tasks_pool, (void **)&task, 1);
if (rc != 1) {
SPDK_ERRLOG("Controller %s: Failed to enqueue %"PRIu32" vhost blk tasks\n", bvdev->vdev.name,
task_cnt);
free_task_pool(bvdev);
return -1;
task->req_idx = j;
task->vq = vq;
}
}
@ -763,5 +739,4 @@ spdk_vhost_blk_destroy(struct spdk_vhost_dev *vdev)
}
SPDK_LOG_REGISTER_TRACE_FLAG("vhost_blk", SPDK_TRACE_VHOST_BLK)
SPDK_LOG_REGISTER_TRACE_FLAG("vhost_blk_task", SPDK_TRACE_VHOST_BLK_TASK)
SPDK_LOG_REGISTER_TRACE_FLAG("vhost_blk_data", SPDK_TRACE_VHOST_BLK_DATA)

View File

@ -61,6 +61,7 @@
#endif
#define SPDK_VHOST_MAX_VQUEUES 256
#define SPDK_VHOST_MAX_VQ_SIZE 1024
#define SPDK_VHOST_SCSI_CTRLR_MAX_DEVS 8
@ -84,6 +85,7 @@ enum spdk_vhost_dev_type {
struct spdk_vhost_virtqueue {
struct rte_vhost_vring vring;
void *tasks;
} __attribute((aligned(SPDK_CACHE_LINE_SIZE)));
struct spdk_vhost_dev_backend {

View File

@ -77,7 +77,6 @@ struct spdk_vhost_scsi_dev {
struct spdk_scsi_dev *scsi_dev[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];
struct spdk_scsi_dev_vhost_state scsi_dev_state[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];
struct spdk_ring *task_pool;
struct spdk_poller *requestq_poller;
struct spdk_poller *mgmt_poller;
} __rte_cache_aligned;
@ -96,6 +95,9 @@ struct spdk_vhost_scsi_task {
int req_idx;
/* If set, the task is currently used for I/O processing. */
bool used;
struct spdk_vhost_virtqueue *vq;
};
@ -125,24 +127,7 @@ spdk_vhost_scsi_task_free_cb(struct spdk_scsi_task *scsi_task)
assert(task->svdev->vdev.task_cnt > 0);
task->svdev->vdev.task_cnt--;
spdk_ring_enqueue(task->svdev->task_pool, (void **) &task, 1);
}
static void
spdk_vhost_get_tasks(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_scsi_task **tasks,
size_t count)
{
size_t res_count;
res_count = spdk_ring_dequeue(svdev->task_pool, (void **)tasks, count);
if (res_count != count) {
SPDK_ERRLOG("%s: couldn't get %zu tasks from task_pool\n", svdev->vdev.name, count);
/* FIXME: we should never run out of tasks, but what if we do? */
abort();
}
assert(svdev->vdev.task_cnt <= INT_MAX - (int) res_count);
svdev->vdev.task_cnt += res_count;
task->used = false;
}
static void
@ -546,20 +531,31 @@ process_request(struct spdk_vhost_scsi_task *task)
static void
process_controlq(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_virtqueue *vq)
{
struct spdk_vhost_scsi_task *tasks[32];
struct spdk_vhost_scsi_task *task;
uint16_t reqs[32];
uint16_t reqs_cnt, i;
reqs_cnt = spdk_vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
spdk_vhost_get_tasks(svdev, tasks, reqs_cnt);
for (i = 0; i < reqs_cnt; i++) {
task = tasks[i];
memset(task, 0, sizeof(*task));
task->vq = vq;
task->svdev = svdev;
task->req_idx = reqs[i];
if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
SPDK_ERRLOG("%s: invalid entry in avail ring. Buffer '%"PRIu16"' exceeds virtqueue size (%"PRIu16")\n",
svdev->vdev.name, reqs[i], vq->vring.size);
spdk_vhost_vq_used_ring_enqueue(&svdev->vdev, vq, reqs[i], 0);
continue;
}
task = &((struct spdk_vhost_scsi_task *)vq->tasks)[reqs[i]];
if (spdk_unlikely(task->used)) {
SPDK_ERRLOG("%s: invalid entry in avail ring. Buffer '%"PRIu16"' is still in use!\n",
svdev->vdev.name, reqs[i]);
spdk_vhost_vq_used_ring_enqueue(&svdev->vdev, vq, reqs[i], 0);
continue;
}
svdev->vdev.task_cnt++;
memset(&task->scsi, 0, sizeof(task->scsi));
task->tmf_resp = NULL;
task->used = true;
process_ctrl_request(task);
}
}
@ -567,7 +563,6 @@ process_controlq(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_virtqueue
static void
process_requestq(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_virtqueue *vq)
{
struct spdk_vhost_scsi_task *tasks[32];
struct spdk_vhost_scsi_task *task;
uint16_t reqs[32];
uint16_t reqs_cnt, i;
@ -576,17 +571,29 @@ process_requestq(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_virtqueue
reqs_cnt = spdk_vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
assert(reqs_cnt <= 32);
spdk_vhost_get_tasks(svdev, tasks, reqs_cnt);
for (i = 0; i < reqs_cnt; i++) {
SPDK_DEBUGLOG(SPDK_TRACE_VHOST_SCSI, "====== Starting processing request idx %"PRIu16"======\n",
reqs[i]);
task = tasks[i];
memset(task, 0, sizeof(*task));
task->vq = vq;
task->svdev = svdev;
task->req_idx = reqs[i];
if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
svdev->vdev.name, reqs[i], vq->vring.size);
spdk_vhost_vq_used_ring_enqueue(&svdev->vdev, vq, reqs[i], 0);
continue;
}
task = &((struct spdk_vhost_scsi_task *)vq->tasks)[reqs[i]];
if (spdk_unlikely(task->used)) {
SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
svdev->vdev.name, reqs[i]);
spdk_vhost_vq_used_ring_enqueue(&svdev->vdev, vq, reqs[i], 0);
continue;
}
svdev->vdev.task_cnt++;
memset(&task->scsi, 0, sizeof(task->scsi));
task->resp = NULL;
task->used = true;
result = process_request(task);
if (likely(result == 0)) {
task_submit(task);
@ -926,64 +933,53 @@ spdk_vhost_scsi_controller_construct(void)
static void
free_task_pool(struct spdk_vhost_scsi_dev *svdev)
{
struct spdk_vhost_task *task;
struct spdk_vhost_virtqueue *vq;
uint16_t i;
if (!svdev->task_pool) {
return;
for (i = 0; i < svdev->vdev.num_queues; i++) {
vq = &svdev->vdev.virtqueue[i];
if (vq->tasks == NULL) {
continue;
}
while (spdk_ring_dequeue(svdev->task_pool, (void **)&task, 1) == 1) {
spdk_dma_free(task);
spdk_dma_free(vq->tasks);
vq->tasks = NULL;
}
spdk_ring_free(svdev->task_pool);
svdev->task_pool = NULL;
}
static int
alloc_task_pool(struct spdk_vhost_scsi_dev *svdev)
{
struct spdk_vhost_virtqueue *vq;
struct spdk_vhost_scsi_task *task;
uint32_t task_cnt = 0;
uint32_t ring_size, socket_id;
uint32_t task_cnt;
uint16_t i;
int rc;
uint32_t j;
for (i = 0; i < svdev->vdev.num_queues; i++) {
/*
* FIXME:
* this is too big because we need only size/2 from each queue but for now
* lets leave it as is to be sure we are not mistaken.
*
* Limit the pool size to 1024 * num_queues. This should be enough as QEMU have the
* same hard limit for queue size.
*/
task_cnt += spdk_min(svdev->vdev.virtqueue[i].vring.size, 1024);
}
ring_size = spdk_align32pow2(task_cnt + 1);
socket_id = spdk_env_get_socket_id(svdev->vdev.lcore);
svdev->task_pool = spdk_ring_create(SPDK_RING_TYPE_SP_SC, ring_size, socket_id);
if (svdev->task_pool == NULL) {
SPDK_ERRLOG("Controller %s: Failed to init vhost scsi task pool\n", svdev->vdev.name);
vq = &svdev->vdev.virtqueue[i];
task_cnt = vq->vring.size;
if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
/* sanity check */
SPDK_ERRLOG("Controller %s: virtuque %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
svdev->vdev.name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
free_task_pool(svdev);
return -1;
}
for (i = 0; i < task_cnt; ++i) {
task = spdk_dma_zmalloc_socket(sizeof(*task), SPDK_CACHE_LINE_SIZE, NULL, socket_id);
if (task == NULL) {
SPDK_ERRLOG("Controller %s: Failed to allocate task\n", svdev->vdev.name);
vq->tasks = spdk_dma_zmalloc(sizeof(struct spdk_vhost_scsi_task) * task_cnt,
SPDK_CACHE_LINE_SIZE, NULL);
if (vq->tasks == NULL) {
SPDK_ERRLOG("Controller %s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
svdev->vdev.name, task_cnt, i);
free_task_pool(svdev);
return -1;
}
rc = spdk_ring_enqueue(svdev->task_pool, (void **)&task, 1);
if (rc != 1) {
SPDK_ERRLOG("Controller %s: Failed to enuqueue %"PRIu32" vhost scsi tasks\n", svdev->vdev.name,
task_cnt);
free_task_pool(svdev);
return -1;
for (j = 0; j < task_cnt; j++) {
task = &((struct spdk_vhost_scsi_task *)vq->tasks)[j];
task->svdev = svdev;
task->vq = vq;
task->req_idx = j;
}
}