blob: split readv/writev as separate blob calls

Now all operations (both single buffer and iov-based
payloads) which span a cluster boundary get split into
separate blob calls for each cluster.

This will simplify upcoming patches that need to do
special operations if a cluster needs to be allocated.
This code can now be added to just the single cluster
operations and not have to worry about splits.  It
will also simplify the code that will eventually queue
requests which require a cluster allocation if an
allocation is already in progress.

Signed-off-by: Jim Harris <james.r.harris@intel.com>
Change-Id: I22a850e6e23e3b3be31183b28d01d57c163b25b1

Reviewed-on: https://review.gerrithub.io/395035
Reviewed-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-by: Maciej Szwed <maciej.szwed@intel.com>
Reviewed-by: Daniel Verkamp <daniel.verkamp@intel.com>
Tested-by: SPDK Automated Test System <sys_sgsw@intel.com>
This commit is contained in:
Jim Harris 2018-01-16 22:58:39 -07:00
parent 7ba04e589a
commit 4547f9bdf8

View File

@ -1318,7 +1318,10 @@ _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_c
}
struct rw_iov_ctx {
struct spdk_blob_data *blob;
struct spdk_blob *blob;
struct spdk_io_channel *channel;
spdk_blob_op_complete cb_fn;
void *cb_arg;
bool read;
int iovcnt;
struct iovec *orig_iov;
@ -1336,27 +1339,25 @@ _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
}
static void
_spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
_spdk_rw_iov_split_next(void *cb_arg, int bserrno)
{
struct rw_iov_ctx *ctx = cb_arg;
struct spdk_blob_data *blob = __blob_to_data(ctx->blob);
struct iovec *iov, *orig_iov;
int iovcnt;
size_t orig_iovoff;
uint64_t lba;
uint64_t page_count, pages_to_boundary;
uint32_t lba_count;
uint64_t page_count, pages_to_boundary, page_offset;
uint64_t byte_count;
if (bserrno != 0 || ctx->pages_remaining == 0) {
ctx->cb_fn(ctx->cb_arg, bserrno);
free(ctx);
spdk_bs_sequence_finish(seq, bserrno);
return;
}
pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
page_offset = ctx->page_offset;
pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(blob, page_offset);
page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
/*
* Get index and offset into the original iov array for our current position in the I/O sequence.
@ -1399,9 +1400,11 @@ _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
iov = &ctx->iov[0];
if (ctx->read) {
spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
spdk_bs_io_readv_blob(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
page_count, _spdk_rw_iov_split_next, ctx);
} else {
spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
spdk_bs_io_writev_blob(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
page_count, _spdk_rw_iov_split_next, ctx);
}
}
@ -1431,10 +1434,6 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel
return;
}
cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
cpl.u.blob_basic.cb_fn = cb_fn;
cpl.u.blob_basic.cb_arg = cb_arg;
/*
* For now, we implement readv/writev using a sequence (instead of a batch) to account for having
* to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary,
@ -1449,16 +1448,20 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel
* in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called
* when the batch was completed, to allow for freeing the memory for the iov arrays.
*/
seq = spdk_bs_sequence_start(_channel, &cpl);
if (!seq) {
cb_fn(cb_arg, -ENOMEM);
return;
}
if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
cpl.u.blob_basic.cb_fn = cb_fn;
cpl.u.blob_basic.cb_arg = cb_arg;
seq = spdk_bs_sequence_start(_channel, &cpl);
if (!seq) {
cb_fn(cb_arg, -ENOMEM);
return;
}
if (read) {
spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
} else {
@ -1469,11 +1472,14 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel
ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
if (ctx == NULL) {
spdk_bs_sequence_finish(seq, -ENOMEM);
cb_fn(cb_arg, -ENOMEM);
return;
}
ctx->blob = blob;
ctx->blob = _blob;
ctx->channel = _channel;
ctx->cb_fn = cb_fn;
ctx->cb_arg = cb_arg;
ctx->read = read;
ctx->orig_iov = iov;
ctx->iovcnt = iovcnt;
@ -1481,7 +1487,7 @@ _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel
ctx->pages_remaining = length;
ctx->pages_done = 0;
_spdk_rw_iov_split_next(seq, ctx, 0);
_spdk_rw_iov_split_next(ctx, 0);
}
}