sock/posix: Add a pending list for asynchronous requests

Add an additional queue for requests that have been sent on the network
but aren't complete yet. As of this patch, the code
is still calling writev with no flags in the POSIX layer, so it completes
synchronously. That means requests pass through this new pending list
only very briefly inside of one function.

Change-Id: Iaab6efc118a6d5fe9589199515eb3a7293db4b8e
Signed-off-by: Ben Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/471768
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Or Gerlitz <gerlitz.or@gmail.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Ben Walker 2019-10-18 10:50:27 -07:00 committed by Tomasz Zawadzki
parent b7ad942612
commit ab22d249e2
4 changed files with 28 additions and 3 deletions

View File

@ -58,6 +58,7 @@ struct spdk_sock {
int max_iovcnt;
TAILQ_HEAD(, spdk_sock_request) queued_reqs;
TAILQ_HEAD(, spdk_sock_request) pending_reqs;
int queued_iovcnt;
struct {
@ -127,15 +128,22 @@ spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
sock->queued_iovcnt += req->iovcnt;
}
static inline void
spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
{
TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
assert(sock->queued_iovcnt >= req->iovcnt);
sock->queued_iovcnt -= req->iovcnt;
TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
}
static inline int
spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
{
bool closed;
int rc = 0;
TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
assert(sock->queued_iovcnt >= req->iovcnt);
sock->queued_iovcnt -= req->iovcnt;
TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
closed = sock->flags.closed;
sock->cb_cnt++;
@ -162,6 +170,15 @@ spdk_sock_abort_requests(struct spdk_sock *sock)
closed = sock->flags.closed;
sock->cb_cnt++;
req = TAILQ_FIRST(&sock->pending_reqs);
while (req) {
TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
req->cb_fn(req->cb_arg, -ECANCELED);
req = TAILQ_FIRST(&sock->pending_reqs);
}
req = TAILQ_FIRST(&sock->queued_reqs);
while (req) {
TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
@ -177,6 +194,7 @@ spdk_sock_abort_requests(struct spdk_sock *sock)
sock->cb_cnt--;
assert(TAILQ_EMPTY(&sock->queued_reqs));
assert(TAILQ_EMPTY(&sock->pending_reqs));
if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
/* The user closed the socket in response to a callback above. */

View File

@ -179,6 +179,7 @@ spdk_sock_connect(const char *ip, int port)
if (sock != NULL) {
sock->net_impl = impl;
TAILQ_INIT(&sock->queued_reqs);
TAILQ_INIT(&sock->pending_reqs);
return sock;
}
}
@ -214,6 +215,7 @@ spdk_sock_accept(struct spdk_sock *sock)
if (new_sock != NULL) {
new_sock->net_impl = sock->net_impl;
TAILQ_INIT(&new_sock->queued_reqs);
TAILQ_INIT(&new_sock->pending_reqs);
}
return new_sock;

View File

@ -524,6 +524,10 @@ _sock_flush(struct spdk_sock *sock)
/* Handled a full request. */
req->internal.offset = 0;
spdk_sock_request_pend(sock, req);
/* The writev syscall above isn't currently asynchronous,
* so it's already done. */
retval = spdk_sock_request_put(sock, req, 0);
if (rc == 0 || retval) {

View File

@ -62,6 +62,7 @@ flush(void)
/* Set up data structures */
TAILQ_INIT(&sock->queued_reqs);
TAILQ_INIT(&sock->pending_reqs);
sock->group_impl = &group.base;
req1 = calloc(1, sizeof(struct spdk_sock_request) + 2 * sizeof(struct iovec));