This commit changes the YPPROC_ALL procecdure so that it handles requests

_without_ using fork().

The problem with YPPROC_ALL is that it transmits an entire map through
a TCP pipe as the result of a single RPC call. First of all, this requires
certain hackery in the XDR filter. Second, if the map being sent is
large, the server can end up spending lots of time in the XDR filter
sending to just the one client, while requests for other clients will
go unanswered.

My original solution for this was to fork() the request into a child
process which terminates after the map has been transmitted (or the
transfer is interrupted due to an error). This leaves the parent free
to handle other requests. But this solution is kind of lame: fork()
is relatively expensive, and we have to keep a cap on the number of
child processes to keep from swamping the system.

What we do now is grab control of the service transport handle and XDR
handle from the RPC library and send the records one at a time ourselves
instead of letting the RPC library do it. We send a record, then go
back to the svc_run() loop and select() on the socket. If select() says
we can still write data, we send the next record. Then we call
svc_getreqset() and handle other RPCs and loop around again. This way,
we can handle other RPCs between records.

We manage multiple YPPROC_ALL requests using a circular queue. When a
request is done, we dequeue it and destroy the handle. We also tag
each request with a ttl which is decremented whevever we run the queue
and a handle isn't serviced. This lets us nuke requests that have sat
idle for too long (if we didn't do this, we might run out of socket
descriptors.)

Now all I have to do is come up with an async resolver, and ypserv
won't need to fork() at all. :)

Note: these changes should not go into 2.2 unless they get a very
throrough shakedown before the final cutoff date.
This commit is contained in:
Bill Paul 1996-11-30 22:38:44 +00:00
parent 84e1b7d26b
commit faf215c7ad
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=20053
5 changed files with 302 additions and 95 deletions

View File

@ -1,8 +1,8 @@
# $Id: Makefile,v 1.5 1996/06/25 20:27:55 wpaul Exp $
# $Id: Makefile,v 1.6 1996/09/15 00:39:19 wpaul Exp $
PROG= ypserv
SRCS= yp_svc.c yp_server.c yp_dblookup.c yp_dnslookup.c \
ypxfr_clnt.c yp_main.c yp_error.c yp_access.c
ypxfr_clnt.c yp_main.c yp_error.c yp_access.c yp_async.c
MAN8= ypserv.8

276
usr.sbin/ypserv/yp_async.c Normal file
View File

@ -0,0 +1,276 @@
/*
* Copyright (c) 1995, 1996
* Bill Paul <wpaul@ctr.columbia.edu>. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by Bill Paul.
* 4. Neither the name of the author nor the names of any co-contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY Bill Paul AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Bill Paul OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $Id: yp_async.c,v 1.5 1996/11/30 21:22:48 wpaul Exp $
*/
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <limits.h>
#include <db.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/queue.h>
#include <rpc/rpc.h>
#include <rpcsvc/yp.h>
#include "yp_extern.h"
#ifndef lint
static const char rcsid[] = "$Id: yp_async.c,v 1.5 1996/11/30 21:22:48 wpaul Exp $";
#endif
/*
* This is the code that lets us handle yp_all() requests without
* using fork(). The idea is that we steal away the transport and
* XDR handles from the RPC library and handle the transactions
* ourselves. Normally, for a TCP request, the RPC library will
* send all the entire response at once, which means that if the
* response is large (like, say, a huge password map), then we could
* block for a long time inside the XDR filter. This would cause other
* clients to block while waiting for us to finish, which is bad.
*
* Previously, we handled this by fork()ing off the request into a
* child process, thereby allowing the parent (us) to service other
* requests. But this can incurr a lot of overhead and we have to
* limit the number of simultaneous children to avoid swapming the
* system.
*
* What we do now is handle the request one record at a time. We send
* the first record, then head back to the svc_run() loop to handle
* more requests. If select() says we can still write data to the
* socket, we send the next record. When we reach the end of the
* map or the client stops receiving, we dequeue the request and
* destroy the handle.
*
* The mechanism we use to steal the transport and XDR handles away
* from the RPC library is quite evil. Basically, we call svc_sendreply()
* and hand it a custom XDR filter that calls yp_add_async() to register
* the request, then bails out via longjmp() back to the ypproc_all_2_svc()
* routine before the library can shut down the pipe on us. We then
* unregister the transport from the library (so that svc_getreqset()
* will no longer talk to it) and handle it ourselves.
*/
extern int _rpc_dtablesize __P(( void ));
#ifndef QUEUE_TTL
#define QUEUE_TTL 255
#endif
static fd_set writefds;
static int fd_setsize;
static CIRCLEQ_HEAD(asynchead, asyncq_entry) ahead;
struct asyncq_entry {
DB *dbp; /* database handle */
XDR *xdrs; /* XDR handle */
SVCXPRT *xprt; /* transport handle */
DBT key;
int ttl;
CIRCLEQ_ENTRY(asyncq_entry) links;
};
void yp_init_async()
{
struct rlimit rlim;
CIRCLEQ_INIT(&ahead);
/*
* Jack up the number of file descriptors.
* We may need them if we end up with a lot
* of requests queued.
*/
if (getrlimit(RLIMIT_NOFILE, &rlim) == -1) {
yp_error("couldn't get filedesc limit: %s", strerror(errno));
return;
}
rlim.rlim_cur = rlim.rlim_max;
if (setrlimit(RLIMIT_NOFILE, &rlim) == -1) {
yp_error("couldn't set filedesc limit: %s", strerror(errno));
return;
}
return;
}
static bool_t yp_xmit_ypresp_all(q)
struct asyncq_entry *q;
{
DBT data = { NULL, 0 };
ypresp_all obj;
/* Get a record */
if ((obj.ypresp_all_u.val.stat =
yp_next_record(q->dbp, &q->key, &data, 1, 0)) == YP_TRUE) {
obj.ypresp_all_u.val.val.valdat_len = data.size;
obj.ypresp_all_u.val.val.valdat_val = data.data;
obj.ypresp_all_u.val.key.keydat_len = q->key.size;
obj.ypresp_all_u.val.key.keydat_val = q->key.data;
obj.more = TRUE;
} else {
obj.more = FALSE;
}
/* Serialize */
q->xdrs->x_op = XDR_ENCODE;
if (xdr_ypresp_all(q->xdrs, &obj) == FALSE)
return(FALSE);
return(obj.more);
}
static void yp_remove_async(q)
struct asyncq_entry *q;
{
xdrrec_endofrecord(q->xdrs, TRUE);
svc_destroy(q->xprt);
(void)(q->dbp->close)(q->dbp);
CIRCLEQ_REMOVE(&ahead, q, links);
free(q);
return;
}
bool_t yp_add_async(xdrs, xprt, dbp)
XDR *xdrs;
SVCXPRT *xprt;
DB *dbp;
{
register struct asyncq_entry *q;
q = (struct asyncq_entry *)calloc(1, sizeof(struct asyncq_entry));
if (q == NULL) {
yp_error("failed to malloc() asyncq entry: %s",
strerror(errno));
return(FALSE);
}
xprt_unregister(xprt);
q->xdrs = xdrs;
q->xdrs->x_op = XDR_ENCODE;
q->dbp = dbp;
q->xprt = xprt;
q->key.size = 0;
q->key.data = NULL;
q->ttl = QUEUE_TTL;
CIRCLEQ_INSERT_HEAD(&ahead, q, links);
return(TRUE);
}
static void yp_handle_async()
{
register struct asyncq_entry *q;
restart:
for (q = ahead.cqh_first; q != (void *)&ahead; q = q->links.cqe_next) {
if (FD_ISSET(q->xprt->xp_sock, &writefds)) {
q->ttl = QUEUE_TTL;
if (yp_xmit_ypresp_all(q) == FALSE) {
yp_remove_async(q);
goto restart;
}
}
}
}
static int yp_set_async_fds()
{
register struct asyncq_entry *q;
int havefds;
restart:
havefds = 0;
FD_ZERO(&writefds);
for (q = ahead.cqh_first; q != (void *)&ahead; q = q->links.cqe_next) {
q->ttl--;
if (q->ttl <= 0) {
yp_remove_async(q);
goto restart;
} else {
FD_SET(q->xprt->xp_sock, &writefds);
havefds++;
}
}
return(havefds);
}
void
yp_svc_run()
{
#ifdef FD_SETSIZE
fd_set readfds;
#else
int readfds;
#endif /* def FD_SETSIZE */
extern int forked;
int pid;
int w;
fd_setsize = _rpc_dtablesize();
/* Establish the identity of the parent ypserv process. */
pid = getpid();
for (;;) {
#ifdef FD_SETSIZE
readfds = svc_fdset;
#else
readfds = svc_fds;
#endif /* def FD_SETSIZE */
w = yp_set_async_fds();
switch (select(fd_setsize, &readfds, w ? &writefds : NULL,
NULL, (struct timeval *)0)) {
case -1:
if (errno == EINTR) {
continue;
}
perror("svc_run: - select failed");
return;
case 0:
continue;
default:
yp_handle_async();
svc_getreqset(&readfds);
if (forked && pid != getpid())
exit(0);
}
}
}

View File

@ -29,7 +29,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $Id: yp_extern.h,v 1.4 1996/04/28 04:38:50 wpaul Exp $
* $Id: yp_extern.h,v 1.5 1996/10/24 18:58:24 wpaul Exp $
*/
#include <stdio.h>
#include <string.h>
@ -85,3 +85,7 @@ extern void yp_flush_all __P(( void ));
extern void yp_init_dbs __P(( void ));
extern int yp_testflag __P(( char *, char *, int ));
extern void load_securenets __P(( void ));
extern void yp_svc_run __P(( void ));
extern void yp_init_async __P(( void ));
extern bool_t yp_add_async __P(( XDR *, SVCXPRT *, DB * ));

View File

@ -29,7 +29,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $Id: yp_main.c,v 1.3 1996/05/01 02:39:34 wpaul Exp wpaul $
* $Id: yp_main.c,v 1.6 1996/05/31 16:01:50 wpaul Exp $
*/
/*
@ -66,7 +66,7 @@
#define _RPCSVC_CLOSEDOWN 120
#ifndef lint
static const char rcsid[] = "$Id: yp_main.c,v 1.3 1996/05/01 02:39:34 wpaul Exp wpaul $";
static const char rcsid[] = "$Id: yp_main.c,v 1.6 1996/05/31 16:01:50 wpaul Exp $";
#endif /* not lint */
int _rpcpmstart; /* Started by a port monitor ? */
static int _rpcfdtype;
@ -79,7 +79,6 @@ static int _rpcfdtype;
extern void ypprog_1 __P((struct svc_req, register SVCXPRT));
extern void ypprog_2 __P((struct svc_req, register SVCXPRT));
extern int _rpc_dtablesize __P((void));
extern int _rpcsvcstate; /* Set when a request is serviced */
char *progname = "ypserv";
char *yp_dir = _PATH_YP;
@ -98,45 +97,6 @@ void _msgout(char* msg)
syslog(LOG_ERR, msg);
}
static void
yp_svc_run()
{
#ifdef FD_SETSIZE
fd_set readfds;
#else
int readfds;
#endif /* def FD_SETSIZE */
extern int forked;
int pid;
int fd_setsize = _rpc_dtablesize();
/* Establish the identity of the parent ypserv process. */
pid = getpid();
for (;;) {
#ifdef FD_SETSIZE
readfds = svc_fdset;
#else
readfds = svc_fds;
#endif /* def FD_SETSIZE */
switch (select(fd_setsize, &readfds, NULL, NULL,
(struct timeval *)0)) {
case -1:
if (errno == EINTR) {
continue;
}
perror("svc_run: - select failed");
return;
case 0:
continue;
default:
svc_getreqset(&readfds);
if (forked && pid != getpid())
exit(0);
}
}
}
static void unregister()
{
(void) pmap_unset(YPPROG, YPVERS);
@ -230,6 +190,7 @@ main(argc, argv)
}
load_securenets();
yp_init_async();
#ifdef DB_CACHE
yp_init_dbs();
#endif

View File

@ -43,6 +43,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <rpc/rpc.h>
#include <setjmp.h>
#ifndef lint
static const char rcsid[] = "$Id: yp_server.c,v 1.10 1996/05/31 16:01:51 wpaul Exp $";
@ -51,10 +52,12 @@ static const char rcsid[] = "$Id: yp_server.c,v 1.10 1996/05/31 16:01:51 wpaul E
int forked = 0;
int children = 0;
static DB *spec_dbp = NULL; /* Special global DB handle for ypproc_all. */
static SVCXPRT *xprt; /* Special SVCXPRT handle for ypproc_all. */
static char *master_string = "YP_MASTER_NAME";
static char *order_string = "YP_LAST_MODIFIED";
static int master_sz = sizeof("YP_MASTER_NAME") - 1;
static int order_sz = sizeof("YP_LAST_MODIFIED") - 1;
static jmp_buf env;
/*
* NIS v2 support. This is where most of the action happens.
@ -473,34 +476,19 @@ ypproc_clear_2_svc(void *argp, struct svc_req *rqstp)
*/
/*
* Custom XDR routine for serialzing results of ypproc_all: keep
* reading from the database and spew until we run out of records
* or encounter an error.
* Custom XDR routine for serialzing results of ypproc_all: grab control
* of the transport and xdr handle from the RPC library and this request
* to the async queue. It will multiplex the record transmission in such
* a way that we can service other requests between transmissions and
* avoid blocking. (It will also close the DB handle for us when the
* request is done.)
*/
static bool_t
xdr_my_ypresp_all(register XDR *xdrs, ypresp_all *objp)
{
DBT key = { NULL, 0 } , data = { NULL, 0 };
while (1) {
/* Get a record. */
if ((objp->ypresp_all_u.val.stat =
yp_next_record(spec_dbp,&key,&data,1,0)) == YP_TRUE) {
objp->ypresp_all_u.val.val.valdat_len = data.size;
objp->ypresp_all_u.val.val.valdat_val = data.data;
objp->ypresp_all_u.val.key.keydat_len = key.size;
objp->ypresp_all_u.val.key.keydat_val = key.data;
objp->more = TRUE;
} else {
objp->more = FALSE;
}
/* Serialize. */
if (!xdr_ypresp_all(xdrs, objp))
return(FALSE);
if (objp->more == FALSE)
return(TRUE);
}
if (yp_add_async(xdrs, xprt, spec_dbp) == FALSE)
return(FALSE);
longjmp(env, 1); /* XXX EVIL!! */
}
ypresp_all *
@ -531,38 +519,16 @@ ypproc_all_2_svc(ypreq_nokey *argp, struct svc_req *rqstp)
return (&result);
}
/*
* The ypproc_all procedure can take a while to complete.
* Best to handle it in a subprocess so the parent doesn't
* block. (Is there a better way to do this? Maybe with
* async socket I/O?)
*/
if (!debug && children < MAX_CHILDREN && fork()) {
children++;
forked = 0;
return (NULL);
} else {
forked++;
}
#ifndef DB_CACHE
if ((spec_dbp = yp_open_db(argp->domain, argp->map)) == NULL) {
result.ypresp_all_u.val.stat = yp_errno;
return(&result);
}
#else
if ((spec_dbp = yp_open_db_cache(argp->domain, argp->map, NULL, 0)) == NULL) {
result.ypresp_all_u.val.stat = yp_errno;
return(&result);
}
#endif
/* Kick off the actual data transfer. */
svc_sendreply(rqstp->rq_xprt, xdr_my_ypresp_all, (char *)&result);
#ifndef DB_CACHE
(void)(spec_dbp->close)(spec_dbp);
#endif
xprt = rqstp->rq_xprt;
if (!setjmp(env)) /* XXX EVIL!!! */
svc_sendreply(rqstp->rq_xprt, xdr_my_ypresp_all,
(char *)&result);
/*
* Returning NULL prevents the dispatcher from calling
* svc_sendreply() since we already did it.