Merge pull request #1123 from dsheets/transfused-perfstat

transfused perfstat
This commit is contained in:
Justin Cormack 2017-02-01 11:04:14 +00:00 committed by GitHub
commit ad528b4bea
6 changed files with 338 additions and 39 deletions

View File

@ -24,6 +24,7 @@
#include "transfused_log.h" #include "transfused_log.h"
#include "transfused_vsock.h" #include "transfused_vsock.h"
#include "transfused_perfstat.h"
char *default_fusermount = DEFAULT_FUSERMOUNT; char *default_fusermount = DEFAULT_FUSERMOUNT;
char *default_socket = DEFAULT_SOCKET; char *default_socket = DEFAULT_SOCKET;
@ -201,7 +202,7 @@ void read_exactly(char *descr, int fd, void *p, size_t nbyte)
} }
} }
int read_message(char *descr, parameters *params, int fd, int read_message(char *descr, parameters_t *params, int fd,
char *buf, size_t max_read) char *buf, size_t max_read)
{ {
size_t nbyte = sizeof(uint32_t); size_t nbyte = sizeof(uint32_t);
@ -233,7 +234,9 @@ void copy_into_fuse(copy_thread_state *copy_state)
char *descr = copy_state->connection->mount_point; char *descr = copy_state->connection->mount_point;
int read_count, write_count; int read_count, write_count;
void *buf; void *buf;
parameters *params = copy_state->connection->params; connection_t *conn = copy_state->connection;
parameters_t *params = conn->params;
uint64_t unique;
buf = must_malloc(descr, IN_BUFSZ); buf = must_malloc(descr, IN_BUFSZ);
@ -250,6 +253,12 @@ void copy_into_fuse(copy_thread_state *copy_state)
die(1, params, NULL, die(1, params, NULL,
"copy %s: read %d but only wrote %d", "copy %s: read %d but only wrote %d",
descr, read_count, write_count); descr, read_count, write_count);
unique = *((uint64_t *)buf + 1);
if (perfstat_close(unique, conn))
log_time(params,
"dropping perfstat for edge message %lld",
unique);
} }
free(buf); free(buf);
@ -263,7 +272,7 @@ void copy_notify_fuse(copy_thread_state *copy_state)
int read_count, write_count; int read_count, write_count;
uint32_t zero = 0, err; uint32_t zero = 0, err;
void *buf; void *buf;
parameters *params = copy_state->connection->params; parameters_t *params = copy_state->connection->params;
buf = must_malloc(descr, IN_BUFSZ); buf = must_malloc(descr, IN_BUFSZ);
@ -323,7 +332,9 @@ void copy_outof_fuse(copy_thread_state *copy_state)
char *descr = copy_state->connection->mount_point; char *descr = copy_state->connection->mount_point;
int read_count; int read_count;
void *buf; void *buf;
parameters *params = copy_state->connection->params; connection_t *conn = copy_state->connection;
parameters_t *params = conn->params;
uint64_t unique;
buf = must_malloc(descr, OUT_BUFSZ); buf = must_malloc(descr, OUT_BUFSZ);
@ -334,6 +345,12 @@ void copy_outof_fuse(copy_thread_state *copy_state)
die(1, params, "", "copy %s: error reading: ", descr); die(1, params, "", "copy %s: error reading: ", descr);
write_exactly(descr, to, (char *)buf, read_count); write_exactly(descr, to, (char *)buf, read_count);
unique = *((uint64_t *)buf + 1);
if (perfstat_open(unique, conn))
die(1, params, NULL,
"copy %s: could not open perfstat for %d",
descr, unique);
} }
free(buf); free(buf);
@ -387,7 +404,7 @@ void *copy_clean_outof_fuse_thread(void *copy_state)
return copy_clean_outof_fuse((copy_thread_state *) copy_state); return copy_clean_outof_fuse((copy_thread_state *) copy_state);
} }
int recv_fd(parameters *params, int sock) int recv_fd(parameters_t *params, int sock)
{ {
int ret; int ret;
int fd = -1; int fd = -1;
@ -625,7 +642,7 @@ void mkdir_p(connection_t *conn, char *path)
} }
} }
int is_next_child_ok(parameters *params, char *path, DIR *dir) int is_next_child_ok(parameters_t *params, char *path, DIR *dir)
{ {
struct dirent *child; struct dirent *child;
@ -641,7 +658,7 @@ int is_next_child_ok(parameters *params, char *path, DIR *dir)
return 1; return 1;
} }
int is_path_mountable(parameters *params, int allow_empty, char *path) int is_path_mountable(parameters_t *params, int allow_empty, char *path)
{ {
DIR *dir; DIR *dir;
@ -737,9 +754,9 @@ void *mount_connection(connection_t *conn)
return NULL; return NULL;
} }
void *mount_thread(void *connection) void *mount_thread(void *conn)
{ {
return mount_connection((connection_t *) connection); return mount_connection((connection_t *) conn);
} }
void write_pid(connection_t *connection) void write_pid(connection_t *connection)
@ -758,7 +775,7 @@ void write_pid(connection_t *connection)
free(pid_s); free(pid_s);
} }
void pong(parameters *params) void pong(parameters_t *params)
{ {
char pong_msg[6] = {'\6', '\0', '\0', '\0', PONG_REPLY, '\0'}; char pong_msg[6] = {'\6', '\0', '\0', '\0', PONG_REPLY, '\0'};
@ -862,7 +879,7 @@ void *event_thread(void *connection_ptr)
return NULL; return NULL;
} }
void write_pidfile(parameters *params) void write_pidfile(parameters_t *params)
{ {
int fd; int fd;
pid_t pid = getpid(); pid_t pid = getpid();
@ -894,7 +911,7 @@ void write_pidfile(parameters *params)
} }
/* TODO: the message parsing here is rickety, do it properly */ /* TODO: the message parsing here is rickety, do it properly */
void *determine_mount_suitability(parameters *params, int allow_empty, void *determine_mount_suitability(parameters_t *params, int allow_empty,
char *req, int len) char *req, int len)
{ {
void *buf = (void *)req; void *buf = (void *)req;
@ -926,9 +943,41 @@ void *determine_mount_suitability(parameters *params, int allow_empty,
return (void *)reply; return (void *)reply;
} }
void *error_reply(uint16_t id, const char *fmt, ...)
{
char *reply;
char *message;
size_t mlen;
va_list args;
va_start(args, fmt);
vasprintf(&message, fmt, args);
va_end(args);
mlen = strlen(message);
reply = (char *)must_malloc("error_reply", 8 + mlen);
*((uint32_t *)reply) = 8 + mlen;
*((uint16_t *) (reply + 4)) = ERROR_REPLY;
*((uint16_t *) (reply + 6)) = id;
memcpy(reply + 8, message, mlen);
free(message);
return (void *)reply;
}
connection_t *find_connection(connection_t *conn, char *name, size_t len)
{
while (conn) {
if (strncmp(name, conn->mount_point, len) == 0)
return conn;
conn = conn->next;
}
return NULL;
}
void *init_thread(void *params_ptr) void *init_thread(void *params_ptr)
{ {
parameters *params = params_ptr; parameters_t *params = params_ptr;
int read_count, len; int read_count, len;
char init_msg[6] = {'\6', '\0', '\0', '\0', '\0', '\0'}; char init_msg[6] = {'\6', '\0', '\0', '\0', '\0', '\0'};
void *buf, *response; void *buf, *response;
@ -951,7 +1000,7 @@ void *init_thread(void *params_ptr)
while (1) { while (1) {
read_count = read_message("control", params, params->ctl_sock, read_count = read_message("control", params, params->ctl_sock,
buf, CTL_BUFSZ); buf, CTL_BUFSZ - 1);
msg_type = *((uint16_t *)buf + 2); msg_type = *((uint16_t *)buf + 2);
switch (msg_type) { switch (msg_type) {
case MOUNT_SUITABILITY_REQUEST: case MOUNT_SUITABILITY_REQUEST:
@ -974,6 +1023,26 @@ void *init_thread(void *params_ptr)
free(response); free(response);
break; break;
case START_PERFSTAT_REQUEST:
((char *)buf)[read_count] = '\0';
response = start_perfstat(params, (char *)buf + 6,
read_count - 6);
len = *((size_t *) response);
write_exactly("init thread: start perfstat response",
params->ctl_sock, response, len);
free(response);
break;
case STOP_PERFSTAT_REQUEST:
((char *)buf)[read_count] = '\0';
response = stop_perfstat(params, (char *)buf + 6,
read_count - 6);
len = *((size_t *) response);
write_exactly("init thread: stop perfstat response",
params->ctl_sock, response, len);
free(response);
break;
default: default:
die(1, params, NULL, die(1, params, NULL,
"init thread: unknown message %d", msg_type); "init thread: unknown message %d", msg_type);
@ -998,7 +1067,7 @@ void setup_debug(void)
die(1, NULL, "Couldn't set siginterrupt for SIGHUP", ""); die(1, NULL, "Couldn't set siginterrupt for SIGHUP", "");
} }
void parse_parameters(int argc, char *argv[], parameters *params) void parse_parameters(int argc, char *argv[], parameters_t *params)
{ {
int c; int c;
int errflg = 0; int errflg = 0;
@ -1011,6 +1080,7 @@ void parse_parameters(int argc, char *argv[], parameters *params)
params->data_sock = 0; params->data_sock = 0;
params->ctl_sock = 0; params->ctl_sock = 0;
lock_init("ctl_lock", &params->ctl_lock, NULL); lock_init("ctl_lock", &params->ctl_lock, NULL);
params->connections = NULL;
while ((c = getopt(argc, argv, ":p:d:s:f:l:")) != -1) { while ((c = getopt(argc, argv, ":p:d:s:f:l:")) != -1) {
switch (c) { switch (c) {
@ -1085,7 +1155,7 @@ void parse_parameters(int argc, char *argv[], parameters *params)
} }
} }
void serve(parameters *params) void serve(parameters_t *params)
{ {
char subproto_selector; char subproto_selector;
pthread_t child; pthread_t child;
@ -1103,7 +1173,12 @@ void serve(parameters *params)
conn = (connection_t *)must_malloc("connection state", conn = (connection_t *)must_malloc("connection state",
sizeof(connection_t)); sizeof(connection_t));
conn->params = params; conn->params = params;
conn->next = params->connections;
params->connections = conn;
conn->mount_point = ""; conn->mount_point = "";
conn->perfstat = 0;
conn->perfstats = NULL;
lock_init("perfstat lock",&conn->perfstat_lock,NULL);
conn->sock = accept(params->data_sock, conn->sock = accept(params->data_sock,
&conn->sa_client, &conn->socklen_client); &conn->sa_client, &conn->socklen_client);
@ -1140,7 +1215,7 @@ void serve(parameters *params)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
parameters params; parameters_t params;
struct rlimit core_limit; struct rlimit core_limit;
core_limit.rlim_cur = RLIM_INFINITY; core_limit.rlim_cur = RLIM_INFINITY;

View File

@ -3,11 +3,14 @@
#include <pthread.h> #include <pthread.h>
#include <sys/socket.h> #include <sys/socket.h>
#include "transfused_perfstat.h"
#define IN_BUFSZ ((1 << 20) + 16) #define IN_BUFSZ ((1 << 20) + 16)
#define OUT_BUFSZ ((1 << 20) + 64) #define OUT_BUFSZ ((1 << 20) + 64)
#define EVENT_BUFSZ 4096 #define EVENT_BUFSZ 4096
#define CTL_BUFSZ 65536 #define CTL_BUFSZ 65536
#define PERFSTATS_PER_SEGMENT 2730 /* (64k - 16) / 24 */
#define MAX_PERFSTAT_CHECK 64
#define DEFAULT_FUSERMOUNT "/bin/fusermount" #define DEFAULT_FUSERMOUNT "/bin/fusermount"
#define DEFAULT_SOCKET "v:_:1525" #define DEFAULT_SOCKET "v:_:1525"
@ -25,14 +28,21 @@
#define MOUNT_SUITABILITY_REQUEST 1 #define MOUNT_SUITABILITY_REQUEST 1
#define EXPORT_SUITABILITY_REQUEST 2 #define EXPORT_SUITABILITY_REQUEST 2
#define START_PERFSTAT_REQUEST 3
#define STOP_PERFSTAT_REQUEST 4
#define TRANSFUSE_LOG_ERROR 1 #define TRANSFUSE_LOG_ERROR 1
#define TRANSFUSE_LOG_NOTICE 2 #define TRANSFUSE_LOG_NOTICE 2
#define PONG_REPLY 3 #define PONG_REPLY 3
#define MOUNT_SUITABILITY_REPLY 4 #define MOUNT_SUITABILITY_REPLY 4
#define TRANSFUSE_NOTIFY_CHANNEL 5 #define TRANSFUSE_NOTIFY_CHANNEL 5
#define PERFSTAT_REPLY 6
#define ERROR_REPLY 7
typedef struct { struct parameters;
struct connection;
struct parameters {
char *server; char *server;
char *socket; char *socket;
char *fusermount; char *fusermount;
@ -42,16 +52,25 @@ typedef struct {
int ctl_sock; int ctl_sock;
int data_sock; int data_sock;
pthread_mutex_t ctl_lock; pthread_mutex_t ctl_lock;
} parameters; struct connection *connections;
};
typedef struct { typedef struct parameters parameters_t;
parameters *params;
struct connection {
struct connection *next;
parameters_t *params;
char *type_descr; char *type_descr;
char *mount_point; char *mount_point;
struct sockaddr sa_client; struct sockaddr sa_client;
socklen_t socklen_client; socklen_t socklen_client;
int sock; int sock;
} connection_t; int perfstat;
perfstats_t *perfstats;
pthread_mutex_t perfstat_lock;
};
typedef struct connection connection_t;
pthread_attr_t detached; pthread_attr_t detached;
@ -60,4 +79,8 @@ void lock(char *const descr, pthread_mutex_t *mutex);
void unlock(char *const descr, pthread_mutex_t *mutex); void unlock(char *const descr, pthread_mutex_t *mutex);
void write_exactly(char *descr, int fd, void *buf, size_t nbyte); void write_exactly(char *descr, int fd, void *buf, size_t nbyte);
void *error_reply(uint16_t id, const char *fmt, ...);
connection_t *find_connection(connection_t *conn, char *name, size_t len);
#endif /* _TRANSFUSED_H_ */ #endif /* _TRANSFUSED_H_ */

View File

@ -80,7 +80,7 @@ void log_sock_locked(int fd, uint16_t msg_type, const char *fmt, ...)
va_end(args); va_end(args);
} }
void die(int exit_code, parameters *params, const char *parg, void die(int exit_code, parameters_t *params, const char *parg,
const char *fmt, ...) const char *fmt, ...)
{ {
va_list argp, targs; va_list argp, targs;
@ -121,7 +121,7 @@ void die(int exit_code, parameters *params, const char *parg,
unlock("die ctl_lock", &params->ctl_lock); unlock("die ctl_lock", &params->ctl_lock);
} }
void vlog_locked(parameters *params, uint16_t msg_type, void vlog_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, va_list args) const char *fmt, va_list args)
{ {
int rc; int rc;
@ -149,7 +149,7 @@ void vlog_locked(parameters *params, uint16_t msg_type,
} }
} }
void vlog_time_locked(parameters *params, uint16_t msg_type, void vlog_time_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, va_list args) const char *fmt, va_list args)
{ {
int fd = params->logfile_fd; int fd = params->logfile_fd;
@ -159,7 +159,7 @@ void vlog_time_locked(parameters *params, uint16_t msg_type,
vlog_locked(params, msg_type, fmt, args); vlog_locked(params, msg_type, fmt, args);
} }
void log_time_locked(parameters *params, uint16_t msg_type, void log_time_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, ...) const char *fmt, ...)
{ {
va_list args; va_list args;
@ -169,7 +169,7 @@ void log_time_locked(parameters *params, uint16_t msg_type,
va_end(args); va_end(args);
} }
void log_time(parameters *params, const char *fmt, ...) void log_time(parameters_t *params, const char *fmt, ...)
{ {
va_list args; va_list args;
@ -180,7 +180,7 @@ void log_time(parameters *params, const char *fmt, ...)
va_end(args); va_end(args);
} }
void log_notice_time(parameters *params, const char *fmt, ...) void log_notice_time(parameters_t *params, const char *fmt, ...)
{ {
va_list args; va_list args;
@ -193,7 +193,7 @@ void log_notice_time(parameters *params, const char *fmt, ...)
} }
typedef struct { typedef struct {
parameters *params; parameters_t *params;
char *msg; char *msg;
} log_thread_state; } log_thread_state;
@ -234,7 +234,7 @@ void thread_log_time(connection_t *conn, const char *fmt, ...)
conn->type_descr, conn->mount_point); conn->type_descr, conn->mount_point);
} }
void log_continue_locked(parameters *params, const char *fmt, ...) void log_continue_locked(parameters_t *params, const char *fmt, ...)
{ {
va_list args; va_list args;
@ -243,7 +243,7 @@ void log_continue_locked(parameters *params, const char *fmt, ...)
va_end(args); va_end(args);
} }
void log_continue(parameters *params, const char *fmt, ...) void log_continue(parameters_t *params, const char *fmt, ...)
{ {
va_list args; va_list args;

View File

@ -6,21 +6,21 @@
#include "transfused.h" #include "transfused.h"
void die(int exit_code, parameters *params, const char *perror_arg, void die(int exit_code, parameters_t *params, const char *perror_arg,
const char *fmt, ...); const char *fmt, ...);
void vlog_locked(parameters *params, uint16_t msg_type, void vlog_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, va_list args); const char *fmt, va_list args);
void vlog_time_locked(parameters *params, uint16_t msg_type, void vlog_time_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, va_list args); const char *fmt, va_list args);
void log_time_locked(parameters *params, uint16_t msg_type, void log_time_locked(parameters_t *params, uint16_t msg_type,
const char *fmt, ...); const char *fmt, ...);
void log_time(parameters *params, const char *fmt, ...); void log_time(parameters_t *params, const char *fmt, ...);
void log_notice_time(parameters *params, const char *fmt, ...); void log_notice_time(parameters_t *params, const char *fmt, ...);
void thread_log_time(connection_t *conn, const char *fmt, ...); void thread_log_time(connection_t *conn, const char *fmt, ...);
void log_continue_locked(parameters *params, const char *fmt, ...); void log_continue_locked(parameters_t *params, const char *fmt, ...);
void log_continue(parameters *params, const char *fmt, ...); void log_continue(parameters_t *params, const char *fmt, ...);
#endif /* _TRANSFUSED_LOG_H_ */ #endif /* _TRANSFUSED_LOG_H_ */

View File

@ -0,0 +1,172 @@
#include <stdlib.h>
#include <string.h>
#include "transfused.h"
#include "transfused_log.h"
uint64_t now(parameters_t *params)
{
uint64_t ns_in_s = 1000000000;
struct timespec now;
if (clock_gettime(CLOCK_MONOTONIC, &now))
die(1, params, "now", "");
return (uint64_t)now.tv_sec * ns_in_s + (uint64_t)now.tv_nsec;
}
size_t size_of_perfstats(perfstats_t *p)
{
size_t len = 0;
while (p) {
len += sizeof(perfstat_t) * p->len + sizeof(perfstats_t);
p = p->next;
}
return len;
}
int perfstat_open(uint64_t unique, connection_t *conn)
{
size_t sz;
perfstats_t *old_perfstats = conn->perfstats;
perfstats_t *stats = conn->perfstats;
perfstat_t stat;
if (!conn->perfstat)
return 0;
lock("perfstat lock: perfstat_open", &conn->perfstat_lock);
if (conn->perfstat) {
if (!stats || stats->len >= PERFSTATS_PER_SEGMENT) {
sz = sizeof(perfstats_t);
sz += PERFSTATS_PER_SEGMENT * sizeof(perfstat_t);
stats = must_malloc("perfstats",sz);
stats->next = old_perfstats;
stats->len = 0;
conn->perfstats = stats;
}
stat = (perfstat_t) {
.id = unique,
.start = now(conn->params),
.stop = 0
};
stats->perfstat[stats->len] = stat;
stats->len++;
}
unlock("perfstat unlock: perfstat_close", &conn->perfstat_lock);
return 0;
}
int perfstat_close_locked(uint64_t unique, parameters_t *params,
perfstats_t *perfstats, int to_check)
{
int i;
perfstat_t *stat;
if (!perfstats)
return 1;
i = perfstats->len - 1;
while (i >= 0 && to_check > 0) {
stat = &perfstats->perfstat[i];
if (stat->id == unique) {
stat->stop = now(params);
return 0;
} else {
i--;
to_check--;
}
}
if (to_check && !perfstat_close_locked(unique, params, perfstats->next,
to_check))
return 0;
return 1;
}
int perfstat_close(uint64_t unique, connection_t *conn)
{
int rc = 0;
pthread_mutex_t *perfstat_lock = &conn->perfstat_lock;
if (!conn->perfstat)
return 0;
lock("perfstat lock: perfstat_close", perfstat_lock);
if (conn->perfstat)
rc = perfstat_close_locked(unique, conn->params,
conn->perfstats,
MAX_PERFSTAT_CHECK);
unlock("perfstat unlock: perfstat_close", perfstat_lock);
return rc;
}
void *start_perfstat(parameters_t *params, char *req, size_t len)
{
char *reply;
uint16_t id = *((uint16_t *) req);
char *mount = (char *) req + 2;
connection_t *conn = find_connection(params->connections, mount,
len - 2);
if (conn == NULL)
return (void *)error_reply(id, "Mount %s unknown", mount);
lock("perfstat lock: start_perfstat", &conn->perfstat_lock);
conn->perfstat = 1;
unlock("perfstat lock: start_perfstat", &conn->perfstat_lock);
reply = (char *)must_malloc("start_perfstat", 8);
*((uint32_t *)reply) = 16;
*((uint16_t *) (reply + 4)) = PERFSTAT_REPLY;
*((uint16_t *) (reply + 6)) = id;
*((uint64_t *) (reply + 8)) = now(params);
return (void *)reply;
}
void copy_and_free_perfstats(perfstats_t *p, char *buf)
{
size_t len;
perfstats_t *p_next;
while (p) {
p_next = p->next;
len = p->len * sizeof(perfstat_t);
memcpy(buf, p->perfstat, len);
buf += len;
free(p);
p = p_next;
}
}
void *stop_perfstat(parameters_t *params, char *req, size_t len)
{
char *reply;
uint16_t id = *((uint16_t *) req);
char *mount = (char *) req + 2;
connection_t *conn = find_connection(params->connections, mount,
len - 2);
if (conn == NULL)
return (void *)error_reply(id, "Mount %s unknown", mount);
size_t out_len = 16;
lock("perfstat lock: stop_perfstat", &conn->perfstat_lock);
conn->perfstat = 0;
out_len += size_of_perfstats(conn->perfstats);
reply = (char *)must_malloc("stop_perfstat", out_len);
*((uint32_t *)reply) = out_len;
*((uint16_t *) (reply + 4)) = PERFSTAT_REPLY;
*((uint16_t *) (reply + 6)) = id;
*((uint64_t *) (reply + 8)) = now(params);
copy_and_free_perfstats(conn->perfstats, reply + 16);
unlock("perfstat lock: stop_perfstat", &conn->perfstat_lock);
return (void *)reply;
}

View File

@ -0,0 +1,29 @@
#ifndef _TRANSFUSED_PERFSTAT_H_
#define _TRANSFUSED_PERFSTAT_H_
#include <inttypes.h>
struct connection;
struct parameters;
typedef struct {
uint64_t id;
uint64_t start;
uint64_t stop;
} perfstat_t;
struct perfstats {
uint32_t len;
uint32_t nothing;
struct perfstats *next;
perfstat_t perfstat[0];
};
typedef struct perfstats perfstats_t;
int perfstat_open(uint64_t unique, struct connection *conn);
int perfstat_close(uint64_t unique, struct connection *conn);
void *start_perfstat(struct parameters *params, char *req, size_t len);
void *stop_perfstat(struct parameters *params, char *req, size_t len);
#endif /* _TRANSFUSED_PERFSTAT_H_ */