nvmf: New RDMA connections move to lcore after CONNECT capsule

Instead of starting the connection poller immediately upon
the connect event, wait for the first connect capsule to
start the poller.

This builds toward associating all connections with the same
session with the same lcore.

Change-Id: I7f08b2dd34585d093ad36a4ebca63c5f782dcf14
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
This commit is contained in:
Ben Walker 2016-07-14 13:52:03 -07:00 committed by Benjamin Walker
parent 0e1dc05efb
commit ca7a61e18a
2 changed files with 150 additions and 63 deletions

View File

@ -75,8 +75,13 @@ struct spdk_nvmf_rdma_conn {
uint16_t queue_depth;
STAILQ_HEAD(, spdk_nvmf_rdma_request) rdma_reqs;
TAILQ_ENTRY(spdk_nvmf_rdma_conn) link;
};
/* List of RDMA connections that have not yet received a CONNECT capsule */
static TAILQ_HEAD(, spdk_nvmf_rdma_conn) g_pending_conns = TAILQ_HEAD_INITIALIZER(g_pending_conns);
struct spdk_nvmf_rdma_request {
struct spdk_nvmf_request req;
STAILQ_ENTRY(spdk_nvmf_rdma_request) link;
@ -678,12 +683,9 @@ nvmf_rdma_connect(struct rdma_cm_event *event)
}
STAILQ_INSERT_TAIL(&rdma_conn->rdma_reqs, rdma_req, link);
rc = spdk_nvmf_startup_conn(conn);
if (rc) {
SPDK_ERRLOG("Error on startup connection\n");
goto err1;
}
SPDK_TRACELOG(SPDK_TRACE_DEBUG, "New Connection Scheduled\n");
/* Add this RDMA connection to the global list until a CONNECT capsule
* is received. */
TAILQ_INSERT_TAIL(&g_pending_conns, rdma_conn, link);
accept_data.recfmt = 0;
accept_data.crqsize = rdma_conn->queue_depth;
@ -770,16 +772,116 @@ static const char *CM_EVENT_STR[] = {
};
#endif /* DEBUG */
static int
nvmf_recv(struct spdk_nvmf_rdma_request *rdma_req, struct ibv_wc *wc)
{
int ret;
struct spdk_nvmf_request *req;
if (wc->byte_len < sizeof(struct spdk_nvmf_capsule_cmd)) {
SPDK_ERRLOG("recv length %u less than capsule header\n", wc->byte_len);
return -1;
}
req = &rdma_req->req;
ret = spdk_nvmf_request_prep_data(req,
rdma_req->bb, wc->byte_len - sizeof(struct spdk_nvmf_capsule_cmd),
rdma_req->bb, rdma_req->recv_sgl[1].length);
if (ret < 0) {
SPDK_ERRLOG("prep_data failed\n");
return spdk_nvmf_request_complete(req);
}
if (ret == 0) {
/* Data is available now; execute command immediately. */
ret = spdk_nvmf_request_exec(req);
if (ret < 0) {
SPDK_ERRLOG("Command execution failed\n");
return -1;
}
return 1;
}
/*
* Pending transfer from host to controller; command will continue
* once transfer is complete.
*/
return 0;
}
static void
nvmf_rdma_accept(struct rte_timer *timer, void *arg)
{
struct rdma_cm_event *event;
int rc;
struct spdk_nvmf_rdma_conn *rdma_conn, *tmp;
struct spdk_nvmf_rdma_request *rdma_req;
if (g_rdma.acceptor_event_channel == NULL) {
return;
}
/* Process pending connections for incoming capsules. The only capsule
* this should ever find is a CONNECT request. */
TAILQ_FOREACH_SAFE(rdma_conn, &g_pending_conns, link, tmp) {
struct ibv_wc wc;
rc = ibv_poll_cq(rdma_conn->cq, 1, &wc);
if (rc == 0) {
continue;
} else if (rc < 0) {
SPDK_ERRLOG("Error polling RDMA completion queue: %d (%s)\n",
errno, strerror(errno));
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
nvmf_rdma_conn_cleanup(&rdma_conn->conn);
continue;
}
if (wc.status) {
SPDK_ERRLOG("Error polling RDMA completion queue: %d (%s)\n",
wc.status, ibv_wc_status_str(wc.status));
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
nvmf_rdma_conn_cleanup(&rdma_conn->conn);
continue;
}
if (wc.opcode == IBV_WC_RECV) {
/* New incoming capsule. */
SPDK_TRACELOG(SPDK_TRACE_RDMA, "Received new capsule on pending connection.\n");
spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, wc.wr_id, 0);
rdma_req = (struct spdk_nvmf_rdma_request *)wc.wr_id;
rc = nvmf_recv(rdma_req, &wc);
if (rc < 0) {
SPDK_ERRLOG("nvmf_recv processing failure\n");
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
nvmf_rdma_conn_cleanup(&rdma_conn->conn);
continue;
} else if (rc > 0) {
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
}
} else if (wc.opcode == IBV_WC_RDMA_READ) {
/* A previously received capsule finished grabbing
* its associated data */
SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA read for a request on the pending connection completed\n");
rdma_req = (struct spdk_nvmf_rdma_request *)wc.wr_id;
spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)&rdma_req->req, 0);
rc = spdk_nvmf_request_exec(&rdma_req->req);
if (rc) {
SPDK_ERRLOG("request_exec error %d after RDMA Read completion\n", rc);
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
nvmf_rdma_conn_cleanup(&rdma_conn->conn);
continue;
}
TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
} else {
/* No other completion types are expected here */
SPDK_ERRLOG("Unexpected RDMA completion.\n");
continue;
}
}
while (1) {
rc = rdma_get_cm_event(g_rdma.acceptor_event_channel, &event);
if (rc == 0) {
@ -964,48 +1066,6 @@ spdk_nvmf_rdma_fini(void)
return 0;
}
static int
nvmf_recv(struct spdk_nvmf_conn *conn, struct ibv_wc *wc)
{
struct spdk_nvmf_rdma_request *rdma_req;
struct spdk_nvmf_request *req;
int ret;
rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
if (wc->byte_len < sizeof(struct spdk_nvmf_capsule_cmd)) {
SPDK_ERRLOG("recv length %u less than capsule header\n", wc->byte_len);
return -1;
}
req = &rdma_req->req;
ret = spdk_nvmf_request_prep_data(req,
rdma_req->bb, wc->byte_len - sizeof(struct spdk_nvmf_capsule_cmd),
rdma_req->bb, rdma_req->recv_sgl[1].length);
if (ret < 0) {
SPDK_ERRLOG("prep_data failed\n");
return spdk_nvmf_request_complete(req);
}
if (ret == 0) {
/* Data is available now; execute command immediately. */
ret = spdk_nvmf_request_exec(req);
if (ret < 0) {
SPDK_ERRLOG("Command execution failed\n");
return -1;
}
return 0;
}
/*
* Pending transfer from host to controller; command will continue
* once transfer is complete.
*/
return 0;
}
int
nvmf_check_rdma_completions(struct spdk_nvmf_conn *conn)
{
@ -1073,8 +1133,9 @@ nvmf_check_rdma_completions(struct spdk_nvmf_conn *conn)
case IBV_WC_RECV:
SPDK_TRACELOG(SPDK_TRACE_RDMA, "CQ recv completion\n");
spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, wc.wr_id, 0);
rc = nvmf_recv(conn, &wc);
if (rc) {
rdma_req = (struct spdk_nvmf_rdma_request *)wc.wr_id;
rc = nvmf_recv(rdma_req, &wc);
if (rc < 0) {
SPDK_ERRLOG("nvmf_recv processing failure\n");
return -1;
}

View File

@ -389,13 +389,40 @@ nvmf_process_property_set(struct spdk_nvmf_request *req)
return true;
}
static void
nvmf_handle_connect(spdk_event_t event)
{
struct spdk_nvmf_request *req = spdk_event_get_arg1(event);
struct spdk_nvmf_fabric_connect_cmd *connect = &req->cmd->connect_cmd;
struct spdk_nvmf_fabric_connect_data *connect_data = (struct spdk_nvmf_fabric_connect_data *)
req->data;
struct spdk_nvmf_fabric_connect_rsp *response = &req->rsp->connect_rsp;
struct spdk_nvmf_conn *conn = req->conn;
spdk_nvmf_session_connect(conn, connect, connect_data, response);
/* Allocate RDMA reqs according to the queue depth and conn type*/
if (spdk_nvmf_rdma_alloc_reqs(conn)) {
SPDK_ERRLOG("Unable to allocate sufficient RDMA work requests\n");
/* TODO: Needs to shutdown poller */
req->rsp->nvme_cpl.status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
spdk_nvmf_request_complete(req);
return;
}
SPDK_TRACELOG(SPDK_TRACE_NVMF, "connect capsule response: cntlid = 0x%04x\n",
response->status_code_specific.success.cntlid);
spdk_nvmf_request_complete(req);
return;
}
static bool
nvmf_process_connect(struct spdk_nvmf_request *req)
{
struct spdk_nvmf_fabric_connect_cmd *connect;
struct spdk_nvmf_fabric_connect_data *connect_data;
struct spdk_nvmf_fabric_connect_rsp *response;
struct spdk_nvmf_conn *conn = req->conn;
int rc;
spdk_event_t event;
if (req->length < sizeof(struct spdk_nvmf_fabric_connect_data)) {
SPDK_ERRLOG("Connect command data length 0x%x too small\n", req->length);
@ -403,20 +430,19 @@ nvmf_process_connect(struct spdk_nvmf_request *req)
return true;
}
connect = &req->cmd->connect_cmd;
response = &req->rsp->connect_rsp;
connect_data = (struct spdk_nvmf_fabric_connect_data *)req->data;
spdk_nvmf_session_connect(conn, connect, connect_data, response);
/* Allocate RDMA reqs according to the queue depth and conn type*/
if (spdk_nvmf_rdma_alloc_reqs(conn)) {
SPDK_ERRLOG("Unable to allocate sufficient RDMA work requests\n");
/* Start the connection poller */
rc = spdk_nvmf_startup_conn(conn);
if (rc) {
req->rsp->nvme_cpl.status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
SPDK_ERRLOG("Unable to start connection poller\n");
return true;
}
return true;
/* Pass an event to the lcore that owns this connection */
event = spdk_event_allocate(conn->poller.lcore, nvmf_handle_connect, req, NULL, NULL);
spdk_event_call(event);
return false;
}
static bool