diff --git a/lib/nvmf/rdma.c b/lib/nvmf/rdma.c index f267d0eb87..fb1be0db3f 100644 --- a/lib/nvmf/rdma.c +++ b/lib/nvmf/rdma.c @@ -66,10 +66,12 @@ #define NVMF_DEFAULT_RX_SGE 2 struct spdk_nvmf_rdma_request { - struct spdk_nvmf_request req; + struct spdk_nvmf_request req; /* In Capsule data buffer */ - void *buf; + void *buf; + + TAILQ_ENTRY(spdk_nvmf_rdma_request) link; }; @@ -79,30 +81,39 @@ struct spdk_nvmf_rdma_conn { struct rdma_cm_id *cm_id; /* The maximum number of I/O outstanding on this connection at one time */ - uint16_t queue_depth; + uint16_t max_queue_depth; /* The maximum number of active RDMA READ and WRITE operations at one time */ - uint16_t rw_depth; + uint16_t max_rw_depth; - /* The current number of I/O outstanding on this connection */ - int num_outstanding_reqs; + /* The current number of I/O outstanding on this connection. This number + * includes all I/O from the time the capsule is first received until it is + * completed. + */ + uint16_t cur_queue_depth; - /* Array of size "queue_depth" containing RDMA requests. */ + /* The number of RDMA READ and WRITE requests that are outstanding */ + uint16_t cur_rdma_rw_depth; + + /* Requests that are waiting to perform an RDMA READ or WRITE */ + TAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_rw_queue; + + /* Array of size "max_queue_depth" containing RDMA requests. */ struct spdk_nvmf_rdma_request *reqs; - /* Array of size "queue_depth" containing 64 byte capsules + /* Array of size "max_queue_depth" containing 64 byte capsules * used for receive. */ union nvmf_h2c_msg *cmds; struct ibv_mr *cmds_mr; - /* Array of size "queue_depth" containing 16 byte completions + /* Array of size "max_queue_depth" containing 16 byte completions * to be sent back to the user. */ union nvmf_c2h_msg *cpls; struct ibv_mr *cpls_mr; - /* Array of size "queue_depth * InCapsuleDataSize" containing + /* Array of size "max_queue_depth * InCapsuleDataSize" containing * buffers to be used for in capsule data. TODO: Currently, all data * is in capsule. */ @@ -178,7 +189,7 @@ spdk_nvmf_rdma_conn_destroy(struct spdk_nvmf_rdma_conn *rdma_conn) } static struct spdk_nvmf_rdma_conn * -spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, uint16_t queue_depth, uint16_t rw_depth) +spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, uint16_t max_queue_depth, uint16_t max_rw_depth) { struct spdk_nvmf_rdma_conn *rdma_conn; struct spdk_nvmf_conn *conn; @@ -192,14 +203,15 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, uint16_t queue_depth, uint16_t return NULL; } - rdma_conn->queue_depth = queue_depth; - rdma_conn->rw_depth = rw_depth; + rdma_conn->max_queue_depth = max_queue_depth; + rdma_conn->max_rw_depth = max_rw_depth; rdma_conn->cm_id = id; + TAILQ_INIT(&rdma_conn->pending_rdma_rw_queue); memset(&attr, 0, sizeof(struct ibv_qp_init_attr)); attr.qp_type = IBV_QPT_RC; - attr.cap.max_send_wr = rdma_conn->queue_depth * 2; /* SEND, READ, and WRITE operations */ - attr.cap.max_recv_wr = rdma_conn->queue_depth; /* RECV operations */ + attr.cap.max_send_wr = rdma_conn->max_queue_depth * 2; /* SEND, READ, and WRITE operations */ + attr.cap.max_recv_wr = rdma_conn->max_queue_depth; /* RECV operations */ attr.cap.max_send_sge = NVMF_DEFAULT_TX_SGE; attr.cap.max_recv_sge = NVMF_DEFAULT_RX_SGE; @@ -234,12 +246,12 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, uint16_t queue_depth, uint16_t SPDK_TRACELOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", conn); - rdma_conn->reqs = calloc(rdma_conn->queue_depth, sizeof(*rdma_conn->reqs)); - rdma_conn->cmds = rte_calloc("nvmf_rdma_cmd", rdma_conn->queue_depth, + rdma_conn->reqs = calloc(max_queue_depth, sizeof(*rdma_conn->reqs)); + rdma_conn->cmds = rte_calloc("nvmf_rdma_cmd", max_queue_depth, sizeof(*rdma_conn->cmds), 0); - rdma_conn->cpls = rte_calloc("nvmf_rdma_cpl", rdma_conn->queue_depth, + rdma_conn->cpls = rte_calloc("nvmf_rdma_cpl", max_queue_depth, sizeof(*rdma_conn->cpls), 0); - rdma_conn->bufs = rte_calloc("nvmf_rdma_buf", rdma_conn->queue_depth, + rdma_conn->bufs = rte_calloc("nvmf_rdma_buf", max_queue_depth, g_rdma.in_capsule_data_size, 0); if (!rdma_conn->reqs || !rdma_conn->cmds || !rdma_conn->cpls || !rdma_conn->bufs) { SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n"); @@ -248,18 +260,18 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, uint16_t queue_depth, uint16_t } rdma_conn->cmds_mr = rdma_reg_msgs(rdma_conn->cm_id, rdma_conn->cmds, - queue_depth * sizeof(*rdma_conn->cmds)); + max_queue_depth * sizeof(*rdma_conn->cmds)); rdma_conn->cpls_mr = rdma_reg_msgs(rdma_conn->cm_id, rdma_conn->cpls, - queue_depth * sizeof(*rdma_conn->cpls)); + max_queue_depth * sizeof(*rdma_conn->cpls)); rdma_conn->bufs_mr = rdma_reg_msgs(rdma_conn->cm_id, rdma_conn->bufs, - rdma_conn->queue_depth * g_rdma.in_capsule_data_size); + max_queue_depth * g_rdma.in_capsule_data_size); if (!rdma_conn->cmds_mr || !rdma_conn->cpls_mr || !rdma_conn->bufs_mr) { SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n"); spdk_nvmf_rdma_conn_destroy(rdma_conn); return NULL; } - for (i = 0; i < queue_depth; i++) { + for (i = 0; i < max_queue_depth; i++) { rdma_req = &rdma_conn->reqs[i]; rdma_req->buf = (void *)((uintptr_t)rdma_conn->bufs + (i * g_rdma.in_capsule_data_size)); rdma_req->req.cmd = &rdma_conn->cmds[i]; @@ -462,16 +474,33 @@ nvmf_post_rdma_send(struct spdk_nvmf_request *req) static int spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req) { - if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { - return nvmf_post_rdma_write(req); - } else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) { - return nvmf_post_rdma_read(req); + int rc; + struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req); + struct spdk_nvmf_conn *conn = req->conn; + struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn); + + assert(req->xfer != SPDK_NVME_DATA_NONE); + + if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) { + if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { + rc = nvmf_post_rdma_write(req); + if (rc) { + SPDK_ERRLOG("Unable to transfer data from target to host\n"); + return -1; + } + } else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) { + rc = nvmf_post_rdma_read(req); + if (rc) { + SPDK_ERRLOG("Unable to transfer data from host to target\n"); + return -1; + } + } + rdma_conn->cur_rdma_rw_depth++; + } else { + TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link); } - /* This should not have been called if there is no data to xfer */ - assert(false); - - return -1; + return 0; } static int @@ -484,7 +513,7 @@ spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req) /* Advance our sq_head pointer */ conn->sq_head++; - if (conn->sq_head == rdma_conn->queue_depth) { + if (conn->sq_head == rdma_conn->max_queue_depth) { conn->sq_head = 0; } rsp->sqhd = conn->sq_head; @@ -513,11 +542,11 @@ spdk_nvmf_rdma_request_ack_completion(struct spdk_nvmf_request *req) /* Advance our sq_head pointer */ conn->sq_head++; - if (conn->sq_head == rdma_conn->queue_depth) { + if (conn->sq_head == rdma_conn->max_queue_depth) { conn->sq_head = 0; } - rdma_conn->num_outstanding_reqs--; + rdma_conn->cur_queue_depth--; return 0; } @@ -554,8 +583,8 @@ nvmf_rdma_connect(struct rdma_cm_event *event) const struct spdk_nvmf_rdma_request_private_data *private_data = NULL; struct spdk_nvmf_rdma_accept_private_data accept_data; uint16_t sts = 0; - uint16_t queue_depth; - uint16_t rw_depth; + uint16_t max_queue_depth; + uint16_t max_rw_depth; int rc; @@ -579,8 +608,8 @@ nvmf_rdma_connect(struct rdma_cm_event *event) SPDK_TRACELOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n"); /* Start with the maximum queue depth allowed by the target */ - queue_depth = g_rdma.max_queue_depth; - rw_depth = g_rdma.max_queue_depth; + max_queue_depth = g_rdma.max_queue_depth; + max_rw_depth = g_rdma.max_queue_depth; SPDK_TRACELOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", g_rdma.max_queue_depth); /* Next check the local NIC's hardware limitations */ @@ -593,15 +622,15 @@ nvmf_rdma_connect(struct rdma_cm_event *event) SPDK_TRACELOG(SPDK_TRACE_RDMA, "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n", ibdev_attr.max_qp_wr, ibdev_attr.max_qp_rd_atom); - queue_depth = nvmf_min(queue_depth, ibdev_attr.max_qp_wr); - rw_depth = nvmf_min(rw_depth, ibdev_attr.max_qp_rd_atom); + max_queue_depth = nvmf_min(max_queue_depth, ibdev_attr.max_qp_wr); + max_rw_depth = nvmf_min(max_rw_depth, ibdev_attr.max_qp_rd_atom); /* Next check the remote NIC's hardware limitations */ rdma_param = &event->param.conn; SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n", rdma_param->initiator_depth, rdma_param->responder_resources); - rw_depth = nvmf_min(rw_depth, rdma_param->initiator_depth); + max_rw_depth = nvmf_min(max_rw_depth, rdma_param->initiator_depth); /* Finally check for the host software requested values, which are * optional. */ @@ -610,18 +639,16 @@ nvmf_rdma_connect(struct rdma_cm_event *event) private_data = rdma_param->private_data; SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize); SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize); - queue_depth = nvmf_min(queue_depth, private_data->hrqsize); - queue_depth = nvmf_min(queue_depth, private_data->hsqsize); + max_queue_depth = nvmf_min(max_queue_depth, private_data->hrqsize); + max_queue_depth = nvmf_min(max_queue_depth, private_data->hsqsize); } - /* Queue Depth cannot exceed R/W depth */ - queue_depth = nvmf_min(queue_depth, rw_depth); SPDK_TRACELOG(SPDK_TRACE_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n", - queue_depth, rw_depth); + max_queue_depth, max_rw_depth); /* Init the NVMf rdma transport connection */ - rdma_conn = spdk_nvmf_rdma_conn_create(event->id, queue_depth, rw_depth); + rdma_conn = spdk_nvmf_rdma_conn_create(event->id, max_queue_depth, max_rw_depth); if (rdma_conn == NULL) { SPDK_ERRLOG("Error on nvmf connection creation\n"); goto err1; @@ -632,13 +659,13 @@ nvmf_rdma_connect(struct rdma_cm_event *event) TAILQ_INSERT_TAIL(&g_pending_conns, rdma_conn, link); accept_data.recfmt = 0; - accept_data.crqsize = rdma_conn->queue_depth; + accept_data.crqsize = max_queue_depth; ctrlr_event_data = *rdma_param; ctrlr_event_data.private_data = &accept_data; ctrlr_event_data.private_data_len = sizeof(accept_data); if (event->id->ps == RDMA_PS_TCP) { ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */ - ctrlr_event_data.initiator_depth = rdma_conn->queue_depth; + ctrlr_event_data.initiator_depth = max_rw_depth; } rc = rdma_accept(event->id, &ctrlr_event_data); @@ -1098,6 +1125,32 @@ spdk_nvmf_rdma_close_conn(struct spdk_nvmf_conn *conn) return spdk_nvmf_rdma_conn_destroy(rdma_conn); } +static int +spdk_nvmf_rdma_handle_pending_rdma_rw(struct spdk_nvmf_conn *conn) +{ + struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn); + struct spdk_nvmf_rdma_request *rdma_req; + int rc; + + while (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) { + if (TAILQ_EMPTY(&rdma_conn->pending_rdma_rw_queue)) { + break; + } + + rdma_req = TAILQ_FIRST(&rdma_conn->pending_rdma_rw_queue); + TAILQ_REMOVE(&rdma_conn->pending_rdma_rw_queue, rdma_req, link); + + SPDK_TRACELOG(SPDK_TRACE_RDMA, "Submitting previously queued for RDMA R/W request %p\n", rdma_req); + + rc = spdk_nvmf_rdma_request_transfer_data(&rdma_req->req); + if (rc) { + return -1; + } + } + + return 0; +} + /* Returns the number of times that spdk_nvmf_request_exec was called, * or -1 on error. */ @@ -1139,10 +1192,10 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn) switch (wc.opcode) { case IBV_WC_SEND: - assert(rdma_conn->num_outstanding_reqs > 0); + assert(rdma_conn->cur_queue_depth > 0); SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n", - req, conn, rdma_conn->num_outstanding_reqs - 1); + req, conn, rdma_conn->cur_queue_depth - 1); rc = spdk_nvmf_rdma_request_ack_completion(req); if (rc) { return -1; @@ -1157,6 +1210,13 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn) if (rc) { return -1; } + + /* Since an RDMA R/W operation completed, try to submit from the pending list. */ + rdma_conn->cur_rdma_rw_depth--; + rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn); + if (rc) { + return -1; + } break; case IBV_WC_RDMA_READ: @@ -1168,6 +1228,13 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn) return -1; } count++; + + /* Since an RDMA R/W operation completed, try to submit from the pending list. */ + rdma_conn->cur_rdma_rw_depth--; + rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn); + if (rc) { + return -1; + } break; case IBV_WC_RECV: @@ -1182,7 +1249,7 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn) } /* Poll the recv completion queue for incoming requests */ - while (rdma_conn->num_outstanding_reqs < rdma_conn->queue_depth) { + while (rdma_conn->cur_queue_depth < rdma_conn->max_queue_depth) { rc = ibv_poll_cq(rdma_conn->cm_id->recv_cq, 1, &wc); if (rc == 0) { break; @@ -1213,10 +1280,10 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn) return -1; } - rdma_conn->num_outstanding_reqs++; + rdma_conn->cur_queue_depth++; SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA RECV Complete. Request: %p Connection: %p Outstanding I/O: %d\n", - req, conn, rdma_conn->num_outstanding_reqs); + req, conn, rdma_conn->cur_queue_depth); spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0); memset(req->rsp, 0, sizeof(*req->rsp));