transfused: add perfstat machinery

This adds two new control channel requests, START_PERFSTAT and
STOP_PERFSTAT, that begin and end performance statistics collection
respectively. START_PERFSTAT takes the name of the mountpoint on which
to enable statistics collection and returns the current monotonic time.
STOP_PERFSTAT takes the name of the mountpoint on which to disable
statistics collection and returns the current monotonic time and the
message timings that have been collected.

Performance statistics are collected in a linked list of blocks. pthread
locks are used for synchronization.

Signed-off-by: David Sheets <dsheets@docker.com>
This commit is contained in:
David Sheets 2017-01-30 11:05:04 +00:00
parent 85f401ac79
commit dc7069db1c
4 changed files with 291 additions and 3 deletions

View File

@ -24,6 +24,7 @@
#include "transfused_log.h"
#include "transfused_vsock.h"
#include "transfused_perfstat.h"
char *default_fusermount = DEFAULT_FUSERMOUNT;
char *default_socket = DEFAULT_SOCKET;
@ -235,6 +236,7 @@ void copy_into_fuse(copy_thread_state *copy_state)
void *buf;
connection_t *conn = copy_state->connection;
parameters_t *params = conn->params;
uint64_t unique;
buf = must_malloc(descr, IN_BUFSZ);
@ -251,6 +253,12 @@ void copy_into_fuse(copy_thread_state *copy_state)
die(1, params, NULL,
"copy %s: read %d but only wrote %d",
descr, read_count, write_count);
unique = *((uint64_t *)buf + 1);
if (perfstat_close(unique, conn))
log_time(params,
"dropping perfstat for edge message %lld",
unique);
}
free(buf);
@ -326,6 +334,7 @@ void copy_outof_fuse(copy_thread_state *copy_state)
void *buf;
connection_t *conn = copy_state->connection;
parameters_t *params = conn->params;
uint64_t unique;
buf = must_malloc(descr, OUT_BUFSZ);
@ -336,6 +345,12 @@ void copy_outof_fuse(copy_thread_state *copy_state)
die(1, params, "", "copy %s: error reading: ", descr);
write_exactly(descr, to, (char *)buf, read_count);
unique = *((uint64_t *)buf + 1);
if (perfstat_open(unique, conn))
die(1, params, NULL,
"copy %s: could not open perfstat for %d",
descr, unique);
}
free(buf);
@ -928,6 +943,38 @@ void *determine_mount_suitability(parameters_t *params, int allow_empty,
return (void *)reply;
}
void *error_reply(uint16_t id, const char *fmt, ...)
{
char *reply;
char *message;
size_t mlen;
va_list args;
va_start(args, fmt);
vasprintf(&message, fmt, args);
va_end(args);
mlen = strlen(message);
reply = (char *)must_malloc("error_reply", 8 + mlen);
*((uint32_t *)reply) = 8 + mlen;
*((uint16_t *) (reply + 4)) = ERROR_REPLY;
*((uint16_t *) (reply + 6)) = id;
memcpy(reply + 8, message, mlen);
free(message);
return (void *)reply;
}
connection_t *find_connection(connection_t *conn, char *name, size_t len)
{
while (conn) {
if (strncmp(name, conn->mount_point, len) == 0)
return conn;
conn = conn->next;
}
return NULL;
}
void *init_thread(void *params_ptr)
{
parameters_t *params = params_ptr;
@ -953,7 +1000,7 @@ void *init_thread(void *params_ptr)
while (1) {
read_count = read_message("control", params, params->ctl_sock,
buf, CTL_BUFSZ);
buf, CTL_BUFSZ - 1);
msg_type = *((uint16_t *)buf + 2);
switch (msg_type) {
case MOUNT_SUITABILITY_REQUEST:
@ -976,6 +1023,26 @@ void *init_thread(void *params_ptr)
free(response);
break;
case START_PERFSTAT_REQUEST:
((char *)buf)[read_count] = '\0';
response = start_perfstat(params, (char *)buf + 6,
read_count - 6);
len = *((size_t *) response);
write_exactly("init thread: start perfstat response",
params->ctl_sock, response, len);
free(response);
break;
case STOP_PERFSTAT_REQUEST:
((char *)buf)[read_count] = '\0';
response = stop_perfstat(params, (char *)buf + 6,
read_count - 6);
len = *((size_t *) response);
write_exactly("init thread: stop perfstat response",
params->ctl_sock, response, len);
free(response);
break;
default:
die(1, params, NULL,
"init thread: unknown message %d", msg_type);
@ -1109,6 +1176,9 @@ void serve(parameters_t *params)
conn->next = params->connections;
params->connections = conn;
conn->mount_point = "";
conn->perfstat = 0;
conn->perfstats = NULL;
lock_init("perfstat lock",&conn->perfstat_lock,NULL);
conn->sock = accept(params->data_sock,
&conn->sa_client, &conn->socklen_client);

View File

@ -3,11 +3,14 @@
#include <pthread.h>
#include <sys/socket.h>
#include "transfused_perfstat.h"
#define IN_BUFSZ ((1 << 20) + 16)
#define OUT_BUFSZ ((1 << 20) + 64)
#define EVENT_BUFSZ 4096
#define CTL_BUFSZ 65536
#define PERFSTATS_PER_SEGMENT 2730 /* (64k - 16) / 24 */
#define MAX_PERFSTAT_CHECK 64
#define DEFAULT_FUSERMOUNT "/bin/fusermount"
#define DEFAULT_SOCKET "v:_:1525"
@ -25,16 +28,21 @@
#define MOUNT_SUITABILITY_REQUEST 1
#define EXPORT_SUITABILITY_REQUEST 2
#define START_PERFSTAT_REQUEST 3
#define STOP_PERFSTAT_REQUEST 4
#define TRANSFUSE_LOG_ERROR 1
#define TRANSFUSE_LOG_NOTICE 2
#define PONG_REPLY 3
#define MOUNT_SUITABILITY_REPLY 4
#define TRANSFUSE_NOTIFY_CHANNEL 5
#define PERFSTAT_REPLY 6
#define ERROR_REPLY 7
struct parameters;
struct connection;
typedef struct {
struct parameters {
char *server;
char *socket;
char *fusermount;
@ -45,7 +53,9 @@ typedef struct {
int data_sock;
pthread_mutex_t ctl_lock;
struct connection *connections;
} parameters_t;
};
typedef struct parameters parameters_t;
struct connection {
struct connection *next;
@ -55,6 +65,9 @@ struct connection {
struct sockaddr sa_client;
socklen_t socklen_client;
int sock;
int perfstat;
perfstats_t *perfstats;
pthread_mutex_t perfstat_lock;
};
typedef struct connection connection_t;
@ -66,4 +79,8 @@ void lock(char *const descr, pthread_mutex_t *mutex);
void unlock(char *const descr, pthread_mutex_t *mutex);
void write_exactly(char *descr, int fd, void *buf, size_t nbyte);
void *error_reply(uint16_t id, const char *fmt, ...);
connection_t *find_connection(connection_t *conn, char *name, size_t len);
#endif /* _TRANSFUSED_H_ */

View File

@ -0,0 +1,172 @@
#include <stdlib.h>
#include <string.h>
#include "transfused.h"
#include "transfused_log.h"
uint64_t now(parameters_t *params)
{
uint64_t ns_in_s = 1000000000;
struct timespec now;
if (clock_gettime(CLOCK_MONOTONIC, &now))
die(1, params, "now", "");
return (uint64_t)now.tv_sec * ns_in_s + (uint64_t)now.tv_nsec;
}
size_t size_of_perfstats(perfstats_t *p)
{
size_t len = 0;
while (p) {
len += sizeof(perfstat_t) * p->len + sizeof(perfstats_t);
p = p->next;
}
return len;
}
int perfstat_open(uint64_t unique, connection_t *conn)
{
size_t sz;
perfstats_t *old_perfstats = conn->perfstats;
perfstats_t *stats = conn->perfstats;
perfstat_t stat;
if (!conn->perfstat)
return 0;
lock("perfstat lock: perfstat_open", &conn->perfstat_lock);
if (conn->perfstat) {
if (!stats || stats->len >= PERFSTATS_PER_SEGMENT) {
sz = sizeof(perfstats_t);
sz += PERFSTATS_PER_SEGMENT * sizeof(perfstat_t);
stats = must_malloc("perfstats",sz);
stats->next = old_perfstats;
stats->len = 0;
conn->perfstats = stats;
}
stat = (perfstat_t) {
.id = unique,
.start = now(conn->params),
.stop = 0
};
stats->perfstat[stats->len] = stat;
stats->len++;
}
unlock("perfstat unlock: perfstat_close", &conn->perfstat_lock);
return 0;
}
int perfstat_close_locked(uint64_t unique, parameters_t *params,
perfstats_t *perfstats, int to_check)
{
int i;
perfstat_t *stat;
if (!perfstats)
return 1;
i = perfstats->len - 1;
while (i >= 0 && to_check > 0) {
stat = &perfstats->perfstat[i];
if (stat->id == unique) {
stat->stop = now(params);
return 0;
} else {
i--;
to_check--;
}
}
if (to_check && !perfstat_close_locked(unique, params, perfstats->next,
to_check))
return 0;
return 1;
}
int perfstat_close(uint64_t unique, connection_t *conn)
{
int rc = 0;
pthread_mutex_t *perfstat_lock = &conn->perfstat_lock;
if (!conn->perfstat)
return 0;
lock("perfstat lock: perfstat_close", perfstat_lock);
if (conn->perfstat)
rc = perfstat_close_locked(unique, conn->params,
conn->perfstats,
MAX_PERFSTAT_CHECK);
unlock("perfstat unlock: perfstat_close", perfstat_lock);
return rc;
}
void *start_perfstat(parameters_t *params, char *req, size_t len)
{
char *reply;
uint16_t id = *((uint16_t *) req);
char *mount = (char *) req + 2;
connection_t *conn = find_connection(params->connections, mount,
len - 2);
if (conn == NULL)
return (void *)error_reply(id, "Mount %s unknown", mount);
lock("perfstat lock: start_perfstat", &conn->perfstat_lock);
conn->perfstat = 1;
unlock("perfstat lock: start_perfstat", &conn->perfstat_lock);
reply = (char *)must_malloc("start_perfstat", 8);
*((uint32_t *)reply) = 16;
*((uint16_t *) (reply + 4)) = PERFSTAT_REPLY;
*((uint16_t *) (reply + 6)) = id;
*((uint64_t *) (reply + 8)) = now(params);
return (void *)reply;
}
void copy_and_free_perfstats(perfstats_t *p, char *buf)
{
size_t len;
perfstats_t *p_next;
while (p) {
p_next = p->next;
len = p->len * sizeof(perfstat_t);
memcpy(buf, p->perfstat, len);
buf += len;
free(p);
p = p_next;
}
}
void *stop_perfstat(parameters_t *params, char *req, size_t len)
{
char *reply;
uint16_t id = *((uint16_t *) req);
char *mount = (char *) req + 2;
connection_t *conn = find_connection(params->connections, mount,
len - 2);
if (conn == NULL)
return (void *)error_reply(id, "Mount %s unknown", mount);
size_t out_len = 16;
lock("perfstat lock: stop_perfstat", &conn->perfstat_lock);
conn->perfstat = 0;
out_len += size_of_perfstats(conn->perfstats);
reply = (char *)must_malloc("stop_perfstat", out_len);
*((uint32_t *)reply) = out_len;
*((uint16_t *) (reply + 4)) = PERFSTAT_REPLY;
*((uint16_t *) (reply + 6)) = id;
*((uint64_t *) (reply + 8)) = now(params);
copy_and_free_perfstats(conn->perfstats, reply + 16);
unlock("perfstat lock: stop_perfstat", &conn->perfstat_lock);
return (void *)reply;
}

View File

@ -0,0 +1,29 @@
#ifndef _TRANSFUSED_PERFSTAT_H_
#define _TRANSFUSED_PERFSTAT_H_
#include <inttypes.h>
struct connection;
struct parameters;
typedef struct {
uint64_t id;
uint64_t start;
uint64_t stop;
} perfstat_t;
struct perfstats {
uint32_t len;
uint32_t nothing;
struct perfstats *next;
perfstat_t perfstat[0];
};
typedef struct perfstats perfstats_t;
int perfstat_open(uint64_t unique, struct connection *conn);
int perfstat_close(uint64_t unique, struct connection *conn);
void *start_perfstat(struct parameters *params, char *req, size_t len);
void *stop_perfstat(struct parameters *params, char *req, size_t len);
#endif /* _TRANSFUSED_PERFSTAT_H_ */