diff --git a/alpine/packages/transfused/transfused.c b/alpine/packages/transfused/transfused.c index 7853e441d..bf2e947ac 100644 --- a/alpine/packages/transfused/transfused.c +++ b/alpine/packages/transfused/transfused.c @@ -24,6 +24,7 @@ #include "transfused_log.h" #include "transfused_vsock.h" +#include "transfused_perfstat.h" char *default_fusermount = DEFAULT_FUSERMOUNT; char *default_socket = DEFAULT_SOCKET; @@ -201,7 +202,7 @@ void read_exactly(char *descr, int fd, void *p, size_t nbyte) } } -int read_message(char *descr, parameters *params, int fd, +int read_message(char *descr, parameters_t *params, int fd, char *buf, size_t max_read) { size_t nbyte = sizeof(uint32_t); @@ -233,7 +234,9 @@ void copy_into_fuse(copy_thread_state *copy_state) char *descr = copy_state->connection->mount_point; int read_count, write_count; void *buf; - parameters *params = copy_state->connection->params; + connection_t *conn = copy_state->connection; + parameters_t *params = conn->params; + uint64_t unique; buf = must_malloc(descr, IN_BUFSZ); @@ -250,6 +253,12 @@ void copy_into_fuse(copy_thread_state *copy_state) die(1, params, NULL, "copy %s: read %d but only wrote %d", descr, read_count, write_count); + + unique = *((uint64_t *)buf + 1); + if (perfstat_close(unique, conn)) + log_time(params, + "dropping perfstat for edge message %lld", + unique); } free(buf); @@ -263,7 +272,7 @@ void copy_notify_fuse(copy_thread_state *copy_state) int read_count, write_count; uint32_t zero = 0, err; void *buf; - parameters *params = copy_state->connection->params; + parameters_t *params = copy_state->connection->params; buf = must_malloc(descr, IN_BUFSZ); @@ -323,7 +332,9 @@ void copy_outof_fuse(copy_thread_state *copy_state) char *descr = copy_state->connection->mount_point; int read_count; void *buf; - parameters *params = copy_state->connection->params; + connection_t *conn = copy_state->connection; + parameters_t *params = conn->params; + uint64_t unique; buf = must_malloc(descr, OUT_BUFSZ); @@ -334,6 +345,12 @@ void copy_outof_fuse(copy_thread_state *copy_state) die(1, params, "", "copy %s: error reading: ", descr); write_exactly(descr, to, (char *)buf, read_count); + + unique = *((uint64_t *)buf + 1); + if (perfstat_open(unique, conn)) + die(1, params, NULL, + "copy %s: could not open perfstat for %d", + descr, unique); } free(buf); @@ -387,7 +404,7 @@ void *copy_clean_outof_fuse_thread(void *copy_state) return copy_clean_outof_fuse((copy_thread_state *) copy_state); } -int recv_fd(parameters *params, int sock) +int recv_fd(parameters_t *params, int sock) { int ret; int fd = -1; @@ -625,7 +642,7 @@ void mkdir_p(connection_t *conn, char *path) } } -int is_next_child_ok(parameters *params, char *path, DIR *dir) +int is_next_child_ok(parameters_t *params, char *path, DIR *dir) { struct dirent *child; @@ -641,7 +658,7 @@ int is_next_child_ok(parameters *params, char *path, DIR *dir) return 1; } -int is_path_mountable(parameters *params, int allow_empty, char *path) +int is_path_mountable(parameters_t *params, int allow_empty, char *path) { DIR *dir; @@ -737,9 +754,9 @@ void *mount_connection(connection_t *conn) return NULL; } -void *mount_thread(void *connection) +void *mount_thread(void *conn) { - return mount_connection((connection_t *) connection); + return mount_connection((connection_t *) conn); } void write_pid(connection_t *connection) @@ -758,7 +775,7 @@ void write_pid(connection_t *connection) free(pid_s); } -void pong(parameters *params) +void pong(parameters_t *params) { char pong_msg[6] = {'\6', '\0', '\0', '\0', PONG_REPLY, '\0'}; @@ -862,7 +879,7 @@ void *event_thread(void *connection_ptr) return NULL; } -void write_pidfile(parameters *params) +void write_pidfile(parameters_t *params) { int fd; pid_t pid = getpid(); @@ -894,7 +911,7 @@ void write_pidfile(parameters *params) } /* TODO: the message parsing here is rickety, do it properly */ -void *determine_mount_suitability(parameters *params, int allow_empty, +void *determine_mount_suitability(parameters_t *params, int allow_empty, char *req, int len) { void *buf = (void *)req; @@ -926,9 +943,41 @@ void *determine_mount_suitability(parameters *params, int allow_empty, return (void *)reply; } +void *error_reply(uint16_t id, const char *fmt, ...) +{ + char *reply; + char *message; + size_t mlen; + va_list args; + + va_start(args, fmt); + vasprintf(&message, fmt, args); + va_end(args); + + mlen = strlen(message); + reply = (char *)must_malloc("error_reply", 8 + mlen); + *((uint32_t *)reply) = 8 + mlen; + *((uint16_t *) (reply + 4)) = ERROR_REPLY; + *((uint16_t *) (reply + 6)) = id; + memcpy(reply + 8, message, mlen); + free(message); + + return (void *)reply; +} + +connection_t *find_connection(connection_t *conn, char *name, size_t len) +{ + while (conn) { + if (strncmp(name, conn->mount_point, len) == 0) + return conn; + conn = conn->next; + } + return NULL; +} + void *init_thread(void *params_ptr) { - parameters *params = params_ptr; + parameters_t *params = params_ptr; int read_count, len; char init_msg[6] = {'\6', '\0', '\0', '\0', '\0', '\0'}; void *buf, *response; @@ -951,7 +1000,7 @@ void *init_thread(void *params_ptr) while (1) { read_count = read_message("control", params, params->ctl_sock, - buf, CTL_BUFSZ); + buf, CTL_BUFSZ - 1); msg_type = *((uint16_t *)buf + 2); switch (msg_type) { case MOUNT_SUITABILITY_REQUEST: @@ -974,6 +1023,26 @@ void *init_thread(void *params_ptr) free(response); break; + case START_PERFSTAT_REQUEST: + ((char *)buf)[read_count] = '\0'; + response = start_perfstat(params, (char *)buf + 6, + read_count - 6); + len = *((size_t *) response); + write_exactly("init thread: start perfstat response", + params->ctl_sock, response, len); + free(response); + break; + + case STOP_PERFSTAT_REQUEST: + ((char *)buf)[read_count] = '\0'; + response = stop_perfstat(params, (char *)buf + 6, + read_count - 6); + len = *((size_t *) response); + write_exactly("init thread: stop perfstat response", + params->ctl_sock, response, len); + free(response); + break; + default: die(1, params, NULL, "init thread: unknown message %d", msg_type); @@ -998,7 +1067,7 @@ void setup_debug(void) die(1, NULL, "Couldn't set siginterrupt for SIGHUP", ""); } -void parse_parameters(int argc, char *argv[], parameters *params) +void parse_parameters(int argc, char *argv[], parameters_t *params) { int c; int errflg = 0; @@ -1011,6 +1080,7 @@ void parse_parameters(int argc, char *argv[], parameters *params) params->data_sock = 0; params->ctl_sock = 0; lock_init("ctl_lock", ¶ms->ctl_lock, NULL); + params->connections = NULL; while ((c = getopt(argc, argv, ":p:d:s:f:l:")) != -1) { switch (c) { @@ -1085,7 +1155,7 @@ void parse_parameters(int argc, char *argv[], parameters *params) } } -void serve(parameters *params) +void serve(parameters_t *params) { char subproto_selector; pthread_t child; @@ -1103,7 +1173,12 @@ void serve(parameters *params) conn = (connection_t *)must_malloc("connection state", sizeof(connection_t)); conn->params = params; + conn->next = params->connections; + params->connections = conn; conn->mount_point = ""; + conn->perfstat = 0; + conn->perfstats = NULL; + lock_init("perfstat lock",&conn->perfstat_lock,NULL); conn->sock = accept(params->data_sock, &conn->sa_client, &conn->socklen_client); @@ -1140,7 +1215,7 @@ void serve(parameters *params) int main(int argc, char *argv[]) { - parameters params; + parameters_t params; struct rlimit core_limit; core_limit.rlim_cur = RLIM_INFINITY; diff --git a/alpine/packages/transfused/transfused.h b/alpine/packages/transfused/transfused.h index ba979bf68..b1e66f96e 100644 --- a/alpine/packages/transfused/transfused.h +++ b/alpine/packages/transfused/transfused.h @@ -3,11 +3,14 @@ #include #include +#include "transfused_perfstat.h" #define IN_BUFSZ ((1 << 20) + 16) #define OUT_BUFSZ ((1 << 20) + 64) #define EVENT_BUFSZ 4096 #define CTL_BUFSZ 65536 +#define PERFSTATS_PER_SEGMENT 2730 /* (64k - 16) / 24 */ +#define MAX_PERFSTAT_CHECK 64 #define DEFAULT_FUSERMOUNT "/bin/fusermount" #define DEFAULT_SOCKET "v:_:1525" @@ -25,14 +28,21 @@ #define MOUNT_SUITABILITY_REQUEST 1 #define EXPORT_SUITABILITY_REQUEST 2 +#define START_PERFSTAT_REQUEST 3 +#define STOP_PERFSTAT_REQUEST 4 #define TRANSFUSE_LOG_ERROR 1 #define TRANSFUSE_LOG_NOTICE 2 #define PONG_REPLY 3 #define MOUNT_SUITABILITY_REPLY 4 #define TRANSFUSE_NOTIFY_CHANNEL 5 +#define PERFSTAT_REPLY 6 +#define ERROR_REPLY 7 -typedef struct { +struct parameters; +struct connection; + +struct parameters { char *server; char *socket; char *fusermount; @@ -42,16 +52,25 @@ typedef struct { int ctl_sock; int data_sock; pthread_mutex_t ctl_lock; -} parameters; + struct connection *connections; +}; -typedef struct { - parameters *params; +typedef struct parameters parameters_t; + +struct connection { + struct connection *next; + parameters_t *params; char *type_descr; char *mount_point; struct sockaddr sa_client; socklen_t socklen_client; int sock; -} connection_t; + int perfstat; + perfstats_t *perfstats; + pthread_mutex_t perfstat_lock; +}; + +typedef struct connection connection_t; pthread_attr_t detached; @@ -60,4 +79,8 @@ void lock(char *const descr, pthread_mutex_t *mutex); void unlock(char *const descr, pthread_mutex_t *mutex); void write_exactly(char *descr, int fd, void *buf, size_t nbyte); +void *error_reply(uint16_t id, const char *fmt, ...); + +connection_t *find_connection(connection_t *conn, char *name, size_t len); + #endif /* _TRANSFUSED_H_ */ diff --git a/alpine/packages/transfused/transfused_log.c b/alpine/packages/transfused/transfused_log.c index 287cfbaf9..3c464ae06 100644 --- a/alpine/packages/transfused/transfused_log.c +++ b/alpine/packages/transfused/transfused_log.c @@ -80,7 +80,7 @@ void log_sock_locked(int fd, uint16_t msg_type, const char *fmt, ...) va_end(args); } -void die(int exit_code, parameters *params, const char *parg, +void die(int exit_code, parameters_t *params, const char *parg, const char *fmt, ...) { va_list argp, targs; @@ -121,7 +121,7 @@ void die(int exit_code, parameters *params, const char *parg, unlock("die ctl_lock", ¶ms->ctl_lock); } -void vlog_locked(parameters *params, uint16_t msg_type, +void vlog_locked(parameters_t *params, uint16_t msg_type, const char *fmt, va_list args) { int rc; @@ -149,7 +149,7 @@ void vlog_locked(parameters *params, uint16_t msg_type, } } -void vlog_time_locked(parameters *params, uint16_t msg_type, +void vlog_time_locked(parameters_t *params, uint16_t msg_type, const char *fmt, va_list args) { int fd = params->logfile_fd; @@ -159,7 +159,7 @@ void vlog_time_locked(parameters *params, uint16_t msg_type, vlog_locked(params, msg_type, fmt, args); } -void log_time_locked(parameters *params, uint16_t msg_type, +void log_time_locked(parameters_t *params, uint16_t msg_type, const char *fmt, ...) { va_list args; @@ -169,7 +169,7 @@ void log_time_locked(parameters *params, uint16_t msg_type, va_end(args); } -void log_time(parameters *params, const char *fmt, ...) +void log_time(parameters_t *params, const char *fmt, ...) { va_list args; @@ -180,7 +180,7 @@ void log_time(parameters *params, const char *fmt, ...) va_end(args); } -void log_notice_time(parameters *params, const char *fmt, ...) +void log_notice_time(parameters_t *params, const char *fmt, ...) { va_list args; @@ -193,7 +193,7 @@ void log_notice_time(parameters *params, const char *fmt, ...) } typedef struct { - parameters *params; + parameters_t *params; char *msg; } log_thread_state; @@ -234,7 +234,7 @@ void thread_log_time(connection_t *conn, const char *fmt, ...) conn->type_descr, conn->mount_point); } -void log_continue_locked(parameters *params, const char *fmt, ...) +void log_continue_locked(parameters_t *params, const char *fmt, ...) { va_list args; @@ -243,7 +243,7 @@ void log_continue_locked(parameters *params, const char *fmt, ...) va_end(args); } -void log_continue(parameters *params, const char *fmt, ...) +void log_continue(parameters_t *params, const char *fmt, ...) { va_list args; diff --git a/alpine/packages/transfused/transfused_log.h b/alpine/packages/transfused/transfused_log.h index e2176a1e4..f500d2434 100644 --- a/alpine/packages/transfused/transfused_log.h +++ b/alpine/packages/transfused/transfused_log.h @@ -6,21 +6,21 @@ #include "transfused.h" -void die(int exit_code, parameters *params, const char *perror_arg, +void die(int exit_code, parameters_t *params, const char *perror_arg, const char *fmt, ...); -void vlog_locked(parameters *params, uint16_t msg_type, +void vlog_locked(parameters_t *params, uint16_t msg_type, const char *fmt, va_list args); -void vlog_time_locked(parameters *params, uint16_t msg_type, +void vlog_time_locked(parameters_t *params, uint16_t msg_type, const char *fmt, va_list args); -void log_time_locked(parameters *params, uint16_t msg_type, +void log_time_locked(parameters_t *params, uint16_t msg_type, const char *fmt, ...); -void log_time(parameters *params, const char *fmt, ...); -void log_notice_time(parameters *params, const char *fmt, ...); +void log_time(parameters_t *params, const char *fmt, ...); +void log_notice_time(parameters_t *params, const char *fmt, ...); void thread_log_time(connection_t *conn, const char *fmt, ...); -void log_continue_locked(parameters *params, const char *fmt, ...); -void log_continue(parameters *params, const char *fmt, ...); +void log_continue_locked(parameters_t *params, const char *fmt, ...); +void log_continue(parameters_t *params, const char *fmt, ...); #endif /* _TRANSFUSED_LOG_H_ */ diff --git a/alpine/packages/transfused/transfused_perfstat.c b/alpine/packages/transfused/transfused_perfstat.c new file mode 100644 index 000000000..972331e0e --- /dev/null +++ b/alpine/packages/transfused/transfused_perfstat.c @@ -0,0 +1,172 @@ +#include +#include +#include "transfused.h" +#include "transfused_log.h" + +uint64_t now(parameters_t *params) +{ + uint64_t ns_in_s = 1000000000; + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now)) + die(1, params, "now", ""); + return (uint64_t)now.tv_sec * ns_in_s + (uint64_t)now.tv_nsec; +} + +size_t size_of_perfstats(perfstats_t *p) +{ + size_t len = 0; + + while (p) { + len += sizeof(perfstat_t) * p->len + sizeof(perfstats_t); + p = p->next; + } + + return len; +} + +int perfstat_open(uint64_t unique, connection_t *conn) +{ + size_t sz; + perfstats_t *old_perfstats = conn->perfstats; + perfstats_t *stats = conn->perfstats; + perfstat_t stat; + + if (!conn->perfstat) + return 0; + + lock("perfstat lock: perfstat_open", &conn->perfstat_lock); + if (conn->perfstat) { + if (!stats || stats->len >= PERFSTATS_PER_SEGMENT) { + sz = sizeof(perfstats_t); + sz += PERFSTATS_PER_SEGMENT * sizeof(perfstat_t); + stats = must_malloc("perfstats",sz); + stats->next = old_perfstats; + stats->len = 0; + conn->perfstats = stats; + } + + stat = (perfstat_t) { + .id = unique, + .start = now(conn->params), + .stop = 0 + }; + stats->perfstat[stats->len] = stat; + stats->len++; + } + unlock("perfstat unlock: perfstat_close", &conn->perfstat_lock); + + return 0; +} + +int perfstat_close_locked(uint64_t unique, parameters_t *params, + perfstats_t *perfstats, int to_check) +{ + int i; + perfstat_t *stat; + + if (!perfstats) + return 1; + + i = perfstats->len - 1; + while (i >= 0 && to_check > 0) { + stat = &perfstats->perfstat[i]; + if (stat->id == unique) { + stat->stop = now(params); + return 0; + } else { + i--; + to_check--; + } + } + + if (to_check && !perfstat_close_locked(unique, params, perfstats->next, + to_check)) + return 0; + return 1; +} + +int perfstat_close(uint64_t unique, connection_t *conn) +{ + int rc = 0; + pthread_mutex_t *perfstat_lock = &conn->perfstat_lock; + + if (!conn->perfstat) + return 0; + + lock("perfstat lock: perfstat_close", perfstat_lock); + if (conn->perfstat) + rc = perfstat_close_locked(unique, conn->params, + conn->perfstats, + MAX_PERFSTAT_CHECK); + unlock("perfstat unlock: perfstat_close", perfstat_lock); + + return rc; +} + +void *start_perfstat(parameters_t *params, char *req, size_t len) +{ + char *reply; + uint16_t id = *((uint16_t *) req); + char *mount = (char *) req + 2; + connection_t *conn = find_connection(params->connections, mount, + len - 2); + if (conn == NULL) + return (void *)error_reply(id, "Mount %s unknown", mount); + + lock("perfstat lock: start_perfstat", &conn->perfstat_lock); + conn->perfstat = 1; + unlock("perfstat lock: start_perfstat", &conn->perfstat_lock); + + reply = (char *)must_malloc("start_perfstat", 8); + *((uint32_t *)reply) = 16; + *((uint16_t *) (reply + 4)) = PERFSTAT_REPLY; + *((uint16_t *) (reply + 6)) = id; + *((uint64_t *) (reply + 8)) = now(params); + + return (void *)reply; +} + +void copy_and_free_perfstats(perfstats_t *p, char *buf) +{ + size_t len; + perfstats_t *p_next; + + while (p) { + p_next = p->next; + len = p->len * sizeof(perfstat_t); + memcpy(buf, p->perfstat, len); + buf += len; + free(p); + p = p_next; + } +} + +void *stop_perfstat(parameters_t *params, char *req, size_t len) +{ + char *reply; + uint16_t id = *((uint16_t *) req); + char *mount = (char *) req + 2; + connection_t *conn = find_connection(params->connections, mount, + len - 2); + if (conn == NULL) + return (void *)error_reply(id, "Mount %s unknown", mount); + + size_t out_len = 16; + + lock("perfstat lock: stop_perfstat", &conn->perfstat_lock); + conn->perfstat = 0; + + out_len += size_of_perfstats(conn->perfstats); + + reply = (char *)must_malloc("stop_perfstat", out_len); + *((uint32_t *)reply) = out_len; + *((uint16_t *) (reply + 4)) = PERFSTAT_REPLY; + *((uint16_t *) (reply + 6)) = id; + *((uint64_t *) (reply + 8)) = now(params); + + copy_and_free_perfstats(conn->perfstats, reply + 16); + + unlock("perfstat lock: stop_perfstat", &conn->perfstat_lock); + + return (void *)reply; +} diff --git a/alpine/packages/transfused/transfused_perfstat.h b/alpine/packages/transfused/transfused_perfstat.h new file mode 100644 index 000000000..33bb9047f --- /dev/null +++ b/alpine/packages/transfused/transfused_perfstat.h @@ -0,0 +1,29 @@ +#ifndef _TRANSFUSED_PERFSTAT_H_ +#define _TRANSFUSED_PERFSTAT_H_ + +#include + +struct connection; +struct parameters; + +typedef struct { + uint64_t id; + uint64_t start; + uint64_t stop; +} perfstat_t; + +struct perfstats { + uint32_t len; + uint32_t nothing; + struct perfstats *next; + perfstat_t perfstat[0]; +}; + +typedef struct perfstats perfstats_t; + +int perfstat_open(uint64_t unique, struct connection *conn); +int perfstat_close(uint64_t unique, struct connection *conn); +void *start_perfstat(struct parameters *params, char *req, size_t len); +void *stop_perfstat(struct parameters *params, char *req, size_t len); + +#endif /* _TRANSFUSED_PERFSTAT_H_ */