nvme/rdma: Wait for completions of both RDMA RECV and SEND

In some situations we may get a completion of RDMA_RECV before
completion of RDMA_SEND and this can lead to a bug described in #1292
To avoid such situations we must complete nvme_request only when
we received both RMDA_RECV and RDMA_SEND completions.
Add a new field to spdk_nvme_rdma_req to store response idx -
it is used to complete nvme request when RDMA_RECV was completed
before RDMA_SEND
Repost RDMA_RECV when both RDMA_SEND and RDMA_RECV are completed
Side changes: change type of spdk_nvme_rdma_req::id to uint16_t,
repack struct nvme_rdma_qpair

Fixes #1292

Signed-off-by: Alexey Marchuk <alexeymar@mellanox.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1770 (master)

(cherry picked from commit 581e1bb576)
Change-Id: Ie51fbbba425acf37c306c5af031479bc9de08955
Signed-off-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/2640
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com>
This commit is contained in:
Alexey Marchuk 2020-04-09 13:02:09 +03:00 committed by Tomasz Zawadzki
parent 90501268d6
commit 2381516ecc

View File

@ -143,13 +143,14 @@ struct nvme_rdma_qpair {
uint16_t num_entries; uint16_t num_entries;
bool delay_cmd_submit;
/* Parallel arrays of response buffers + response SGLs of size num_entries */ /* Parallel arrays of response buffers + response SGLs of size num_entries */
struct ibv_sge *rsp_sgls; struct ibv_sge *rsp_sgls;
struct spdk_nvme_cpl *rsps; struct spdk_nvme_cpl *rsps;
struct ibv_recv_wr *rsp_recv_wrs; struct ibv_recv_wr *rsp_recv_wrs;
bool delay_cmd_submit;
struct spdk_nvme_send_wr_list sends_to_post; struct spdk_nvme_send_wr_list sends_to_post;
struct spdk_nvme_recv_wr_list recvs_to_post; struct spdk_nvme_recv_wr_list recvs_to_post;
@ -174,8 +175,19 @@ struct nvme_rdma_qpair {
struct rdma_cm_event *evt; struct rdma_cm_event *evt;
}; };
enum NVME_RDMA_COMPLETION_FLAGS {
NVME_RDMA_SEND_COMPLETED = 1u << 0,
NVME_RDMA_RECV_COMPLETED = 1u << 1,
};
struct spdk_nvme_rdma_req { struct spdk_nvme_rdma_req {
int id; uint16_t id;
uint16_t completion_flags: 2;
uint16_t reserved: 14;
/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
* during processing of RDMA_SEND. To complete the request we must know the index
* of nvme_cpl received in RDMA_RECV, so store it in this field */
uint16_t rsp_idx;
struct ibv_send_wr send_wr; struct ibv_send_wr send_wr;
@ -184,8 +196,6 @@ struct spdk_nvme_rdma_req {
struct ibv_sge send_sgl[NVME_RDMA_DEFAULT_TX_SGE]; struct ibv_sge send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
TAILQ_ENTRY(spdk_nvme_rdma_req) link; TAILQ_ENTRY(spdk_nvme_rdma_req) link;
bool request_ready_to_put;
}; };
static const char *rdma_cm_event_str[] = { static const char *rdma_cm_event_str[] = {
@ -244,7 +254,7 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
static void static void
nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req) nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
{ {
rdma_req->request_ready_to_put = false; rdma_req->completion_flags = 0;
TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link); TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link); TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
} }
@ -668,7 +678,8 @@ fail:
static int static int
nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair) nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair)
{ {
int i, rc; uint16_t i;
int rc;
rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps, rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
rqpair->num_entries * sizeof(*rqpair->rsps)); rqpair->num_entries * sizeof(*rqpair->rsps));
@ -734,7 +745,7 @@ nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
static int static int
nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair) nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
{ {
int i; uint16_t i;
rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req)); rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
if (rqpair->rdma_reqs == NULL) { if (rqpair->rdma_reqs == NULL) {
@ -804,35 +815,6 @@ fail:
return -ENOMEM; return -ENOMEM;
} }
static int
nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx, int *reaped)
{
struct spdk_nvme_rdma_req *rdma_req;
struct spdk_nvme_cpl *rsp;
struct nvme_request *req;
assert(rsp_idx < rqpair->num_entries);
rsp = &rqpair->rsps[rsp_idx];
rdma_req = &rqpair->rdma_reqs[rsp->cid];
req = rdma_req->req;
nvme_rdma_req_complete(req, rsp);
if (rdma_req->request_ready_to_put) {
(*reaped)++;
nvme_rdma_req_put(rqpair, rdma_req);
} else {
rdma_req->request_ready_to_put = true;
}
if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n");
return -1;
}
return 0;
}
static int static int
nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair, nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
struct sockaddr *src_addr, struct sockaddr *src_addr,
@ -1944,6 +1926,14 @@ nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
} }
} }
static inline int
nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
{
nvme_rdma_req_complete(rdma_req->req, &rqpair->rsps[rdma_req->rsp_idx]);
nvme_rdma_req_put(rqpair, rdma_req);
return nvme_rdma_post_recv(rqpair, rdma_req->rsp_idx);
}
#define MAX_COMPLETIONS_PER_POLL 128 #define MAX_COMPLETIONS_PER_POLL 128
int int
@ -1953,10 +1943,12 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair); struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL]; struct ibv_wc wc[MAX_COMPLETIONS_PER_POLL];
int i, rc = 0, batch_size; int i, rc = 0, batch_size;
uint32_t reaped; uint32_t reaped = 0;
uint16_t rsp_idx;
struct ibv_cq *cq; struct ibv_cq *cq;
struct spdk_nvme_rdma_req *rdma_req; struct spdk_nvme_rdma_req *rdma_req;
struct nvme_rdma_ctrlr *rctrlr; struct nvme_rdma_ctrlr *rctrlr;
struct spdk_nvme_cpl *rsp;
if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) || if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
nvme_rdma_qpair_submit_recvs(rqpair))) { nvme_rdma_qpair_submit_recvs(rqpair))) {
@ -1981,7 +1973,6 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
cq = rqpair->cq; cq = rqpair->cq;
reaped = 0;
do { do {
batch_size = spdk_min((max_completions - reaped), batch_size = spdk_min((max_completions - reaped),
MAX_COMPLETIONS_PER_POLL); MAX_COMPLETIONS_PER_POLL);
@ -2011,20 +2002,32 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
goto fail; goto fail;
} }
if (nvme_rdma_recv(rqpair, wc[i].wr_id, &reaped)) { assert(wc[i].wr_id < rqpair->num_entries);
SPDK_ERRLOG("nvme_rdma_recv processing failure\n"); rsp_idx = (uint16_t)wc[i].wr_id;
rsp = &rqpair->rsps[rsp_idx];
rdma_req = &rqpair->rdma_reqs[rsp->cid];
rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
rdma_req->rsp_idx = rsp_idx;
if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) {
if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n");
goto fail; goto fail;
} }
reaped++;
}
break; break;
case IBV_WC_SEND: case IBV_WC_SEND:
rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id; rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
if (rdma_req->request_ready_to_put) { if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
SPDK_ERRLOG("Unable to re-post rx descriptor\n");
goto fail;
}
reaped++; reaped++;
nvme_rdma_req_put(rqpair, rdma_req);
} else {
rdma_req->request_ready_to_put = true;
} }
break; break;