From 723c7768299a077a93f19b26e8c27b4641751e34 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Wed, 17 Apr 2019 23:32:38 +0000 Subject: [PATCH] fusefs: WIP making FUSE operations interruptible The fuse protocol includes a FUSE_INTERRUPT operation that the client can send to the server to indicate that it wants to abort an in-progress operation. It's required to interrupt any syscall that is blocking on a fuse operation. This commit adds basic FUSE_INTERRUPT support. If a process receives any signal while it's blocking on a FUSE operation, it will send a FUSE_INTERRUPT and wait for the original operation to complete. But there is still much to do: * The current code will leak memory if the server ignores FUSE_INTERRUPT, which many do. It will also leak memory if the server completes the original operation before it receives the FUSE_INTERRUPT. * An interrupted read(2) will incorrectly appear to be successful. * fusefs should return immediately for fatal signals. * Operations that haven't been sent to the server yet should be aborted without sending FUSE_INTERRUPT. * Test coverage should be better. * It would be great if write operations could be made restartable. That would require delaying uiomove until the last possible moment, which would be sometime during fuse_device_read. PR: 236530 Sponsored by: The FreeBSD Foundation --- sys/fs/fuse/fuse_device.c | 13 ++- sys/fs/fuse/fuse_io.c | 19 +++- sys/fs/fuse/fuse_ipc.c | 144 ++++++++++++++++++++++++++----- sys/fs/fuse/fuse_ipc.h | 13 ++- tests/sys/fs/fusefs/interrupt.cc | 56 +++++++----- tests/sys/fs/fusefs/mockfs.cc | 3 + 6 files changed, 197 insertions(+), 51 deletions(-) diff --git a/sys/fs/fuse/fuse_device.c b/sys/fs/fuse/fuse_device.c index 23271d4c2c1a..a9aa0d7e64c9 100644 --- a/sys/fs/fuse/fuse_device.c +++ b/sys/fs/fuse/fuse_device.c @@ -346,8 +346,8 @@ fuse_ohead_audit(struct fuse_out_header *ohead, struct uio *uio) return (0); } -SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_bumped_into_callback, - "uint64_t"); +SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_missing_ticket, "uint64_t"); +SDT_PROBE_DEFINE1(fuse, , device, fuse_device_write_found, "struct fuse_ticket*"); /* * fuse_device_write first reads the header sent by the daemon. * If that's OK, looks up ticket/callback node by the unique id seen in header. @@ -393,10 +393,9 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag) fuse_lck_mtx_lock(data->aw_mtx); TAILQ_FOREACH_SAFE(tick, &data->aw_head, tk_aw_link, x_tick) { - SDT_PROBE1(fuse, , device, - fuse_device_write_bumped_into_callback, - tick->tk_unique); if (tick->tk_unique == ohead.unique) { + SDT_PROBE1(fuse, , device, fuse_device_write_found, + tick); found = 1; fuse_aw_remove(tick); break; @@ -432,8 +431,8 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag) fuse_ticket_drop(tick); } else { /* no callback at all! */ - SDT_PROBE2(fuse, , device, trace, 1, - "erhm, no handler for this response"); + SDT_PROBE1(fuse, , device, fuse_device_write_missing_ticket, + ohead.unique); err = EINVAL; } diff --git a/sys/fs/fuse/fuse_io.c b/sys/fs/fuse/fuse_io.c index df6d8d71e53d..258e1221c8cd 100644 --- a/sys/fs/fuse/fuse_io.c +++ b/sys/fs/fuse/fuse_io.c @@ -379,8 +379,25 @@ fuse_write_directbackend(struct vnode *vp, struct uio *uio, break; retry: - if ((err = fdisp_wait_answ(&fdi))) + err = fdisp_wait_answ(&fdi); + if (err == ERESTART || err == EINTR || err == EWOULDBLOCK) { + /* + * Rewind the uio so dofilewrite will know it's + * incomplete + */ + uio->uio_resid += fwi->size; + uio->uio_offset -= fwi->size; + /* + * Change ERESTART into EINTR because we can't rewind + * uio->uio_iov. Basically, once uiomove(9) has been + * called, it's impossible to restart a syscall. + */ + if (err == ERESTART) + err = EINTR; break; + } else if (err) { + break; + } fwo = ((struct fuse_write_out *)fdi.answ); diff --git a/sys/fs/fuse/fuse_ipc.c b/sys/fs/fuse/fuse_ipc.c index 4dde1132df9d..a19d38f99dea 100644 --- a/sys/fs/fuse/fuse_ipc.c +++ b/sys/fs/fuse/fuse_ipc.c @@ -92,7 +92,10 @@ SDT_PROVIDER_DECLARE(fuse); */ SDT_PROBE_DEFINE2(fuse, , ipc, trace, "int", "char*"); +static void fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op, + struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred); static void fiov_clear(struct fuse_iov *fiov); +static void fuse_interrupt_send(struct fuse_ticket *otick); static struct fuse_ticket *fticket_alloc(struct fuse_data *data); static void fticket_refresh(struct fuse_ticket *ftick); static void fticket_destroy(struct fuse_ticket *ftick); @@ -126,25 +129,76 @@ SYSCTL_INT(_vfs_fusefs, OID_AUTO, iov_credit, CTLFLAG_RW, MALLOC_DEFINE(M_FUSEMSG, "fuse_msgbuf", "fuse message buffer"); static uma_zone_t ticket_zone; -static void -fuse_block_sigs(sigset_t *oldset) +/* + * TODO: figure out how to timeout INTERRUPT requests, because the daemon may + * leagally never respond + * + * TODO: remove an INTERRUPT request if the daemon responds to the original + */ +static int +fuse_interrupt_callback(struct fuse_ticket *tick, struct uio *uio) { - sigset_t newset; + if (tick->tk_aw_ohead.error == EAGAIN) { + /* + * There are two reasons we might get this: + * 1) the daemon received the INTERRUPT request before the + * original, or + * 2) the daemon received the INTERRUPT request after it + * completed the original request. + * In the first case we should re-send the INTERRUPT. In the + * second, we should ignore it. + */ + struct fuse_interrupt_in *fii; + struct fuse_data *data; + struct fuse_ticket *otick, *x_tick; + bool found = false; - SIGFILLSET(newset); - SIGDELSET(newset, SIGKILL); - if (kern_sigprocmask(curthread, SIG_BLOCK, &newset, oldset, 0)) - panic("%s: Invalid operation for kern_sigprocmask()", - __func__); + data = tick->tk_data; + fii = (struct fuse_interrupt_in*)((char*)tick->tk_ms_fiov.base + + sizeof(struct fuse_in_header)); + fuse_lck_mtx_lock(data->aw_mtx); + TAILQ_FOREACH_SAFE(otick, &data->aw_head, tk_aw_link, x_tick) { + if (otick->tk_unique == fii->unique) { + found = true; + break; + } + } + fuse_lck_mtx_unlock(data->aw_mtx); + if (found) { + /* Resend */ + fuse_interrupt_send(otick); + } else { + /* Original is already complete; nothing to do */ + } + return 0; + } else { + /* Illegal FUSE_INTERRUPT response */ + return EINVAL; + } } -static void -fuse_restore_sigs(sigset_t *oldset) +void +fuse_interrupt_send(struct fuse_ticket *otick) { + struct fuse_dispatcher fdi; + struct fuse_interrupt_in *fii; + struct fuse_in_header *ftick_hdr; + struct fuse_data *data = otick->tk_data; + struct ucred reused_creds; - if (kern_sigprocmask(curthread, SIG_SETMASK, oldset, NULL, 0)) - panic("%s: Invalid operation for kern_sigprocmask()", - __func__); + ftick_hdr = fticket_in_header(otick); + reused_creds.cr_uid = ftick_hdr->uid; + reused_creds.cr_rgid = ftick_hdr->gid; + fdisp_init(&fdi, sizeof(*fii)); + fdisp_make_pid(&fdi, FUSE_INTERRUPT, data, ftick_hdr->nodeid, + ftick_hdr->pid, &reused_creds); + + fii = fdi.indata; + fii->unique = otick->tk_unique; + fuse_insert_callback(fdi.tick, fuse_interrupt_callback); + + fuse_insert_message(fdi.tick); + fdisp_destroy(&fdi); } void @@ -329,12 +383,16 @@ fticket_reset(struct fuse_ticket *ftick) static int fticket_wait_answer(struct fuse_ticket *ftick) { - sigset_t tset; + struct thread *td = curthread; + sigset_t blockedset, oldset; int err = 0; struct fuse_data *data; fuse_lck_mtx_lock(ftick->tk_aw_mtx); + SIGEMPTYSET(blockedset); + kern_sigprocmask(td, SIG_BLOCK, &blockedset, &oldset, 0); +retry: if (fticket_answered(ftick)) { goto out; } @@ -345,11 +403,12 @@ fticket_wait_answer(struct fuse_ticket *ftick) fticket_set_answered(ftick); goto out; } - fuse_block_sigs(&tset); err = msleep(ftick, &ftick->tk_aw_mtx, PCATCH, "fu_ans", data->daemon_timeout * hz); - fuse_restore_sigs(&tset); - if (err == EAGAIN) { /* same as EWOULDBLOCK */ + kern_sigprocmask(td, SIG_SETMASK, &oldset, NULL, 0); + if (err == EWOULDBLOCK) { + SDT_PROBE2(fuse, , ipc, trace, 3, + "fticket_wait_answer: EWOULDBLOCK"); #ifdef XXXIP /* die conditionally */ if (!fdata_get_dead(data)) { fdata_set_dead(data); @@ -357,6 +416,45 @@ fticket_wait_answer(struct fuse_ticket *ftick) #endif err = ETIMEDOUT; fticket_set_answered(ftick); + } else if ((err == EINTR || err == ERESTART)) { + /* + * Whether we get EINTR or ERESTART depends on whether + * SA_RESTART was set by sigaction(2). + * + * Try to interrupt the operation and wait for an EINTR response + * to the original operation. If the file system does not + * support FUSE_INTERRUPT, then we'll just wait for it to + * complete like normal. If it does support FUSE_INTERRUPT, + * then it will either respond EINTR to the original operation, + * or EAGAIN to the interrupt. + */ + int sig; + + SDT_PROBE2(fuse, , ipc, trace, 4, + "fticket_wait_answer: interrupt"); + fuse_lck_mtx_unlock(ftick->tk_aw_mtx); + fuse_interrupt_send(ftick); + fuse_lck_mtx_lock(ftick->tk_aw_mtx); + + /* TODO: return, rather than retry, for fatal signals */ + + /* + * Block the just-delivered signal while we wait for an + * interrupt response + */ + PROC_LOCK(td->td_proc); + mtx_lock(&td->td_proc->p_sigacts->ps_mtx); + sig = cursig(td); + mtx_unlock(&td->td_proc->p_sigacts->ps_mtx); + PROC_UNLOCK(td->td_proc); + SIGADDSET(blockedset, sig); + kern_sigprocmask(curthread, SIG_BLOCK, &blockedset, NULL, 0); + goto retry; + } else if (err) { + SDT_PROBE2(fuse, , ipc, trace, 6, + "fticket_wait_answer: other error"); + } else { + SDT_PROBE2(fuse, , ipc, trace, 7, "fticket_wait_answer: OK"); } out: if (!(err || fticket_answered(ftick))) { @@ -762,10 +860,8 @@ fdisp_refresh_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op, /* Initialize a dispatcher from a pid and node id */ static void fdisp_make_pid(struct fuse_dispatcher *fdip, enum fuse_opcode op, - struct mount *mp, uint64_t nid, pid_t pid, struct ucred *cred) + struct fuse_data *data, uint64_t nid, pid_t pid, struct ucred *cred) { - struct fuse_data *data = fuse_get_mpdata(mp); - if (fdip->tick) { fticket_refresh(fdip->tick); } else { @@ -783,17 +879,21 @@ void fdisp_make(struct fuse_dispatcher *fdip, enum fuse_opcode op, struct mount *mp, uint64_t nid, struct thread *td, struct ucred *cred) { + struct fuse_data *data = fuse_get_mpdata(mp); RECTIFY_TDCR(td, cred); - return fdisp_make_pid(fdip, op, mp, nid, td->td_proc->p_pid, cred); + return fdisp_make_pid(fdip, op, data, nid, td->td_proc->p_pid, cred); } void fdisp_make_vp(struct fuse_dispatcher *fdip, enum fuse_opcode op, struct vnode *vp, struct thread *td, struct ucred *cred) { + struct mount *mp = vnode_mount(vp); + struct fuse_data *data = fuse_get_mpdata(mp); + RECTIFY_TDCR(td, cred); - return fdisp_make_pid(fdip, op, vnode_mount(vp), VTOI(vp), + return fdisp_make_pid(fdip, op, data, VTOI(vp), td->td_proc->p_pid, cred); } diff --git a/sys/fs/fuse/fuse_ipc.h b/sys/fs/fuse/fuse_ipc.h index 863234acf549..3a2f9fb0c1e1 100644 --- a/sys/fs/fuse/fuse_ipc.h +++ b/sys/fs/fuse/fuse_ipc.h @@ -147,10 +147,16 @@ fticket_set_answered(struct fuse_ticket *ftick) ftick->tk_flag |= FT_ANSW; } +static inline struct fuse_in_header* +fticket_in_header(struct fuse_ticket *ftick) +{ + return (struct fuse_in_header *)(ftick->tk_ms_fiov.base); +} + static inline enum fuse_opcode fticket_opcode(struct fuse_ticket *ftick) { - return (((struct fuse_in_header *)(ftick->tk_ms_fiov.base))->opcode); + return fticket_in_header(ftick)->opcode; } int fticket_pull(struct fuse_ticket *ftick, struct uio *uio); @@ -174,6 +180,11 @@ struct fuse_data { struct mtx aw_mtx; TAILQ_HEAD(, fuse_ticket) aw_head; + /* + * Holds the next value of the FUSE operation unique value. + * Also, serves as a wakeup channel to prevent any operations from + * being created before INIT completes. + */ u_long ticketer; struct sx rename_lock; diff --git a/tests/sys/fs/fusefs/interrupt.cc b/tests/sys/fs/fusefs/interrupt.cc index 86135162587c..30ce446af69c 100644 --- a/tests/sys/fs/fusefs/interrupt.cc +++ b/tests/sys/fs/fusefs/interrupt.cc @@ -41,8 +41,10 @@ using namespace testing; /* Don't do anything; all we care about is that the syscall gets interrupted */ void sigusr2_handler(int __unused sig) { - if (verbosity > 1) - printf("Signaled!\n"); + if (verbosity > 1) { + printf("Signaled! thread %p\n", pthread_self()); + } + } void* killer(void* target) { @@ -52,8 +54,8 @@ void* killer(void* target) { */ usleep(250'000); if (verbosity > 1) - printf("Signalling!\n"); - pthread_kill(*(pthread_t*)target, SIGUSR2); + printf("Signalling! thread %p\n", target); + pthread_kill((pthread_t)target, SIGUSR2); return(NULL); } @@ -94,9 +96,14 @@ void setup_interruptor(pthread_t self) } void TearDown() { + struct sigaction sa; + if (m_child != NULL) { pthread_join(m_child, NULL); } + bzero(&sa, sizeof(sa)); + sa.sa_handler = SIG_DFL; + sigaction(SIGUSR2, &sa, NULL); FuseTest::TearDown(); } @@ -107,7 +114,7 @@ void TearDown() { * complete should generate an EAGAIN response. */ /* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */ -TEST_F(Interrupt, DISABLED_already_complete) +TEST_F(Interrupt, already_complete) { const char FULLPATH[] = "mountpoint/some_file.txt"; const char RELPATH[] = "some_file.txt"; @@ -122,7 +129,6 @@ TEST_F(Interrupt, DISABLED_already_complete) expect_lookup(RELPATH, ino); expect_open(ino, 0, 1); - expect_getattr(ino, 0); expect_write(ino, &write_unique); EXPECT_CALL(*m_mock, process( ResultOf([&](auto in) { @@ -150,7 +156,7 @@ TEST_F(Interrupt, DISABLED_already_complete) ASSERT_LE(0, fd) << strerror(errno); setup_interruptor(self); - ASSERT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno); + EXPECT_EQ(bufsize, write(fd, CONTENTS, bufsize)) << strerror(errno); /* Deliberately leak fd. close(2) will be tested in release.cc */ } @@ -160,7 +166,7 @@ TEST_F(Interrupt, DISABLED_already_complete) * complete the original operation whenever it damn well pleases. */ /* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */ -TEST_F(Interrupt, DISABLED_ignore) +TEST_F(Interrupt, ignore) { const char FULLPATH[] = "mountpoint/some_file.txt"; const char RELPATH[] = "some_file.txt"; @@ -175,10 +181,9 @@ TEST_F(Interrupt, DISABLED_ignore) expect_lookup(RELPATH, ino); expect_open(ino, 0, 1); - expect_getattr(ino, 0); expect_write(ino, &write_unique); EXPECT_CALL(*m_mock, process( - ResultOf([=](auto in) { + ResultOf([&](auto in) { return (in->header.opcode == FUSE_INTERRUPT && in->body.interrupt.unique == write_unique); }, Eq(true)), @@ -208,7 +213,7 @@ TEST_F(Interrupt, DISABLED_ignore) * return EINTR to userspace */ /* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */ -TEST_F(Interrupt, DISABLED_in_progress) +TEST_F(Interrupt, in_progress) { const char FULLPATH[] = "mountpoint/some_file.txt"; const char RELPATH[] = "some_file.txt"; @@ -223,10 +228,9 @@ TEST_F(Interrupt, DISABLED_in_progress) expect_lookup(RELPATH, ino); expect_open(ino, 0, 1); - expect_getattr(ino, 0); expect_write(ino, &write_unique); EXPECT_CALL(*m_mock, process( - ResultOf([=](auto in) { + ResultOf([&](auto in) { return (in->header.opcode == FUSE_INTERRUPT && in->body.interrupt.unique == write_unique); }, Eq(true)), @@ -260,11 +264,12 @@ TEST_F(Interrupt, DISABLED_in_progress) * successfully interrupts the original */ /* https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236530 */ -TEST_F(Interrupt, DISABLED_too_soon) +TEST_F(Interrupt, too_soon) { const char FULLPATH[] = "mountpoint/some_file.txt"; const char RELPATH[] = "some_file.txt"; const char *CONTENTS = "abcdefgh"; + Sequence seq; uint64_t ino = 42; int fd; ssize_t bufsize = strlen(CONTENTS); @@ -275,25 +280,25 @@ TEST_F(Interrupt, DISABLED_too_soon) expect_lookup(RELPATH, ino); expect_open(ino, 0, 1); - expect_getattr(ino, 0); expect_write(ino, &write_unique); EXPECT_CALL(*m_mock, process( - ResultOf([=](auto in) { + ResultOf([&](auto in) { return (in->header.opcode == FUSE_INTERRUPT && in->body.interrupt.unique == write_unique); }, Eq(true)), _) - ).WillOnce(Invoke(ReturnErrno(EAGAIN))) - .RetiresOnSaturation(); + ).InSequence(seq) + .WillOnce(Invoke(ReturnErrno(EAGAIN))); EXPECT_CALL(*m_mock, process( - ResultOf([=](auto in) { + ResultOf([&](auto in) { return (in->header.opcode == FUSE_INTERRUPT && in->body.interrupt.unique == write_unique); }, Eq(true)), _) - ).WillOnce(Invoke([&](auto in __unused, auto &out __unused) { + ).InSequence(seq) + .WillOnce(Invoke([&](auto in __unused, auto &out __unused) { auto out0 = new mockfs_buf_out; out0->header.error = -EINTR; out0->header.unique = write_unique; @@ -310,3 +315,14 @@ TEST_F(Interrupt, DISABLED_too_soon) /* Deliberately leak fd. close(2) will be tested in release.cc */ } + + +// TODO: add a test that uses siginterrupt and an interruptible signal +// TODO: add a test that verifies a process can be cleanly killed even if a +// FUSE_WRITE command never returns. +// TODO: write in-progress tests for read and other operations +// TODO: add a test where write returns EWOULDBLOCK +// TODO: test that if a fatal signal is received, fticket_wait_answer will +// return without waiting for a response to the interrupted operation. +// TODO: test that operations that haven't been received by the server can be +// interrupted without generating a FUSE_INTERRUPT. diff --git a/tests/sys/fs/fusefs/mockfs.cc b/tests/sys/fs/fusefs/mockfs.cc index ecac3e1d5c78..c5c6e30366f2 100644 --- a/tests/sys/fs/fusefs/mockfs.cc +++ b/tests/sys/fs/fusefs/mockfs.cc @@ -184,6 +184,9 @@ void debug_fuseop(const mockfs_buf_in *in) case FUSE_FSYNCDIR: printf(" flags=%#x", in->body.fsyncdir.fsync_flags); break; + case FUSE_INTERRUPT: + printf(" unique=%lu", in->body.interrupt.unique); + break; case FUSE_LOOKUP: printf(" %s", in->body.lookup); break;