diff --git a/mk/spdk.lib_deps.mk b/mk/spdk.lib_deps.mk index cd8a345264..c0f9d69316 100644 --- a/mk/spdk.lib_deps.mk +++ b/mk/spdk.lib_deps.mk @@ -107,7 +107,7 @@ DEPDIRS-accel_ioat := log ioat conf thread $(JSON_LIBS) accel DEPDIRS-env_dpdk_rpc := log $(JSON_LIBS) # module/sock -DEPDIRS-sock_posix := log sock +DEPDIRS-sock_posix := log sock util DEPDIRS-sock_vpp := log sock util thread # module/bdev diff --git a/module/sock/posix/posix.c b/module/sock/posix/posix.c index 4860d0a403..3d78769ea2 100644 --- a/module/sock/posix/posix.c +++ b/module/sock/posix/posix.c @@ -41,7 +41,9 @@ #endif #include "spdk/log.h" +#include "spdk/pipe.h" #include "spdk/sock.h" +#include "spdk/util.h" #include "spdk_internal/sock.h" #define MAX_TMPBUF 1024 @@ -60,11 +62,19 @@ struct spdk_posix_sock { uint32_t sendmsg_idx; bool zcopy; + + struct spdk_pipe *recv_pipe; + void *recv_buf; + int recv_buf_sz; + bool pending_recv; + + TAILQ_ENTRY(spdk_posix_sock) link; }; struct spdk_posix_sock_group_impl { struct spdk_sock_group_impl base; int fd; + TAILQ_HEAD(, spdk_posix_sock) pending_recv; }; static int @@ -175,6 +185,70 @@ enum spdk_posix_sock_create_type { SPDK_SOCK_CREATE_CONNECT, }; +static int +spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) +{ + uint8_t *new_buf; + struct spdk_pipe *new_pipe; + struct iovec siov[2]; + struct iovec diov[2]; + int sbytes; + ssize_t bytes; + + if (sock->recv_buf_sz == sz) { + return 0; + } + + /* If the new size is 0, just free the pipe */ + if (sz == 0) { + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + sock->recv_pipe = NULL; + sock->recv_buf = NULL; + return 0; + } + + /* Round up to next 64 byte multiple */ + new_buf = calloc(((sz + 1) >> 6) << 6, sizeof(uint8_t)); + if (!new_buf) { + SPDK_ERRLOG("socket recv buf allocation failed\n"); + return -ENOMEM; + } + + new_pipe = spdk_pipe_create(new_buf, sz + 1); + if (new_pipe == NULL) { + SPDK_ERRLOG("socket pipe allocation failed\n"); + free(new_buf); + return -ENOMEM; + } + + if (sock->recv_pipe != NULL) { + /* Pull all of the data out of the old pipe */ + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes > sz) { + /* Too much data to fit into the new pipe size */ + spdk_pipe_destroy(new_pipe); + free(new_buf); + return -EINVAL; + } + + sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); + assert(sbytes == sz); + + bytes = spdk_iovcpy(siov, 2, diov, 2); + spdk_pipe_writer_advance(new_pipe, bytes); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + } + + sock->recv_buf_sz = sz; + sock->recv_buf = new_buf; + sock->recv_pipe = new_pipe; + + return 0; +} + static int spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) { @@ -219,8 +293,9 @@ static struct spdk_posix_sock * _spdk_posix_sock_alloc(int fd) { struct spdk_posix_sock *sock; + int rc; #ifdef SPDK_ZEROCOPY - int flag, rc; + int flag; #endif sock = calloc(1, sizeof(*sock)); @@ -231,6 +306,16 @@ _spdk_posix_sock_alloc(int fd) sock->fd = fd; +#ifndef __aarch64__ + /* On ARM systems, this buffering does not help. Skip it. */ + /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ + rc = spdk_posix_sock_alloc_pipe(sock, 8192); + if (rc) { + SPDK_ERRLOG("unable to allocate sufficient recvbuf\n"); + free(sock); + return NULL; + } +#endif #ifdef SPDK_ZEROCOPY /* Try to turn on zero copy sends */ @@ -459,6 +544,8 @@ spdk_posix_sock_close(struct spdk_sock *_sock) * memory. */ close(sock->fd); + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); free(sock); return 0; @@ -675,19 +762,108 @@ spdk_posix_sock_flush(struct spdk_sock *_sock) } static ssize_t -spdk_posix_sock_recv(struct spdk_sock *_sock, void *buf, size_t len) +spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) { - struct spdk_posix_sock *sock = __posix_sock(_sock); + struct iovec siov[2]; + int sbytes; + ssize_t bytes; + struct spdk_posix_sock_group_impl *group; - return recv(sock->fd, buf, len, MSG_DONTWAIT); + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes < 0) { + errno = EINVAL; + return -1; + } else if (sbytes == 0) { + errno = EAGAIN; + return -1; + } + + bytes = spdk_iovcpy(siov, 2, diov, diovcnt); + + if (bytes == 0) { + /* The only way this happens is if diov is 0 length */ + errno = EINVAL; + return -1; + } + + spdk_pipe_reader_advance(sock->recv_pipe, bytes); + + /* If we drained the pipe, take it off the level-triggered list */ + if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + group = __posix_group_impl(sock->base.group_impl); + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + + return bytes; +} + +static inline ssize_t +_spdk_posix_sock_read(struct spdk_posix_sock *sock) +{ + struct iovec iov[2]; + int bytes; + struct spdk_posix_sock_group_impl *group; + + bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); + + if (bytes > 0) { + bytes = readv(sock->fd, iov, 2); + if (bytes > 0) { + spdk_pipe_writer_advance(sock->recv_pipe, bytes); + if (sock->base.group_impl) { + group = __posix_group_impl(sock->base.group_impl); + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + sock->pending_recv = true; + } + } + } + + return bytes; } static ssize_t spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) { struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc, i; + size_t len; - return readv(sock->fd, iov, iovcnt); + if (sock->recv_pipe == NULL) { + return readv(sock->fd, iov, iovcnt); + } + + len = 0; + for (i = 0; i < iovcnt; i++) { + len += iov[i].iov_len; + } + + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If the user is receiving a sufficiently large amount of data, + * receive directly to their buffers. */ + if (len >= 1024) { + return readv(sock->fd, iov, iovcnt); + } + + /* Otherwise, do a big read into our pipe */ + rc = _spdk_posix_sock_read(sock); + if (rc <= 0) { + return rc; + } + } + + return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt); +} + +static ssize_t +spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) +{ + struct iovec iov[1]; + + iov[0].iov_base = buf; + iov[0].iov_len = len; + + return spdk_posix_sock_readv(sock, iov, 1); } static ssize_t @@ -867,6 +1043,7 @@ spdk_posix_sock_group_impl_create(void) } group_impl->fd = fd; + TAILQ_INIT(&group_impl->pending_recv); return &group_impl->base; } @@ -905,6 +1082,13 @@ spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, stru struct spdk_posix_sock *sock = __posix_sock(_sock); int rc; + if (sock->recv_pipe != NULL) { + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + } + #if defined(__linux__) struct epoll_event event; @@ -934,7 +1118,8 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve { struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); struct spdk_sock *sock, *tmp; - int num_events, i, j, rc; + int num_events, i, rc; + struct spdk_posix_sock *psock, *ptmp; #if defined(__linux__) struct epoll_event events[MAX_EVENTS_PER_POLL]; #elif defined(__FreeBSD__) @@ -962,9 +1147,10 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve return -1; } - for (i = 0, j = 0; i < num_events; i++) { + for (i = 0; i < num_events; i++) { #if defined(__linux__) sock = events[i].data.ptr; + psock = __posix_sock(sock); #ifdef SPDK_ZEROCOPY if (events[i].events & EPOLLERR) { @@ -977,18 +1163,48 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve } } #endif - - if (events[i].events & EPOLLIN) { - socks[j++] = sock; + if ((events[i].events & EPOLLIN) == 0) { + continue; } #elif defined(__FreeBSD__) - socks[j++] = events[i].udata; + sock = events[i].udata; + psock = __posix_sock(sock); #endif + /* If the socket does not already have recv pending, add it now */ + if (!psock->pending_recv) { + psock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); + } + } + + num_events = 0; + + TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { + if (num_events == max_events) { + break; + } + + socks[num_events++] = &psock->base; + } + + /* Cycle the pending_recv list so that each time we poll things aren't + * in the same order. */ + for (i = 0; i < num_events; i++) { + psock = __posix_sock(socks[i]); + + TAILQ_REMOVE(&group->pending_recv, psock, link); + + if (psock->recv_pipe != NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { + psock->pending_recv = false; + } else { + TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); + } + } - return j; + return num_events; } static int