- Rework the underlying ALQ storage to be a circular buffer, which amongst other

things allows variable length messages to be easily supported.

- Extend KPI with alq_writen() and alq_getn() to support variable length
  messages, which is enabled at ALQ creation time depending on the
  arguments passed to alq_open(). Also add variants of alq_open() and
  alq_post() that accept a flags argument. The KPI is still fully
  backwards compatible and shouldn't require any change in ALQ consumers
  unless they wish to utilise the new features.

- Introduce the ALQ_NOACTIVATE and ALQ_ORDERED flags to allow ALQ consumers
  to have more control over IO scheduling and resource acquisition
  respectively.

- Strengthen invariants checking.

- Document ALQ changes in ALQ(9) man page.

Sponsored by:	FreeBSD Foundation
Reviewed by:	gnn, jeff, rpaulo, rwatson
MFC after:	1 month
This commit is contained in:
Lawrence Stewart 2010-04-26 13:48:22 +00:00
parent 699f853bed
commit 7d11e744c1
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=207223
3 changed files with 782 additions and 203 deletions

View File

@ -1,7 +1,13 @@
.\"
.\" Copyright (c) 2003 Hiten Pandya <hmp@FreeBSD.org>
.\" Copyright (c) 2009-2010 The FreeBSD Foundation
.\" All rights reserved.
.\"
.\" Portions of this software were developed at the Centre for Advanced
.\" Internet Architectures, Swinburne University of Technology, Melbourne,
.\" Australia by Lawrence Stewart under sponsorship from the FreeBSD
.\" Foundation.
.\"
.\" Redistribution and use in source and binary forms, with or without
.\" modification, are permitted provided that the following conditions
.\" are met:
@ -25,21 +31,34 @@
.\"
.\" $FreeBSD$
.\"
.Dd May 16, 2003
.Dd April 26, 2010
.Dt ALQ 9
.Os
.Sh NAME
.Nm alq ,
.Nm alq_open_flags ,
.Nm alq_open ,
.Nm alq_writen ,
.Nm alq_write ,
.Nm alq_flush ,
.Nm alq_close ,
.Nm alq_getn ,
.Nm alq_get ,
.Nm alq_post_flags ,
.Nm alq_post
.Nd Asynchronous Logging Queues
.Sh SYNOPSIS
.In sys/alq.h
.Ft int
.Fo alq_open_flags
.Fa "struct alq **app"
.Fa "const char *file"
.Fa "struct ucred *cred"
.Fa "int cmode"
.Fa "int size"
.Fa "int flags"
.Fc
.Ft int
.Fo alq_open
.Fa "struct alq **app"
.Fa "const char *file"
@ -49,19 +68,25 @@
.Fa "int count"
.Fc
.Ft int
.Fn alq_write "struct alq *alq" "void *data" "int waitok"
.Fn alq_writen "struct alq *alq" "void *data" "int len" "int flags"
.Ft int
.Fn alq_write "struct alq *alq" "void *data" "int flags"
.Ft void
.Fn alq_flush "struct alq *alq"
.Ft void
.Fn alq_close "struct alq *alq"
.Ft struct ale *
.Fn alq_get "struct alq *alq" "int waitok"
.Fn alq_getn "struct alq *alq" "int len" "int flags"
.Ft struct ale *
.Fn alq_get "struct alq *alq" "int flags"
.Ft void
.Fn alq_post_flags "struct alq *alq" "struct ale *ale" "int flags"
.Ft void
.Fn alq_post "struct alq *alq" "struct ale *ale"
.Sh DESCRIPTION
The
.Nm
facility provides an asynchronous fixed length recording
facility provides an asynchronous fixed or variable length recording
mechanism, known as Asynchronous Logging Queues.
It can record to any
.Xr vnode 9 ,
@ -81,26 +106,37 @@ is defined as
which has the following members:
.Bd -literal -offset indent
struct ale {
struct ale *ae_next; /* Next Entry */
char *ae_data; /* Entry buffer */
int ae_flags; /* Entry flags */
intptr_t ae_bytesused; /* # bytes written to ALE. */
char *ae_data; /* Write ptr. */
int ae_pad; /* Unused, compat. */
};
.Ed
.Pp
The
.Va ae_flags
field is for internal use, clients of the
An
.Nm
interface should not modify this field.
Behaviour is undefined if this field is modified.
can be created in either fixed or variable length mode.
A variable length
.Nm
accommodates writes of varying length using
.Fn alq_writen
and
.Fn alq_getn .
A fixed length
.Nm
accommodates a fixed number of writes using
.Fn alq_write
and
.Fn alq_get ,
each of fixed size (set at queue creation time).
Fixed length mode is deprecated in favour of variable length mode.
.Sh FUNCTIONS
The
.Fn alq_open
function creates a new logging queue.
.Fn alq_open_flags
function creates a new variable length asynchronous logging queue.
The
.Fa file
argument is the name of the file to open for logging; if the file does not
yet exist,
argument is the name of the file to open for logging.
If the file does not yet exist,
.Fn alq_open
will attempt to create it.
The
@ -112,33 +148,99 @@ as the requested creation mode, to be used if the file will be created by
Consumers of this API may wish to pass
.Dv ALQ_DEFAULT_CMODE ,
a default creation mode suitable for most applications.
The argument
.Fa cred
specifies the credentials to use when opening and performing I/O on the file.
The size of each entry in the queue is determined by
.Fa size .
The
.Fa cred
argument specifies the credentials to use when opening and performing I/O on the file.
The
.Fa size
argument sets the size (in bytes) of the underlying queue.
The ALQ_ORDERED flag may be passed in via
.Fa flags
to indicate that the ordering of writer threads waiting for a busy
.Nm
to free up resources should be preserved.
.Pp
The deprecated
.Fn alq_open
function is implemented as a wrapper around
.Fn alq_open_flags
to provide backwards compatibility to consumers that have not been updated to
utilise the newer
.Fn alq_open_flags
function.
It passes all arguments through to
.Fn alq_open_flags
untouched except for
.Fa size
and
.Fa count ,
and sets
.Fa flags
to 0.
To create a variable length mode
.Nm ,
the
.Fa size
argument should be set to the size (in bytes) of the underlying queue and the
.Fa count
argument determines the number of items to be stored in the
asynchronous queue over an approximate period of a disk
write operation.
argument should be set to 0.
To create a fixed length mode
.Nm ,
the
.Fa size
argument should be set to the size (in bytes) of each write and the
.Fa count
argument should be set to the number of
.Fa size
byte chunks to reserve capacity for.
.Pp
The
.Fn alq_write
.Fn alq_writen
function writes
.Fa len
bytes from
.Fa data
to the designated queue,
to the designated variable length mode queue
.Fa alq .
In the event that
.Fn alq_write
could not write the entry immediately, and
If
.Fn alq_writen
could not write the entry immediately and
.Dv ALQ_WAITOK
is passed to
.Fa waitok ,
then
is set in
.Fa flags ,
the function will be allowed to
.Xr msleep_spin 9
with the
.Dq Li alqwnord
or
.Dq Li alqwnres
wait message.
A write will automatically schedule the queue
.Fa alq
to be flushed to disk.
This behaviour can be controlled by passing ALQ_NOACTIVATE via
.Fa flags
to indicate that the write should not schedule
.Fa alq
to be flushed to disk.
.Pp
The deprecated
.Fn alq_write
will be allowed to
.Xr tsleep 9 .
function is implemented as a wrapper around
.Fn alq_writen
to provide backwards compatibility to consumers that have not been updated to
utilise variable length mode queues.
The function will write
.Fa size
bytes of data (where
.Fa size
was specified at queue creation time) from the
.Fa data
buffer to the
.Fa alq .
Note that it is an error to call
.Fn alq_write
on a variable length mode queue.
.Pp
The
.Fn alq_flush
@ -146,61 +248,136 @@ function is used for flushing
.Fa alq
to the log medium that was passed to
.Fn alq_open .
If
.Fa alq
has data to flush and is not already in the process of being flushed, the
function will block doing IO.
Otherwise, the function will return immediately.
.Pp
The
.Fn alq_close
function will close the asynchronous logging queue,
.Fa alq ,
function will close the asynchronous logging queue
.Fa alq
and flush all pending write requests to the log medium.
It will free all resources that were previously allocated.
.Pp
The
.Fn alq_get
function returns the next available asynchronous logging entry
from the queue,
.Fa alq .
This function leaves the queue in a locked state, until a subsequent
.Fn alq_getn
function returns an asynchronous log entry from
.Fa alq ,
initialised to point at a buffer capable of receiving
.Fa len
bytes of data.
This function leaves
.Fa alq
in a locked state, until a subsequent
.Fn alq_post
or
.Fn alq_post_flags
call is made.
In the event that
.Fn alq_get
could not retrieve an entry immediately, it will
.Xr tsleep 9
If
.Fn alq_getn
could not obtain
.Fa len
bytes of buffer immediately and
.Dv ALQ_WAITOK
is set in
.Fa flags ,
the function will be allowed to
.Xr msleep_spin 9
with the
.Dq Li alqget
.Dq Li alqgnord
or
.Dq Li alqgnres
wait message.
The caller can choose to write less than
.Fa len
bytes of data to the returned asynchronous log entry by setting the entry's
ae_bytesused field to the number of bytes actually written.
This must be done prior to calling
.Fn alq_post .
.Pp
The deprecated
.Fn alq_get
function is implemented as a wrapper around
.Fn alq_getn
to provide backwards compatibility to consumers that have not been updated to
utilise variable length mode queues.
The asynchronous log entry returned will be initialised to point at a buffer
capable of receiving
.Fa size
bytes of data (where
.Fa size
was specified at queue creation time).
Note that it is an error to call
.Fn alq_get
on a variable length mode queue.
.Pp
The
.Fn alq_post_flags
function schedules the asynchronous log entry
.Fa ale
(obtained from
.Fn alq_getn
or
.Fn alq_get )
for writing to
.Fa alq .
The ALQ_NOACTIVATE flag may be passed in via
.Fa flags
to indicate that the queue should not be immediately scheduled to be flushed to
disk.
This function leaves
.Fa alq
in an unlocked state.
.Pp
The
.Fn alq_post
function schedules the asynchronous logging entry,
.Fa ale ,
which is retrieved using the
.Fn alq_get
function,
for writing to the asynchronous logging queue,
.Fa alq .
This function leaves the queue,
.Fa alq ,
in an unlocked state.
function is implemented as a wrapper around
.Fn alq_post_flags
to provide backwards compatibility to consumers that have not been updated to
utilise the newer
.Fn alq_post_flags
function.
It simply passes all arguments through to
.Fn alq_post_flags
untouched, and sets
.Fa flags
to 0.
.Sh IMPLEMENTATION NOTES
The
.Fn alq_writen
and
.Fn alq_write
function is a wrapper around the
functions both perform a
.Xr bcopy 3
from the supplied
.Fa data
buffer into the underlying
.Nm
buffer.
Performance critical code paths may wish to consider using
.Fn alq_getn
(variable length queues) or
.Fn alq_get
(fixed length queues) to avoid the extra memory copy. Note that a queue
remains locked between calls to
.Fn alq_getn
or
.Fn alq_get
and
.Fn alq_post
functions; by using these functions separately, a call
to
.Fn bcopy
can be avoided for performance critical code paths.
or
.Fn alq_post_flags ,
so this method of writing to a queue is unsuitable for situations where the
time between calls may be substantial.
.Sh LOCKING
Each asynchronous queue is protected by a spin mutex.
Each asynchronous logging queue is protected by a spin mutex.
.Pp
Functions
.Fn alq_flush ,
.Fn alq_open
.Fn alq_flush
and
.Fn alq_post
.Fn alq_open
may attempt to acquire an internal sleep mutex, and should
consequently not be used in contexts where sleeping is
not allowed.
@ -214,32 +391,36 @@ if it fails to open
or else it returns 0.
.Pp
The
.Fn alq_writen
and
.Fn alq_write
function returns
functions return
.Er EWOULDBLOCK
if
.Dv ALQ_NOWAIT
was provided as a value to
.Fa waitok
and either the queue is full, or when the system is shutting down.
was set in
.Fa flags
and either the queue is full or the system is shutting down.
.Pp
The
.Fn alq_getn
and
.Fn alq_get
function returns
.Dv NULL ,
functions return
.Dv NULL
if
.Dv ALQ_NOWAIT
was provided as a value to
.Fa waitok
and either the queue is full, or when the system is shutting down.
was set in
.Fa flags
and either the queue is full or the system is shutting down.
.Pp
NOTE: invalid arguments to non-void functions will result in
undefined behaviour.
.Sh SEE ALSO
.Xr syslog 3 ,
.Xr kthread 9 ,
.Xr kproc 9 ,
.Xr ktr 9 ,
.Xr tsleep 9 ,
.Xr msleep_spin 9 ,
.Xr syslog 3 ,
.Xr vnode 9
.Sh HISTORY
The
@ -250,7 +431,11 @@ Asynchronous Logging Queues (ALQ) facility first appeared in
The
.Nm
facility was written by
.An Jeffrey Roberson Aq jeff@FreeBSD.org .
.An Jeffrey Roberson Aq jeff@FreeBSD.org
and extended by
.An Lawrence Stewart Aq lstewart@freebsd.org .
.Pp
This manual page was written by
.An Hiten Pandya Aq hmp@FreeBSD.org .
.An Hiten Pandya Aq hmp@FreeBSD.org
and revised by
.An Lawrence Stewart Aq lstewart@freebsd.org .

View File

@ -55,16 +55,23 @@ __FBSDID("$FreeBSD$");
/* Async. Logging Queue */
struct alq {
char *aq_entbuf; /* Buffer for stored entries */
int aq_entmax; /* Max entries */
int aq_entlen; /* Entry length */
char *aq_entbuf; /* Buffer for stored entries */
int aq_freebytes; /* Bytes available in buffer */
int aq_buflen; /* Total length of our buffer */
int aq_writehead; /* Location for next write */
int aq_writetail; /* Flush starts at this location */
int aq_wrapearly; /* # bytes left blank at end of buf */
int aq_flags; /* Queue flags */
int aq_waiters; /* Num threads waiting for resources
* NB: Used as a wait channel so must
* not be first field in the alq struct
*/
struct ale aq_getpost; /* ALE for use by get/post */
struct mtx aq_mtx; /* Queue lock */
struct vnode *aq_vp; /* Open vnode handle */
struct ucred *aq_cred; /* Credentials of the opening thread */
struct ale *aq_first; /* First ent */
struct ale *aq_entfree; /* First free ent */
struct ale *aq_entvalid; /* First ent valid for writing */
LIST_ENTRY(alq) aq_act; /* List of active queues */
LIST_ENTRY(alq) aq_link; /* List of all queues */
};
@ -73,10 +80,14 @@ struct alq {
#define AQ_ACTIVE 0x0002 /* on the active list */
#define AQ_FLUSHING 0x0004 /* doing IO */
#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */
#define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */
#define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */
#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
/*
@ -205,7 +216,7 @@ ald_daemon(void)
needwakeup = alq_doio(alq);
ALQ_UNLOCK(alq);
if (needwakeup)
wakeup(alq);
wakeup_one(alq);
ALD_LOCK();
}
@ -252,6 +263,20 @@ alq_shutdown(struct alq *alq)
/* Stop any new writers. */
alq->aq_flags |= AQ_SHUTDOWN;
/*
* If the ALQ isn't active but has unwritten data (possible if
* the ALQ_NOACTIVATE flag has been used), explicitly activate the
* ALQ here so that the pending data gets flushed by the ald_daemon.
*/
if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
alq->aq_flags |= AQ_ACTIVE;
ALQ_UNLOCK(alq);
ALD_LOCK();
ald_activate(alq);
ALD_UNLOCK();
ALQ_LOCK(alq);
}
/* Drain IO */
while (alq->aq_flags & AQ_ACTIVE) {
alq->aq_flags |= AQ_WANTED;
@ -271,7 +296,6 @@ alq_destroy(struct alq *alq)
alq_shutdown(alq);
mtx_destroy(&alq->aq_mtx);
free(alq->aq_first, M_ALD);
free(alq->aq_entbuf, M_ALD);
free(alq, M_ALD);
}
@ -287,46 +311,54 @@ alq_doio(struct alq *alq)
struct vnode *vp;
struct uio auio;
struct iovec aiov[2];
struct ale *ale;
struct ale *alstart;
int totlen;
int iov;
int vfslocked;
int wrapearly;
KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
vp = alq->aq_vp;
td = curthread;
totlen = 0;
iov = 0;
alstart = ale = alq->aq_entvalid;
alq->aq_entvalid = NULL;
iov = 1;
wrapearly = alq->aq_wrapearly;
bzero(&aiov, sizeof(aiov));
bzero(&auio, sizeof(auio));
do {
if (aiov[iov].iov_base == NULL)
aiov[iov].iov_base = ale->ae_data;
aiov[iov].iov_len += alq->aq_entlen;
totlen += alq->aq_entlen;
/* Check to see if we're wrapping the buffer */
if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
iov++;
ale->ae_flags &= ~AE_VALID;
ale = ale->ae_next;
} while (ale->ae_flags & AE_VALID);
/* Start the write from the location of our buffer tail pointer. */
aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
if (alq->aq_writetail < alq->aq_writehead) {
/* Buffer not wrapped. */
totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
} else if (alq->aq_writehead == 0) {
/* Buffer not wrapped (special case to avoid an empty iov). */
totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
wrapearly;
} else {
/*
* Buffer wrapped, requires 2 aiov entries:
* - first is from writetail to end of buffer
* - second is from start of buffer to writehead
*/
aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
wrapearly;
iov++;
aiov[1].iov_base = alq->aq_entbuf;
aiov[1].iov_len = alq->aq_writehead;
totlen = aiov[0].iov_len + aiov[1].iov_len;
}
alq->aq_flags |= AQ_FLUSHING;
ALQ_UNLOCK(alq);
if (iov == 2 || aiov[iov].iov_base == NULL)
iov--;
auio.uio_iov = &aiov[0];
auio.uio_offset = 0;
auio.uio_segflg = UIO_SYSSPACE;
auio.uio_rw = UIO_WRITE;
auio.uio_iovcnt = iov + 1;
auio.uio_iovcnt = iov;
auio.uio_resid = totlen;
auio.uio_td = td;
@ -350,8 +382,28 @@ alq_doio(struct alq *alq)
ALQ_LOCK(alq);
alq->aq_flags &= ~AQ_FLUSHING;
if (alq->aq_entfree == NULL)
alq->aq_entfree = alstart;
/* Adjust writetail as required, taking into account wrapping. */
alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
alq->aq_buflen;
alq->aq_freebytes += totlen + wrapearly;
/*
* If we just flushed part of the buffer which wrapped, reset the
* wrapearly indicator.
*/
if (wrapearly)
alq->aq_wrapearly = 0;
/*
* If we just flushed the buffer completely, reset indexes to 0 to
* minimise buffer wraps.
* This is also required to ensure alq_getn() can't wedge itself.
*/
if (!HAS_PENDING_DATA(alq))
alq->aq_writehead = alq->aq_writetail = 0;
KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
if (alq->aq_flags & AQ_WANTED) {
alq->aq_flags &= ~AQ_WANTED;
@ -376,27 +428,27 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
/*
* Create the queue data structure, allocate the buffer, and open the file.
*/
int
alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
int size, int count)
alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
int size, int flags)
{
struct thread *td;
struct nameidata nd;
struct ale *ale;
struct ale *alp;
struct alq *alq;
char *bufp;
int flags;
int oflags;
int error;
int i, vfslocked;
int vfslocked;
KASSERT((size > 0), ("%s: size <= 0", __func__));
*alqp = NULL;
td = curthread;
NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
flags = FWRITE | O_NOFOLLOW | O_CREAT;
oflags = FWRITE | O_NOFOLLOW | O_CREAT;
error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL);
error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
if (error)
return (error);
@ -407,31 +459,20 @@ alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
VFS_UNLOCK_GIANT(vfslocked);
alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
alq->aq_vp = nd.ni_vp;
alq->aq_cred = crhold(cred);
alq->aq_entmax = count;
alq->aq_entlen = size;
alq->aq_entfree = alq->aq_first;
mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
bufp = alq->aq_entbuf;
ale = alq->aq_first;
alp = NULL;
alq->aq_buflen = size;
alq->aq_entmax = 0;
alq->aq_entlen = 0;
/* Match up entries with buffers */
for (i = 0; i < count; i++) {
if (alp)
alp->ae_next = ale;
ale->ae_data = bufp;
alp = ale;
ale++;
bufp += size;
}
alp->ae_next = alq->aq_first;
alq->aq_freebytes = alq->aq_buflen;
alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
alq->aq_writehead = alq->aq_writetail = 0;
if (flags & ALQ_ORDERED)
alq->aq_flags |= AQ_ORDERED;
if ((error = ald_add(alq)) != 0) {
alq_destroy(alq);
@ -443,77 +484,405 @@ alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
return (0);
}
int
alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
int size, int count)
{
int ret;
KASSERT((count >= 0), ("%s: count < 0", __func__));
if (count > 0) {
ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
(*alqp)->aq_flags |= AQ_LEGACY;
(*alqp)->aq_entmax = count;
(*alqp)->aq_entlen = size;
} else
ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
return (ret);
}
/*
* Copy a new entry into the queue. If the operation would block either
* wait or return an error depending on the value of waitok.
*/
int
alq_write(struct alq *alq, void *data, int waitok)
alq_writen(struct alq *alq, void *data, int len, int flags)
{
struct ale *ale;
int activate, copy, ret;
void *waitchan;
if ((ale = alq_get(alq, waitok)) == NULL)
return (EWOULDBLOCK);
KASSERT((len > 0 && len <= alq->aq_buflen),
("%s: len <= 0 || len > aq_buflen", __func__));
bcopy(data, ale->ae_data, alq->aq_entlen);
alq_post(alq, ale);
return (0);
}
struct ale *
alq_get(struct alq *alq, int waitok)
{
struct ale *ale;
struct ale *aln;
ale = NULL;
activate = ret = 0;
copy = len;
waitchan = NULL;
ALQ_LOCK(alq);
/* Loop until we get an entry or we're shutting down */
while ((alq->aq_flags & AQ_SHUTDOWN) == 0 &&
(ale = alq->aq_entfree) == NULL &&
(waitok & ALQ_WAITOK)) {
alq->aq_flags |= AQ_WANTED;
msleep_spin(alq, &alq->aq_mtx, "alqget", 0);
/*
* Fail to perform the write and return EWOULDBLOCK if:
* - The message is larger than our underlying buffer.
* - The ALQ is being shutdown.
* - There is insufficient free space in our underlying buffer
* to accept the message and the user can't wait for space.
* - There is insufficient free space in our underlying buffer
* to accept the message and the alq is inactive due to prior
* use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
*/
if (len > alq->aq_buflen ||
alq->aq_flags & AQ_SHUTDOWN ||
(((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
ALQ_UNLOCK(alq);
return (EWOULDBLOCK);
}
if (ale != NULL) {
aln = ale->ae_next;
if ((aln->ae_flags & AE_VALID) == 0)
alq->aq_entfree = aln;
/*
* If we want ordered writes and there is already at least one thread
* waiting for resources to become available, sleep until we're woken.
*/
if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
KASSERT(!(flags & ALQ_NOWAIT),
("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
alq->aq_waiters++;
msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
alq->aq_waiters--;
}
/*
* (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
* enter while loop and sleep until we have enough free bytes (former)
* or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
* be in this loop. Otherwise, multiple threads may be sleeping here
* competing for ALQ resources.
*/
while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
KASSERT(!(flags & ALQ_NOWAIT),
("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
alq->aq_flags |= AQ_WANTED;
alq->aq_waiters++;
if (waitchan)
wakeup(waitchan);
msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
alq->aq_waiters--;
/*
* If we're the first thread to wake after an AQ_WANTED wakeup
* but there isn't enough free space for us, we're going to loop
* and sleep again. If there are other threads waiting in this
* loop, schedule a wakeup so that they can see if the space
* they require is available.
*/
if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
waitchan = alq;
else
alq->aq_entfree = NULL;
waitchan = NULL;
}
/*
* If there are waiters, we need to signal the waiting threads after we
* complete our work. The alq ptr is used as a wait channel for threads
* requiring resources to be freed up. In the AQ_ORDERED case, threads
* are not allowed to concurrently compete for resources in the above
* while loop, so we use a different wait channel in this case.
*/
if (alq->aq_waiters > 0) {
if (alq->aq_flags & AQ_ORDERED)
waitchan = &alq->aq_waiters;
else
waitchan = alq;
} else
ALQ_UNLOCK(alq);
waitchan = NULL;
/* Bail if we're shutting down. */
if (alq->aq_flags & AQ_SHUTDOWN) {
ret = EWOULDBLOCK;
goto unlock;
}
return (ale);
}
/*
* If we need to wrap the buffer to accommodate the write,
* we'll need 2 calls to bcopy.
*/
if ((alq->aq_buflen - alq->aq_writehead) < len)
copy = alq->aq_buflen - alq->aq_writehead;
void
alq_post(struct alq *alq, struct ale *ale)
{
int activate;
/* Copy message (or part thereof if wrap required) to the buffer. */
bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
alq->aq_writehead += copy;
ale->ae_flags |= AE_VALID;
if (alq->aq_writehead >= alq->aq_buflen) {
KASSERT((alq->aq_writehead == alq->aq_buflen),
("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
__func__,
alq->aq_writehead,
alq->aq_buflen));
alq->aq_writehead = 0;
}
if (alq->aq_entvalid == NULL)
alq->aq_entvalid = ale;
if (copy != len) {
/*
* Wrap the buffer by copying the remainder of our message
* to the start of the buffer and resetting aq_writehead.
*/
bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
alq->aq_writehead = len - copy;
}
if ((alq->aq_flags & AQ_ACTIVE) == 0) {
KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
alq->aq_freebytes -= len;
if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
alq->aq_flags |= AQ_ACTIVE;
activate = 1;
} else
activate = 0;
}
KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
unlock:
ALQ_UNLOCK(alq);
if (activate) {
ALD_LOCK();
ald_activate(alq);
ALD_UNLOCK();
}
/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
if (waitchan != NULL)
wakeup_one(waitchan);
return (ret);
}
int
alq_write(struct alq *alq, void *data, int flags)
{
/* Should only be called in fixed length message (legacy) mode. */
KASSERT((alq->aq_flags & AQ_LEGACY),
("%s: fixed length write on variable length queue", __func__));
return (alq_writen(alq, data, alq->aq_entlen, flags));
}
/*
* Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
*/
struct ale *
alq_getn(struct alq *alq, int len, int flags)
{
int contigbytes;
void *waitchan;
KASSERT((len > 0 && len <= alq->aq_buflen),
("%s: len <= 0 || len > alq->aq_buflen", __func__));
waitchan = NULL;
ALQ_LOCK(alq);
/*
* Determine the number of free contiguous bytes.
* We ensure elsewhere that if aq_writehead == aq_writetail because
* the buffer is empty, they will both be set to 0 and therefore
* aq_freebytes == aq_buflen and is fully contiguous.
* If they are equal and the buffer is not empty, aq_freebytes will
* be 0 indicating the buffer is full.
*/
if (alq->aq_writehead <= alq->aq_writetail)
contigbytes = alq->aq_freebytes;
else {
contigbytes = alq->aq_buflen - alq->aq_writehead;
if (contigbytes < len) {
/*
* Insufficient space at end of buffer to handle a
* contiguous write. Wrap early if there's space at
* the beginning. This will leave a hole at the end
* of the buffer which we will have to skip over when
* flushing the buffer to disk.
*/
if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
/* Keep track of # bytes left blank. */
alq->aq_wrapearly = contigbytes;
/* Do the wrap and adjust counters. */
contigbytes = alq->aq_freebytes =
alq->aq_writetail;
alq->aq_writehead = 0;
}
}
}
/*
* Return a NULL ALE if:
* - The message is larger than our underlying buffer.
* - The ALQ is being shutdown.
* - There is insufficient free space in our underlying buffer
* to accept the message and the user can't wait for space.
* - There is insufficient free space in our underlying buffer
* to accept the message and the alq is inactive due to prior
* use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
*/
if (len > alq->aq_buflen ||
alq->aq_flags & AQ_SHUTDOWN ||
(((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
HAS_PENDING_DATA(alq))) && contigbytes < len)) {
ALQ_UNLOCK(alq);
return (NULL);
}
/*
* If we want ordered writes and there is already at least one thread
* waiting for resources to become available, sleep until we're woken.
*/
if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
KASSERT(!(flags & ALQ_NOWAIT),
("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
alq->aq_waiters++;
msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
alq->aq_waiters--;
}
/*
* (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
* while loop and sleep until we have enough contiguous free bytes
* (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
* time will be in this loop. Otherwise, multiple threads may be
* sleeping here competing for ALQ resources.
*/
while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
KASSERT(!(flags & ALQ_NOWAIT),
("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
alq->aq_flags |= AQ_WANTED;
alq->aq_waiters++;
if (waitchan)
wakeup(waitchan);
msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
alq->aq_waiters--;
if (alq->aq_writehead <= alq->aq_writetail)
contigbytes = alq->aq_freebytes;
else
contigbytes = alq->aq_buflen - alq->aq_writehead;
/*
* If we're the first thread to wake after an AQ_WANTED wakeup
* but there isn't enough free space for us, we're going to loop
* and sleep again. If there are other threads waiting in this
* loop, schedule a wakeup so that they can see if the space
* they require is available.
*/
if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
contigbytes < len && !(alq->aq_flags & AQ_WANTED))
waitchan = alq;
else
waitchan = NULL;
}
/*
* If there are waiters, we need to signal the waiting threads after we
* complete our work. The alq ptr is used as a wait channel for threads
* requiring resources to be freed up. In the AQ_ORDERED case, threads
* are not allowed to concurrently compete for resources in the above
* while loop, so we use a different wait channel in this case.
*/
if (alq->aq_waiters > 0) {
if (alq->aq_flags & AQ_ORDERED)
waitchan = &alq->aq_waiters;
else
waitchan = alq;
} else
waitchan = NULL;
/* Bail if we're shutting down. */
if (alq->aq_flags & AQ_SHUTDOWN) {
ALQ_UNLOCK(alq);
if (waitchan != NULL)
wakeup_one(waitchan);
return (NULL);
}
/*
* If we are here, we have a contiguous number of bytes >= len
* available in our buffer starting at aq_writehead.
*/
alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
alq->aq_getpost.ae_bytesused = len;
return (&alq->aq_getpost);
}
struct ale *
alq_get(struct alq *alq, int flags)
{
/* Should only be called in fixed length message (legacy) mode. */
KASSERT((alq->aq_flags & AQ_LEGACY),
("%s: fixed length get on variable length queue", __func__));
return (alq_getn(alq, alq->aq_entlen, flags));
}
void
alq_post_flags(struct alq *alq, struct ale *ale, int flags)
{
int activate;
void *waitchan;
activate = 0;
if (ale->ae_bytesused > 0) {
if (!(alq->aq_flags & AQ_ACTIVE) &&
!(flags & ALQ_NOACTIVATE)) {
alq->aq_flags |= AQ_ACTIVE;
activate = 1;
}
alq->aq_writehead += ale->ae_bytesused;
alq->aq_freebytes -= ale->ae_bytesused;
/* Wrap aq_writehead if we filled to the end of the buffer. */
if (alq->aq_writehead == alq->aq_buflen)
alq->aq_writehead = 0;
KASSERT((alq->aq_writehead >= 0 &&
alq->aq_writehead < alq->aq_buflen),
("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
__func__));
KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
}
/*
* If there are waiters, we need to signal the waiting threads after we
* complete our work. The alq ptr is used as a wait channel for threads
* requiring resources to be freed up. In the AQ_ORDERED case, threads
* are not allowed to concurrently compete for resources in the
* alq_getn() while loop, so we use a different wait channel in this case.
*/
if (alq->aq_waiters > 0) {
if (alq->aq_flags & AQ_ORDERED)
waitchan = &alq->aq_waiters;
else
waitchan = alq;
} else
waitchan = NULL;
ALQ_UNLOCK(alq);
if (activate) {
ALD_LOCK();
ald_activate(alq);
ALD_UNLOCK();
}
/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
if (waitchan != NULL)
wakeup_one(waitchan);
}
void
@ -523,16 +892,24 @@ alq_flush(struct alq *alq)
ALD_LOCK();
ALQ_LOCK(alq);
if (alq->aq_flags & AQ_ACTIVE) {
ald_deactivate(alq);
/*
* Pull the lever iff there is data to flush and we're
* not already in the middle of a flush operation.
*/
if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
if (alq->aq_flags & AQ_ACTIVE)
ald_deactivate(alq);
ALD_UNLOCK();
needwakeup = alq_doio(alq);
} else
ALD_UNLOCK();
ALQ_UNLOCK(alq);
if (needwakeup)
wakeup(alq);
wakeup_one(alq);
}
/*

View File

@ -1,7 +1,13 @@
/*-
* Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
* Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
* Copyright (c) 2010, The FreeBSD Foundation
* All rights reserved.
*
* Portions of this software were developed at the Centre for Advanced
* Internet Architectures, Swinburne University of Technology, Melbourne,
* Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
@ -41,46 +47,47 @@ extern struct thread *ald_thread;
* Async. Logging Entry
*/
struct ale {
struct ale *ae_next; /* Next Entry */
char *ae_data; /* Entry buffer */
int ae_flags; /* Entry flags */
intptr_t ae_bytesused; /* # bytes written to ALE. */
char *ae_data; /* Write ptr. */
int ae_pad; /* Unused, compat. */
};
#define AE_VALID 0x0001 /* Entry has valid data */
/* waitok options */
#define ALQ_NOWAIT 0x0001
#define ALQ_WAITOK 0x0002
/* Flag options. */
#define ALQ_NOWAIT 0x0001 /* ALQ may not sleep. */
#define ALQ_WAITOK 0x0002 /* ALQ may sleep. */
#define ALQ_NOACTIVATE 0x0004 /* Don't activate ALQ after write. */
#define ALQ_ORDERED 0x0010 /* Maintain write ordering between threads. */
/* Suggested mode for file creation. */
#define ALQ_DEFAULT_CMODE 0600
/*
* alq_open: Creates a new queue
* alq_open_flags: Creates a new queue
*
* Arguments:
* alq Storage for a pointer to the newly created queue.
* file The filename to open for logging.
* cred Credential to authorize open and I/O with.
* cmode Creation mode for file, if new.
* size The size of each entry in the queue.
* count The number of items in the buffer, this should be large enough
* to store items over the period of a disk write.
* size The size of the queue in bytes.
* flags ALQ_ORDERED
* Returns:
* error from open or 0 on success
*/
struct ucred;
int alq_open(struct alq **, const char *file, struct ucred *cred, int cmode,
int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
int size, int flags);
int alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
int size, int count);
/*
* alq_write: Write data into the queue
* alq_writen: Write data into the queue
*
* Arguments:
* alq The queue we're writing to
* data The entry to be recorded
* waitok Are we permitted to wait?
* len The number of bytes to write from *data
* flags (ALQ_NOWAIT || ALQ_WAITOK), ALQ_NOACTIVATE
*
* Returns:
* EWOULDBLOCK if:
@ -88,7 +95,8 @@ int alq_open(struct alq **, const char *file, struct ucred *cred, int cmode,
* The system is shutting down.
* 0 on success.
*/
int alq_write(struct alq *alq, void *data, int waitok);
int alq_writen(struct alq *alq, void *data, int len, int flags);
int alq_write(struct alq *alq, void *data, int flags);
/*
* alq_flush: Flush the queue out to disk
@ -101,27 +109,36 @@ void alq_flush(struct alq *alq);
void alq_close(struct alq *alq);
/*
* alq_get: Return an entry for direct access
* alq_getn: Return an entry for direct access
*
* Arguments:
* alq The queue to retrieve an entry from
* waitok Are we permitted to wait?
* len Max number of bytes required
* flags (ALQ_NOWAIT || ALQ_WAITOK)
*
* Returns:
* The next available ale on success.
* NULL if:
* Waitok is ALQ_NOWAIT and the queue is full.
* flags is ALQ_NOWAIT and the queue is full.
* The system is shutting down.
*
* This leaves the queue locked until a subsequent alq_post.
*/
struct ale *alq_get(struct alq *alq, int waitok);
struct ale *alq_getn(struct alq *alq, int len, int flags);
struct ale *alq_get(struct alq *alq, int flags);
/*
* alq_post: Schedule the ale retrieved by alq_get for writing.
* alq_post_flags: Schedule the ale retrieved by alq_get/alq_getn for writing.
* alq The queue to post the entry to.
* ale An asynch logging entry returned by alq_get.
* flags ALQ_NOACTIVATE
*/
void alq_post(struct alq *, struct ale *);
void alq_post_flags(struct alq *alq, struct ale *ale, int flags);
static __inline void
alq_post(struct alq *alq, struct ale *ale)
{
alq_post_flags(alq, ale, 0);
}
#endif /* _SYS_ALQ_H_ */