Merge pull request #86 from dsheets/transfused-event-stream

transfused: update the event protocol for streaming vsock
This commit is contained in:
David Sheets 2016-04-14 15:07:02 +01:00
commit a49bb9a4b2

View File

@ -180,7 +180,7 @@ uint64_t message_id(uint64_t * message) {
return message[1]; return message[1];
} }
int read_message(char * descr, int fd, char * buf) { int read_message(char * descr, int fd, char * buf, size_t max_read) {
int read_count; int read_count;
size_t nbyte; size_t nbyte;
uint32_t len; uint32_t len;
@ -188,14 +188,14 @@ int read_message(char * descr, int fd, char * buf) {
// TODO: socket read conditions e.g. EAGAIN // TODO: socket read conditions e.g. EAGAIN
read_count = read(fd, buf, 4); read_count = read(fd, buf, 4);
if (read_count != 4) { if (read_count != 4) {
if (read_count < 0) die(1, "", "copy %s: error reading: ", descr); if (read_count < 0) die(1, "", "read %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "copy %s: EOF reading length", descr); if (read_count == 0) die(1, NULL, "read %s: EOF reading length", descr);
die(1, NULL, "copy %s: short read length %d", descr, read_count); die(1, NULL, "read %s: short read length %d", descr, read_count);
} }
len = *((uint32_t *) buf); len = *((uint32_t *) buf);
if (len > IN_BUFSZ) if (len > max_read)
die(1, NULL, "copy %s: message size %d exceeds buffer capacity %d", die(1, NULL, "read %s: message size %d exceeds buffer capacity %d",
len, IN_BUFSZ); len, max_read);
nbyte = (size_t) (len - 4); nbyte = (size_t) (len - 4);
buf += 4; buf += 4;
@ -203,8 +203,8 @@ int read_message(char * descr, int fd, char * buf) {
do { do {
// TODO: socket read conditions e.g. EAGAIN // TODO: socket read conditions e.g. EAGAIN
read_count = read(fd, buf, nbyte); read_count = read(fd, buf, nbyte);
if (read_count < 0) die(1, "", "copy %s: error reading: ", descr); if (read_count < 0) die(1, "", "read %s: error reading: ", descr);
if (read_count == 0) die(1, NULL, "copy %s: EOF reading", descr); if (read_count == 0) die(1, NULL, "read %s: EOF reading", descr);
nbyte -= read_count; nbyte -= read_count;
buf += read_count; buf += read_count;
} while (nbyte != 0); } while (nbyte != 0);
@ -222,7 +222,7 @@ void copy_into_fuse(copy_thread_state * copy_state) {
buf = must_malloc(descr, IN_BUFSZ); buf = must_malloc(descr, IN_BUFSZ);
while(1) { while(1) {
read_count = read_message(descr, from, (char *) buf); read_count = read_message(descr, from, (char *) buf, IN_BUFSZ);
write_count = write(to, buf, read_count); write_count = write(to, buf, read_count);
if (write_count < 0) die(1, "", "copy %s: error writing: ", descr); if (write_count < 0) die(1, "", "copy %s: error writing: ", descr);
@ -559,13 +559,12 @@ void perform_syscall(connection_t * conn, uint8_t syscall, char path[]) {
} }
void * event_thread(void * connection_ptr) { void * event_thread(void * connection_ptr) {
int read_count, event_len, path_len; int read_count, path_len;
void * buf; void * buf;
connection_t * connection = connection_ptr; connection_t * connection = connection_ptr;
char * path; char * path;
uint8_t syscall; uint8_t syscall;
void * msg;
// This thread registers with the file system server as being an // This thread registers with the file system server as being an
// fsnotify event actuator. Other mounted file system interactions // fsnotify event actuator. Other mounted file system interactions
@ -576,36 +575,17 @@ void * event_thread(void * connection_ptr) {
buf = must_malloc("incoming event buffer", EVENT_BUFSZ); buf = must_malloc("incoming event buffer", EVENT_BUFSZ);
while(1) { while(1) {
read_count = read(connection->sock, buf, EVENT_BUFSZ); read_count = read_message("events", connection->sock, buf, EVENT_BUFSZ);
if (read_count < 0) die(1, "event thread: error reading", "");
event_len = (int) ntohs(*((uint16_t *) buf));
if (debug) if (debug)
thread_log_time(connection, "read %d bytes from event connection\n", thread_log_time(connection, "read %d bytes from event connection\n",
read_count); read_count);
// TODO: handle short read path_len = (int) ntohs(*(((uint32_t *) buf) + 1));
if (read_count != event_len) {
thread_log_time(connection, "event thread: only read %d of %d\n",
read_count, event_len);
msg = must_malloc("event hex", read_count * 2 + 1);
for (int i = 0; i < read_count; i++) {
sprintf(((char *) msg) + (i * 2),"%02x",(int) (((char *) buf)[i]));
}
((char *) msg)[read_count * 2] = 0;
thread_log_time(connection, "message: %s\n", (char *) msg);
free(msg);
continue;
}
path_len = (int) ntohs(*(((uint16_t *) buf) + 1));
// TODO: could check the path length isn't a lie here // TODO: could check the path length isn't a lie here
path = (char *) (((uint8_t *) buf) + 4); path = (char *) (((uint8_t *) buf) + 6);
// TODO: could check the path is NUL terminated here // TODO: could check the path is NUL terminated here
syscall = *(((uint8_t *) buf) + 4 + path_len); syscall = *(((uint8_t *) buf) + 6 + path_len);
// TODO: should this be in another thread? // TODO: should this be in another thread?
perform_syscall(connection, syscall, path); perform_syscall(connection, syscall, path);