transfused: add separate FUSE notify channel to avoid deadlock

This adds a new notify channel to the transfuse protocol. It is not
optional yet but could be made to be optional. A notify protocol and
notify channel are required because writing FUSE response messages to
the FUSE device has different semantics from writing asynchronous
notifications. In particular, response message writes only error on
malformed messages, do not take locks, and do not block. In contrast,
asynchronous notifications can error under normal conditions
(e.g. invalidating a cache entry that doesn't exist), can take locks
during the write call, and can block.

If responses and notifications occur in the same thread, the file system
can become deadlocked when syscalls lock resources waiting for a
response and a notification is written that blocks attempting to acquire
those same locks. The response that would unlock the contended lock
could be queued behind the notification write but the notification write
can't unblock until the response is written in the future. This patch
enables file systems to avoid that fate by offering a secondary channel
on which to send notifications.

Signed-off-by: David Sheets <dsheets@docker.com>
This commit is contained in:
David Sheets 2016-07-21 21:20:08 +01:00
parent 19f04c7c6d
commit 821b329f7b
2 changed files with 84 additions and 0 deletions

View File

@ -226,6 +226,43 @@ void copy_into_fuse(copy_thread_state * copy_state) {
free(buf);
}
void copy_notify_fuse(copy_thread_state * copy_state) {
int from = copy_state->from;
int to = copy_state->to;
char * descr = copy_state->connection->mount_point;
int read_count, write_count;
uint32_t zero = 0, err;
void * buf;
parameters * params = copy_state->connection->params;
buf = must_malloc(descr, IN_BUFSZ);
while (1) {
read_count = read_message(descr, params, from, (char *) buf, IN_BUFSZ);
write_count = write(to, buf, read_count);
if (write_count < 0) {
err = errno;
write_count = write(from, &err, 4);
if (write_count < 0) {
log_time(params, "copy notify %s write error: %s", strerror(err));
die(1, params, "", "copy notify %s reply write error: ",
descr);
}
continue;
} else {
if (write(from, &zero, 4) < 0)
die(1, params, "", "copy notify %s reply write error: ",
descr);
}
if (write_count != read_count)
die(1, params, NULL, "copy notify %s: read %d but only wrote %d",
descr, read_count, write_count);
}
free(buf);
}
void write_exactly(char * descr, int fd, void * p, size_t nbyte) {
int write_count;
char * buf = p;
@ -276,6 +313,20 @@ void * copy_clean_into_fuse_thread(void * copy_state) {
return (copy_clean_into_fuse((copy_thread_state *) copy_state));
}
void * copy_clean_notify_fuse(copy_thread_state * copy_state) {
copy_notify_fuse(copy_state);
close(copy_state->from);
free(copy_state);
return NULL;
}
void * copy_clean_notify_fuse_thread(void * copy_state) {
return (copy_clean_notify_fuse((copy_thread_state *) copy_state));
}
void * copy_clean_outof_fuse(copy_thread_state * copy_state) {
copy_outof_fuse(copy_state);
@ -436,6 +487,37 @@ void start_writer(connection_t * connection, int fuse) {
connection->mount_point);
}
void negotiate_notify_channel(char * mount_point, int notify_sock) {
int len = strlen(mount_point);
char hdr[6];
*((uint32_t *) hdr) = 6 + len;
*((uint16_t *) (hdr + 4)) = TRANSFUSE_NOTIFY_CHANNEL;
write_exactly("negotiate_notify_channel hdr", notify_sock, hdr, 6);
write_exactly("negotiate_notify_channel mnt", notify_sock, mount_point, len);
}
void start_notify(connection_t * connection, int fuse) {
pthread_t child;
copy_thread_state * copy_state;
copy_state = (copy_thread_state *) must_malloc("start_notify copy_state",
sizeof(copy_thread_state));
copy_state->connection = connection;
copy_state->from = connect_socket(connection->params->server);
copy_state->to = fuse;
negotiate_notify_channel(connection->mount_point, copy_state->from);
if ((errno = pthread_create(&child, &detached,
copy_clean_notify_fuse_thread, copy_state)))
die(1, connection->params, "",
"Couldn't create notify copy thread for mount %s: ",
connection->mount_point);
}
char * alloc_dirname(connection_t * conn, char * path) {
size_t len = strlen(path) + 1;
char * input = must_malloc("alloc_dirname input", len);
@ -545,6 +627,7 @@ void * mount_connection(connection_t * conn) {
start_reader(conn, fuse);
start_writer(conn, fuse);
start_notify(conn, fuse);
lock("copy lock", &copy_lock);
while (!should_halt)

View File

@ -58,3 +58,4 @@ void write_exactly(char * descr, int fd, void * buf, size_t nbyte);
#define TRANSFUSE_LOG_NOTICE 2
#define PONG_REPLY 3
#define MOUNT_SUITABILITY_REPLY 4
#define TRANSFUSE_NOTIFY_CHANNEL 5