fusefs: WIP making FUSE operations interruptible

The fuse protocol includes a FUSE_INTERRUPT operation that the client can
send to the server to indicate that it wants to abort an in-progress
operation.  It's required to interrupt any syscall that is blocking on a
fuse operation.

This commit adds basic FUSE_INTERRUPT support.  If a process receives any
signal while it's blocking on a FUSE operation, it will send a
FUSE_INTERRUPT and wait for the original operation to complete.  But there
is still much to do:

* The current code will leak memory if the server ignores FUSE_INTERRUPT,
  which many do.  It will also leak memory if the server completes the
  original operation before it receives the FUSE_INTERRUPT.
* An interrupted read(2) will incorrectly appear to be successful.
* fusefs should return immediately for fatal signals.
* Operations that haven't been sent to the server yet should be aborted
  without sending FUSE_INTERRUPT.
* Test coverage should be better.
* It would be great if write operations could be made restartable.
  That would require delaying uiomove until the last possible moment, which
  would be sometime during fuse_device_read.

PR:		236530
Sponsored by:	The FreeBSD Foundation
This commit is contained in:
Alan Somers 2019-04-17 23:32:38 +00:00
parent f067b60946
commit 723c776829
6 changed files with 197 additions and 51 deletions

View File

@ -346,8 +346,8 @@ fuse_ohead_audit(struct fuse_out_header *ohead, struct uio *uio)
return (0);
}
SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_bumped_into_callback,
"uint64_t");
SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_missing_ticket, "uint64_t");
SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_found, "struct fuse_ticket*");
/*
* fuse_device_write first reads the header sent by the daemon.
* If that's OK, looks up ticket/callback node by the unique id seen in header.
@ -393,10 +393,9 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
fuse_lck_mtx_lock(data->aw_mtx);
TAILQ_FOREACH_SAFE(tick, &data->aw_head, tk_aw_link,
x_tick) {
SDT_PROBE1(fuse, , device,
fuse_device_write_bumped_into_callback,
tick->tk_unique);
if (tick->tk_unique == ohead.unique) {
SDT_PROBE1(fuse, , device, fuse_device_write_found,
tick);
found = 1;
fuse_aw_remove(tick);
break;
@ -432,8 +431,8 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
fuse_ticket_drop(tick);
} else {
/* no callback at all! */
SDT_PROBE2(fuse, , device, trace, 1,
"erhm, no handler for this response");
SDT_PROBE1(fuse, , device, fuse_device_write_missing_ticket,
ohead.unique);
err = EINVAL;
}

View File

@ -379,8 +379,25 @@ fuse_write_directbackend(struct vnode *vp, struct uio *uio,
break;
retry:
if ((err = fdisp_wait_answ(&fdi)))
err = fdisp_wait_answ(&fdi);
if (err == ERESTART || err == EINTR || err == EWOULDBLOCK) {
/*
* Rewind the uio so dofilewrite will know it's
* incomplete
*/
uio->uio_resid += fwi->size;
uio->uio_offset -= fwi->size;
/*
* Change ERESTART into EINTR because we can't rewind
* uio->uio_iov. Basically, once uiomove(9) has been
* called, it's impossible to restart a syscall.
*/
if (err == ERESTART)
err = EINTR;
break;
} else if (err) {
break;
}
fwo = ((struct fuse_write_out *)fdi.answ);

View File

@ -92,7 +92,10 @@ SDT_PROVIDER_DECLARE(fuse);
*/
SDT_PROBE_DEFINE2(fuse, , ipc, trace, "int", "char*");
static void fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred);
static void fiov_clear(struct fuse_iov *fiov);
static void fuse_interrupt_send(struct fuse_ticket *otick);
static struct fuse_ticket *fticket_alloc(struct fuse_data *data);
static void fticket_refresh(struct fuse_ticket *ftick);
static void fticket_destroy(struct fuse_ticket *ftick);
@ -126,25 +129,76 @@ SYSCTL_INT(_vfs_fusefs, OID_AUTO, iov_credit, CTLFLAG_RW,
MALLOC_DEFINE(M_FUSEMSG, "fuse_msgbuf", "fuse message buffer");
static uma_zone_t ticket_zone;
static void
fuse_block_sigs(sigset_t *oldset)
/*
* TODO: figure out how to timeout INTERRUPT requests, because the daemon may
* leagally never respond
*
* TODO: remove an INTERRUPT request if the daemon responds to the original
*/
static int
fuse_interrupt_callback(struct fuse_ticket *tick, struct uio *uio)
{
sigset_t newset;
if (tick->tk_aw_ohead.error == EAGAIN) {
/*
* There are two reasons we might get this:
* 1) the daemon received the INTERRUPT request before the
* original, or
* 2) the daemon received the INTERRUPT request after it
* completed the original request.
* In the first case we should re-send the INTERRUPT. In the
* second, we should ignore it.
*/
struct fuse_interrupt_in *fii;
struct fuse_data *data;
struct fuse_ticket *otick, *x_tick;
bool found = false;
SIGFILLSET(newset);
SIGDELSET(newset, SIGKILL);
if (kern_sigprocmask(curthread, SIG_BLOCK, &newset, oldset, 0))
panic("%s: Invalid operation for kern_sigprocmask()",
__func__);
data = tick->tk_data;
fii = (struct fuse_interrupt_in*)((char*)tick->tk_ms_fiov.base +
sizeof(struct fuse_in_header));
fuse_lck_mtx_lock(data->aw_mtx);
TAILQ_FOREACH_SAFE(otick, &data->aw_head, tk_aw_link, x_tick) {
if (otick->tk_unique == fii->unique) {
found = true;
break;
}
}
fuse_lck_mtx_unlock(data->aw_mtx);
if (found) {
/* Resend */
fuse_interrupt_send(otick);
} else {
/* Original is already complete; nothing to do */
}
return 0;
} else {
/* Illegal FUSE_INTERRUPT response */
return EINVAL;
}
}
static void
fuse_restore_sigs(sigset_t *oldset)
void
fuse_interrupt_send(struct fuse_ticket *otick)
{
struct fuse_dispatcher fdi;
struct fuse_interrupt_in *fii;
struct fuse_in_header *ftick_hdr;
struct fuse_data *data = otick->tk_data;
struct ucred reused_creds;
if (kern_sigprocmask(curthread, SIG_SETMASK, oldset, NULL, 0))
panic("%s: Invalid operation for kern_sigprocmask()",
__func__);
ftick_hdr = fticket_in_header(otick);
reused_creds.cr_uid = ftick_hdr->uid;
reused_creds.cr_rgid = ftick_hdr->gid;
fdisp_init(&fdi, sizeof(*fii));
fdisp_make_pid(&fdi, FUSE_INTERRUPT, data, ftick_hdr->nodeid,
ftick_hdr->pid, &reused_creds);
fii = fdi.indata;
fii->unique = otick->tk_unique;
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
fuse_insert_message(fdi.tick);
fdisp_destroy(&fdi);
}
void
@ -329,12 +383,16 @@ fticket_reset(struct fuse_ticket *ftick)
static int
fticket_wait_answer(struct fuse_ticket *ftick)
{
sigset_t tset;
struct thread *td = curthread;
sigset_t blockedset, oldset;
int err = 0;
struct fuse_data *data;
fuse_lck_mtx_lock(ftick->tk_aw_mtx);
SIGEMPTYSET(blockedset);
kern_sigprocmask(td, SIG_BLOCK, &blockedset, &oldset, 0);
retry:
if (fticket_answered(ftick)) {
goto out;
}
@ -345,11 +403,12 @@ fticket_wait_answer(struct fuse_ticket *ftick)
fticket_set_answered(ftick);
goto out;
}
fuse_block_sigs(&tset);
err = msleep(ftick, &ftick->tk_aw_mtx, PCATCH, "fu_ans",
data->daemon_timeout * hz);
fuse_restore_sigs(&tset);
if (err == EAGAIN) { /* same as EWOULDBLOCK */
kern_sigprocmask(td, SIG_SETMASK, &oldset, NULL, 0);
if (err == EWOULDBLOCK) {
SDT_PROBE2(fuse, , ipc, trace, 3,
"fticket_wait_answer: EWOULDBLOCK");
#ifdef XXXIP /* die conditionally */
if (!fdata_get_dead(data)) {
fdata_set_dead(data);
@ -357,6 +416,45 @@ fticket_wait_answer(struct fuse_ticket *ftick)
#endif
err = ETIMEDOUT;
fticket_set_answered(ftick);
} else if ((err == EINTR || err == ERESTART)) {
/*
* Whether we get EINTR or ERESTART depends on whether
* SA_RESTART was set by sigaction(2).
*
* Try to interrupt the operation and wait for an EINTR response
* to the original operation. If the file system does not
* support FUSE_INTERRUPT, then we'll just wait for it to
* complete like normal. If it does support FUSE_INTERRUPT,
* then it will either respond EINTR to the original operation,
* or EAGAIN to the interrupt.
*/
int sig;
SDT_PROBE2(fuse, , ipc, trace, 4,
"fticket_wait_answer: interrupt");
fuse_lck_mtx_unlock(ftick->tk_aw_mtx);
fuse_interrupt_send(ftick);
fuse_lck_mtx_lock(ftick->tk_aw_mtx);
/* TODO: return, rather than retry, for fatal signals */
/*
* Block the just-delivered signal while we wait for an
* interrupt response
*/
PROC_LOCK(td->td_proc);
mtx_lock(&td->td_proc->p_sigacts->ps_mtx);
sig = cursig(td);
mtx_unlock(&td->td_proc->p_sigacts->ps_mtx);
PROC_UNLOCK(td->td_proc);
SIGADDSET(blockedset, sig);
kern_sigprocmask(curthread, SIG_BLOCK, &blockedset, NULL, 0);
goto retry;
} else if (err) {
SDT_PROBE2(fuse, , ipc, trace, 6,
"fticket_wait_answer: other error");
} else {
SDT_PROBE2(fuse, , ipc, trace, 7, "fticket_wait_answer: OK");
}
out:
if (!(err || fticket_answered(ftick))) {
@ -762,10 +860,8 @@ fdisp_refresh_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
/* Initialize a dispatcher from a pid and node id */
static void
fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op,
struct mount *mp, uint64_t nid, pid_t pid, struct ucred *cred)
struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred)
{
struct fuse_data *data = fuse_get_mpdata(mp);
if (fdip->tick) {
fticket_refresh(fdip->tick);
} else {
@ -783,17 +879,21 @@ void
fdisp_make(struct fuse_dispatcher *fdip, enum fuse_opcode op, struct mount *mp,
uint64_t nid, struct thread *td, struct ucred *cred)
{
struct fuse_data *data = fuse_get_mpdata(mp);
RECTIFY_TDCR(td, cred);
return fdisp_make_pid(fdip, op, mp, nid, td->td_proc->p_pid, cred);
return fdisp_make_pid(fdip, op, data, nid, td->td_proc->p_pid, cred);
}
void
fdisp_make_vp(struct fuse_dispatcher *fdip, enum fuse_opcode op,
struct vnode *vp, struct thread *td, struct ucred *cred)
{
struct mount *mp = vnode_mount(vp);
struct fuse_data *data = fuse_get_mpdata(mp);
RECTIFY_TDCR(td, cred);
return fdisp_make_pid(fdip, op, vnode_mount(vp), VTOI(vp),
return fdisp_make_pid(fdip, op, data, VTOI(vp),
td->td_proc->p_pid, cred);
}

View File

@ -147,10 +147,16 @@ fticket_set_answered(struct fuse_ticket *ftick)
ftick->tk_flag |= FT_ANSW;
}
static inline struct fuse_in_header*
fticket_in_header(struct fuse_ticket *ftick)
{
return (struct fuse_in_header *)(ftick->tk_ms_fiov.base);
}
static inline enum fuse_opcode
fticket_opcode(struct fuse_ticket *ftick)
{
return (((struct fuse_in_header *)(ftick->tk_ms_fiov.base))->opcode);
return fticket_in_header(ftick)->opcode;
}
int fticket_pull(struct fuse_ticket *ftick, struct uio *uio);
@ -174,6 +180,11 @@ struct fuse_data {
struct mtx aw_mtx;
TAILQ_HEAD(, fuse_ticket) aw_head;
/*
* Holds the next value of the FUSE operation unique value.
* Also, serves as a wakeup channel to prevent any operations from
* being created before INIT completes.
*/
u_long ticketer;
struct sx rename_lock;

View File

@ -41,8 +41,10 @@ using namespace testing;
/* Don't do anything; all we care about is that the syscall gets interrupted */
void sigusr2_handler(int __unused sig) {
if (verbosity > 1)
printf("Signaled!\n");
if (verbosity > 1) {
printf("Signaled! thread %p\n", pthread_self());
}
}
void* killer(void* target) {
@ -52,8 +54,8 @@ void* killer(void* target) {
*/
usleep(250'000);
if (verbosity > 1)
printf("Signalling!\n");
pthread_kill(*(pthread_t*)target, SIGUSR2);
printf("Signalling! thread %p\n", target);
pthread_kill((pthread_t)target, SIGUSR2);
return(NULL);
}
@ -94,9 +96,14 @@ void setup_interruptor(pthread_t self)
}
void TearDown() {
struct sigaction sa;
if (m_child != NULL) {
pthread_join(m_child, NULL);
}
bzero(&sa, sizeof(sa));
sa.sa_handler = SIG_DFL;
sigaction(SIGUSR2, &sa, NULL);
FuseTest::TearDown();
}
@ -107,7 +114,7 @@ void TearDown() {
* complete should generate an EAGAIN response.
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
TEST_F(Interrupt, DISABLED_already_complete)
TEST_F(Interrupt, already_complete)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@ -122,7 +129,6 @@ TEST_F(Interrupt, DISABLED_already_complete)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
@ -150,7 +156,7 @@ TEST_F(Interrupt, DISABLED_already_complete)
ASSERT_LE(0, fd) << strerror(errno);
setup_interruptor(self);
ASSERT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno);
EXPECT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno);
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
@ -160,7 +166,7 @@ TEST_F(Interrupt, DISABLED_already_complete)
* complete the original operation whenever it damn well pleases.
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
TEST_F(Interrupt, DISABLED_ignore)
TEST_F(Interrupt, ignore)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@ -175,10 +181,9 @@ TEST_F(Interrupt, DISABLED_ignore)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
@ -208,7 +213,7 @@ TEST_F(Interrupt, DISABLED_ignore)
* return EINTR to userspace
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
TEST_F(Interrupt, DISABLED_in_progress)
TEST_F(Interrupt, in_progress)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
@ -223,10 +228,9 @@ TEST_F(Interrupt, DISABLED_in_progress)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
@ -260,11 +264,12 @@ TEST_F(Interrupt, DISABLED_in_progress)
* successfully interrupts the original
*/
/* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */
TEST_F(Interrupt, DISABLED_too_soon)
TEST_F(Interrupt, too_soon)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
const char *CONTENTS = "abcdefgh";
Sequence seq;
uint64_t ino = 42;
int fd;
ssize_t bufsize = strlen(CONTENTS);
@ -275,25 +280,25 @@ TEST_F(Interrupt, DISABLED_too_soon)
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_getattr(ino, 0);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
).WillOnce(Invoke(ReturnErrno(EAGAIN)))
.RetiresOnSaturation();
).InSequence(seq)
.WillOnce(Invoke(ReturnErrno(EAGAIN)));
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
).WillOnce(Invoke([&](auto in __unused, auto &out __unused) {
).InSequence(seq)
.WillOnce(Invoke([&](auto in __unused, auto &out __unused) {
auto out0 = new mockfs_buf_out;
out0->header.error = -EINTR;
out0->header.unique = write_unique;
@ -310,3 +315,14 @@ TEST_F(Interrupt, DISABLED_too_soon)
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
// TODO: add a test that uses siginterrupt and an interruptible signal
// TODO: add a test that verifies a process can be cleanly killed even if a
// FUSE_WRITE command never returns.
// TODO: write in-progress tests for read and other operations
// TODO: add a test where write returns EWOULDBLOCK
// TODO: test that if a fatal signal is received, fticket_wait_answer will
// return without waiting for a response to the interrupted operation.
// TODO: test that operations that haven't been received by the server can be
// interrupted without generating a FUSE_INTERRUPT.

View File

@ -184,6 +184,9 @@ void debug_fuseop(const mockfs_buf_in *in)
case FUSE_FSYNCDIR:
printf(" flags=%#x", in->body.fsyncdir.fsync_flags);
break;
case FUSE_INTERRUPT:
printf(" unique=%lu", in->body.interrupt.unique);
break;
case FUSE_LOOKUP:
printf(" %s", in->body.lookup);
break;