diff --git a/module/sock/posix/posix.c b/module/sock/posix/posix.c index 59125f823e..a23751357d 100644 --- a/module/sock/posix/posix.c +++ b/module/sock/posix/posix.c @@ -68,7 +68,8 @@ struct spdk_posix_sock { struct spdk_pipe *recv_pipe; void *recv_buf; int recv_buf_sz; - bool pending_events; + bool pipe_has_data; + bool socket_has_data; bool zcopy; int placement_id; @@ -76,12 +77,12 @@ struct spdk_posix_sock { TAILQ_ENTRY(spdk_posix_sock) link; }; -TAILQ_HEAD(spdk_pending_events_list, spdk_posix_sock); +TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock); struct spdk_posix_sock_group_impl { struct spdk_sock_group_impl base; int fd; - struct spdk_pending_events_list pending_events; + struct spdk_has_data_list socks_with_data; int placement_id; }; @@ -904,13 +905,16 @@ posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int spdk_pipe_reader_advance(sock->recv_pipe, bytes); - /* If we drained the pipe, take it off the pending_events list. The socket may still have data buffered - * in the kernel to receive, but this will be handled on the next poll call when we get the same EPOLLIN - * event again. */ - if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If we drained the pipe, mark it appropriately */ + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + assert(sock->pipe_has_data == true); + group = __posix_group_impl(sock->base.group_impl); - TAILQ_REMOVE(&group->pending_events, sock, link); - sock->pending_events = false; + if (group && !sock->socket_has_data) { + TAILQ_REMOVE(&group->socks_with_data, sock, link); + } + + sock->pipe_has_data = false; } return bytes; @@ -920,34 +924,46 @@ static inline ssize_t posix_sock_read(struct spdk_posix_sock *sock) { struct iovec iov[2]; - int bytes; + int bytes_avail, bytes_recvd; struct spdk_posix_sock_group_impl *group; - bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); + bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); - if (bytes > 0) { - bytes = readv(sock->fd, iov, 2); - if (bytes > 0) { - spdk_pipe_writer_advance(sock->recv_pipe, bytes); - - /* For normal operation, this function is called in response to an EPOLLIN - * event, which already placed the socket onto the pending_events list. - * But between polls the user may repeatedly call posix_sock_read - * and if they clear the pipe on one of those earlier calls, the - * socket will be removed from the pending_events list. In that case, - * if we now found more data, put it back on. - * This essentially never happens in practice because the application - * will stop trying to receive and wait for the next EPOLLIN event, but - * for correctness let's handle it. */ - if (!sock->pending_events && sock->base.group_impl) { - group = __posix_group_impl(sock->base.group_impl); - TAILQ_INSERT_TAIL(&group->pending_events, sock, link); - sock->pending_events = true; - } - } + if (bytes_avail <= 0) { + return bytes_avail; } - return bytes; + bytes_recvd = readv(sock->fd, iov, 2); + + assert(sock->pipe_has_data == false); + + if (bytes_recvd <= 0) { + /* Errors count as draining the socket data */ + if (sock->base.group_impl && sock->socket_has_data) { + group = __posix_group_impl(sock->base.group_impl); + TAILQ_REMOVE(&group->socks_with_data, sock, link); + } + + sock->socket_has_data = false; + + return bytes_recvd; + } + + spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd); + +#if DEBUG + if (sock->base.group_impl) { + assert(sock->socket_has_data == true); + } +#endif + + sock->pipe_has_data = true; + if (bytes_recvd < bytes_avail) { + /* We drained the kernel socket entirely. */ + sock->socket_has_data = false; + } + + return bytes_recvd; } static ssize_t @@ -959,26 +975,26 @@ posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) size_t len; if (sock->recv_pipe == NULL) { - if (group && sock->pending_events) { - sock->pending_events = false; - TAILQ_REMOVE(&group->pending_events, sock, link); + assert(sock->pipe_has_data == false); + if (group && sock->socket_has_data) { + sock->socket_has_data = false; + TAILQ_REMOVE(&group->socks_with_data, sock, link); } return readv(sock->fd, iov, iovcnt); } - len = 0; - for (i = 0; i < iovcnt; i++) { - len += iov[i].iov_len; - } - - if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If the socket is not in a group, we must assume it always has + * data waiting for us because it is not epolled */ + if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) { /* If the user is receiving a sufficiently large amount of data, * receive directly to their buffers. */ + len = 0; + for (i = 0; i < iovcnt; i++) { + len += iov[i].iov_len; + } + if (len >= MIN_SOCK_PIPE_SIZE) { - if (group && sock->pending_events) { - sock->pending_events = false; - TAILQ_REMOVE(&group->pending_events, sock, link); - } + /* TODO: Should this detect if kernel socket is drained? */ return readv(sock->fd, iov, iovcnt); } @@ -1160,7 +1176,7 @@ posix_sock_group_impl_create(void) } group_impl->fd = fd; - TAILQ_INIT(&group_impl->pending_events); + TAILQ_INIT(&group_impl->socks_with_data); group_impl->placement_id = -1; if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) { @@ -1256,9 +1272,9 @@ posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_ /* switched from another polling group due to scheduling */ if (spdk_unlikely(sock->recv_pipe != NULL && (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { - assert(sock->pending_events == false); - sock->pending_events = true; - TAILQ_INSERT_TAIL(&group->pending_events, sock, link); + sock->pipe_has_data = true; + sock->socket_has_data = false; + TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link); } if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) { @@ -1281,9 +1297,10 @@ posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct sp struct spdk_posix_sock *sock = __posix_sock(_sock); int rc; - if (sock->pending_events) { - TAILQ_REMOVE(&group->pending_events, sock, link); - sock->pending_events = false; + if (sock->pipe_has_data || sock->socket_has_data) { + TAILQ_REMOVE(&group->socks_with_data, sock, link); + sock->pipe_has_data = false; + sock->socket_has_data = false; } if (sock->placement_id != -1) { @@ -1362,7 +1379,7 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, */ int last_placement_id = -1; - TAILQ_FOREACH(psock, &group->pending_events, link) { + TAILQ_FOREACH(psock, &group->socks_with_data, link) { if (psock->zcopy && psock->placement_id >= 0 && psock->placement_id != last_placement_id) { struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0}; @@ -1433,16 +1450,16 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, psock = __posix_sock(sock); #endif - /* If the socket does not already have recv pending, add it now */ - if (!psock->pending_events) { - psock->pending_events = true; - TAILQ_INSERT_TAIL(&group->pending_events, psock, link); + /* If the socket is not already in the list, add it now */ + if (!psock->socket_has_data && !psock->pipe_has_data) { + TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link); } + psock->socket_has_data = true; } num_events = 0; - TAILQ_FOREACH_SAFE(psock, &group->pending_events, link, ptmp) { + TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) { if (num_events == max_events) { break; } @@ -1450,15 +1467,16 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, /* If the socket's cb_fn is NULL, just remove it from the * list and do not add it to socks array */ if (spdk_unlikely(psock->base.cb_fn == NULL)) { - psock->pending_events = false; - TAILQ_REMOVE(&group->pending_events, psock, link); + psock->socket_has_data = false; + psock->pipe_has_data = false; + TAILQ_REMOVE(&group->socks_with_data, psock, link); continue; } socks[num_events++] = &psock->base; } - /* Cycle the pending_events list so that each time we poll things aren't + /* Cycle the has_data list so that each time we poll things aren't * in the same order. Say we have 6 sockets in the list, named as follows: * A B C D E F * And all 6 sockets had epoll events, but max_events is only 3. That means @@ -1473,9 +1491,9 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, /* Capture pointers to the elements we need */ pd = psock; - pc = TAILQ_PREV(pd, spdk_pending_events_list, link); - pa = TAILQ_FIRST(&group->pending_events); - pf = TAILQ_LAST(&group->pending_events, spdk_pending_events_list); + pc = TAILQ_PREV(pd, spdk_has_data_list, link); + pa = TAILQ_FIRST(&group->socks_with_data); + pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list); /* Break the link between C and D */ pc->link.tqe_next = NULL; @@ -1486,8 +1504,8 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, pa->link.tqe_prev = &pf->link.tqe_next; /* Fix up the list first/last pointers */ - group->pending_events.tqh_first = pd; - group->pending_events.tqh_last = &pc->link.tqe_next; + group->socks_with_data.tqh_first = pd; + group->socks_with_data.tqh_last = &pc->link.tqe_next; } return num_events;