First cut at implementing SOCK_SEQPACKET support for UNIX (local) domain

sockets.  This allows for reliable bi-directional datagram communication
over UNIX domain sockets, in contrast to SOCK_DGRAM (M:N, unreliable) or
SOCK_STERAM (bi-directional bytestream).  Largely, this reuses existing
UNIX domain socket code.  This allows applications requiring record-
oriented semantics to do so reliably via local IPC.

Some implementation notes (also present in XXX comments):

- Currently we lack an sbappend variant able to do datagrams and control
  data without doing addresses, so we mark SOCK_SEQPACKET as PR_ADDR.
  Adding a new variant will solve this problem.

- UNIX domain sockets on FreeBSD provide back-pressure/flow control
  notification for stream sockets by manipulating the send socket
  buffer's size during pru_send and pru_rcvd.  This trick works less well
  for SOCK_SEQPACKET as sosend_generic() uses sb_hiwat not just to
  manage blocking, but also to determine maximum datagram size.  Fixing
  this requires rethinking how back-pressure is done for SOCK_SEQPACKET;
  in the mean time, it's possible to get EMSGSIZE when buffers fill,
  instead of blocking.

Discussed with:	benl
Reviewed by:	bz, rpaulo
MFC after:	3 months
Sponsored by:	Google
This commit is contained in:
Robert Watson 2009-10-05 14:49:16 +00:00
parent 432e1942d2
commit 84d61770bc

View File

@ -50,7 +50,8 @@
* garbage collector to find and tear down cycles of disconnected sockets.
*
* TODO:
* SEQPACKET, RDM
* RDM
* distinguish datagram size limits from flow control limits in SEQPACKET
* rethink name space problems
* need a proper out-of-band
*/
@ -112,6 +113,7 @@ static ino_t unp_ino; /* Prototype for fake inode numbers. */
static int unp_rights; /* (g) File descriptors in flight. */
static struct unp_head unp_shead; /* (l) List of stream sockets. */
static struct unp_head unp_dhead; /* (l) List of datagram sockets. */
static struct unp_head unp_sphead; /* (l) List of seqpacket sockets. */
static const struct sockaddr sun_noname = { sizeof(sun_noname), AF_LOCAL };
@ -139,10 +141,14 @@ static u_long unpst_sendspace = PIPSIZ;
static u_long unpst_recvspace = PIPSIZ;
static u_long unpdg_sendspace = 2*1024; /* really max datagram size */
static u_long unpdg_recvspace = 4*1024;
static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */
static u_long unpsp_recvspace = PIPSIZ;
SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW, 0, "Local domain");
SYSCTL_NODE(_net_local, SOCK_STREAM, stream, CTLFLAG_RW, 0, "SOCK_STREAM");
SYSCTL_NODE(_net_local, SOCK_DGRAM, dgram, CTLFLAG_RW, 0, "SOCK_DGRAM");
SYSCTL_NODE(_net_local, SOCK_SEQPACKET, seqpacket, CTLFLAG_RW, 0,
"SOCK_SEQPACKET");
SYSCTL_ULONG(_net_local_stream, OID_AUTO, sendspace, CTLFLAG_RW,
&unpst_sendspace, 0, "Default stream send space.");
@ -152,6 +158,10 @@ SYSCTL_ULONG(_net_local_dgram, OID_AUTO, maxdgram, CTLFLAG_RW,
&unpdg_sendspace, 0, "Default datagram send space.");
SYSCTL_ULONG(_net_local_dgram, OID_AUTO, recvspace, CTLFLAG_RW,
&unpdg_recvspace, 0, "Default datagram receive space.");
SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, maxseqpacket, CTLFLAG_RW,
&unpsp_sendspace, 0, "Default seqpacket send space.");
SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, recvspace, CTLFLAG_RW,
&unpsp_recvspace, 0, "Default seqpacket receive space.");
SYSCTL_INT(_net_local, OID_AUTO, inflight, CTLFLAG_RD, &unp_rights, 0,
"File descriptors in flight.");
@ -257,6 +267,7 @@ static struct mbuf *unp_addsockcred(struct thread *, struct mbuf *);
*/
static struct domain localdomain;
static struct pr_usrreqs uipc_usrreqs_dgram, uipc_usrreqs_stream;
static struct pr_usrreqs uipc_usrreqs_seqpacket;
static struct protosw localsw[] = {
{
.pr_type = SOCK_STREAM,
@ -271,6 +282,19 @@ static struct protosw localsw[] = {
.pr_flags = PR_ATOMIC|PR_ADDR|PR_RIGHTS,
.pr_usrreqs = &uipc_usrreqs_dgram
},
{
.pr_type = SOCK_SEQPACKET,
.pr_domain = &localdomain,
/*
* XXXRW: For now, PR_ADDR because soreceive will bump into them
* due to our use of sbappendaddr. A new sbappend variants is needed
* that supports both atomic record writes and control data.
*/
.pr_flags = PR_ADDR|PR_ATOMIC|PR_CONNREQUIRED|PR_WANTRCVD|
PR_RIGHTS,
.pr_usrreqs = &uipc_usrreqs_seqpacket,
},
};
static struct domain localdomain = {
@ -353,6 +377,11 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
recvspace = unpdg_recvspace;
break;
case SOCK_SEQPACKET:
sendspace = unpsp_sendspace;
recvspace = unpsp_recvspace;
break;
default:
panic("uipc_attach");
}
@ -372,8 +401,22 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
UNP_LIST_LOCK();
unp->unp_gencnt = ++unp_gencnt;
unp_count++;
LIST_INSERT_HEAD(so->so_type == SOCK_DGRAM ? &unp_dhead : &unp_shead,
unp, unp_link);
switch (so->so_type) {
case SOCK_STREAM:
LIST_INSERT_HEAD(&unp_shead, unp, unp_link);
break;
case SOCK_DGRAM:
LIST_INSERT_HEAD(&unp_dhead, unp, unp_link);
break;
case SOCK_SEQPACKET:
LIST_INSERT_HEAD(&unp_sphead, unp, unp_link);
break;
default:
panic("uipc_attach");
}
UNP_LIST_UNLOCK();
return (0);
@ -705,11 +748,8 @@ uipc_rcvd(struct socket *so, int flags)
unp = sotounpcb(so);
KASSERT(unp != NULL, ("uipc_rcvd: unp == NULL"));
if (so->so_type == SOCK_DGRAM)
panic("uipc_rcvd DGRAM?");
if (so->so_type != SOCK_STREAM)
panic("uipc_rcvd unknown socktype");
if (so->so_type != SOCK_STREAM && so->so_type != SOCK_SEQPACKET)
panic("uipc_rcvd socktype %d", so->so_type);
/*
* Adjust backpressure on sender and wakeup any waiting to write.
@ -824,6 +864,7 @@ uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam,
break;
}
case SOCK_SEQPACKET:
case SOCK_STREAM:
if ((so->so_state & SS_ISCONNECTED) == 0) {
if (nam != NULL) {
@ -875,11 +916,33 @@ uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam,
* Send to paired receive port, and then reduce send buffer
* hiwater marks to maintain backpressure. Wake up readers.
*/
if (control != NULL) {
if (sbappendcontrol_locked(&so2->so_rcv, m, control))
switch (so->so_type) {
case SOCK_STREAM:
if (control != NULL) {
if (sbappendcontrol_locked(&so2->so_rcv, m,
control))
control = NULL;
} else
sbappend_locked(&so2->so_rcv, m);
break;
case SOCK_SEQPACKET: {
const struct sockaddr *from;
from = &sun_noname;
if (sbappendaddr_locked(&so2->so_rcv, from, m,
control))
control = NULL;
} else
sbappend_locked(&so2->so_rcv, m);
break;
}
}
/*
* XXXRW: While fine for SOCK_STREAM, this conflates maximum
* datagram size and back-pressure for SOCK_SEQPACKET, which
* can lead to undesired return of EMSGSIZE on send instead
* of more desirable blocking.
*/
mbcnt_delta = so2->so_rcv.sb_mbcnt - unp2->unp_mbcnt;
unp2->unp_mbcnt = so2->so_rcv.sb_mbcnt;
sbcc = so2->so_rcv.sb_cc;
@ -939,7 +1002,8 @@ uipc_sense(struct socket *so, struct stat *sb)
UNP_LINK_RLOCK();
UNP_PCB_LOCK(unp);
unp2 = unp->unp_conn;
if (so->so_type == SOCK_STREAM && unp2 != NULL) {
if ((so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET) &&
unp2 != NULL) {
so2 = unp2->unp_socket;
sb->st_blksize += so2->so_rcv.sb_cc;
}
@ -1009,6 +1073,26 @@ static struct pr_usrreqs uipc_usrreqs_dgram = {
.pru_close = uipc_close,
};
static struct pr_usrreqs uipc_usrreqs_seqpacket = {
.pru_abort = uipc_abort,
.pru_accept = uipc_accept,
.pru_attach = uipc_attach,
.pru_bind = uipc_bind,
.pru_connect = uipc_connect,
.pru_connect2 = uipc_connect2,
.pru_detach = uipc_detach,
.pru_disconnect = uipc_disconnect,
.pru_listen = uipc_listen,
.pru_peeraddr = uipc_peeraddr,
.pru_rcvd = uipc_rcvd,
.pru_send = uipc_send,
.pru_sense = uipc_sense,
.pru_shutdown = uipc_shutdown,
.pru_sockaddr = uipc_sockaddr,
.pru_soreceive = soreceive_generic, /* XXX: or...? */
.pru_close = uipc_close,
};
static struct pr_usrreqs uipc_usrreqs_stream = {
.pru_abort = uipc_abort,
.pru_accept = uipc_accept,
@ -1306,6 +1390,7 @@ unp_connect2(struct socket *so, struct socket *so2, int req)
break;
case SOCK_STREAM:
case SOCK_SEQPACKET:
unp2->unp_conn = unp;
if (req == PRU_CONNECT &&
((unp->unp_flags | unp2->unp_flags) & UNP_CONNWAIT))
@ -1343,6 +1428,7 @@ unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
break;
case SOCK_STREAM:
case SOCK_SEQPACKET:
soisdisconnected(unp->unp_socket);
unp2->unp_conn = NULL;
soisdisconnected(unp2->unp_socket);
@ -1368,7 +1454,22 @@ unp_pcblist(SYSCTL_HANDLER_ARGS)
struct unp_head *head;
struct xunpcb *xu;
head = ((intptr_t)arg1 == SOCK_DGRAM ? &unp_dhead : &unp_shead);
switch ((intptr_t)arg1) {
case SOCK_STREAM:
head = &unp_shead;
break;
case SOCK_DGRAM:
head = &unp_dhead;
break;
case SOCK_SEQPACKET:
head = &unp_sphead;
break;
default:
panic("unp_pcblist: arg1 %d", (intptr_t)arg1);
}
/*
* The process of preparing the PCB list is too time-consuming and
@ -1481,6 +1582,9 @@ SYSCTL_PROC(_net_local_dgram, OID_AUTO, pcblist, CTLFLAG_RD,
SYSCTL_PROC(_net_local_stream, OID_AUTO, pcblist, CTLFLAG_RD,
(caddr_t)(long)SOCK_STREAM, 0, unp_pcblist, "S,xunpcb",
"List of active local stream sockets");
SYSCTL_PROC(_net_local_seqpacket, OID_AUTO, pcblist, CTLFLAG_RD,
(caddr_t)(long)SOCK_SEQPACKET, 0, unp_pcblist, "S,xunpcb",
"List of active local seqpacket sockets");
static void
unp_shutdown(struct unpcb *unp)
@ -1492,7 +1596,8 @@ unp_shutdown(struct unpcb *unp)
UNP_PCB_LOCK_ASSERT(unp);
unp2 = unp->unp_conn;
if (unp->unp_socket->so_type == SOCK_STREAM && unp2 != NULL) {
if ((unp->unp_socket->so_type == SOCK_STREAM ||
(unp->unp_socket->so_type == SOCK_SEQPACKET)) && unp2 != NULL) {
so = unp2->unp_socket;
if (so != NULL)
socantrcvmore(so);
@ -1658,6 +1763,7 @@ unp_init(void)
NULL, EVENTHANDLER_PRI_ANY);
LIST_INIT(&unp_dhead);
LIST_INIT(&unp_shead);
LIST_INIT(&unp_sphead);
TASK_INIT(&unp_gc_task, 0, unp_gc, NULL);
UNP_LINK_LOCK_INIT();
UNP_LIST_LOCK_INIT();
@ -1974,7 +2080,8 @@ SYSCTL_INT(_net_local, OID_AUTO, taskcount, CTLFLAG_RD, &unp_taskcount, 0,
static void
unp_gc(__unused void *arg, int pending)
{
struct unp_head *heads[] = { &unp_dhead, &unp_shead, NULL };
struct unp_head *heads[] = { &unp_dhead, &unp_shead, &unp_sphead,
NULL };
struct unp_head **head;
struct file **unref;
struct unpcb *unp;