Change the way we transfer links (again). The previous
method avoided all race conditions, but suffered from sometimes running out of buffer space if enough clients were piled up at the same time. Now, the client pushes the link descriptor, one end of a socketpair() and the ppp version via sendmsg() at the server. The server replies with a pid. The client then transfers any link lock with uu_lock_txfr() and writev()s the actual link contents. The socketpair is now the only place we need to have large socket buffers and the bind()ed socket can keep the default 4k buffer while still handling around 90 racing clients.
This commit is contained in:
parent
3d1d437531
commit
cbee975442
Notes:
svn2git
2020-12-20 02:59:44 +00:00
svn path=/head/; revision=53970
@ -1353,13 +1353,12 @@ void
|
||||
bundle_ReceiveDatalink(struct bundle *bundle, int s)
|
||||
{
|
||||
char cmsgbuf[sizeof(struct cmsghdr) + sizeof(int) * SEND_MAXFD];
|
||||
int niov, expect, f, *fd, nfd, onfd;
|
||||
int niov, expect, f, *fd, nfd, onfd, got;
|
||||
struct iovec iov[SCATTER_SEGMENTS];
|
||||
struct cmsghdr *cmsg;
|
||||
struct msghdr msg;
|
||||
struct datalink *dl;
|
||||
pid_t pid;
|
||||
char ack;
|
||||
|
||||
log_Printf(LogPHASE, "Receiving datalink\n");
|
||||
|
||||
@ -1382,7 +1381,8 @@ bundle_ReceiveDatalink(struct bundle *bundle, int s)
|
||||
log_Printf(LogERROR, "Cannot allocate space to receive link\n");
|
||||
return;
|
||||
}
|
||||
expect += iov[f].iov_len;
|
||||
if (f)
|
||||
expect += iov[f].iov_len;
|
||||
}
|
||||
|
||||
/* Set up our message */
|
||||
@ -1395,17 +1395,18 @@ bundle_ReceiveDatalink(struct bundle *bundle, int s)
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iovlen = niov;
|
||||
msg.msg_iovlen = 1; /* Only send the version at the first pass */
|
||||
msg.msg_control = cmsgbuf;
|
||||
msg.msg_controllen = sizeof cmsgbuf;
|
||||
|
||||
log_Printf(LogDEBUG, "Expecting %d scatter/gather bytes\n", expect);
|
||||
log_Printf(LogDEBUG, "Expecting %d scatter/gather bytes\n", iov[0].iov_len);
|
||||
|
||||
if ((f = recvmsg(s, &msg, MSG_WAITALL)) != expect) {
|
||||
if (f == -1)
|
||||
if ((got = recvmsg(s, &msg, MSG_WAITALL)) != iov[0].iov_len) {
|
||||
if (got == -1)
|
||||
log_Printf(LogERROR, "Failed recvmsg: %s\n", strerror(errno));
|
||||
else
|
||||
log_Printf(LogERROR, "Failed recvmsg: Got %d, not %d\n", f, expect);
|
||||
log_Printf(LogERROR, "Failed recvmsg: Got %d, not %d\n",
|
||||
got, iov[0].iov_len);
|
||||
while (niov--)
|
||||
free(iov[niov].iov_base);
|
||||
return;
|
||||
@ -1432,11 +1433,10 @@ bundle_ReceiveDatalink(struct bundle *bundle, int s)
|
||||
}
|
||||
|
||||
/*
|
||||
* We've successfully received one or more open file descriptors
|
||||
* through our socket
|
||||
* We've successfully received two or more open file descriptors
|
||||
* through our socket, plus a version string. Make sure it's the
|
||||
* correct version, and drop the connection if it's not.
|
||||
*/
|
||||
log_Printf(LogDEBUG, "Receiving device descriptor\n");
|
||||
|
||||
if (strncmp(Version, iov[0].iov_base, iov[0].iov_len)) {
|
||||
log_Printf(LogWARN, "Cannot receive datalink, incorrect version"
|
||||
" (\"%.*s\", not \"%s\")\n", (int)iov[0].iov_len,
|
||||
@ -1448,16 +1448,44 @@ bundle_ReceiveDatalink(struct bundle *bundle, int s)
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Everything looks good. Send the other side our process id so that
|
||||
* they can transfer lock ownership, and wait for them to send the
|
||||
* actual link data.
|
||||
*/
|
||||
pid = getpid();
|
||||
if ((got = write(fd[1], &pid, sizeof pid)) != sizeof pid) {
|
||||
if (got == -1)
|
||||
log_Printf(LogERROR, "Failed write: %s\n", strerror(errno));
|
||||
else
|
||||
log_Printf(LogERROR, "Failed write: Got %d, not %d\n", got,
|
||||
(int)(sizeof pid));
|
||||
while (nfd--)
|
||||
close(fd[nfd]);
|
||||
while (niov--)
|
||||
free(iov[niov].iov_base);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((got = readv(fd[1], iov + 1, niov - 1)) != expect) {
|
||||
if (got == -1)
|
||||
log_Printf(LogERROR, "Failed write: %s\n", strerror(errno));
|
||||
else
|
||||
log_Printf(LogERROR, "Failed write: Got %d, not %d\n", got, expect);
|
||||
while (nfd--)
|
||||
close(fd[nfd]);
|
||||
while (niov--)
|
||||
free(iov[niov].iov_base);
|
||||
return;
|
||||
}
|
||||
close(fd[1]);
|
||||
|
||||
onfd = nfd; /* We've got this many in our array */
|
||||
nfd -= 2; /* Don't include p->fd and our reply descriptor */
|
||||
niov = 1; /* Skip the version id */
|
||||
dl = iov2datalink(bundle, iov, &niov, sizeof iov / sizeof *iov, fd[0],
|
||||
fd + 2, &nfd);
|
||||
if (dl) {
|
||||
pid = getpid();
|
||||
write(fd[1], &pid, sizeof pid); /* Please hand me any locks */
|
||||
read(fd[1], &ack, 1); /* Thanks (ACK) ! */
|
||||
close(fd[1]);
|
||||
|
||||
if (nfd) {
|
||||
log_Printf(LogERROR, "bundle_ReceiveDatalink: Failed to handle %d "
|
||||
@ -1526,8 +1554,13 @@ bundle_SendDatalink(struct datalink *dl, int s, struct sockaddr_un *sun)
|
||||
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
/*
|
||||
* Only send the version to start... We used to send the whole lot, but
|
||||
* this caused problems with our RECVBUF size as a single link is about
|
||||
* 22k ! This way, we should bump into no limits.
|
||||
*/
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iovlen = niov;
|
||||
msg.msg_control = cmsgbuf;
|
||||
msg.msg_controllen = sizeof *cmsg + sizeof(int) * nfd;
|
||||
msg.msg_flags = 0;
|
||||
@ -1540,28 +1573,50 @@ bundle_SendDatalink(struct datalink *dl, int s, struct sockaddr_un *sun)
|
||||
for (f = 0; f < nfd; f++)
|
||||
*((int *)(cmsg + 1) + f) = fd[f];
|
||||
|
||||
for (f = expect = 0; f < niov; f++)
|
||||
for (f = 1, expect = 0; f < niov; f++)
|
||||
expect += iov[f].iov_len;
|
||||
|
||||
if (setsockopt(reply[0], SOL_SOCKET, SO_SNDBUF, &expect, sizeof(int)) == -1)
|
||||
log_Printf(LogERROR, "setsockopt(SO_RCVBUF, %d): %s\n", expect,
|
||||
strerror(errno));
|
||||
if (setsockopt(reply[1], SOL_SOCKET, SO_RCVBUF, &expect, sizeof(int)) == -1)
|
||||
log_Printf(LogERROR, "setsockopt(SO_RCVBUF, %d): %s\n", expect,
|
||||
strerror(errno));
|
||||
|
||||
log_Printf(LogDEBUG, "Sending %d descriptor%s and %d bytes in scatter"
|
||||
"/gather array\n", nfd, nfd == 1 ? "" : "s", expect);
|
||||
"/gather array\n", nfd, nfd == 1 ? "" : "s", iov[0].iov_len);
|
||||
|
||||
if ((got = sendmsg(s, &msg, 0)) == -1)
|
||||
log_Printf(LogERROR, "Failed sendmsg: %s: %s\n",
|
||||
sun->sun_path, strerror(errno));
|
||||
else if (got != expect)
|
||||
log_Printf(LogERROR, "Failed sendmsg: %s: Only sent %d of %d\n",
|
||||
sun->sun_path, got, expect);
|
||||
else if (got != iov[0].iov_len)
|
||||
log_Printf(LogERROR, "%s: Failed initial sendmsg: Only sent %d of %d\n",
|
||||
sun->sun_path, got, iov[0].iov_len);
|
||||
else {
|
||||
/* We must get the ACK before closing the descriptor ! */
|
||||
int res;
|
||||
|
||||
read(reply[0], &newpid, sizeof newpid);
|
||||
log_Printf(LogDEBUG, "Received confirmation from pid %d\n", (int)newpid);
|
||||
if (lock && (res = ID0uu_lock_txfr(lock, newpid)) != UU_LOCK_OK)
|
||||
log_Printf(LogPHASE, "uu_lock_txfr: %s\n", uu_lockerr(res));
|
||||
if ((got = read(reply[0], &newpid, sizeof newpid)) == sizeof newpid) {
|
||||
log_Printf(LogDEBUG, "Received confirmation from pid %d\n",
|
||||
(int)newpid);
|
||||
if (lock && (res = ID0uu_lock_txfr(lock, newpid)) != UU_LOCK_OK)
|
||||
log_Printf(LogPHASE, "uu_lock_txfr: %s\n", uu_lockerr(res));
|
||||
|
||||
write(reply[1], "!", 1); /* Thanks (ACK) ! */
|
||||
log_Printf(LogDEBUG, "Transmitting link (%d bytes)\n", expect);
|
||||
if ((got = writev(reply[0], iov + 1, niov - 1)) != expect) {
|
||||
if (got == -1)
|
||||
log_Printf(LogERROR, "%s: Failed writev: %s\n",
|
||||
sun->sun_path, strerror(errno));
|
||||
else
|
||||
log_Printf(LogERROR, "%s: Failed writev: Wrote %d of %d\n",
|
||||
sun->sun_path, got, expect);
|
||||
}
|
||||
} else if (got == -1)
|
||||
log_Printf(LogERROR, "%s: Failed socketpair read: %s\n",
|
||||
sun->sun_path, strerror(errno));
|
||||
else
|
||||
log_Printf(LogERROR, "%s: Failed socketpair read: Got %d of %d\n",
|
||||
sun->sun_path, got, (int)(sizeof newpid));
|
||||
}
|
||||
|
||||
close(reply[0]);
|
||||
|
@ -151,7 +151,7 @@
|
||||
#define NEG_SHORTSEQ 52
|
||||
#define NEG_VJCOMP 53
|
||||
|
||||
const char Version[] = "2.25";
|
||||
const char Version[] = "2.26";
|
||||
|
||||
static int ShowCommand(struct cmdargs const *);
|
||||
static int TerminalCommand(struct cmdargs const *);
|
||||
|
@ -1046,7 +1046,7 @@ mpserver_Init(struct mpserver *s)
|
||||
int
|
||||
mpserver_Open(struct mpserver *s, struct peerid *peer)
|
||||
{
|
||||
int f, l, bufsz;
|
||||
int f, l;
|
||||
mode_t mask;
|
||||
|
||||
if (s->fd != -1) {
|
||||
@ -1076,17 +1076,10 @@ mpserver_Open(struct mpserver *s, struct peerid *peer)
|
||||
mask = umask(0177);
|
||||
|
||||
/*
|
||||
* Calculate how big a link is. It's vital that we set our receive
|
||||
* buffer size before binding the socket, otherwise we'll end up with
|
||||
* a sendmsg() failing with ENOBUFS.
|
||||
* Try to bind the socket. If we succeed we play server, if we fail
|
||||
* we connect() and hand the link off.
|
||||
*/
|
||||
|
||||
bufsz = bundle_LinkSize() + SOCKET_OVERHEAD;
|
||||
log_Printf(LogDEBUG, "Setting MP socket buffer size to %d\n", bufsz);
|
||||
if (setsockopt(s->fd, SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof bufsz) == -1)
|
||||
log_Printf(LogERROR, "setsockopt(SO_RCVBUF, %d): %s\n", bufsz,
|
||||
strerror(errno));
|
||||
|
||||
if (ID0bind_un(s->fd, &s->socket) < 0) {
|
||||
if (errno != EADDRINUSE) {
|
||||
log_Printf(LogPHASE, "mpserver: can't create bundle socket %s (%s)\n",
|
||||
@ -1097,10 +1090,7 @@ mpserver_Open(struct mpserver *s, struct peerid *peer)
|
||||
return MPSERVER_FAILED;
|
||||
}
|
||||
|
||||
/* Ok, so we'll play sender... set the send buffer size */
|
||||
if (setsockopt(s->fd, SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof bufsz) == -1)
|
||||
log_Printf(LogERROR, "setsockopt(SO_SNDBUF, %d): %s\n", bufsz,
|
||||
strerror(errno));
|
||||
/* So we're the sender */
|
||||
umask(mask);
|
||||
if (ID0connect_un(s->fd, &s->socket) < 0) {
|
||||
log_Printf(LogPHASE, "mpserver: can't connect to bundle socket %s (%s)\n",
|
||||
|
Loading…
Reference in New Issue
Block a user