transfused: update the event protocol for streaming vsock

Signed-off-by: David Sheets <dsheets@docker.com>
This commit is contained in:
David Sheets 2016-04-14 14:57:06 +01:00
parent 32e840da1b
commit b894b67417

View File

@ -180,7 +180,7 @@ uint64_t message_id(uint64_t * message) {
return message[1];
}
int read_message(char * descr, int fd, char * buf) {
int read_message(char * descr, int fd, char * buf, size_t max_read) {
int read_count;
size_t nbyte;
uint32_t len;
@ -188,14 +188,14 @@ int read_message(char * descr, int fd, char * buf) {
// TODO: socket read conditions e.g. EAGAIN
read_count = read(fd, buf, 4);
if (read_count != 4) {
if (read_count < 0) die(1, "", "copy %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "copy %s: EOF reading length", descr);
die(1, NULL, "copy %s: short read length %d", descr, read_count);
if (read_count < 0) die(1, "", "read %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "read %s: EOF reading length", descr);
die(1, NULL, "read %s: short read length %d", descr, read_count);
}
len = *((uint32_t *) buf);
if (len > IN_BUFSZ)
die(1, NULL, "copy %s: message size %d exceeds buffer capacity %d",
len, IN_BUFSZ);
if (len > max_read)
die(1, NULL, "read %s: message size %d exceeds buffer capacity %d",
len, max_read);
nbyte = (size_t) (len - 4);
buf += 4;
@ -203,8 +203,8 @@ int read_message(char * descr, int fd, char * buf) {
do {
// TODO: socket read conditions e.g. EAGAIN
read_count = read(fd, buf, nbyte);
if (read_count < 0) die(1, "", "copy %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "copy %s: EOF reading", descr);
if (read_count < 0) die(1, "", "read %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "read %s: EOF reading", descr);
nbyte -= read_count;
buf += read_count;
} while (nbyte != 0);
@ -222,7 +222,7 @@ void copy_into_fuse(copy_thread_state * copy_state) {
buf = must_malloc(descr, IN_BUFSZ);
while(1) {
read_count = read_message(descr, from, (char *) buf);
read_count = read_message(descr, from, (char *) buf, IN_BUFSZ);
write_count = write(to, buf, read_count);
if (write_count < 0) die(1, "", "copy %s: error writing: ", descr);
@ -559,13 +559,12 @@ void perform_syscall(connection_t * conn, uint8_t syscall, char path[]) {
}
void * event_thread(void * connection_ptr) {
int read_count, event_len, path_len;
int read_count, path_len;
void * buf;
connection_t * connection = connection_ptr;
char * path;
uint8_t syscall;
void * msg;
// This thread registers with the file system server as being an
// fsnotify event actuator. Other mounted file system interactions
@ -576,36 +575,17 @@ void * event_thread(void * connection_ptr) {
buf = must_malloc("incoming event buffer", EVENT_BUFSZ);
while(1) {
read_count = read(connection->sock, buf, EVENT_BUFSZ);
if (read_count < 0) die(1, "event thread: error reading", "");
event_len = (int) ntohs(*((uint16_t *) buf));
read_count = read_message("events", connection->sock, buf, EVENT_BUFSZ);
if (debug)
thread_log_time(connection, "read %d bytes from event connection\n",
read_count);
// TODO: handle short read
if (read_count != event_len) {
thread_log_time(connection, "event thread: only read %d of %d\n",
read_count, event_len);
msg = must_malloc("event hex", read_count * 2 + 1);
for (int i = 0; i < read_count; i++) {
sprintf(((char *) msg) + (i * 2),"%02x",(int) (((char *) buf)[i]));
}
((char *) msg)[read_count * 2] = 0;
thread_log_time(connection, "message: %s\n", (char *) msg);
free(msg);
continue;
}
path_len = (int) ntohs(*(((uint16_t *) buf) + 1));
path_len = (int) ntohs(*(((uint32_t *) buf) + 1));
// TODO: could check the path length isn't a lie here
path = (char *) (((uint8_t *) buf) + 4);
path = (char *) (((uint8_t *) buf) + 6);
// TODO: could check the path is NUL terminated here
syscall = *(((uint8_t *) buf) + 4 + path_len);
syscall = *(((uint8_t *) buf) + 6 + path_len);
// TODO: should this be in another thread?
perform_syscall(connection, syscall, path);