Use kqueue(2) instead of poll(2) to wait for replies and timeouts

in the UDP RPC client code. As a side-effect, this fixes some bugs
that might prevent the RPC call from ever timing out for example
if the server keeps responding with the wrong xid. This could
probably be simplified further by using the EVFILT_TIMER filter.
This commit is contained in:
iedowse 2002-10-15 22:28:59 +00:00
parent c470759591
commit ff07b93884

View File

@ -46,8 +46,8 @@ __FBSDID("$FreeBSD$");
#include "namespace.h"
#include "reentrant.h"
#include <sys/poll.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
@ -75,7 +75,6 @@ static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_dg_abort(CLIENT *);
static bool_t clnt_dg_control(CLIENT *, u_int, void *);
static void clnt_dg_destroy(CLIENT *);
static int __rpc_timeval_to_msec(struct timeval *);
@ -100,7 +99,7 @@ static cond_t *dg_cv;
mutex_lock(&clnt_fd_lock); \
dg_fd_locks[fd] = 0; \
mutex_unlock(&clnt_fd_lock); \
thr_sigsetmask(SIG_SETMASK, &(mask), (sigset_t *) NULL); \
thr_sigsetmask(SIG_SETMASK, &(mask), NULL); \
cond_signal(&dg_cv[fd]); \
}
@ -124,10 +123,11 @@ struct cu_data {
u_int cu_sendsz; /* send size */
char *cu_outbuf;
u_int cu_recvsz; /* recv size */
struct pollfd pfdp;
int cu_async;
int cu_connect; /* Use connect(). */
int cu_connected; /* Have done connect(). */
struct kevent cu_kin;
int cu_kq;
char cu_inbuf[1];
};
@ -273,8 +273,8 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
cl->cl_auth = authnone_create();
cl->cl_tp = NULL;
cl->cl_netid = NULL;
cu->pfdp.fd = cu->cu_fd;
cu->pfdp.events = POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND;
cu->cu_kq = -1;
EV_SET(&cu->cu_kin, cu->cu_fd, EVFILT_READ, EV_ADD, 0, 0, 0);
return (cl);
err1:
warnx(mem_err_clnt_dg);
@ -301,22 +301,22 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
XDR *xdrs;
size_t outlen;
size_t outlen = 0;
struct rpc_msg reply_msg;
XDR reply_xdrs;
struct timeval time_waited;
bool_t ok;
int nrefreshes = 2; /* number of times to refresh cred */
struct timeval timeout;
struct timeval retransmit_time;
struct timeval startime, curtime;
int firsttimeout = 1;
struct timeval next_sendtime, starttime, time_waited, tv;
struct timespec ts;
struct kevent kv;
struct sockaddr *sa;
sigset_t mask;
sigset_t newmask;
socklen_t inlen, salen;
ssize_t recvlen = 0;
int rpc_lock_value;
int kin_len, n, rpc_lock_value;
u_int32_t xid;
outlen = 0;
@ -340,9 +340,9 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
if (cu->cu_connect && !cu->cu_connected) {
if (_connect(cu->cu_fd, (struct sockaddr *)&cu->cu_raddr,
cu->cu_rlen) < 0) {
release_fd_lock(cu->cu_fd, mask);
cu->cu_error.re_errno = errno;
return (cu->cu_error.re_status = RPC_CANTSEND);
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
}
cu->cu_connected = 1;
}
@ -355,7 +355,18 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
}
time_waited.tv_sec = 0;
time_waited.tv_usec = 0;
retransmit_time = cu->cu_wait;
retransmit_time = next_sendtime = cu->cu_wait;
gettimeofday(&starttime, NULL);
/* Clean up in case the last call ended in a longjmp(3) call. */
if (cu->cu_kq >= 0)
_close(cu->cu_kq);
if ((cu->cu_kq = kqueue()) < 0) {
cu->cu_error.re_errno = errno;
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
}
kin_len = 1;
call_again:
xdrs = &(cu->cu_outxdrs);
@ -375,24 +386,24 @@ call_again:
if ((! XDR_PUTINT32(xdrs, &proc)) ||
(! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(! (*xargs)(xdrs, argsp))) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTENCODEARGS);
cu->cu_error.re_status = RPC_CANTENCODEARGS;
goto out;
}
outlen = (size_t)XDR_GETPOS(xdrs);
send_again:
if (_sendto(cu->cu_fd, cu->cu_outbuf, outlen, 0, sa, salen) != outlen) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTSEND);
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
}
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
get_reply:
@ -406,130 +417,68 @@ get_reply:
reply_msg.acpted_rply.ar_results.where = resultsp;
reply_msg.acpted_rply.ar_results.proc = xresults;
for (;;) {
switch (_poll(&cu->pfdp, 1,
__rpc_timeval_to_msec(&retransmit_time))) {
case 0:
time_waited.tv_sec += retransmit_time.tv_sec;
time_waited.tv_usec += retransmit_time.tv_usec;
while (time_waited.tv_usec >= 1000000) {
time_waited.tv_sec++;
time_waited.tv_usec -= 1000000;
}
/* update retransmit_time */
if (retransmit_time.tv_sec < RPC_MAX_BACKOFF) {
retransmit_time.tv_usec *= 2;
retransmit_time.tv_sec *= 2;
while (retransmit_time.tv_usec >= 1000000) {
retransmit_time.tv_sec++;
retransmit_time.tv_usec -= 1000000;
}
}
/* Decide how long to wait. */
if (timercmp(&next_sendtime, &timeout, <))
timersub(&next_sendtime, &time_waited, &tv);
else
timersub(&timeout, &time_waited, &tv);
if (tv.tv_sec < 0 || tv.tv_usec < 0)
tv.tv_sec = tv.tv_usec = 0;
TIMEVAL_TO_TIMESPEC(&tv, &ts);
if ((time_waited.tv_sec < timeout.tv_sec) ||
((time_waited.tv_sec == timeout.tv_sec) &&
(time_waited.tv_usec < timeout.tv_usec)))
goto send_again;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
n = _kevent(cu->cu_kq, &cu->cu_kin, kin_len, &kv, 1, &ts);
/* We don't need to register the event again. */
kin_len = 0;
case -1:
if (errno == EBADF) {
if (n == 1) {
if (kv.flags & EV_ERROR) {
cu->cu_error.re_errno = kv.data;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
/* We have some data now */
do {
recvlen = _recvfrom(cu->cu_fd, cu->cu_inbuf,
cu->cu_recvsz, 0, NULL, NULL);
} while (recvlen < 0 && errno == EINTR);
if (recvlen < 0 && errno != EWOULDBLOCK) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
if (errno != EINTR) {
errno = 0; /* reset it */
continue;
if (recvlen >= sizeof(u_int32_t) &&
(cu->cu_async == TRUE ||
*((u_int32_t *)(void *)(cu->cu_inbuf)) ==
*((u_int32_t *)(void *)(cu->cu_outbuf)))) {
/* We now assume we have the proper reply. */
break;
}
/* interrupted by another signal, update time_waited */
if (firsttimeout) {
/*
* Could have done gettimeofday before clnt_call
* but that means 1 more system call per each
* clnt_call, so do it after first time out
*/
if (gettimeofday(&startime,
(struct timezone *) NULL) == -1) {
errno = 0;
continue;
}
firsttimeout = 0;
errno = 0;
continue;
};
if (gettimeofday(&curtime,
(struct timezone *) NULL) == -1) {
errno = 0;
continue;
};
time_waited.tv_sec += curtime.tv_sec - startime.tv_sec;
time_waited.tv_usec += curtime.tv_usec -
startime.tv_usec;
while (time_waited.tv_usec < 0) {
time_waited.tv_sec--;
time_waited.tv_usec += 1000000;
};
while (time_waited.tv_usec >= 1000000) {
time_waited.tv_sec++;
time_waited.tv_usec -= 1000000;
}
startime.tv_sec = curtime.tv_sec;
startime.tv_usec = curtime.tv_usec;
if ((time_waited.tv_sec > timeout.tv_sec) ||
((time_waited.tv_sec == timeout.tv_sec) &&
(time_waited.tv_usec > timeout.tv_usec))) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
}
errno = 0; /* reset it */
continue;
};
if (cu->pfdp.revents & POLLNVAL || (cu->pfdp.revents == 0)) {
cu->cu_error.re_status = RPC_CANTRECV;
/*
* Note: we're faking errno here because we
* previously would have expected _poll() to
* return -1 with errno EBADF. Poll(BA_OS)
* returns 0 and sets the POLLNVAL revents flag
* instead.
*/
cu->cu_error.re_errno = errno = EBADF;
release_fd_lock(cu->cu_fd, mask);
return (-1);
}
/* We have some data now */
do {
if (errno == EINTR) {
/*
* Must make sure errno was not already
* EINTR in case _recvfrom() returns -1.
*/
errno = 0;
}
recvlen = _recvfrom(cu->cu_fd, cu->cu_inbuf,
cu->cu_recvsz, 0, NULL, NULL);
} while (recvlen < 0 && errno == EINTR);
if (recvlen < 0) {
if (errno == EWOULDBLOCK)
continue;
if (n == -1 && errno != EINTR) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
gettimeofday(&tv, NULL);
timersub(&tv, &starttime, &time_waited);
/* Check for timeout. */
if (timercmp(&time_waited, &timeout, >)) {
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
if (timercmp(&time_waited, &next_sendtime, >)) {
/* update retransmit_time */
if (retransmit_time.tv_sec < RPC_MAX_BACKOFF)
timeradd(&retransmit_time, &retransmit_time,
&retransmit_time);
timeradd(&next_sendtime, &retransmit_time,
&next_sendtime);
goto send_again;
}
if (recvlen < sizeof (u_int32_t))
continue;
/* see if reply transaction id matches sent id */
if (cu->cu_async == FALSE &&
*((u_int32_t *)(void *)(cu->cu_inbuf)) !=
*((u_int32_t *)(void *)(cu->cu_outbuf)))
continue;
/* we now assume we have the proper reply */
break;
}
inlen = (socklen_t)recvlen;
@ -576,6 +525,10 @@ get_reply:
cu->cu_error.re_status = RPC_CANTDECODERES;
}
out:
if (cu->cu_kq >= 0)
_close(cu->cu_kq);
cu->cu_kq = -1;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status);
}
@ -784,6 +737,8 @@ clnt_dg_destroy(cl)
cond_wait(&dg_cv[cu_fd], &clnt_fd_lock);
if (cu->cu_closeit)
(void)_close(cu_fd);
if (cu->cu_kq >= 0)
_close(cu->cu_kq);
XDR_DESTROY(&(cu->cu_outxdrs));
mem_free(cu, (sizeof (*cu) + cu->cu_sendsz + cu->cu_recvsz));
if (cl->cl_netid && cl->cl_netid[0])
@ -833,26 +788,3 @@ time_not_ok(t)
t->tv_usec < -1 || t->tv_usec > 1000000);
}
/*
* Convert from timevals (used by select) to milliseconds (used by poll).
*/
static int
__rpc_timeval_to_msec(t)
struct timeval *t;
{
int t1, tmp;
/*
* We're really returning t->tv_sec * 1000 + (t->tv_usec / 1000)
* but try to do so efficiently. Note: 1000 = 1024 - 16 - 8.
*/
tmp = (int)t->tv_sec << 3;
t1 = -tmp;
t1 += t1 << 1;
t1 += tmp << 7;
if (t->tv_usec)
t1 += (int)(t->tv_usec / 1000);
return (t1);
}