diff --git a/alpine/packages/transfused/transfused.c b/alpine/packages/transfused/transfused.c index 11275ab71..bf2e947ac 100644 --- a/alpine/packages/transfused/transfused.c +++ b/alpine/packages/transfused/transfused.c @@ -24,6 +24,7 @@ #include "transfused_log.h" #include "transfused_vsock.h" +#include "transfused_perfstat.h" char *default_fusermount = DEFAULT_FUSERMOUNT; char *default_socket = DEFAULT_SOCKET; @@ -235,6 +236,7 @@ void copy_into_fuse(copy_thread_state *copy_state) void *buf; connection_t *conn = copy_state->connection; parameters_t *params = conn->params; + uint64_t unique; buf = must_malloc(descr, IN_BUFSZ); @@ -251,6 +253,12 @@ void copy_into_fuse(copy_thread_state *copy_state) die(1, params, NULL, "copy %s: read %d but only wrote %d", descr, read_count, write_count); + + unique = *((uint64_t *)buf + 1); + if (perfstat_close(unique, conn)) + log_time(params, + "dropping perfstat for edge message %lld", + unique); } free(buf); @@ -326,6 +334,7 @@ void copy_outof_fuse(copy_thread_state *copy_state) void *buf; connection_t *conn = copy_state->connection; parameters_t *params = conn->params; + uint64_t unique; buf = must_malloc(descr, OUT_BUFSZ); @@ -336,6 +345,12 @@ void copy_outof_fuse(copy_thread_state *copy_state) die(1, params, "", "copy %s: error reading: ", descr); write_exactly(descr, to, (char *)buf, read_count); + + unique = *((uint64_t *)buf + 1); + if (perfstat_open(unique, conn)) + die(1, params, NULL, + "copy %s: could not open perfstat for %d", + descr, unique); } free(buf); @@ -928,6 +943,38 @@ void *determine_mount_suitability(parameters_t *params, int allow_empty, return (void *)reply; } +void *error_reply(uint16_t id, const char *fmt, ...) +{ + char *reply; + char *message; + size_t mlen; + va_list args; + + va_start(args, fmt); + vasprintf(&message, fmt, args); + va_end(args); + + mlen = strlen(message); + reply = (char *)must_malloc("error_reply", 8 + mlen); + *((uint32_t *)reply) = 8 + mlen; + *((uint16_t *) (reply + 4)) = ERROR_REPLY; + *((uint16_t *) (reply + 6)) = id; + memcpy(reply + 8, message, mlen); + free(message); + + return (void *)reply; +} + +connection_t *find_connection(connection_t *conn, char *name, size_t len) +{ + while (conn) { + if (strncmp(name, conn->mount_point, len) == 0) + return conn; + conn = conn->next; + } + return NULL; +} + void *init_thread(void *params_ptr) { parameters_t *params = params_ptr; @@ -953,7 +1000,7 @@ void *init_thread(void *params_ptr) while (1) { read_count = read_message("control", params, params->ctl_sock, - buf, CTL_BUFSZ); + buf, CTL_BUFSZ - 1); msg_type = *((uint16_t *)buf + 2); switch (msg_type) { case MOUNT_SUITABILITY_REQUEST: @@ -976,6 +1023,26 @@ void *init_thread(void *params_ptr) free(response); break; + case START_PERFSTAT_REQUEST: + ((char *)buf)[read_count] = '\0'; + response = start_perfstat(params, (char *)buf + 6, + read_count - 6); + len = *((size_t *) response); + write_exactly("init thread: start perfstat response", + params->ctl_sock, response, len); + free(response); + break; + + case STOP_PERFSTAT_REQUEST: + ((char *)buf)[read_count] = '\0'; + response = stop_perfstat(params, (char *)buf + 6, + read_count - 6); + len = *((size_t *) response); + write_exactly("init thread: stop perfstat response", + params->ctl_sock, response, len); + free(response); + break; + default: die(1, params, NULL, "init thread: unknown message %d", msg_type); @@ -1109,6 +1176,9 @@ void serve(parameters_t *params) conn->next = params->connections; params->connections = conn; conn->mount_point = ""; + conn->perfstat = 0; + conn->perfstats = NULL; + lock_init("perfstat lock",&conn->perfstat_lock,NULL); conn->sock = accept(params->data_sock, &conn->sa_client, &conn->socklen_client); diff --git a/alpine/packages/transfused/transfused.h b/alpine/packages/transfused/transfused.h index 65535afba..b1e66f96e 100644 --- a/alpine/packages/transfused/transfused.h +++ b/alpine/packages/transfused/transfused.h @@ -3,11 +3,14 @@ #include #include +#include "transfused_perfstat.h" #define IN_BUFSZ ((1 << 20) + 16) #define OUT_BUFSZ ((1 << 20) + 64) #define EVENT_BUFSZ 4096 #define CTL_BUFSZ 65536 +#define PERFSTATS_PER_SEGMENT 2730 /* (64k - 16) / 24 */ +#define MAX_PERFSTAT_CHECK 64 #define DEFAULT_FUSERMOUNT "/bin/fusermount" #define DEFAULT_SOCKET "v:_:1525" @@ -25,16 +28,21 @@ #define MOUNT_SUITABILITY_REQUEST 1 #define EXPORT_SUITABILITY_REQUEST 2 +#define START_PERFSTAT_REQUEST 3 +#define STOP_PERFSTAT_REQUEST 4 #define TRANSFUSE_LOG_ERROR 1 #define TRANSFUSE_LOG_NOTICE 2 #define PONG_REPLY 3 #define MOUNT_SUITABILITY_REPLY 4 #define TRANSFUSE_NOTIFY_CHANNEL 5 +#define PERFSTAT_REPLY 6 +#define ERROR_REPLY 7 +struct parameters; struct connection; -typedef struct { +struct parameters { char *server; char *socket; char *fusermount; @@ -45,7 +53,9 @@ typedef struct { int data_sock; pthread_mutex_t ctl_lock; struct connection *connections; -} parameters_t; +}; + +typedef struct parameters parameters_t; struct connection { struct connection *next; @@ -55,6 +65,9 @@ struct connection { struct sockaddr sa_client; socklen_t socklen_client; int sock; + int perfstat; + perfstats_t *perfstats; + pthread_mutex_t perfstat_lock; }; typedef struct connection connection_t; @@ -66,4 +79,8 @@ void lock(char *const descr, pthread_mutex_t *mutex); void unlock(char *const descr, pthread_mutex_t *mutex); void write_exactly(char *descr, int fd, void *buf, size_t nbyte); +void *error_reply(uint16_t id, const char *fmt, ...); + +connection_t *find_connection(connection_t *conn, char *name, size_t len); + #endif /* _TRANSFUSED_H_ */ diff --git a/alpine/packages/transfused/transfused_perfstat.c b/alpine/packages/transfused/transfused_perfstat.c new file mode 100644 index 000000000..972331e0e --- /dev/null +++ b/alpine/packages/transfused/transfused_perfstat.c @@ -0,0 +1,172 @@ +#include +#include +#include "transfused.h" +#include "transfused_log.h" + +uint64_t now(parameters_t *params) +{ + uint64_t ns_in_s = 1000000000; + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now)) + die(1, params, "now", ""); + return (uint64_t)now.tv_sec * ns_in_s + (uint64_t)now.tv_nsec; +} + +size_t size_of_perfstats(perfstats_t *p) +{ + size_t len = 0; + + while (p) { + len += sizeof(perfstat_t) * p->len + sizeof(perfstats_t); + p = p->next; + } + + return len; +} + +int perfstat_open(uint64_t unique, connection_t *conn) +{ + size_t sz; + perfstats_t *old_perfstats = conn->perfstats; + perfstats_t *stats = conn->perfstats; + perfstat_t stat; + + if (!conn->perfstat) + return 0; + + lock("perfstat lock: perfstat_open", &conn->perfstat_lock); + if (conn->perfstat) { + if (!stats || stats->len >= PERFSTATS_PER_SEGMENT) { + sz = sizeof(perfstats_t); + sz += PERFSTATS_PER_SEGMENT * sizeof(perfstat_t); + stats = must_malloc("perfstats",sz); + stats->next = old_perfstats; + stats->len = 0; + conn->perfstats = stats; + } + + stat = (perfstat_t) { + .id = unique, + .start = now(conn->params), + .stop = 0 + }; + stats->perfstat[stats->len] = stat; + stats->len++; + } + unlock("perfstat unlock: perfstat_close", &conn->perfstat_lock); + + return 0; +} + +int perfstat_close_locked(uint64_t unique, parameters_t *params, + perfstats_t *perfstats, int to_check) +{ + int i; + perfstat_t *stat; + + if (!perfstats) + return 1; + + i = perfstats->len - 1; + while (i >= 0 && to_check > 0) { + stat = &perfstats->perfstat[i]; + if (stat->id == unique) { + stat->stop = now(params); + return 0; + } else { + i--; + to_check--; + } + } + + if (to_check && !perfstat_close_locked(unique, params, perfstats->next, + to_check)) + return 0; + return 1; +} + +int perfstat_close(uint64_t unique, connection_t *conn) +{ + int rc = 0; + pthread_mutex_t *perfstat_lock = &conn->perfstat_lock; + + if (!conn->perfstat) + return 0; + + lock("perfstat lock: perfstat_close", perfstat_lock); + if (conn->perfstat) + rc = perfstat_close_locked(unique, conn->params, + conn->perfstats, + MAX_PERFSTAT_CHECK); + unlock("perfstat unlock: perfstat_close", perfstat_lock); + + return rc; +} + +void *start_perfstat(parameters_t *params, char *req, size_t len) +{ + char *reply; + uint16_t id = *((uint16_t *) req); + char *mount = (char *) req + 2; + connection_t *conn = find_connection(params->connections, mount, + len - 2); + if (conn == NULL) + return (void *)error_reply(id, "Mount %s unknown", mount); + + lock("perfstat lock: start_perfstat", &conn->perfstat_lock); + conn->perfstat = 1; + unlock("perfstat lock: start_perfstat", &conn->perfstat_lock); + + reply = (char *)must_malloc("start_perfstat", 8); + *((uint32_t *)reply) = 16; + *((uint16_t *) (reply + 4)) = PERFSTAT_REPLY; + *((uint16_t *) (reply + 6)) = id; + *((uint64_t *) (reply + 8)) = now(params); + + return (void *)reply; +} + +void copy_and_free_perfstats(perfstats_t *p, char *buf) +{ + size_t len; + perfstats_t *p_next; + + while (p) { + p_next = p->next; + len = p->len * sizeof(perfstat_t); + memcpy(buf, p->perfstat, len); + buf += len; + free(p); + p = p_next; + } +} + +void *stop_perfstat(parameters_t *params, char *req, size_t len) +{ + char *reply; + uint16_t id = *((uint16_t *) req); + char *mount = (char *) req + 2; + connection_t *conn = find_connection(params->connections, mount, + len - 2); + if (conn == NULL) + return (void *)error_reply(id, "Mount %s unknown", mount); + + size_t out_len = 16; + + lock("perfstat lock: stop_perfstat", &conn->perfstat_lock); + conn->perfstat = 0; + + out_len += size_of_perfstats(conn->perfstats); + + reply = (char *)must_malloc("stop_perfstat", out_len); + *((uint32_t *)reply) = out_len; + *((uint16_t *) (reply + 4)) = PERFSTAT_REPLY; + *((uint16_t *) (reply + 6)) = id; + *((uint64_t *) (reply + 8)) = now(params); + + copy_and_free_perfstats(conn->perfstats, reply + 16); + + unlock("perfstat lock: stop_perfstat", &conn->perfstat_lock); + + return (void *)reply; +} diff --git a/alpine/packages/transfused/transfused_perfstat.h b/alpine/packages/transfused/transfused_perfstat.h new file mode 100644 index 000000000..33bb9047f --- /dev/null +++ b/alpine/packages/transfused/transfused_perfstat.h @@ -0,0 +1,29 @@ +#ifndef _TRANSFUSED_PERFSTAT_H_ +#define _TRANSFUSED_PERFSTAT_H_ + +#include + +struct connection; +struct parameters; + +typedef struct { + uint64_t id; + uint64_t start; + uint64_t stop; +} perfstat_t; + +struct perfstats { + uint32_t len; + uint32_t nothing; + struct perfstats *next; + perfstat_t perfstat[0]; +}; + +typedef struct perfstats perfstats_t; + +int perfstat_open(uint64_t unique, struct connection *conn); +int perfstat_close(uint64_t unique, struct connection *conn); +void *start_perfstat(struct parameters *params, char *req, size_t len); +void *stop_perfstat(struct parameters *params, char *req, size_t len); + +#endif /* _TRANSFUSED_PERFSTAT_H_ */