Account for AIO socket operations in thread/process resource usage.
File and disk-backed I/O requests store counts of read/written disk blocks in each AIO job so that they can be charged to the thread that completes an AIO request via aio_return() or aio_waitcomplete(). This change extends AIO jobs to store counts of received/sent messages and updates socket backends to set these counts accordingly. Note that the socket backends are careful to only charge a single messages for each AIO request even though a single request on a blocking socket might invoke sosend or soreceive multiple times. This is to mimic the resource accounting of synchronous read/write. Adjust the UNIX socketpair AIO test to verify that the message resource usage counts update accordingly for aio_read and aio_write. Approved by: re (hrs) Sponsored by: Chelsio Communications Differential Revision: https://reviews.freebsd.org/D6911
This commit is contained in:
parent
8f9b4b00a2
commit
a6270457f8
@ -360,6 +360,8 @@ insert_ddp_data(struct toepcb *toep, uint32_t n)
|
|||||||
placed = n;
|
placed = n;
|
||||||
if (placed > job->uaiocb.aio_nbytes - copied)
|
if (placed > job->uaiocb.aio_nbytes - copied)
|
||||||
placed = job->uaiocb.aio_nbytes - copied;
|
placed = job->uaiocb.aio_nbytes - copied;
|
||||||
|
if (placed > 0)
|
||||||
|
job->msgrcv = 1;
|
||||||
if (!aio_clear_cancel_function(job)) {
|
if (!aio_clear_cancel_function(job)) {
|
||||||
/*
|
/*
|
||||||
* Update the copied length for when
|
* Update the copied length for when
|
||||||
@ -602,6 +604,7 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len)
|
|||||||
toep->rx_credits += len;
|
toep->rx_credits += len;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
job->msgrcv = 1;
|
||||||
if (db->cancel_pending) {
|
if (db->cancel_pending) {
|
||||||
/*
|
/*
|
||||||
* Update the job's length but defer completion to the
|
* Update the job's length but defer completion to the
|
||||||
@ -756,6 +759,8 @@ handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
|
|||||||
placed = len;
|
placed = len;
|
||||||
if (placed > job->uaiocb.aio_nbytes - copied)
|
if (placed > job->uaiocb.aio_nbytes - copied)
|
||||||
placed = job->uaiocb.aio_nbytes - copied;
|
placed = job->uaiocb.aio_nbytes - copied;
|
||||||
|
if (placed > 0)
|
||||||
|
job->msgrcv = 1;
|
||||||
if (!aio_clear_cancel_function(job)) {
|
if (!aio_clear_cancel_function(job)) {
|
||||||
/*
|
/*
|
||||||
* Update the copied length for when
|
* Update the copied length for when
|
||||||
@ -1458,6 +1463,7 @@ aio_ddp_requeue(struct toepcb *toep)
|
|||||||
if (copied != 0) {
|
if (copied != 0) {
|
||||||
sbdrop_locked(sb, copied);
|
sbdrop_locked(sb, copied);
|
||||||
job->aio_received += copied;
|
job->aio_received += copied;
|
||||||
|
job->msgrcv = 1;
|
||||||
copied = job->aio_received;
|
copied = job->aio_received;
|
||||||
inp = sotoinpcb(so);
|
inp = sotoinpcb(so);
|
||||||
if (!INP_TRY_WLOCK(inp)) {
|
if (!INP_TRY_WLOCK(inp)) {
|
||||||
|
@ -563,6 +563,7 @@ soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
|
|||||||
struct uio uio;
|
struct uio uio;
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
size_t cnt, done;
|
size_t cnt, done;
|
||||||
|
long ru_before;
|
||||||
int error, flags;
|
int error, flags;
|
||||||
|
|
||||||
SOCKBUF_UNLOCK(sb);
|
SOCKBUF_UNLOCK(sb);
|
||||||
@ -585,23 +586,33 @@ soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
|
|||||||
uio.uio_td = td;
|
uio.uio_td = td;
|
||||||
flags = MSG_NBIO;
|
flags = MSG_NBIO;
|
||||||
|
|
||||||
/* TODO: Charge ru_msg* to job. */
|
/*
|
||||||
|
* For resource usage accounting, only count a completed request
|
||||||
|
* as a single message to avoid counting multiple calls to
|
||||||
|
* sosend/soreceive on a blocking socket.
|
||||||
|
*/
|
||||||
|
|
||||||
if (sb == &so->so_rcv) {
|
if (sb == &so->so_rcv) {
|
||||||
uio.uio_rw = UIO_READ;
|
uio.uio_rw = UIO_READ;
|
||||||
|
ru_before = td->td_ru.ru_msgrcv;
|
||||||
#ifdef MAC
|
#ifdef MAC
|
||||||
error = mac_socket_check_receive(fp->f_cred, so);
|
error = mac_socket_check_receive(fp->f_cred, so);
|
||||||
if (error == 0)
|
if (error == 0)
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
|
error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
|
||||||
|
if (td->td_ru.ru_msgrcv != ru_before)
|
||||||
|
job->msgrcv = 1;
|
||||||
} else {
|
} else {
|
||||||
uio.uio_rw = UIO_WRITE;
|
uio.uio_rw = UIO_WRITE;
|
||||||
|
ru_before = td->td_ru.ru_msgsnd;
|
||||||
#ifdef MAC
|
#ifdef MAC
|
||||||
error = mac_socket_check_send(fp->f_cred, so);
|
error = mac_socket_check_send(fp->f_cred, so);
|
||||||
if (error == 0)
|
if (error == 0)
|
||||||
#endif
|
#endif
|
||||||
error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
|
error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
|
||||||
|
if (td->td_ru.ru_msgsnd != ru_before)
|
||||||
|
job->msgsnd = 1;
|
||||||
if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
|
if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
|
||||||
PROC_LOCK(job->userproc);
|
PROC_LOCK(job->userproc);
|
||||||
kern_psignal(job->userproc, SIGPIPE);
|
kern_psignal(job->userproc, SIGPIPE);
|
||||||
|
@ -743,9 +743,11 @@ aio_process_rw(struct kaiocb *job)
|
|||||||
struct uio auio;
|
struct uio auio;
|
||||||
struct iovec aiov;
|
struct iovec aiov;
|
||||||
ssize_t cnt;
|
ssize_t cnt;
|
||||||
|
long msgsnd_st, msgsnd_end;
|
||||||
|
long msgrcv_st, msgrcv_end;
|
||||||
|
long oublock_st, oublock_end;
|
||||||
|
long inblock_st, inblock_end;
|
||||||
int error;
|
int error;
|
||||||
int oublock_st, oublock_end;
|
|
||||||
int inblock_st, inblock_end;
|
|
||||||
|
|
||||||
KASSERT(job->uaiocb.aio_lio_opcode == LIO_READ ||
|
KASSERT(job->uaiocb.aio_lio_opcode == LIO_READ ||
|
||||||
job->uaiocb.aio_lio_opcode == LIO_WRITE,
|
job->uaiocb.aio_lio_opcode == LIO_WRITE,
|
||||||
@ -769,8 +771,11 @@ aio_process_rw(struct kaiocb *job)
|
|||||||
auio.uio_segflg = UIO_USERSPACE;
|
auio.uio_segflg = UIO_USERSPACE;
|
||||||
auio.uio_td = td;
|
auio.uio_td = td;
|
||||||
|
|
||||||
|
msgrcv_st = td->td_ru.ru_msgrcv;
|
||||||
|
msgsnd_st = td->td_ru.ru_msgsnd;
|
||||||
inblock_st = td->td_ru.ru_inblock;
|
inblock_st = td->td_ru.ru_inblock;
|
||||||
oublock_st = td->td_ru.ru_oublock;
|
oublock_st = td->td_ru.ru_oublock;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* aio_aqueue() acquires a reference to the file that is
|
* aio_aqueue() acquires a reference to the file that is
|
||||||
* released in aio_free_entry().
|
* released in aio_free_entry().
|
||||||
@ -787,11 +792,15 @@ aio_process_rw(struct kaiocb *job)
|
|||||||
auio.uio_rw = UIO_WRITE;
|
auio.uio_rw = UIO_WRITE;
|
||||||
error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
|
error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
|
||||||
}
|
}
|
||||||
|
msgrcv_end = td->td_ru.ru_msgrcv;
|
||||||
|
msgsnd_end = td->td_ru.ru_msgsnd;
|
||||||
inblock_end = td->td_ru.ru_inblock;
|
inblock_end = td->td_ru.ru_inblock;
|
||||||
oublock_end = td->td_ru.ru_oublock;
|
oublock_end = td->td_ru.ru_oublock;
|
||||||
|
|
||||||
job->inputcharge = inblock_end - inblock_st;
|
job->msgrcv = msgrcv_end - msgrcv_st;
|
||||||
job->outputcharge = oublock_end - oublock_st;
|
job->msgsnd = msgsnd_end - msgsnd_st;
|
||||||
|
job->inblock = inblock_end - inblock_st;
|
||||||
|
job->outblock = oublock_end - oublock_st;
|
||||||
|
|
||||||
if ((error) && (auio.uio_resid != cnt)) {
|
if ((error) && (auio.uio_resid != cnt)) {
|
||||||
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
|
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
|
||||||
@ -1805,13 +1814,10 @@ kern_aio_return(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
|
|||||||
status = job->uaiocb._aiocb_private.status;
|
status = job->uaiocb._aiocb_private.status;
|
||||||
error = job->uaiocb._aiocb_private.error;
|
error = job->uaiocb._aiocb_private.error;
|
||||||
td->td_retval[0] = status;
|
td->td_retval[0] = status;
|
||||||
if (job->uaiocb.aio_lio_opcode == LIO_WRITE) {
|
td->td_ru.ru_oublock += job->outblock;
|
||||||
td->td_ru.ru_oublock += job->outputcharge;
|
td->td_ru.ru_inblock += job->inblock;
|
||||||
job->outputcharge = 0;
|
td->td_ru.ru_msgsnd += job->msgsnd;
|
||||||
} else if (job->uaiocb.aio_lio_opcode == LIO_READ) {
|
td->td_ru.ru_msgrcv += job->msgrcv;
|
||||||
td->td_ru.ru_inblock += job->inputcharge;
|
|
||||||
job->inputcharge = 0;
|
|
||||||
}
|
|
||||||
aio_free_entry(job);
|
aio_free_entry(job);
|
||||||
AIO_UNLOCK(ki);
|
AIO_UNLOCK(ki);
|
||||||
ops->store_error(ujob, error);
|
ops->store_error(ujob, error);
|
||||||
@ -2327,9 +2333,9 @@ aio_physwakeup(struct bio *bp)
|
|||||||
error = bp->bio_error;
|
error = bp->bio_error;
|
||||||
nblks = btodb(nbytes);
|
nblks = btodb(nbytes);
|
||||||
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
|
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
|
||||||
job->outputcharge += nblks;
|
job->outblock += nblks;
|
||||||
else
|
else
|
||||||
job->inputcharge += nblks;
|
job->inblock += nblks;
|
||||||
|
|
||||||
if (error)
|
if (error)
|
||||||
aio_complete(job, -1, error);
|
aio_complete(job, -1, error);
|
||||||
@ -2395,13 +2401,10 @@ kern_aio_waitcomplete(struct thread *td, struct aiocb **ujobp,
|
|||||||
status = job->uaiocb._aiocb_private.status;
|
status = job->uaiocb._aiocb_private.status;
|
||||||
error = job->uaiocb._aiocb_private.error;
|
error = job->uaiocb._aiocb_private.error;
|
||||||
td->td_retval[0] = status;
|
td->td_retval[0] = status;
|
||||||
if (job->uaiocb.aio_lio_opcode == LIO_WRITE) {
|
td->td_ru.ru_oublock += job->outblock;
|
||||||
td->td_ru.ru_oublock += job->outputcharge;
|
td->td_ru.ru_inblock += job->inblock;
|
||||||
job->outputcharge = 0;
|
td->td_ru.ru_msgsnd += job->msgsnd;
|
||||||
} else if (job->uaiocb.aio_lio_opcode == LIO_READ) {
|
td->td_ru.ru_msgrcv += job->msgrcv;
|
||||||
td->td_ru.ru_inblock += job->inputcharge;
|
|
||||||
job->inputcharge = 0;
|
|
||||||
}
|
|
||||||
aio_free_entry(job);
|
aio_free_entry(job);
|
||||||
AIO_UNLOCK(ki);
|
AIO_UNLOCK(ki);
|
||||||
ops->store_aiocb(ujobp, ujob);
|
ops->store_aiocb(ujobp, ujob);
|
||||||
|
@ -119,8 +119,10 @@ struct kaiocb {
|
|||||||
TAILQ_ENTRY(kaiocb) plist; /* (a) lists of pending / done jobs */
|
TAILQ_ENTRY(kaiocb) plist; /* (a) lists of pending / done jobs */
|
||||||
TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
|
TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
|
||||||
int jobflags; /* (a) job flags */
|
int jobflags; /* (a) job flags */
|
||||||
int inputcharge; /* (*) input blocks */
|
int inblock; /* (*) input blocks */
|
||||||
int outputcharge; /* (*) output blocks */
|
int outblock; /* (*) output blocks */
|
||||||
|
int msgsnd; /* (*) messages sent */
|
||||||
|
int msgrcv; /* (*) messages received */
|
||||||
struct proc *userproc; /* (*) user process */
|
struct proc *userproc; /* (*) user process */
|
||||||
struct ucred *cred; /* (*) active credential when created */
|
struct ucred *cred; /* (*) active credential when created */
|
||||||
struct file *fd_file; /* (*) pointer to file structure */
|
struct file *fd_file; /* (*) pointer to file structure */
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
|
|
||||||
#include <sys/param.h>
|
#include <sys/param.h>
|
||||||
#include <sys/module.h>
|
#include <sys/module.h>
|
||||||
|
#include <sys/resource.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/mdioctl.h>
|
#include <sys/mdioctl.h>
|
||||||
@ -455,6 +456,7 @@ ATF_TC_BODY(aio_unix_socketpair_test, tc)
|
|||||||
{
|
{
|
||||||
struct aio_unix_socketpair_arg arg;
|
struct aio_unix_socketpair_arg arg;
|
||||||
struct aio_context ac;
|
struct aio_context ac;
|
||||||
|
struct rusage ru_before, ru_after;
|
||||||
int sockets[2];
|
int sockets[2];
|
||||||
|
|
||||||
ATF_REQUIRE_KERNEL_MODULE("aio");
|
ATF_REQUIRE_KERNEL_MODULE("aio");
|
||||||
@ -467,8 +469,17 @@ ATF_TC_BODY(aio_unix_socketpair_test, tc)
|
|||||||
aio_context_init(&ac, sockets[0],
|
aio_context_init(&ac, sockets[0],
|
||||||
sockets[1], UNIX_SOCKETPAIR_LEN, UNIX_SOCKETPAIR_TIMEOUT,
|
sockets[1], UNIX_SOCKETPAIR_LEN, UNIX_SOCKETPAIR_TIMEOUT,
|
||||||
aio_unix_socketpair_cleanup, &arg);
|
aio_unix_socketpair_cleanup, &arg);
|
||||||
|
ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_before) != -1,
|
||||||
|
"getrusage failed: %s", strerror(errno));
|
||||||
aio_write_test(&ac);
|
aio_write_test(&ac);
|
||||||
|
ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1,
|
||||||
|
"getrusage failed: %s", strerror(errno));
|
||||||
|
ATF_REQUIRE(ru_after.ru_msgsnd == ru_before.ru_msgsnd + 1);
|
||||||
|
ru_before = ru_after;
|
||||||
aio_read_test(&ac);
|
aio_read_test(&ac);
|
||||||
|
ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1,
|
||||||
|
"getrusage failed: %s", strerror(errno));
|
||||||
|
ATF_REQUIRE(ru_after.ru_msgrcv == ru_before.ru_msgrcv + 1);
|
||||||
|
|
||||||
aio_unix_socketpair_cleanup(&arg);
|
aio_unix_socketpair_cleanup(&arg);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user