nvmf/rdma: Send data and completion at the same time

For an NVMe read, send the completion immediately
following the RDMA WRITE, without waiting for
the acknowledgement. RDMA is strictly ordered,
so the WRITE will arrive before the completion.

Change-Id: I7e4e01d7a02c2130b655ef90f5fdaec992d9361a
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Ben Walker 2017-03-15 11:19:08 -07:00
parent d0fb728ad2
commit cc294653ca

View File

@ -287,7 +287,7 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
TAILQ_INIT(&rdma_conn->pending_data_buf_queue);
TAILQ_INIT(&rdma_conn->pending_rdma_rw_queue);
rdma_conn->cq = ibv_create_cq(id->verbs, max_queue_depth * 2, rdma_conn, channel, 0);
rdma_conn->cq = ibv_create_cq(id->verbs, max_queue_depth * 3, rdma_conn, channel, 0);
if (!rdma_conn->cq) {
SPDK_ERRLOG("Unable to create completion queue\n");
SPDK_ERRLOG("Completion Channel: %p Id: %p Verbs: %p\n", channel, id, id->verbs);
@ -301,7 +301,7 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
attr.qp_type = IBV_QPT_RC;
attr.send_cq = rdma_conn->cq;
attr.recv_cq = rdma_conn->cq;
attr.cap.max_send_wr = max_queue_depth; /* SEND, READ, and WRITE operations */
attr.cap.max_send_wr = max_queue_depth * 2; /* SEND, READ, and WRITE operations */
attr.cap.max_recv_wr = max_queue_depth; /* RECV operations */
attr.cap.max_send_sge = NVMF_DEFAULT_TX_SGE;
attr.cap.max_recv_sge = NVMF_DEFAULT_RX_SGE;
@ -539,75 +539,34 @@ nvmf_post_rdma_send(struct spdk_nvmf_request *req)
return rc;
}
/**
* REQUEST COMPLETION HANDLING
*
* Request completion consists of three steps:
*
* 1) Transfer any data to the host using an RDMA Write. If no data or an NVMe write,
* this step is unnecessary. (spdk_nvmf_rdma_request_transfer_data)
* 2) Upon transfer completion, update sq_head, re-post the recv capsule,
* and send the completion. (spdk_nvmf_rdma_request_send_completion)
* 3) Upon getting acknowledgement of the completion, decrement the internal
* count of number of outstanding requests.
*
* The public interface to initiate the process of completing a request is
* spdk_nvmf_rdma_request_complete(), which calls a a callback in the transport layer.
**/
static int
spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
request_transfer_in(struct spdk_nvmf_request *req)
{
int rc;
struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
assert(req->xfer != SPDK_NVME_DATA_NONE);
assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
rc = nvmf_post_rdma_write(req);
if (rc) {
SPDK_ERRLOG("Unable to transfer data from target to host\n");
return -1;
}
} else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
rc = nvmf_post_rdma_read(req);
if (rc) {
SPDK_ERRLOG("Unable to transfer data from host to target\n");
return -1;
}
}
rdma_conn->cur_rdma_rw_depth++;
} else {
TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
rdma_conn->cur_rdma_rw_depth++;
rc = nvmf_post_rdma_read(req);
if (rc) {
SPDK_ERRLOG("Unable to transfer data from host to target\n");
return -1;
}
return 0;
}
static int
spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
request_transfer_out(struct spdk_nvmf_request *req)
{
int rc;
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
int rc;
struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
struct spdk_nvmf_rdma_session *rdma_sess;
struct spdk_nvmf_rdma_buf *buf;
if (rdma_req->data_from_pool) {
/* Put the buffer back in the pool */
rdma_sess = get_rdma_sess(conn->sess);
buf = req->data;
SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
req->data = NULL;
req->length = 0;
rdma_req->data_from_pool = false;
}
/* Advance our sq_head pointer */
if (conn->sq_head == conn->sq_head_max) {
@ -630,6 +589,17 @@ spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
}
rdma_req->recv = NULL;
if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
/* Send the write */
rdma_conn->cur_rdma_rw_depth++;
rc = nvmf_post_rdma_write(req);
if (rc) {
SPDK_ERRLOG("Unable to transfer data from target to host\n");
return -1;
}
}
/* Send the completion */
rc = nvmf_post_rdma_send(req);
if (rc) {
@ -639,6 +609,31 @@ spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
return rc;
}
static int
spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
{
struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
if (req->xfer == SPDK_NVME_DATA_NONE) {
/* If no data transfer, this can bypass the queue */
return request_transfer_out(req);
}
if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
return request_transfer_out(req);
} else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
return request_transfer_in(req);
}
} else {
TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
}
return 0;
}
static int
nvmf_rdma_connect(struct rdma_cm_event *event)
{
@ -1405,12 +1400,32 @@ spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
rc = spdk_nvmf_rdma_request_transfer_data(req);
} else {
rc = spdk_nvmf_rdma_request_send_completion(req);
rc = request_transfer_out(req);
}
return rc;
}
static void
request_release_buffer(struct spdk_nvmf_request *req)
{
struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
struct spdk_nvmf_conn *conn = req->conn;
struct spdk_nvmf_rdma_session *rdma_sess;
struct spdk_nvmf_rdma_buf *buf;
if (rdma_req->data_from_pool) {
/* Put the buffer back in the pool */
rdma_sess = get_rdma_sess(conn->sess);
buf = req->data;
SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
req->data = NULL;
req->length = 0;
rdma_req->data_from_pool = false;
}
}
static void
spdk_nvmf_rdma_close_conn(struct spdk_nvmf_conn *conn)
{
@ -1557,7 +1572,14 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
"RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
req, conn, rdma_conn->cur_queue_depth - 1);
rdma_conn->cur_queue_depth--;
/* The request may still own a data buffer. Release it */
request_release_buffer(req);
/* Put the request back on the free list */
TAILQ_INSERT_TAIL(&rdma_conn->free_queue, rdma_req, link);
/* Try to process queued incoming requests */
rc = process_incoming_queue(rdma_conn);
if (rc < 0) {
error = true;
@ -1573,14 +1595,13 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE Complete. Request: %p Connection: %p\n",
req, conn);
spdk_trace_record(TRACE_RDMA_WRITE_COMPLETE, 0, 0, (uint64_t)req, 0);
rc = spdk_nvmf_rdma_request_send_completion(req);
if (rc) {
error = true;
continue;
}
/* Now that the write has completed, the data buffer can be released */
request_release_buffer(req);
rdma_conn->cur_rdma_rw_depth--;
/* Since an RDMA R/W operation completed, try to submit from the pending list. */
rdma_conn->cur_rdma_rw_depth--;
rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn);
if (rc < 0) {
error = true;