rdma: add function to create qpair resources.

Change-Id: Id865e2a2821fe04c1f927038d6dd967848cd9a55
Signed-off-by: Seth Howell <seth.howell@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/446999
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Seth Howell 2019-03-04 16:29:39 -07:00 committed by Jim Harris
parent 353fbcdaf0
commit 0d3fcd10e9

View File

@ -259,6 +259,16 @@ enum spdk_nvmf_rdma_qpair_disconnect_flags {
RDMA_QP_SEND_DRAINED = 1 << 2
};
struct spdk_nvmf_rdma_resource_opts {
struct spdk_nvmf_rdma_qpair *qpair;
/* qp points either to an ibv_qp object or an ibv_srq object depending on the value of shared. */
void *qp;
struct ibv_pd *pd;
uint32_t max_queue_depth;
uint32_t in_capsule_data_size;
bool shared;
};
struct spdk_nvmf_rdma_resources {
/* Array of size "max_queue_depth" containing RDMA requests. */
struct spdk_nvmf_rdma_request *reqs;
@ -623,6 +633,163 @@ nvmf_rdma_resources_destroy(struct spdk_nvmf_rdma_resources *resources)
free(resources);
}
static struct spdk_nvmf_rdma_resources *
nvmf_rdma_resources_create(struct spdk_nvmf_rdma_resource_opts *opts)
{
struct spdk_nvmf_rdma_resources *resources;
struct spdk_nvmf_rdma_request *rdma_req;
struct spdk_nvmf_rdma_recv *rdma_recv;
struct ibv_qp *qp;
struct ibv_srq *srq;
uint32_t i;
int rc;
resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources));
if (!resources) {
SPDK_ERRLOG("Unable to allocate resources for receive queue.\n");
return NULL;
}
resources->reqs = calloc(opts->max_queue_depth, sizeof(*resources->reqs));
resources->recvs = calloc(opts->max_queue_depth, sizeof(*resources->recvs));
resources->cmds = spdk_dma_zmalloc(opts->max_queue_depth * sizeof(*resources->cmds),
0x1000, NULL);
resources->cpls = spdk_dma_zmalloc(opts->max_queue_depth * sizeof(*resources->cpls),
0x1000, NULL);
if (opts->in_capsule_data_size > 0) {
resources->bufs = spdk_dma_zmalloc(opts->max_queue_depth *
opts->in_capsule_data_size,
0x1000, NULL);
}
if (!resources->reqs || !resources->recvs || !resources->cmds ||
!resources->cpls || (opts->in_capsule_data_size && !resources->bufs)) {
SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
goto cleanup;
}
resources->cmds_mr = ibv_reg_mr(opts->pd, resources->cmds,
opts->max_queue_depth * sizeof(*resources->cmds),
IBV_ACCESS_LOCAL_WRITE);
resources->cpls_mr = ibv_reg_mr(opts->pd, resources->cpls,
opts->max_queue_depth * sizeof(*resources->cpls),
0);
if (opts->in_capsule_data_size) {
resources->bufs_mr = ibv_reg_mr(opts->pd, resources->bufs,
opts->max_queue_depth *
opts->in_capsule_data_size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
}
if (!resources->cmds_mr || !resources->cpls_mr ||
(opts->in_capsule_data_size &&
!resources->bufs_mr)) {
goto cleanup;
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
resources->cmds, opts->max_queue_depth * sizeof(*resources->cmds),
resources->cmds_mr->lkey);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
resources->cpls, opts->max_queue_depth * sizeof(*resources->cpls),
resources->cpls_mr->lkey);
if (resources->bufs && resources->bufs_mr) {
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
resources->bufs, opts->max_queue_depth *
opts->in_capsule_data_size, resources->bufs_mr->lkey);
}
/* Initialize queues */
STAILQ_INIT(&resources->incoming_queue);
STAILQ_INIT(&resources->free_queue);
for (i = 0; i < opts->max_queue_depth; i++) {
struct ibv_recv_wr *bad_wr = NULL;
rdma_recv = &resources->recvs[i];
rdma_recv->qpair = opts->qpair;
/* Set up memory to receive commands */
if (resources->bufs) {
rdma_recv->buf = (void *)((uintptr_t)resources->bufs + (i *
opts->in_capsule_data_size));
}
rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV;
rdma_recv->sgl[0].addr = (uintptr_t)&resources->cmds[i];
rdma_recv->sgl[0].length = sizeof(resources->cmds[i]);
rdma_recv->sgl[0].lkey = resources->cmds_mr->lkey;
rdma_recv->wr.num_sge = 1;
if (rdma_recv->buf && resources->bufs_mr) {
rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
rdma_recv->sgl[1].length = opts->in_capsule_data_size;
rdma_recv->sgl[1].lkey = resources->bufs_mr->lkey;
rdma_recv->wr.num_sge++;
}
rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr;
rdma_recv->wr.sg_list = rdma_recv->sgl;
if (opts->shared) {
srq = (struct ibv_srq *)opts->qp;
rc = ibv_post_srq_recv(srq, &rdma_recv->wr, &bad_wr);
} else {
qp = (struct ibv_qp *)opts->qp;
rc = ibv_post_recv(qp, &rdma_recv->wr, &bad_wr);
}
if (rc) {
goto cleanup;
}
}
for (i = 0; i < opts->max_queue_depth; i++) {
rdma_req = &resources->reqs[i];
if (opts->qpair != NULL) {
rdma_req->req.qpair = &opts->qpair->qpair;
} else {
rdma_req->req.qpair = NULL;
}
rdma_req->req.cmd = NULL;
/* Set up memory to send responses */
rdma_req->req.rsp = &resources->cpls[i];
rdma_req->rsp.sgl[0].addr = (uintptr_t)&resources->cpls[i];
rdma_req->rsp.sgl[0].length = sizeof(resources->cpls[i]);
rdma_req->rsp.sgl[0].lkey = resources->cpls_mr->lkey;
rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND;
rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr;
rdma_req->rsp.wr.next = NULL;
rdma_req->rsp.wr.opcode = IBV_WR_SEND;
rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
/* Set up memory for data buffers */
rdma_req->data.rdma_wr.type = RDMA_WR_TYPE_DATA;
rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data.rdma_wr;
rdma_req->data.wr.next = NULL;
rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->data.wr.sg_list = rdma_req->data.sgl;
rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
/* Initialize request state to FREE */
rdma_req->state = RDMA_REQUEST_STATE_FREE;
STAILQ_INSERT_TAIL(&resources->free_queue, rdma_req, state_link);
}
return resources;
cleanup:
nvmf_rdma_resources_destroy(resources);
return NULL;
}
static void
spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
{
@ -680,18 +847,16 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
static int
spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
{
struct spdk_nvmf_rdma_qpair *rqpair;
struct spdk_nvmf_rdma_poller *rpoller;
int rc, num_cqe, required_num_wr;
struct spdk_nvmf_rdma_qpair *rqpair;
struct spdk_nvmf_rdma_poller *rpoller;
int rc, num_cqe, required_num_wr;
#ifndef SPDK_CONFIG_RDMA_SRQ
int i;
struct spdk_nvmf_rdma_recv *rdma_recv;
struct spdk_nvmf_rdma_request *rdma_req;
struct spdk_nvmf_rdma_transport *rtransport;
struct spdk_nvmf_transport *transport;
struct spdk_nvmf_rdma_transport *rtransport;
struct spdk_nvmf_transport *transport;
struct spdk_nvmf_rdma_resource_opts opts;
#endif
struct spdk_nvmf_rdma_device *device;
struct ibv_qp_init_attr ibv_init_attr;
struct spdk_nvmf_rdma_device *device;
struct ibv_qp_init_attr ibv_init_attr;
rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
device = rqpair->port->device;
@ -767,145 +932,30 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
transport = &rtransport->transport;
rqpair->resources->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->resources->reqs));
rqpair->resources->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->resources->recvs));
rqpair->resources->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(
*rqpair->resources->cmds),
0x1000, NULL);
rqpair->resources->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(
*rqpair->resources->cpls),
0x1000, NULL);
opts.qp = rqpair->cm_id->qp;
opts.pd = rqpair->cm_id->pd;
opts.qpair = rqpair;
opts.shared = false;
opts.max_queue_depth = rqpair->max_queue_depth;
opts.in_capsule_data_size = transport->opts.in_capsule_data_size;
rqpair->resources = nvmf_rdma_resources_create(&opts);
if (transport->opts.in_capsule_data_size > 0) {
rqpair->resources->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth *
transport->opts.in_capsule_data_size,
0x1000, NULL);
}
if (!rqpair->resources->reqs || !rqpair->resources->recvs || !rqpair->resources->cmds ||
!rqpair->resources->cpls || (transport->opts.in_capsule_data_size && !rqpair->resources->bufs)) {
SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
if (!rqpair->resources) {
SPDK_ERRLOG("Unable to allocate resources for receive queue.\n");
rdma_destroy_id(rqpair->cm_id);
rqpair->cm_id = NULL;
spdk_nvmf_rdma_qpair_destroy(rqpair);
return -1;
}
rqpair->resources->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->cmds,
rqpair->max_queue_depth * sizeof(*rqpair->resources->cmds),
IBV_ACCESS_LOCAL_WRITE);
rqpair->resources->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->cpls,
rqpair->max_queue_depth * sizeof(*rqpair->resources->cpls),
0);
if (transport->opts.in_capsule_data_size) {
rqpair->resources->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->resources->bufs,
rqpair->max_queue_depth *
transport->opts.in_capsule_data_size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
}
if (!rqpair->resources->cmds_mr || !rqpair->resources->cpls_mr ||
(transport->opts.in_capsule_data_size &&
!rqpair->resources->bufs_mr)) {
SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
spdk_nvmf_rdma_qpair_destroy(rqpair);
return -1;
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
rqpair->resources->cmds, rqpair->max_queue_depth * sizeof(*rqpair->resources->cmds),
rqpair->resources->cmds_mr->lkey);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
rqpair->resources->cpls, rqpair->max_queue_depth * sizeof(*rqpair->resources->cpls),
rqpair->resources->cpls_mr->lkey);
if (rqpair->resources->bufs && rqpair->resources->bufs_mr) {
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
rqpair->resources->bufs, rqpair->max_queue_depth *
transport->opts.in_capsule_data_size, rqpair->resources->bufs_mr->lkey);
}
#else
rqpair->resources = rqpair->poller->resources;
#endif
rqpair->current_recv_depth = rqpair->max_queue_depth;
rqpair->current_recv_depth = 0;
STAILQ_INIT(&rqpair->pending_rdma_read_queue);
STAILQ_INIT(&rqpair->pending_rdma_write_queue);
#ifndef SPDK_CONFIG_RDMA_SRQ
STAILQ_INIT(&rqpair->resources->free_queue);
for (i = 0; i < rqpair->max_queue_depth; i++) {
struct ibv_recv_wr *bad_wr = NULL;
rdma_recv = &rqpair->resources->recvs[i];
rdma_recv->qpair = rqpair;
/* Set up memory to receive commands */
if (rqpair->resources->bufs) {
rdma_recv->buf = (void *)((uintptr_t)rqpair->resources->bufs + (i *
transport->opts.in_capsule_data_size));
}
rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV;
rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->resources->cmds[i];
rdma_recv->sgl[0].length = sizeof(rqpair->resources->cmds[i]);
rdma_recv->sgl[0].lkey = rqpair->resources->cmds_mr->lkey;
rdma_recv->wr.num_sge = 1;
if (rdma_recv->buf && rqpair->resources->bufs_mr) {
rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size;
rdma_recv->sgl[1].lkey = rqpair->resources->bufs_mr->lkey;
rdma_recv->wr.num_sge++;
}
rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr;
rdma_recv->wr.sg_list = rdma_recv->sgl;
rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
assert(rqpair->current_recv_depth > 0);
rqpair->current_recv_depth--;
if (rc) {
SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
spdk_nvmf_rdma_qpair_destroy(rqpair);
return -1;
}
}
assert(rqpair->current_recv_depth == 0);
for (i = 0; i < rqpair->max_queue_depth; i++) {
rdma_req = &rqpair->resources->reqs[i];
rdma_req->req.qpair = &rqpair->qpair;
rdma_req->req.cmd = NULL;
/* Set up memory to send responses */
rdma_req->req.rsp = &rqpair->resources->cpls[i];
rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->resources->cpls[i];
rdma_req->rsp.sgl[0].length = sizeof(rqpair->resources->cpls[i]);
rdma_req->rsp.sgl[0].lkey = rqpair->resources->cpls_mr->lkey;
rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND;
rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr;
rdma_req->rsp.wr.next = NULL;
rdma_req->rsp.wr.opcode = IBV_WR_SEND;
rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
/* Set up memory for data buffers */
rdma_req->data.rdma_wr.type = RDMA_WR_TYPE_DATA;
rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data.rdma_wr;
rdma_req->data.wr.next = NULL;
rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->data.wr.sg_list = rdma_req->data.sgl;
rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
/* Initialize request state to FREE */
rdma_req->state = RDMA_REQUEST_STATE_FREE;
STAILQ_INSERT_HEAD(&rqpair->resources->free_queue, rdma_req, state_link);
}
#endif
return 0;
}
@ -1130,18 +1180,6 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
return -1;
}
#ifndef SPDK_CONFIG_RDMA_SRQ
rqpair->resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources));
if (rqpair->resources == NULL) {
SPDK_ERRLOG("Unable to allocate resources for rdma qpair\n");
free(rqpair);
spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
return -1;
}
STAILQ_INIT(&rqpair->resources->incoming_queue);
STAILQ_INIT(&rqpair->resources->free_queue);
#endif
rqpair->port = port;
rqpair->max_queue_depth = max_queue_depth;
rqpair->max_read_depth = max_read_depth;
@ -2556,10 +2594,9 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
struct spdk_nvmf_rdma_poller *poller;
struct spdk_nvmf_rdma_device *device;
#ifdef SPDK_CONFIG_RDMA_SRQ
int i, rc;
struct ibv_srq_init_attr srq_init_attr;
struct spdk_nvmf_rdma_recv *rdma_recv;
struct spdk_nvmf_rdma_request *rdma_req;
struct spdk_nvmf_rdma_resource_opts opts;
#endif
rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
@ -2612,150 +2649,19 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
return NULL;
}
poller->resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources));
opts.qp = poller->srq;
opts.pd = device->pd;
opts.qpair = NULL;
opts.shared = true;
opts.max_queue_depth = poller->max_srq_depth;
opts.in_capsule_data_size = transport->opts.in_capsule_data_size;
poller->resources = nvmf_rdma_resources_create(&opts);
if (!poller->resources) {
SPDK_ERRLOG("Unable to allocate resources for shared receive queue.\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Created RDMA SRQ %p: max_wr %u, max_sge %u, srq_limit %u\n",
poller->srq,
srq_init_attr.attr.max_wr,
srq_init_attr.attr.max_sge,
srq_init_attr.attr.srq_limit);
poller->resources->reqs = calloc(poller->max_srq_depth, sizeof(*poller->resources->reqs));
poller->resources->recvs = calloc(poller->max_srq_depth, sizeof(*poller->resources->recvs));
poller->resources->cmds = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->resources->cmds),
0x1000, NULL);
poller->resources->cpls = spdk_dma_zmalloc(poller->max_srq_depth * sizeof(*poller->resources->cpls),
0x1000, NULL);
if (transport->opts.in_capsule_data_size > 0) {
poller->resources->bufs = spdk_dma_zmalloc(poller->max_srq_depth *
transport->opts.in_capsule_data_size,
0x1000, NULL);
}
if (!poller->resources->reqs || !poller->resources->recvs || !poller->resources->cmds ||
!poller->resources->cpls || (transport->opts.in_capsule_data_size && !poller->resources->bufs)) {
SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA shared receive queue.\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
poller->resources->cmds_mr = ibv_reg_mr(device->pd, poller->resources->cmds,
poller->max_srq_depth * sizeof(*poller->resources->cmds),
IBV_ACCESS_LOCAL_WRITE);
poller->resources->cpls_mr = ibv_reg_mr(device->pd, poller->resources->cpls,
poller->max_srq_depth * sizeof(*poller->resources->cpls),
0);
if (transport->opts.in_capsule_data_size) {
poller->resources->bufs_mr = ibv_reg_mr(device->pd, poller->resources->bufs,
poller->max_srq_depth *
transport->opts.in_capsule_data_size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
}
if (!poller->resources->cmds_mr || !poller->resources->cpls_mr ||
(transport->opts.in_capsule_data_size &&
!poller->resources->bufs_mr)) {
SPDK_ERRLOG("Unable to register required memory for RDMA shared receive queue.\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
poller->resources->cmds, poller->max_srq_depth * sizeof(*poller->resources->cmds),
poller->resources->cmds_mr->lkey);
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
poller->resources->cpls, poller->max_srq_depth * sizeof(*poller->resources->cpls),
poller->resources->cpls_mr->lkey);
if (poller->resources->bufs && poller->resources->bufs_mr) {
SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
poller->resources->bufs, poller->max_srq_depth *
transport->opts.in_capsule_data_size, poller->resources->bufs_mr->lkey);
}
/* Initialize queues */
STAILQ_INIT(&poller->resources->incoming_queue);
STAILQ_INIT(&poller->resources->free_queue);
for (i = 0; i < poller->max_srq_depth; i++) {
struct ibv_recv_wr *bad_wr = NULL;
rdma_recv = &poller->resources->recvs[i];
rdma_recv->qpair = NULL;
/* Set up memory to receive commands */
if (poller->resources->bufs) {
rdma_recv->buf = (void *)((uintptr_t)poller->resources->bufs + (i *
transport->opts.in_capsule_data_size));
}
rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV;
rdma_recv->sgl[0].addr = (uintptr_t)&poller->resources->cmds[i];
rdma_recv->sgl[0].length = sizeof(poller->resources->cmds[i]);
rdma_recv->sgl[0].lkey = poller->resources->cmds_mr->lkey;
rdma_recv->wr.num_sge = 1;
if (rdma_recv->buf && poller->resources->bufs_mr) {
rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size;
rdma_recv->sgl[1].lkey = poller->resources->bufs_mr->lkey;
rdma_recv->wr.num_sge++;
}
rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr;
rdma_recv->wr.sg_list = rdma_recv->sgl;
rc = ibv_post_srq_recv(poller->srq, &rdma_recv->wr, &bad_wr);
if (rc) {
SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
spdk_nvmf_rdma_poll_group_destroy(&rgroup->group);
pthread_mutex_unlock(&rtransport->lock);
return NULL;
}
}
for (i = 0; i < poller->max_srq_depth; i++) {
rdma_req = &poller->resources->reqs[i];
rdma_req->req.qpair = NULL;
rdma_req->req.cmd = NULL;
/* Set up memory to send responses */
rdma_req->req.rsp = &poller->resources->cpls[i];
rdma_req->rsp.sgl[0].addr = (uintptr_t)&poller->resources->cpls[i];
rdma_req->rsp.sgl[0].length = sizeof(poller->resources->cpls[i]);
rdma_req->rsp.sgl[0].lkey = poller->resources->cpls_mr->lkey;
rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND;
rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr;
rdma_req->rsp.wr.next = NULL;
rdma_req->rsp.wr.opcode = IBV_WR_SEND;
rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
/* Set up memory for data buffers */
rdma_req->data.rdma_wr.type = RDMA_WR_TYPE_DATA;
rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data.rdma_wr;
rdma_req->data.wr.next = NULL;
rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
rdma_req->data.wr.sg_list = rdma_req->data.sgl;
rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
/* Initialize request state to FREE */
rdma_req->state = RDMA_REQUEST_STATE_FREE;
STAILQ_INSERT_TAIL(&poller->resources->free_queue, rdma_req, state_link);
}
#endif
}
@ -2841,10 +2747,6 @@ spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
rqpair->poller = poller;
#ifdef SPDK_CONFIG_RDMA_SRQ
rqpair->resources = rqpair->poller->resources;
#endif
rc = spdk_nvmf_rdma_qpair_initialize(qpair);
if (rc < 0) {
SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair);