fusefs: improvements to interruptibility

* If a process receives a fatal signal while blocked on a fuse operation,
  return ASAP without waiting for the operation to complete.  But still send
  the FUSE_INTERRUPT op to the daemon.
* Plug memory leaks from r346339

Interruptibility is now fully functional, but it could be better:
* Operations that haven't been sent to the server yet should be aborted
  without sending FUSE_INTERRUPT.
* It would be great if write operations could be made restartable.
  That would require delaying uiomove until the last possible moment, which
  would be sometime during fuse_device_read.
* It would be nice if we didn't have to guess which EAGAIN responses were
  for FUSE_INTERRUPT operations.

PR:		236530
Sponsored by:	The FreeBSD Foundation
This commit is contained in:
Alan Somers 2019-04-18 19:16:34 +00:00
parent 723c776829
commit a154214620
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/projects/fuse2/; revision=346357
8 changed files with 254 additions and 69 deletions

View File

@ -360,7 +360,7 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
struct fuse_out_header ohead;
int err = 0;
struct fuse_data *data;
struct fuse_ticket *tick, *x_tick;
struct fuse_ticket *tick, *itick, *x_tick;
int found = 0;
err = devfs_get_cdevpriv((void **)&data);
@ -401,6 +401,20 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
break;
}
}
if (found && tick->irq_unique > 0) {
/*
* Discard the FUSE_INTERRUPT ticket that tried to interrupt
* this operation
*/
TAILQ_FOREACH_SAFE(itick, &data->aw_head, tk_aw_link,
x_tick) {
if (itick->tk_unique == tick->irq_unique) {
fuse_aw_remove(itick);
break;
}
}
tick->irq_unique = 0;
}
fuse_lck_mtx_unlock(data->aw_mtx);
if (found) {
@ -433,7 +447,20 @@ fuse_device_write(struct cdev *dev, struct uio *uio, int ioflag)
/* no callback at all! */
SDT_PROBE1(fuse, , device, fuse_device_write_missing_ticket,
ohead.unique);
err = EINVAL;
if (ohead.error == EAGAIN) {
/*
* This was probably a response to a FUSE_INTERRUPT
* operation whose original operation is already
* complete. We can't store FUSE_INTERRUPT tickets
* indefinitely because their responses are optional.
* So we delete them when the original operation
* completes. And sadly the fuse_header_out doesn't
* identify the opcode, so we have to guess.
*/
err = 0;
} else {
err = EINVAL;
}
}
return (err);

View File

@ -138,6 +138,32 @@ static uma_zone_t ticket_zone;
static int
fuse_interrupt_callback(struct fuse_ticket *tick, struct uio *uio)
{
struct fuse_ticket *otick, *x_tick;
struct fuse_interrupt_in *fii;
struct fuse_data *data;
data = tick->tk_data;
bool found = false;
fii = (struct fuse_interrupt_in*)((char*)tick->tk_ms_fiov.base +
sizeof(struct fuse_in_header));
fuse_lck_mtx_lock(data->aw_mtx);
TAILQ_FOREACH_SAFE(otick, &data->aw_head, tk_aw_link, x_tick) {
if (otick->tk_unique == fii->unique) {
found = true;
break;
}
}
fuse_lck_mtx_unlock(data->aw_mtx);
if (!found) {
/* Original is already complete. Just return */
return 0;
}
/* Clear the original ticket's interrupt association */
otick->irq_unique = 0;
if (tick->tk_aw_ohead.error == EAGAIN) {
/*
* There are two reasons we might get this:
@ -148,28 +174,8 @@ fuse_interrupt_callback(struct fuse_ticket *tick, struct uio *uio)
* In the first case we should re-send the INTERRUPT. In the
* second, we should ignore it.
*/
struct fuse_interrupt_in *fii;
struct fuse_data *data;
struct fuse_ticket *otick, *x_tick;
bool found = false;
data = tick->tk_data;
fii = (struct fuse_interrupt_in*)((char*)tick->tk_ms_fiov.base +
sizeof(struct fuse_in_header));
fuse_lck_mtx_lock(data->aw_mtx);
TAILQ_FOREACH_SAFE(otick, &data->aw_head, tk_aw_link, x_tick) {
if (otick->tk_unique == fii->unique) {
found = true;
break;
}
}
fuse_lck_mtx_unlock(data->aw_mtx);
if (found) {
/* Resend */
fuse_interrupt_send(otick);
} else {
/* Original is already complete; nothing to do */
}
/* Resend */
fuse_interrupt_send(otick);
return 0;
} else {
/* Illegal FUSE_INTERRUPT response */
@ -186,19 +192,24 @@ fuse_interrupt_send(struct fuse_ticket *otick)
struct fuse_data *data = otick->tk_data;
struct ucred reused_creds;
ftick_hdr = fticket_in_header(otick);
reused_creds.cr_uid = ftick_hdr->uid;
reused_creds.cr_rgid = ftick_hdr->gid;
fdisp_init(&fdi, sizeof(*fii));
fdisp_make_pid(&fdi, FUSE_INTERRUPT, data, ftick_hdr->nodeid,
ftick_hdr->pid, &reused_creds);
if (otick->irq_unique == 0) {
ftick_hdr = fticket_in_header(otick);
reused_creds.cr_uid = ftick_hdr->uid;
reused_creds.cr_rgid = ftick_hdr->gid;
fdisp_init(&fdi, sizeof(*fii));
fdisp_make_pid(&fdi, FUSE_INTERRUPT, data, ftick_hdr->nodeid,
ftick_hdr->pid, &reused_creds);
fii = fdi.indata;
fii->unique = otick->tk_unique;
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
fii = fdi.indata;
fii->unique = otick->tk_unique;
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
fuse_insert_message(fdi.tick);
fdisp_destroy(&fdi);
otick->irq_unique = fdi.tick->tk_unique;
fuse_insert_message(fdi.tick);
fdisp_destroy(&fdi);
} else {
/* This ticket has already been interrupted */
}
}
void
@ -278,6 +289,8 @@ fticket_ctor(void *mem, int size, void *arg, int flags)
if (ftick->tk_unique == 0)
ftick->tk_unique = atomic_fetchadd_long(&data->ticketer, 1);
ftick->irq_unique = 0;
refcount_init(&ftick->tk_refcount, 1);
atomic_add_acq_int(&fuse_ticket_count, 1);
@ -436,20 +449,23 @@ fticket_wait_answer(struct fuse_ticket *ftick)
fuse_interrupt_send(ftick);
fuse_lck_mtx_lock(ftick->tk_aw_mtx);
/* TODO: return, rather than retry, for fatal signals */
/*
* Block the just-delivered signal while we wait for an
* interrupt response
*/
PROC_LOCK(td->td_proc);
mtx_lock(&td->td_proc->p_sigacts->ps_mtx);
sig = cursig(td);
mtx_unlock(&td->td_proc->p_sigacts->ps_mtx);
PROC_UNLOCK(td->td_proc);
SIGADDSET(blockedset, sig);
kern_sigprocmask(curthread, SIG_BLOCK, &blockedset, NULL, 0);
goto retry;
if (!sig_isfatal(td->td_proc, sig)) {
/*
* Block the just-delivered signal while we wait for an
* interrupt response
*/
SIGADDSET(blockedset, sig);
kern_sigprocmask(curthread, SIG_BLOCK, &blockedset,
NULL, 0);
goto retry;
} else {
/* Return immediately for fatal signals */
}
} else if (err) {
SDT_PROBE2(fuse, , ipc, trace, 6,
"fticket_wait_answer: other error");

View File

@ -103,6 +103,12 @@ struct fuse_ticket {
struct fuse_data *tk_data;
int tk_flag;
u_int tk_refcount;
/*
* If this ticket's operation has been interrupted, this will hold the
* unique value of the FUSE_INTERRUPT operation. Otherwise, it will be
* 0.
*/
uint64_t irq_unique;
/* fields for initiating an upgoing message */
struct fuse_iov tk_ms_fiov;

View File

@ -36,6 +36,7 @@
extern "C" {
#include <sys/types.h>
#include <sys/extattr.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <unistd.h>
}
@ -74,7 +75,9 @@ virtual void SetUp() {
TEST_F(AllowOther, allowed)
{
fork(true, [&] {
int status;
fork(true, &status, [&] {
uint64_t ino = 42;
expect_lookup(RELPATH, ino, S_IFREG | 0644, 0, 1);
@ -92,6 +95,7 @@ TEST_F(AllowOther, allowed)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
}
/*
@ -104,12 +108,12 @@ TEST_F(AllowOther, privilege_escalation)
{
const static char FULLPATH[] = "mountpoint/some_file.txt";
const static char RELPATH[] = "some_file.txt";
int fd1;
int fd1, status;
const static uint64_t ino = 42;
const static uint64_t fh = 100;
/* Fork a child to open the file with different credentials */
fork(true, [&] {
fork(true, &status, [&] {
expect_lookup(RELPATH, ino, S_IFREG | 0600, 0, 2);
EXPECT_CALL(*m_mock, process(
@ -156,12 +160,15 @@ TEST_F(AllowOther, privilege_escalation)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
/* Deliberately leak fd1. close(2) will be tested in release.cc */
}
TEST_F(NoAllowOther, disallowed)
{
fork(true, [] {
int status;
fork(true, &status, [] {
}, []() {
int fd;
@ -177,6 +184,7 @@ TEST_F(NoAllowOther, disallowed)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
}
/*
@ -191,7 +199,7 @@ TEST_F(NoAllowOther, disallowed_beneath_root)
const static char RELPATH2[] = "other_dir";
const static uint64_t ino = 42;
const static uint64_t ino2 = 43;
int dfd;
int dfd, status;
expect_lookup(RELPATH, ino, S_IFDIR | 0755, 0, 1);
EXPECT_LOOKUP(ino, RELPATH2)
@ -206,7 +214,7 @@ TEST_F(NoAllowOther, disallowed_beneath_root)
dfd = open(FULLPATH, O_DIRECTORY);
ASSERT_LE(0, dfd) << strerror(errno);
fork(true, [] {
fork(true, &status, [] {
}, [&]() {
int fd;
@ -222,6 +230,7 @@ TEST_F(NoAllowOther, disallowed_beneath_root)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
}
/*
@ -230,9 +239,9 @@ TEST_F(NoAllowOther, disallowed_beneath_root)
*/
TEST_F(NoAllowOther, setextattr)
{
int ino = 42;
int ino = 42, status;
fork(true, [&] {
fork(true, &status, [&] {
EXPECT_LOOKUP(1, RELPATH)
.WillOnce(Invoke(
ReturnImmediate([=](auto in __unused, auto out) {
@ -268,4 +277,5 @@ TEST_F(NoAllowOther, setextattr)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
}

View File

@ -29,6 +29,7 @@
*/
extern "C" {
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
@ -68,7 +69,24 @@ Interrupt(): m_child(NULL) {};
void expect_lookup(const char *relpath, uint64_t ino)
{
FuseTest::expect_lookup(relpath, ino, S_IFREG | 0644, 0, 1);
FuseTest::expect_lookup(relpath, ino, S_IFREG | 0644, 1000, 1);
}
/*
* Expect a FUSE_READ but don't reply. Instead, just record the unique value
* to the provided pointer
*/
void expect_read(uint64_t ino, uint64_t *read_unique)
{
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
return (in->header.opcode == FUSE_READ &&
in->header.nodeid == ino);
}, Eq(true)),
_)
).WillOnce(Invoke([=](auto in, auto &out __unused) {
*read_unique = in->header.unique;
}));
}
/*
@ -90,7 +108,7 @@ void expect_write(uint64_t ino, uint64_t *write_unique)
void setup_interruptor(pthread_t self)
{
ASSERT_EQ(0, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
ASSERT_NE(SIG_ERR, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)self))
<< strerror(errno);
}
@ -161,6 +179,74 @@ TEST_F(Interrupt, already_complete)
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
/*
* Upon receipt of a fatal signal, fusefs should return ASAP after sending
* FUSE_INTERRUPT.
*/
TEST_F(Interrupt, fatal_signal)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char *CONTENTS = "abcdefgh";
const char RELPATH[] = "some_file.txt";
ssize_t bufsize = strlen(CONTENTS);
uint64_t ino = 42;
int status;
pthread_t self;
uint64_t write_unique;
self = pthread_self();
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_write(ino, &write_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
).WillOnce(Invoke([&](auto in __unused, auto &out __unused) {
/* Don't respond. The process should exit anyway */
}));
expect_flush(ino, 1, ReturnErrno(0));
expect_release(ino, FH);
fork(false, &status, [&] {
}, [&]() {
struct sigaction sa;
int fd, r;
pthread_t killer_th;
pthread_t self;
fd = open(FULLPATH, O_WRONLY);
if (fd < 0) {
perror("open");
return 1;
}
/* SIGUSR2 terminates the process by default */
bzero(&sa, sizeof(sa));
sa.sa_handler = SIG_DFL;
r = sigaction(SIGUSR2, &sa, NULL);
if (r != 0) {
perror("sigaction");
return 1;
}
self = pthread_self();
r = pthread_create(&killer_th, NULL, killer, (void*)self);
if (r != 0) {
perror("pthread_create");
return 1;
}
write(fd, CONTENTS, bufsize);
return 1;
});
ASSERT_EQ(SIGUSR2, WTERMSIG(status));
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
/*
* A FUSE filesystem is legally allowed to ignore INTERRUPT operations, and
* complete the original operation whenever it damn well pleases.
@ -253,6 +339,47 @@ TEST_F(Interrupt, in_progress)
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
/* Reads should also be interruptible */
TEST_F(Interrupt, in_progress_read)
{
const char FULLPATH[] = "mountpoint/some_file.txt";
const char RELPATH[] = "some_file.txt";
const size_t bufsize = 80;
char buf[bufsize];
uint64_t ino = 42;
int fd;
pthread_t self;
uint64_t read_unique;
self = pthread_self();
expect_lookup(RELPATH, ino);
expect_open(ino, 0, 1);
expect_read(ino, &read_unique);
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == read_unique);
}, Eq(true)),
_)
).WillOnce(Invoke([&](auto in __unused, auto &out) {
auto out0 = new mockfs_buf_out;
out0->header.error = -EINTR;
out0->header.unique = read_unique;
out0->header.len = sizeof(out0->header);
out.push_back(out0);
}));
fd = open(FULLPATH, O_RDONLY);
ASSERT_LE(0, fd) << strerror(errno);
setup_interruptor(self);
ASSERT_EQ(-1, read(fd, buf, bufsize));
EXPECT_EQ(EINTR, errno);
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
/*
* If the FUSE filesystem receives the FUSE_INTERRUPT operation before
* processing the original, then it should wait for "some timeout" for the
@ -320,9 +447,7 @@ TEST_F(Interrupt, too_soon)
// TODO: add a test that uses siginterrupt and an interruptible signal
// TODO: add a test that verifies a process can be cleanly killed even if a
// FUSE_WRITE command never returns.
// TODO: write in-progress tests for read and other operations
// TODO: write in-progress tests for other operations
// TODO: add a test where write returns EWOULDBLOCK
// TODO: test that if a fatal signal is received, fticket_wait_answer will
// return without waiting for a response to the interrupted operation.
// TODO: test that operations that haven't been received by the server can be
// interrupted without generating a FUSE_INTERRUPT.

View File

@ -29,6 +29,7 @@
*/
extern "C" {
#include <sys/wait.h>
#include <fcntl.h>
}
@ -166,12 +167,12 @@ TEST_F(Open, multiple_creds)
{
const static char FULLPATH[] = "mountpoint/some_file.txt";
const static char RELPATH[] = "some_file.txt";
int fd1;
int fd1, status;
const static uint64_t ino = 42;
const static uint64_t fh0 = 100, fh1 = 200;
/* Fork a child to open the file with different credentials */
fork(false, [&] {
fork(false, &status, [&] {
expect_lookup(RELPATH, ino, S_IFREG | 0644, 0, 2);
EXPECT_CALL(*m_mock, process(
@ -218,6 +219,7 @@ TEST_F(Open, multiple_creds)
return 0;
}
);
ASSERT_EQ(0, WEXITSTATUS(status));
close(fd1);
}

View File

@ -336,7 +336,8 @@ get_unprivileged_uid(uid_t *uid)
}
void
FuseTest::fork(bool drop_privs, std::function<void()> parent_func,
FuseTest::fork(bool drop_privs, int *child_status,
std::function<void()> parent_func,
std::function<int()> child_func)
{
sem_t *sem;
@ -376,8 +377,6 @@ out:
sem_destroy(sem);
_exit(err);
} else if (child > 0) {
int child_status;
/*
* In parent. Cleanup must happen here, because it's still
* privileged.
@ -388,12 +387,12 @@ out:
/* Signal the child process to go */
ASSERT_EQ(0, sem_post(sem)) << strerror(errno);
ASSERT_LE(0, wait(&child_status)) << strerror(errno);
ASSERT_EQ(0, WEXITSTATUS(child_status));
ASSERT_LE(0, wait(child_status)) << strerror(errno);
} else {
FAIL() << strerror(errno);
}
munmap(sem, sizeof(*sem));
return;
}
static void usage(char* progname) {

View File

@ -106,7 +106,7 @@ class FuseTest : public ::testing::Test {
uid_t uid = 0);
/*
* Create an expectation that FUSE_GETATTR will be called for the given
* Create an expectation that FUSE_OPEN will be called for the given
* inode exactly times times. It will return with open_flags flags and
* file handle FH.
*/
@ -162,11 +162,11 @@ class FuseTest : public ::testing::Test {
*
* # Returns
*
* fusetest_fork will FAIL the test if child_func returns nonzero.
* It may SKIP the test, which the caller should detect with the
* IsSkipped() method.
* fusetest_fork may SKIP the test, which the caller should detect with
* the IsSkipped() method. If not, then the child's exit status will
* be returned in status.
*/
void fork(bool drop_privs,
void fork(bool drop_privs, int *status,
std::function<void()> parent_func,
std::function<int()> child_func);
};