Add support for backchannels to the kernel RPC. Backchannels

are used by NFSv4.1 for callbacks. A backchannel is a connection
established by the client, but used for RPCs done by the server
on the client (callbacks). As a result, this patch mixes some
client side calls in the server side and vice versa. Some
definitions in the .c files were extracted out into a file called
krpc.h, so that they could be included in multiple .c files.
This code has been in projects/nfsv4.1-client for some time.
Although no one has given it a formal review, I believe kib@
has taken a look at it.
This commit is contained in:
Rick Macklem 2012-12-08 00:29:16 +00:00
parent 33a38f7453
commit e2adc47dbb
6 changed files with 405 additions and 98 deletions

View File

@ -372,6 +372,7 @@ enum clnt_stat clnt_call_private(CLIENT *, struct rpc_callextra *, rpcproc_t,
#define CLGET_RETRIES 26 /* get retry count for reconnect */
#define CLSET_PRIVPORT 27 /* set privileged source port flag */
#define CLGET_PRIVPORT 28 /* get privileged source port flag */
#define CLSET_BACKCHANNEL 29 /* set backchannel for socket */
#endif

View File

@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc.h>
#include <rpc/rpc_com.h>
#include <rpc/krpc.h>
static enum clnt_stat clnt_reconnect_call(CLIENT *, struct rpc_callextra *,
rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
@ -67,27 +68,6 @@ static struct clnt_ops clnt_reconnect_ops = {
static int fake_wchan;
struct rc_data {
struct mtx rc_lock;
struct sockaddr_storage rc_addr; /* server address */
struct netconfig* rc_nconf; /* network type */
rpcprog_t rc_prog; /* program number */
rpcvers_t rc_vers; /* version number */
size_t rc_sendsz;
size_t rc_recvsz;
struct timeval rc_timeout;
struct timeval rc_retry;
int rc_retries;
int rc_privport;
char *rc_waitchan;
int rc_intr;
int rc_connecting;
int rc_closed;
struct ucred *rc_ucred;
CLIENT* rc_client; /* underlying RPC client */
struct rpc_err rc_err;
};
CLIENT *
clnt_reconnect_create(
struct netconfig *nconf, /* network type */
@ -211,6 +191,8 @@ clnt_reconnect_connect(CLIENT *cl)
CLNT_CONTROL(newclient, CLSET_RETRY_TIMEOUT, &rc->rc_retry);
CLNT_CONTROL(newclient, CLSET_WAITCHAN, rc->rc_waitchan);
CLNT_CONTROL(newclient, CLSET_INTERRUPTIBLE, &rc->rc_intr);
if (rc->rc_backchannel != NULL)
CLNT_CONTROL(newclient, CLSET_BACKCHANNEL, rc->rc_backchannel);
stat = RPC_SUCCESS;
out:
@ -385,6 +367,7 @@ static bool_t
clnt_reconnect_control(CLIENT *cl, u_int request, void *info)
{
struct rc_data *rc = (struct rc_data *)cl->cl_private;
SVCXPRT *xprt;
if (info == NULL) {
return (FALSE);
@ -466,6 +449,13 @@ clnt_reconnect_control(CLIENT *cl, u_int request, void *info)
*(int *) info = rc->rc_privport;
break;
case CLSET_BACKCHANNEL:
xprt = (SVCXPRT *)info;
SVC_ACQUIRE(xprt);
xprt_register(xprt);
rc->rc_backchannel = info;
break;
default:
return (FALSE);
}
@ -502,9 +492,15 @@ static void
clnt_reconnect_destroy(CLIENT *cl)
{
struct rc_data *rc = (struct rc_data *)cl->cl_private;
SVCXPRT *xprt;
if (rc->rc_client)
CLNT_DESTROY(rc->rc_client);
if (rc->rc_backchannel) {
xprt = (SVCXPRT *)rc->rc_backchannel;
xprt_unregister(xprt);
SVC_RELEASE(xprt);
}
crfree(rc->rc_ucred);
mtx_destroy(&rc->rc_lock);
mem_free(rc, sizeof(*rc));

View File

@ -67,6 +67,7 @@ __FBSDID("$FreeBSD$");
#include <sys/protosw.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
#include <sys/sx.h>
#include <sys/syslog.h>
#include <sys/time.h>
#include <sys/uio.h>
@ -77,8 +78,7 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc.h>
#include <rpc/rpc_com.h>
#define MCALL_MSG_SIZE 24
#include <rpc/krpc.h>
struct cmessage {
struct cmsghdr cmsg;
@ -106,43 +106,6 @@ static struct clnt_ops clnt_vc_ops = {
.cl_control = clnt_vc_control
};
/*
* A pending RPC request which awaits a reply. Requests which have
* received their reply will have cr_xid set to zero and cr_mrep to
* the mbuf chain of the reply.
*/
struct ct_request {
TAILQ_ENTRY(ct_request) cr_link;
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
};
TAILQ_HEAD(ct_request_list, ct_request);
struct ct_data {
struct mtx ct_lock;
int ct_threads; /* number of threads in clnt_vc_call */
bool_t ct_closing; /* TRUE if we are closing */
bool_t ct_closed; /* TRUE if we are closed */
struct socket *ct_socket; /* connection socket */
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
struct sockaddr_storage ct_addr; /* remote addr */
struct rpc_err ct_error;
uint32_t ct_xid;
char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
size_t ct_mpos; /* pos after marshal */
const char *ct_waitchan;
int ct_waitflag;
struct mbuf *ct_record; /* current reply record */
size_t ct_record_resid; /* how much left of reply to read */
bool_t ct_record_eor; /* true if reading last fragment */
struct ct_request_list ct_pending;
int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
};
static void clnt_vc_upcallsdone(struct ct_data *);
static const char clnt_vc_errstr[] = "%s : %s";
@ -641,6 +604,7 @@ clnt_vc_control(CLIENT *cl, u_int request, void *info)
{
struct ct_data *ct = (struct ct_data *)cl->cl_private;
void *infop = info;
SVCXPRT *xprt;
mtx_lock(&ct->ct_lock);
@ -752,6 +716,14 @@ clnt_vc_control(CLIENT *cl, u_int request, void *info)
*(int *) info = FALSE;
break;
case CLSET_BACKCHANNEL:
xprt = (SVCXPRT *)info;
if (ct->ct_backchannelxprt == NULL) {
xprt->xp_p2 = ct;
ct->ct_backchannelxprt = xprt;
}
break;
default:
mtx_unlock(&ct->ct_lock);
return (FALSE);
@ -817,10 +789,20 @@ clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
struct socket *so = NULL;
SVCXPRT *xprt;
clnt_vc_close(cl);
mtx_lock(&ct->ct_lock);
xprt = ct->ct_backchannelxprt;
ct->ct_backchannelxprt = NULL;
if (xprt != NULL) {
mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */
sx_xlock(&xprt->xp_lock);
mtx_lock(&ct->ct_lock);
xprt->xp_p2 = NULL;
xprt_unregister(xprt);
}
if (ct->ct_socket) {
if (ct->ct_closeit) {
@ -829,6 +811,10 @@ clnt_vc_destroy(CLIENT *cl)
}
mtx_unlock(&ct->ct_lock);
if (xprt != NULL) {
sx_xunlock(&xprt->xp_lock);
SVC_RELEASE(xprt);
}
mtx_destroy(&ct->ct_lock);
if (so) {
@ -859,12 +845,15 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
{
struct ct_data *ct = (struct ct_data *) arg;
struct uio uio;
struct mbuf *m;
struct mbuf *m, *m2;
struct ct_request *cr;
int error, rcvflag, foundreq;
uint32_t xid, header;
uint32_t xid_plus_direction[2], header;
bool_t do_read;
SVCXPRT *xprt;
struct cf_conn *cd;
CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
ct->ct_upcallrefs++;
uio.uio_td = curthread;
do {
@ -978,22 +967,64 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
&& ct->ct_record_eor) {
/*
* The XID is in the first uint32_t of
* the reply.
* the reply and the message direction
* is the second one.
*/
if (ct->ct_record->m_len < sizeof(xid) &&
if (ct->ct_record->m_len <
sizeof(xid_plus_direction) &&
m_length(ct->ct_record, NULL) <
sizeof(xid)) {
sizeof(xid_plus_direction)) {
m_freem(ct->ct_record);
break;
}
m_copydata(ct->ct_record, 0, sizeof(xid),
(char *)&xid);
xid = ntohl(xid);
m_copydata(ct->ct_record, 0,
sizeof(xid_plus_direction),
(char *)xid_plus_direction);
xid_plus_direction[0] =
ntohl(xid_plus_direction[0]);
xid_plus_direction[1] =
ntohl(xid_plus_direction[1]);
/* Check message direction. */
if (xid_plus_direction[1] == CALL) {
/* This is a backchannel request. */
mtx_lock(&ct->ct_lock);
xprt = ct->ct_backchannelxprt;
if (xprt == NULL) {
mtx_unlock(&ct->ct_lock);
/* Just throw it away. */
m_freem(ct->ct_record);
ct->ct_record = NULL;
} else {
cd = (struct cf_conn *)
xprt->xp_p1;
m2 = cd->mreq;
/*
* The requests are chained
* in the m_nextpkt list.
*/
while (m2 != NULL &&
m2->m_nextpkt != NULL)
/* Find end of list. */
m2 = m2->m_nextpkt;
if (m2 != NULL)
m2->m_nextpkt =
ct->ct_record;
else
cd->mreq =
ct->ct_record;
ct->ct_record->m_nextpkt =
NULL;
ct->ct_record = NULL;
xprt_active(xprt);
mtx_unlock(&ct->ct_lock);
}
} else {
mtx_lock(&ct->ct_lock);
foundreq = 0;
TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
if (cr->cr_xid == xid) {
TAILQ_FOREACH(cr, &ct->ct_pending,
cr_link) {
if (cr->cr_xid ==
xid_plus_direction[0]) {
/*
* This one
* matches. We leave
@ -1001,11 +1032,12 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
* cr->cr_mrep. Set
* the XID to zero so
* that we will ignore
* any duplicaed
* any duplicated
* replies.
*/
cr->cr_xid = 0;
cr->cr_mrep = ct->ct_record;
cr->cr_mrep =
ct->ct_record;
cr->cr_error = 0;
foundreq = 1;
wakeup(cr);
@ -1019,6 +1051,7 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
ct->ct_record = NULL;
}
}
}
} while (m);
ct->ct_upcallrefs--;
if (ct->ct_upcallrefs < 0)

111
sys/rpc/krpc.h Normal file
View File

@ -0,0 +1,111 @@
/*-
* Sun RPC is a product of Sun Microsystems, Inc. and is provided for
* unrestricted use provided that this legend is included on all tape
* media and as a part of the software program in whole or part. Users
* may copy or modify Sun RPC without charge, but are not authorized
* to license or distribute it to anyone else except as part of a product or
* program developed by the user.
*
* SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
* WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
* PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
*
* Sun RPC is provided with no support and without any obligation on the
* part of Sun Microsystems, Inc. to assist in its use, correction,
* modification or enhancement.
*
* SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
* INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
* OR ANY PART THEREOF.
*
* In no event will Sun Microsystems, Inc. be liable for any lost revenue
* or profits or other special, indirect and consequential damages, even if
* Sun has been advised of the possibility of such damages.
*
* Sun Microsystems, Inc.
* 2550 Garcia Avenue
* Mountain View, California 94043
*
* $FreeBSD$
*/
#ifndef _RPC_KRPC_H_
#define _RPC_KRPC_H_
#ifdef _KERNEL
/*
* Definitions now shared between client and server RPC for backchannels.
*/
#define MCALL_MSG_SIZE 24
/*
* A pending RPC request which awaits a reply. Requests which have
* received their reply will have cr_xid set to zero and cr_mrep to
* the mbuf chain of the reply.
*/
struct ct_request {
TAILQ_ENTRY(ct_request) cr_link;
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
};
TAILQ_HEAD(ct_request_list, ct_request);
struct rc_data {
struct mtx rc_lock;
struct sockaddr_storage rc_addr; /* server address */
struct netconfig* rc_nconf; /* network type */
rpcprog_t rc_prog; /* program number */
rpcvers_t rc_vers; /* version number */
size_t rc_sendsz;
size_t rc_recvsz;
struct timeval rc_timeout;
struct timeval rc_retry;
int rc_retries;
int rc_privport;
char *rc_waitchan;
int rc_intr;
int rc_connecting;
int rc_closed;
struct ucred *rc_ucred;
CLIENT* rc_client; /* underlying RPC client */
struct rpc_err rc_err;
void *rc_backchannel;
};
struct ct_data {
struct mtx ct_lock;
int ct_threads; /* number of threads in clnt_vc_call */
bool_t ct_closing; /* TRUE if we are closing */
bool_t ct_closed; /* TRUE if we are closed */
struct socket *ct_socket; /* connection socket */
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
struct sockaddr_storage ct_addr; /* remote addr */
struct rpc_err ct_error;
uint32_t ct_xid;
char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
size_t ct_mpos; /* pos after marshal */
const char *ct_waitchan;
int ct_waitflag;
struct mbuf *ct_record; /* current reply record */
size_t ct_record_resid; /* how much left of reply to read */
bool_t ct_record_eor; /* true if reading last fragment */
struct ct_request_list ct_pending;
int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
SVCXPRT *ct_backchannelxprt; /* xprt for backchannel */
};
struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
enum xprt_stat strm_stat;
struct mbuf *mpending; /* unparsed data read from the socket */
struct mbuf *mreq; /* current record being built from mpending */
uint32_t resid; /* number of bytes needed for fragment */
bool_t eor; /* reading last fragment of current record */
};
#endif /* _KERNEL */
#endif /* _RPC_KRPC_H_ */

View File

@ -703,6 +703,8 @@ extern SVCXPRT *svc_vc_create(SVCPOOL *, struct socket *,
* const size_t recvsize; -- max recv size
*/
extern SVCXPRT *svc_vc_create_backchannel(SVCPOOL *);
/*
* Generic TLI create routine
*/

View File

@ -65,6 +65,7 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc.h>
#include <rpc/krpc.h>
#include <rpc/rpc_com.h>
#include <security/mac/mac_framework.h>
@ -83,6 +84,14 @@ static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
static void svc_vc_backchannel_destroy(SVCXPRT *);
static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
struct sockaddr *, struct mbuf *);
static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
void *in);
static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
struct sockaddr *raddr);
static int svc_vc_accept(struct socket *head, struct socket **sop);
@ -105,12 +114,12 @@ static struct xp_ops svc_vc_ops = {
.xp_control = svc_vc_control
};
struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
enum xprt_stat strm_stat;
struct mbuf *mpending; /* unparsed data read from the socket */
struct mbuf *mreq; /* current record being built from mpending */
uint32_t resid; /* number of bytes needed for fragment */
bool_t eor; /* reading last fragment of current record */
static struct xp_ops svc_vc_backchannel_ops = {
.xp_recv = svc_vc_backchannel_recv,
.xp_stat = svc_vc_backchannel_stat,
.xp_reply = svc_vc_backchannel_reply,
.xp_destroy = svc_vc_backchannel_destroy,
.xp_control = svc_vc_backchannel_control
};
/*
@ -266,6 +275,28 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
return (NULL);
}
/*
* Create a new transport for a backchannel on a clnt_vc socket.
*/
SVCXPRT *
svc_vc_create_backchannel(SVCPOOL *pool)
{
SVCXPRT *xprt = NULL;
struct cf_conn *cd = NULL;
cd = mem_alloc(sizeof(*cd));
cd->strm_stat = XPRT_IDLE;
xprt = svc_xprt_alloc();
sx_init(&xprt->xp_lock, "xprt->xp_lock");
xprt->xp_pool = pool;
xprt->xp_socket = NULL;
xprt->xp_p1 = cd;
xprt->xp_p2 = NULL;
xprt->xp_ops = &svc_vc_backchannel_ops;
return (xprt);
}
/*
* This does all of the accept except the final call to soaccept. The
* caller will call soaccept after dropping its locks (soaccept may
@ -452,6 +483,22 @@ svc_vc_destroy(SVCXPRT *xprt)
mem_free(cd, sizeof(*cd));
}
static void
svc_vc_backchannel_destroy(SVCXPRT *xprt)
{
struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
struct mbuf *m, *m2;
svc_xprt_free(xprt);
m = cd->mreq;
while (m != NULL) {
m2 = m;
m = m->m_nextpkt;
m_freem(m2);
}
mem_free(cd, sizeof(*cd));
}
/*ARGSUSED*/
static bool_t
svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
@ -466,6 +513,13 @@ svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
return (FALSE);
}
static bool_t
svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
{
return (FALSE);
}
static enum xprt_stat
svc_vc_stat(SVCXPRT *xprt)
{
@ -506,6 +560,19 @@ svc_vc_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}
static enum xprt_stat
svc_vc_backchannel_stat(SVCXPRT *xprt)
{
struct cf_conn *cd;
cd = (struct cf_conn *)(xprt->xp_p1);
if (cd->mreq != NULL)
return (XPRT_MOREREQS);
return (XPRT_IDLE);
}
static bool_t
svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
struct sockaddr **addrp, struct mbuf **mp)
@ -679,6 +746,44 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
}
}
static bool_t
svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
struct sockaddr **addrp, struct mbuf **mp)
{
struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
struct ct_data *ct;
struct mbuf *m;
XDR xdrs;
sx_xlock(&xprt->xp_lock);
ct = (struct ct_data *)xprt->xp_p2;
if (ct == NULL) {
sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
mtx_lock(&ct->ct_lock);
m = cd->mreq;
if (m == NULL) {
xprt_inactive(xprt);
mtx_unlock(&ct->ct_lock);
sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
cd->mreq = m->m_nextpkt;
mtx_unlock(&ct->ct_lock);
sx_xunlock(&xprt->xp_lock);
xdrmbuf_create(&xdrs, m, XDR_DECODE);
if (! xdr_callmsg(&xdrs, msg)) {
XDR_DESTROY(&xdrs);
return (FALSE);
}
*addrp = NULL;
*mp = xdrmbuf_getall(&xdrs);
XDR_DESTROY(&xdrs);
return (TRUE);
}
static bool_t
svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
struct sockaddr *addr, struct mbuf *m)
@ -732,6 +837,65 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
return (stat);
}
static bool_t
svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
struct sockaddr *addr, struct mbuf *m)
{
struct ct_data *ct;
XDR xdrs;
struct mbuf *mrep;
bool_t stat = TRUE;
int error;
/*
* Leave space for record mark.
*/
MGETHDR(mrep, M_WAITOK, MT_DATA);
mrep->m_len = 0;
mrep->m_data += sizeof(uint32_t);
xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
if (!xdr_replymsg(&xdrs, msg))
stat = FALSE;
else
xdrmbuf_append(&xdrs, m);
} else {
stat = xdr_replymsg(&xdrs, msg);
}
if (stat) {
m_fixhdr(mrep);
/*
* Prepend a record marker containing the reply length.
*/
M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
*mtod(mrep, uint32_t *) =
htonl(0x80000000 | (mrep->m_pkthdr.len
- sizeof(uint32_t)));
sx_xlock(&xprt->xp_lock);
ct = (struct ct_data *)xprt->xp_p2;
if (ct != NULL)
error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
0, curthread);
else
error = EPIPE;
sx_xunlock(&xprt->xp_lock);
if (!error) {
stat = TRUE;
}
} else {
m_freem(mrep);
}
XDR_DESTROY(&xdrs);
return (stat);
}
static bool_t
svc_vc_null()
{