Implement non-blocking tcp-connections.

Reviewed by:	rwatson
Obtained from:	NetBSD
MFC after:	1 day
This commit is contained in:
Martin Blapp 2003-01-16 07:13:51 +00:00
parent 41f294aba1
commit 08497c026c
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=109359
7 changed files with 446 additions and 64 deletions

View File

@ -74,7 +74,7 @@ int __rpc_sockisbound(int);
struct netbuf *__rpcb_findaddr(rpcprog_t, rpcvers_t, const struct netconfig *,
const char *, CLIENT **);
bool_t __rpc_control(int,void *);
bool_t rpc_control(int,void *);
char *_get_next_token(char *, int);

View File

@ -70,7 +70,14 @@
*/
#define SVCGET_VERSQUIET 1
#define SVCSET_VERSQUIET 2
#define SVCGET_CONNMAXREC 3
#define SVCSET_CONNMAXREC 4
/*
* Operations for rpc_control().
*/
#define RPC_SVC_CONNMAXREC_SET 0 /* set max rec size, enable nonblock */
#define RPC_SVC_CONNMAXREC_GET 1
enum xprt_stat {
XPRT_DIED,

View File

@ -84,6 +84,9 @@ bool_t __rpc_control(int,void *);
char *_get_next_token(char *, int);
SVCXPRT **__svc_xports;
int __svc_maxrec;
__END_DECLS
#endif /* _RPC_RPCCOM_H */

View File

@ -63,8 +63,6 @@ __FBSDID("$FreeBSD$");
#include "rpc_com.h"
static SVCXPRT **xports;
#define RQCRED_SIZE 400 /* this size is excessive */
#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
@ -91,6 +89,7 @@ extern rwlock_t svc_fd_lock;
static struct svc_callout *svc_find(rpcprog_t, rpcvers_t,
struct svc_callout **, char *);
static void __xprt_do_unregister (SVCXPRT *xprt, bool_t dolock);
/* *************** SVCXPRT related stuff **************** */
@ -108,27 +107,40 @@ xprt_register(xprt)
sock = xprt->xp_fd;
rwlock_wrlock(&svc_fd_lock);
if (xports == NULL) {
xports = (SVCXPRT **)
if (__svc_xports == NULL) {
__svc_xports = (SVCXPRT **)
mem_alloc(FD_SETSIZE * sizeof(SVCXPRT *));
if (xports == NULL)
if (__svc_xports == NULL)
return;
memset(xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *));
memset(__svc_xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *));
}
if (sock < FD_SETSIZE) {
xports[sock] = xprt;
__svc_xports[sock] = xprt;
FD_SET(sock, &svc_fdset);
svc_maxfd = max(svc_maxfd, sock);
}
rwlock_unlock(&svc_fd_lock);
}
void
xprt_unregister(SVCXPRT *xprt)
{
__xprt_do_unregister(xprt, TRUE);
}
void
__xprt_unregister_unlocked(SVCXPRT *xprt)
{
__xprt_do_unregister(xprt, FALSE);
}
/*
* De-activate a transport handle.
*/
void
xprt_unregister(xprt)
static void
__xprt_do_unregister(xprt, dolock)
SVCXPRT *xprt;
bool_t dolock;
{
int sock;
@ -136,17 +148,19 @@ xprt_unregister(xprt)
sock = xprt->xp_fd;
rwlock_wrlock(&svc_fd_lock);
if ((sock < FD_SETSIZE) && (xports[sock] == xprt)) {
xports[sock] = NULL;
if (dolock)
rwlock_wrlock(&svc_fd_lock);
if ((sock < FD_SETSIZE) && (__svc_xports[sock] == xprt)) {
__svc_xports[sock] = NULL;
FD_CLR(sock, &svc_fdset);
if (sock >= svc_maxfd) {
for (svc_maxfd--; svc_maxfd>=0; svc_maxfd--)
if (xports[svc_maxfd])
if (__svc_xports[svc_maxfd])
break;
}
}
rwlock_unlock(&svc_fd_lock);
if (dolock)
rwlock_unlock(&svc_fd_lock);
}
/*
@ -611,7 +625,7 @@ svc_getreq_common(fd)
r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]);
rwlock_rdlock(&svc_fd_lock);
xprt = xports[fd];
xprt = __svc_xports[fd];
rwlock_unlock(&svc_fd_lock);
if (xprt == NULL)
/* But do we control sock? */
@ -667,7 +681,7 @@ svc_getreq_common(fd)
* If so, then break.
*/
rwlock_rdlock(&svc_fd_lock);
if (xprt != xports[fd]) {
if (xprt != __svc_xports[fd]) {
rwlock_unlock(&svc_fd_lock);
break;
}
@ -715,3 +729,24 @@ svc_getreq_poll(pfdp, pollretval)
}
}
}
bool_t
rpc_control(int what, void *arg)
{
int val;
switch (what) {
case RPC_SVC_CONNMAXREC_SET:
val = *(int *)arg;
if (val <= 0)
return FALSE;
__svc_maxrec = val;
return TRUE;
case RPC_SVC_CONNMAXREC_GET:
*(int *)arg = __svc_maxrec;
return TRUE;
default:
break;
}
return FALSE;
}

View File

@ -55,14 +55,19 @@ __FBSDID("$FreeBSD$");
void
svc_run()
{
fd_set readfds;
fd_set readfds, cleanfds;
struct timeval timeout;
extern rwlock_t svc_fd_lock;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
for (;;) {
rwlock_rdlock(&svc_fd_lock);
readfds = svc_fdset;
cleanfds = svc_fdset;
rwlock_unlock(&svc_fd_lock);
switch (_select(svc_maxfd+1, &readfds, NULL, NULL, NULL)) {
switch (select(svc_maxfd+1, &readfds, NULL, NULL, &timeout)) {
case -1:
FD_ZERO(&readfds);
if (errno == EINTR) {
@ -71,6 +76,7 @@ svc_run()
_warn("svc_run: - select failed");
return;
case 0:
__svc_clean_idle(&cleanfds, 30, FALSE);
continue;
default:
svc_getreqset(&readfds);

View File

@ -51,6 +51,7 @@ __FBSDID("$FreeBSD$");
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@ -58,6 +59,7 @@ __FBSDID("$FreeBSD$");
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -73,10 +75,13 @@ struct cmessage {
struct cmsgcred cmcred;
};
extern rwlock_t svc_fd_lock;
static SVCXPRT *makefd_xprt(int, u_int, u_int);
static bool_t rendezvous_request(SVCXPRT *, struct rpc_msg *);
static enum xprt_stat rendezvous_stat(SVCXPRT *);
static void svc_vc_destroy(SVCXPRT *);
static void __svc_vc_dodestroy (SVCXPRT *);
static int read_vc(void *, void *, int);
static int write_vc(void *, void *, int);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
@ -87,12 +92,15 @@ static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *);
static void svc_vc_rendezvous_ops(SVCXPRT *);
static void svc_vc_ops(SVCXPRT *);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
static int __msgread_withcred(int, void *, size_t, struct cmessage *);
static int __msgwrite(int, void *, size_t);
struct cf_rendezvous { /* kept in xprt->xp_p1 for rendezvouser */
u_int sendsize;
u_int recvsize;
int maxrec;
};
struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
@ -100,6 +108,11 @@ struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
u_int32_t x_id;
XDR xdrs;
char verf_body[MAX_AUTH_BYTES];
u_int sendsize;
u_int recvsize;
int maxrec;
bool_t nonblock;
struct timeval last_recv_time;
};
/*
@ -139,6 +152,7 @@ svc_vc_create(fd, sendsize, recvsize)
return NULL;
r->sendsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsize);
r->recvsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsize);
r->maxrec = __svc_maxrec;
xprt = mem_alloc(sizeof(SVCXPRT));
if (xprt == NULL) {
warnx("svc_vc_create: out of memory");
@ -285,11 +299,14 @@ rendezvous_request(xprt, msg)
SVCXPRT *xprt;
struct rpc_msg *msg;
{
int sock;
int sock, flags;
struct cf_rendezvous *r;
struct cf_conn *cd;
struct sockaddr_storage addr;
socklen_t len;
struct __rpc_sockinfo si;
SVCXPRT *newxprt;
fd_set cleanfds;
assert(xprt != NULL);
assert(msg != NULL);
@ -301,21 +318,30 @@ rendezvous_request(xprt, msg)
&len)) < 0) {
if (errno == EINTR)
goto again;
return (FALSE);
/*
* Clean out the most idle file descriptor when we're
* running out.
*/
if (errno == EMFILE || errno == ENFILE) {
cleanfds = svc_fdset;
__svc_clean_idle(&cleanfds, 0, FALSE);
goto again;
}
return (FALSE);
}
/*
* make a new transporter (re-uses xprt)
*/
xprt = makefd_xprt(sock, r->sendsize, r->recvsize);
xprt->xp_rtaddr.buf = mem_alloc(len);
if (xprt->xp_rtaddr.buf == NULL)
newxprt = makefd_xprt(sock, r->sendsize, r->recvsize);
newxprt->xp_rtaddr.buf = mem_alloc(len);
if (newxprt->xp_rtaddr.buf == NULL)
return (FALSE);
memcpy(xprt->xp_rtaddr.buf, &addr, len);
xprt->xp_rtaddr.len = len;
memcpy(newxprt->xp_rtaddr.buf, &addr, len);
newxprt->xp_rtaddr.len = len;
#ifdef PORTMAP
if (addr.ss_family == AF_INET || addr.ss_family == AF_LOCAL) {
xprt->xp_raddr = *(struct sockaddr_in *)xprt->xp_rtaddr.buf;
xprt->xp_addrlen = sizeof (struct sockaddr_in);
newxprt->xp_raddr = *(struct sockaddr_in *)newxprt->xp_rtaddr.buf;
newxprt->xp_addrlen = sizeof (struct sockaddr_in);
}
#endif /* PORTMAP */
if (__rpc_fd2sockinfo(sock, &si) && si.si_proto == IPPROTO_TCP) {
@ -323,6 +349,28 @@ rendezvous_request(xprt, msg)
/* XXX fvdl - is this useful? */
_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &len, sizeof (len));
}
cd = (struct cf_conn *)newxprt->xp_p1;
cd->recvsize = r->recvsize;
cd->sendsize = r->sendsize;
cd->maxrec = r->maxrec;
if (cd->maxrec != 0) {
flags = fcntl(sock, F_GETFL, 0);
if (flags == -1)
return (FALSE);
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
return (FALSE);
if (cd->recvsize > cd->maxrec)
cd->recvsize = cd->maxrec;
cd->nonblock = TRUE;
__xdrrec_setnonblock(&cd->xdrs, cd->maxrec);
} else
cd->nonblock = FALSE;
gettimeofday(&cd->last_recv_time, NULL);
return (FALSE); /* there is never an rpc msg to be processed */
}
@ -338,15 +386,22 @@ rendezvous_stat(xprt)
static void
svc_vc_destroy(xprt)
SVCXPRT *xprt;
{
assert(xprt != NULL);
xprt_unregister(xprt);
__svc_vc_dodestroy(xprt);
}
static void
__svc_vc_dodestroy(xprt)
SVCXPRT *xprt;
{
struct cf_conn *cd;
struct cf_rendezvous *r;
assert(xprt != NULL);
cd = (struct cf_conn *)xprt->xp_p1;
xprt_unregister(xprt);
if (xprt->xp_fd != RPC_ANYFD)
(void)_close(xprt->xp_fd);
if (xprt->xp_port != 0) {
@ -380,6 +435,30 @@ svc_vc_control(xprt, rq, in)
return (FALSE);
}
static bool_t
svc_vc_rendezvous_control(xprt, rq, in)
SVCXPRT *xprt;
const u_int rq;
void *in;
{
struct cf_rendezvous *cfp;
cfp = (struct cf_rendezvous *)xprt->xp_p1;
if (cfp == NULL)
return (FALSE);
switch (rq) {
case SVCGET_CONNMAXREC:
*(int *)in = cfp->maxrec;
break;
case SVCSET_CONNMAXREC:
cfp->maxrec = *(int *)in;
break;
default:
return (FALSE);
}
return (TRUE);
}
/*
* reads data from the tcp or uip connection.
* any error is fatal and the connection is closed.
@ -399,12 +478,28 @@ read_vc(xprtp, buf, len)
struct pollfd pollfd;
struct sockaddr *sa;
struct cmessage *cm;
struct cf_conn *cfp;
xprt = (SVCXPRT *)xprtp;
assert(xprt != NULL);
sock = xprt->xp_fd;
cfp = (struct cf_conn *)xprt->xp_p1;
if (cfp->nonblock) {
len = read(sock, buf, (size_t)len);
if (len < 0) {
if (errno == EAGAIN)
len = 0;
else
goto fatal_err;
}
if (len != 0)
gettimeofday(&cfp->last_recv_time, NULL);
return len;
}
do {
pollfd.fd = sock;
pollfd.events = POLLIN;
@ -432,8 +527,10 @@ read_vc(xprtp, buf, len)
} else
goto fatal_err;
} else {
if ((len = _read(sock, buf, (size_t)len)) > 0)
if ((len = read(sock, buf, (size_t)len)) > 0) {
gettimeofday(&cfp->last_recv_time, NULL);
return (len);
}
}
fatal_err:
@ -454,27 +551,41 @@ write_vc(xprtp, buf, len)
SVCXPRT *xprt;
int i, cnt;
struct sockaddr *sa;
struct cf_conn *cd;
struct timeval tv0, tv1;
xprt = (SVCXPRT *)xprtp;
assert(xprt != NULL);
cd = (struct cf_conn *)xprt->xp_p1;
if (cd->nonblock)
gettimeofday(&tv0, NULL);
sa = (struct sockaddr *)xprt->xp_rtaddr.buf;
if (sa->sa_family == AF_LOCAL) {
for (cnt = len; cnt > 0; cnt -= i, buf += i) {
if ((i = __msgwrite(xprt->xp_fd, buf,
(size_t)cnt)) < 0) {
((struct cf_conn *)(xprt->xp_p1))->strm_stat =
XPRT_DIED;
for (cnt = len; cnt > 0; cnt -= i, buf += i) {
if (sa->sa_family == AF_LOCAL)
i = __msgwrite(xprt->xp_fd, buf, (size_t)cnt);
else
i = _write(xprt->xp_fd, buf, (size_t)cnt);
if (i < 0) {
if (errno != EAGAIN || !cd->nonblock) {
cd->strm_stat = XPRT_DIED;
return (-1);
}
}
} else {
for (cnt = len; cnt > 0; cnt -= i, buf += i) {
if ((i = _write(xprt->xp_fd, buf,
(size_t)cnt)) < 0) {
((struct cf_conn *)(xprt->xp_p1))->strm_stat =
XPRT_DIED;
return (-1);
if (cd->nonblock && i != cnt) {
/*
* For non-blocking connections, do not
* take more than 2 seconds writing the
* data out.
*
* XXX 2 is an arbitrary amount.
*/
gettimeofday(&tv1, NULL);
if (tv1.tv_sec - tv0.tv_sec >= 2) {
cd->strm_stat = XPRT_DIED;
return (-1);
}
}
}
}
@ -513,6 +624,11 @@ svc_vc_recv(xprt, msg)
cd = (struct cf_conn *)(xprt->xp_p1);
xdrs = &(cd->xdrs);
if (cd->nonblock) {
if (!__xdrrec_getrec(xdrs, &cd->strm_stat, TRUE))
return FALSE;
}
xdrs->x_op = XDR_DECODE;
(void)xdrrec_skiprecord(xdrs);
if (xdr_callmsg(xdrs, msg)) {
@ -560,7 +676,7 @@ svc_vc_reply(xprt, msg)
{
struct cf_conn *cd;
XDR *xdrs;
bool_t stat;
bool_t rstat;
assert(xprt != NULL);
assert(msg != NULL);
@ -570,9 +686,9 @@ svc_vc_reply(xprt, msg)
xdrs->x_op = XDR_ENCODE;
msg->rm_xid = cd->x_id;
stat = xdr_replymsg(xdrs, msg);
rstat = xdr_replymsg(xdrs, msg);
(void)xdrrec_endofrecord(xdrs, TRUE);
return (stat);
return (rstat);
}
static void
@ -619,7 +735,7 @@ svc_vc_rendezvous_ops(xprt)
ops.xp_freeargs =
(bool_t (*)(SVCXPRT *, xdrproc_t, void *))abort,
ops.xp_destroy = svc_vc_destroy;
ops2.xp_control = svc_vc_control;
ops2.xp_control = svc_vc_rendezvous_control;
}
xprt->xp_ops = &ops;
xprt->xp_ops2 = &ops2;
@ -720,3 +836,53 @@ __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid)
*uid = cmcred->cmcred_euid;
return (0);
}
/*
* Destroy xprts that have not have had any activity in 'timeout' seconds.
* If 'cleanblock' is true, blocking connections (the default) are also
* cleaned. If timeout is 0, the least active connection is picked.
*/
bool_t
__svc_clean_idle(fd_set *fds, int timeout, bool_t cleanblock)
{
int i, ncleaned;
SVCXPRT *xprt, *least_active;
struct timeval tv, tdiff, tmax;
struct cf_conn *cd;
gettimeofday(&tv, NULL);
tmax.tv_sec = tmax.tv_usec = 0;
least_active = NULL;
rwlock_wrlock(&svc_fd_lock);
for (i = ncleaned = 0; i <= svc_maxfd; i++) {
if (FD_ISSET(i, fds)) {
xprt = __svc_xports[i];
if (xprt == NULL || xprt->xp_ops == NULL ||
xprt->xp_ops->xp_recv != svc_vc_recv)
continue;
cd = (struct cf_conn *)xprt->xp_p1;
if (!cleanblock && !cd->nonblock)
continue;
if (timeout == 0) {
timersub(&tv, &cd->last_recv_time, &tdiff);
if (timercmp(&tdiff, &tmax, >)) {
tmax = tdiff;
least_active = xprt;
}
continue;
}
if (tv.tv_sec - cd->last_recv_time.tv_sec > timeout) {
__xprt_unregister_unlocked(xprt);
__svc_vc_dodestroy(xprt);
ncleaned++;
}
}
}
if (timeout == 0 && least_active != NULL) {
__xprt_unregister_unlocked(least_active);
__svc_vc_dodestroy(least_active);
ncleaned++;
}
rwlock_unlock(&svc_fd_lock);
return ncleaned > 0 ? TRUE : FALSE;
}

View File

@ -66,6 +66,10 @@ __FBSDID("$FreeBSD$");
#include <rpc/types.h>
#include <rpc/xdr.h>
#include <rpc/auth.h>
#include <rpc/svc.h>
#include <rpc/clnt.h>
#include <sys/stddef.h>
#include "un-namespace.h"
static bool_t xdrrec_getlong(XDR *, long *);
@ -91,7 +95,7 @@ static const struct xdr_ops xdrrec_ops = {
/*
* A record is composed of one or more record fragments.
* A record fragment is a two-byte header followed by zero to
* A record fragment is a four-byte header followed by zero to
* 2**32-1 bytes. The header is treated as a long unsigned and is
* encode/decoded to the network via htonl/ntohl. The low order 31 bits
* are a byte count of the fragment. The highest order bit is a boolean:
@ -106,7 +110,6 @@ static const struct xdr_ops xdrrec_ops = {
typedef struct rec_strm {
char *tcp_handle;
char *the_buffer;
/*
* out-goung bits
*/
@ -128,6 +131,15 @@ typedef struct rec_strm {
bool_t last_frag;
u_int sendsize;
u_int recvsize;
bool_t nonblock;
bool_t in_haveheader;
u_int32_t in_header;
char *in_hdrp;
int in_hdrlen;
int in_reclen;
int in_received;
int in_maxrec;
} RECSTREAM;
static u_int fix_buf_size(u_int);
@ -136,6 +148,7 @@ static bool_t fill_input_buf(RECSTREAM *);
static bool_t get_input_bytes(RECSTREAM *, char *, int);
static bool_t set_input_fragment(RECSTREAM *);
static bool_t skip_input_bytes(RECSTREAM *, long);
static bool_t realloc_stream(RECSTREAM *, int);
/*
@ -168,20 +181,21 @@ xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit)
*/
return;
}
/*
* adjust sizes and allocate buffer quad byte aligned
*/
rstrm->sendsize = sendsize = fix_buf_size(sendsize);
rstrm->recvsize = recvsize = fix_buf_size(recvsize);
rstrm->the_buffer = mem_alloc(sendsize + recvsize + BYTES_PER_XDR_UNIT);
if (rstrm->the_buffer == NULL) {
rstrm->out_base = mem_alloc(rstrm->sendsize);
if (rstrm->out_base == NULL) {
warnx("xdrrec_create: out of memory");
mem_free(rstrm, sizeof(RECSTREAM));
return;
}
rstrm->recvsize = recvsize = fix_buf_size(recvsize);
rstrm->in_base = mem_alloc(recvsize);
if (rstrm->in_base == NULL) {
warnx("xdrrec_create: out of memory");
mem_free(rstrm->out_base, sendsize);
mem_free(rstrm, sizeof(RECSTREAM));
return;
}
for (rstrm->out_base = rstrm->the_buffer;
(u_long)rstrm->out_base % BYTES_PER_XDR_UNIT != 0;
rstrm->out_base++);
rstrm->in_base = rstrm->out_base + sendsize;
/*
* now the rest ...
*/
@ -200,6 +214,12 @@ xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit)
rstrm->in_finger = (rstrm->in_boundry += recvsize);
rstrm->fbtbc = 0;
rstrm->last_frag = TRUE;
rstrm->in_haveheader = FALSE;
rstrm->in_hdrlen = 0;
rstrm->in_hdrp = (char *)(void *)&rstrm->in_header;
rstrm->nonblock = FALSE;
rstrm->in_reclen = 0;
rstrm->in_received = 0;
}
@ -413,8 +433,8 @@ xdrrec_destroy(xdrs)
{
RECSTREAM *rstrm = (RECSTREAM *)xdrs->x_private;
mem_free(rstrm->the_buffer,
rstrm->sendsize + rstrm->recvsize + BYTES_PER_XDR_UNIT);
mem_free(rstrm->out_base, rstrm->sendsize);
mem_free(rstrm->in_base, rstrm->recvsize);
mem_free(rstrm, sizeof(RECSTREAM));
}
@ -432,6 +452,20 @@ xdrrec_skiprecord(xdrs)
XDR *xdrs;
{
RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
enum xprt_stat xstat;
if (rstrm->nonblock) {
if (__xdrrec_getrec(xdrs, &xstat, FALSE)) {
rstrm->fbtbc = 0;
return TRUE;
}
if (rstrm->in_finger == rstrm->in_boundry &&
xstat == XPRT_MOREREQS) {
rstrm->fbtbc = 0;
return TRUE;
}
return FALSE;
}
while (rstrm->fbtbc > 0 || (! rstrm->last_frag)) {
if (! skip_input_bytes(rstrm, rstrm->fbtbc))
@ -454,6 +488,15 @@ xdrrec_eof(xdrs)
XDR *xdrs;
{
RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
enum xprt_stat xstat;
if (rstrm->nonblock) {
if (__xdrrec_getrec(xdrs, &xstat, FALSE))
return FALSE;
if (!rstrm->in_haveheader && xstat == XPRT_IDLE)
return TRUE;
return FALSE;
}
while (rstrm->fbtbc > 0 || (! rstrm->last_frag)) {
if (! skip_input_bytes(rstrm, rstrm->fbtbc))
@ -495,6 +538,99 @@ xdrrec_endofrecord(xdrs, sendnow)
return (TRUE);
}
/*
* Fill the stream buffer with a record for a non-blocking connection.
* Return true if a record is available in the buffer, false if not.
*/
bool_t
__xdrrec_getrec(xdrs, statp, expectdata)
XDR *xdrs;
enum xprt_stat *statp;
bool_t expectdata;
{
RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
ssize_t n;
int fraglen;
if (!rstrm->in_haveheader) {
n = rstrm->readit(rstrm->tcp_handle, rstrm->in_hdrp,
(int)sizeof (rstrm->in_header) - rstrm->in_hdrlen);
if (n == 0) {
*statp = expectdata ? XPRT_DIED : XPRT_IDLE;
return FALSE;
}
if (n < 0) {
*statp = XPRT_DIED;
return FALSE;
}
rstrm->in_hdrp += n;
rstrm->in_hdrlen += n;
if (rstrm->in_hdrlen < sizeof (rstrm->in_header)) {
*statp = XPRT_MOREREQS;
return FALSE;
}
rstrm->in_header = ntohl(rstrm->in_header);
fraglen = (int)(rstrm->in_header & ~LAST_FRAG);
if (fraglen == 0 || fraglen > rstrm->in_maxrec ||
(rstrm->in_reclen + fraglen) > rstrm->in_maxrec) {
*statp = XPRT_DIED;
return FALSE;
}
rstrm->in_reclen += fraglen;
if (rstrm->in_reclen > rstrm->recvsize)
realloc_stream(rstrm, rstrm->in_reclen);
if (rstrm->in_header & LAST_FRAG) {
rstrm->in_header &= ~LAST_FRAG;
rstrm->last_frag = TRUE;
}
}
n = rstrm->readit(rstrm->tcp_handle,
rstrm->in_base + rstrm->in_received,
(rstrm->in_reclen - rstrm->in_received));
if (n < 0) {
*statp = XPRT_DIED;
return FALSE;
}
if (n == 0) {
*statp = expectdata ? XPRT_DIED : XPRT_IDLE;
return FALSE;
}
rstrm->in_received += n;
if (rstrm->in_received == rstrm->in_reclen) {
rstrm->in_haveheader = FALSE;
rstrm->in_hdrp = (char *)(void *)&rstrm->in_header;
rstrm->in_hdrlen = 0;
if (rstrm->last_frag) {
rstrm->fbtbc = rstrm->in_reclen;
rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen;
rstrm->in_finger = rstrm->in_base;
*statp = XPRT_MOREREQS;
return TRUE;
}
}
*statp = XPRT_MOREREQS;
return FALSE;
}
bool_t
__xdrrec_setnonblock(xdrs, maxrec)
XDR *xdrs;
int maxrec;
{
RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
rstrm->nonblock = TRUE;
if (maxrec == 0)
maxrec = rstrm->recvsize;
rstrm->in_maxrec = maxrec;
return TRUE;
}
/*
* Internal useful routines
@ -527,6 +663,9 @@ fill_input_buf(rstrm)
u_int32_t i;
int len;
if (rstrm->nonblock)
return FALSE;
where = rstrm->in_base;
i = (u_int32_t)((u_long)rstrm->in_boundry % BYTES_PER_XDR_UNIT);
where += i;
@ -619,3 +758,29 @@ fix_buf_size(s)
s = 4000;
return (RNDUP(s));
}
/*
* Reallocate the input buffer for a non-block stream.
*/
static bool_t
realloc_stream(rstrm, size)
RECSTREAM *rstrm;
int size;
{
ptrdiff_t diff;
char *buf;
if (size > rstrm->recvsize) {
buf = realloc(rstrm->in_base, (size_t)size);
if (buf == NULL)
return FALSE;
diff = buf - rstrm->in_base;
rstrm->in_finger += diff;
rstrm->in_base = buf;
rstrm->in_boundry = buf + size;
rstrm->recvsize = size;
rstrm->in_size = size;
}
return TRUE;
}