mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-20 17:49:10 +00:00
Merge pull request #1077 from djs55/benchmark-ipc
tap-vsockd: add buffering
This commit is contained in:
commit
aaa2e30a75
@ -24,6 +24,8 @@ start()
|
||||
--pidfile ${PIDFILE} \
|
||||
-- \
|
||||
--tap eth0 \
|
||||
--message-size 8192 \
|
||||
--buffer-size 262144 \
|
||||
--pidfile "${PIDFILE}" \
|
||||
--listen
|
||||
|
||||
|
269
alpine/packages/tap-vsockd/ring.c
Normal file
269
alpine/packages/tap-vsockd/ring.c
Normal file
@ -0,0 +1,269 @@
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/uio.h>
|
||||
#include <errno.h>
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include "ring.h"
|
||||
|
||||
extern void fatal(const char *msg);
|
||||
|
||||
|
||||
/* A fixed-size circular buffer.
|
||||
|
||||
The producer and consumer are positive integers from 0 to 2 * size-1.
|
||||
Adds are modulo 2 * size. This effectively uses one bit to distinguish
|
||||
the case where the buffer is empty (consumer == producer) from the case
|
||||
where the buffer is full (consumer + size == producer). */
|
||||
struct ring {
|
||||
int producer; /* Next sequence number to be written */
|
||||
int consumer; /* Next sequence number to be read */
|
||||
int last; /* Sequence number of end of stream or -1 */
|
||||
int size; /* Maximum number of buffered bytes */
|
||||
pthread_cond_t c;
|
||||
pthread_mutex_t m;
|
||||
char *data;
|
||||
};
|
||||
|
||||
struct ring *ring_allocate(int size)
|
||||
{
|
||||
struct ring *ring = (struct ring*)malloc(sizeof(struct ring));
|
||||
if (!ring) {
|
||||
fatal("Failed to allocate ring buffer metadata");
|
||||
}
|
||||
ring->data = (char*)malloc(size);
|
||||
if (!ring->data) {
|
||||
fatal("Failed to allocate ring buffer data");
|
||||
}
|
||||
int err = 0;
|
||||
if ((err = pthread_cond_init(&ring->c, NULL)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to create condition variable");
|
||||
}
|
||||
if ((err = pthread_mutex_init(&ring->m, NULL)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to create mutex");
|
||||
}
|
||||
ring->size = size;
|
||||
ring->producer = ring->consumer = 0;
|
||||
ring->last = -1;
|
||||
return ring;
|
||||
}
|
||||
|
||||
#define RING_DATA_AVAILABLE(r) \
|
||||
((r->producer >= r->consumer) ? \
|
||||
(r->producer - r->consumer) : \
|
||||
(2 * r->size + r->producer - r->consumer))
|
||||
#define RING_FREE_REQUESTS(r) (r->size - RING_DATA_AVAILABLE(r))
|
||||
|
||||
#define RING_GET(r, seq) (&(r->data[seq % r->size]))
|
||||
|
||||
/* Signal that new data is been produced */
|
||||
void ring_producer_advance(struct ring *ring, int n)
|
||||
{
|
||||
int err = 0;
|
||||
assert(n >= 0);
|
||||
if ((err = pthread_mutex_lock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to lock mutex");
|
||||
}
|
||||
ring->producer = (ring->producer + n) % (2 * ring->size);
|
||||
if ((err = pthread_cond_broadcast(&ring->c)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to signal condition variable");
|
||||
}
|
||||
if ((err = pthread_mutex_unlock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to unlock mutex");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Signal that data has been consumed */
|
||||
void ring_consumer_advance(struct ring *ring, int n)
|
||||
{
|
||||
int err = 0;
|
||||
assert(n >= 0);
|
||||
if ((err = pthread_mutex_lock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to lock mutex");
|
||||
}
|
||||
ring->consumer = (ring->consumer + n) % (2 * ring->size);
|
||||
|
||||
if ((err = pthread_cond_broadcast(&ring->c)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to signal condition variable");
|
||||
}
|
||||
if ((err = pthread_mutex_unlock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to unlock mutex");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* The producer sends Eof */
|
||||
void ring_producer_eof(struct ring *ring)
|
||||
{
|
||||
int err = 0;
|
||||
if ((err = pthread_mutex_lock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to lock mutex");
|
||||
}
|
||||
ring->last = ring->producer - 1;
|
||||
if ((err = pthread_cond_broadcast(&ring->c)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to signal condition variable");
|
||||
}
|
||||
if ((err = pthread_mutex_unlock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to unlock mutex");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Wait for n bytes to become available. If the ring has shutdown, return
|
||||
non-zero. If data is available then return zero and fill in the first
|
||||
iovec_len entries of the iovec. */
|
||||
int ring_producer_wait_available(
|
||||
struct ring *ring, size_t n, struct iovec *iovec, int *iovec_len
|
||||
) {
|
||||
int ret = 1;
|
||||
int err = 0;
|
||||
if ((err = pthread_mutex_lock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to lock mutex");
|
||||
}
|
||||
while ((RING_FREE_REQUESTS(ring) < n) && (ring->last == -1)) {
|
||||
if ((err = pthread_cond_wait(&ring->c, &ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to wait on condition variable");
|
||||
}
|
||||
}
|
||||
if (ring->last != -1) {
|
||||
goto out;
|
||||
}
|
||||
char *producer = RING_GET(ring, ring->producer);
|
||||
char *consumer = RING_GET(ring, ring->consumer);
|
||||
assert (producer >= RING_GET(ring, 0));
|
||||
assert (producer <= RING_GET(ring, ring->size-1));
|
||||
assert (consumer >= RING_GET(ring, 0));
|
||||
assert (consumer <= RING_GET(ring, ring->size-1));
|
||||
if (*iovec_len <= 0) {
|
||||
ret = 0;
|
||||
fprintf(stderr, "no iovecs\n");
|
||||
goto out;
|
||||
}
|
||||
if (consumer > producer) {
|
||||
/* producer has not wrapped around the buffer yet */
|
||||
iovec[0].iov_base = producer;
|
||||
iovec[0].iov_len = consumer - producer;
|
||||
assert(iovec[0].iov_len > 0);
|
||||
*iovec_len = 1;
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
/* consumer has wrapped around, so the first chunk is from the producer to
|
||||
the end of the buffer */
|
||||
iovec[0].iov_base = producer;
|
||||
iovec[0].iov_len = ring->size - (int) (producer - RING_GET(ring, 0));
|
||||
assert(iovec[0].iov_len > 0);
|
||||
if (*iovec_len == 1) {
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
*iovec_len = 1;
|
||||
/* also include the chunk from the beginning of the buffer to the consumer */
|
||||
iovec[1].iov_base = RING_GET(ring, 0);
|
||||
iovec[1].iov_len = consumer - RING_GET(ring, 0);
|
||||
if (iovec[1].iov_len > 0) {
|
||||
/* ... but don't bother if it's zero */
|
||||
*iovec_len = 2;
|
||||
}
|
||||
ret = 0;
|
||||
out:
|
||||
if ((err = pthread_mutex_unlock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to unlock mutex");
|
||||
}
|
||||
if (ret == 0) {
|
||||
for (int i = 0; i < *iovec_len; i++) {
|
||||
assert(iovec[i].iov_base >= (void*)RING_GET(ring, 0));
|
||||
assert(iovec[i].iov_base + iovec[i].iov_len - 1 <= (void*)RING_GET(ring, ring->size - 1));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Wait for n bytes to become available. If the ring has shutdown, return
|
||||
non-zero. If data is available then return zero and fill in the first
|
||||
iovec_len entries of the iovec. */
|
||||
int ring_consumer_wait_available(
|
||||
struct ring *ring, size_t n, struct iovec *iovec, int *iovec_len
|
||||
) {
|
||||
|
||||
int ret = 1;
|
||||
int err = 0;
|
||||
if ((err = pthread_mutex_lock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to lock mutex");
|
||||
}
|
||||
while ((RING_DATA_AVAILABLE(ring) < n) && (ring->last == -1)) {
|
||||
if ((err = pthread_cond_wait(&ring->c, &ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to wait on condition variable");
|
||||
}
|
||||
}
|
||||
if (ring->last != -1) {
|
||||
goto out;
|
||||
}
|
||||
char *producer = RING_GET(ring, ring->producer);
|
||||
char *consumer = RING_GET(ring, ring->consumer);
|
||||
assert (producer >= RING_GET(ring, 0));
|
||||
assert (producer <= RING_GET(ring, ring->size-1));
|
||||
assert (consumer >= RING_GET(ring, 0));
|
||||
assert (consumer <= RING_GET(ring, ring->size-1));
|
||||
if (*iovec_len <= 0) {
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
if (producer > consumer) {
|
||||
/* producer has not wrapped around the buffer yet */
|
||||
iovec[0].iov_base = consumer;
|
||||
iovec[0].iov_len = producer - consumer;
|
||||
assert(iovec[0].iov_len > 0);
|
||||
*iovec_len = 1;
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
/* producer has wrapped around, so the first chunk is from the consumer to
|
||||
the end of the buffer */
|
||||
iovec[0].iov_base = consumer;
|
||||
iovec[0].iov_len = ring->size - (int) (consumer - RING_GET(ring, 0));
|
||||
assert(iovec[0].iov_len > 0);
|
||||
if (*iovec_len == 1) {
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
*iovec_len = 1;
|
||||
/* also include the chunk from the beginning of the buffer to the producer */
|
||||
iovec[1].iov_base = RING_GET(ring, 0);
|
||||
iovec[1].iov_len = producer - RING_GET(ring, 0);
|
||||
if (iovec[1].iov_len > 0) {
|
||||
/* ... but don't bother if its zero */
|
||||
*iovec_len = 2;
|
||||
}
|
||||
ret = 0;
|
||||
out:
|
||||
if ((err = pthread_mutex_unlock(&ring->m)) != 0) {
|
||||
errno = err;
|
||||
fatal("Failed to unlock mutex");
|
||||
}
|
||||
if (ret == 0) {
|
||||
for (int i = 0; i < *iovec_len; i++) {
|
||||
assert(iovec[i].iov_base >= (void*)RING_GET(ring, 0));
|
||||
assert(iovec[i].iov_base + iovec[i].iov_len - 1 <= (void*)RING_GET(ring, ring->size - 1));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
35
alpine/packages/tap-vsockd/ring.h
Normal file
35
alpine/packages/tap-vsockd/ring.h
Normal file
@ -0,0 +1,35 @@
|
||||
#include <unistd.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
/* A fixed-size circular buffer */
|
||||
struct ring;
|
||||
|
||||
/* Allocate a circular buffer with the given payload size.
|
||||
Size must be < INT_MAX / 2. */
|
||||
extern struct ring *ring_allocate(int size);
|
||||
|
||||
/* Signal that new data is been produced */
|
||||
extern void ring_producer_advance(struct ring *ring, int n);
|
||||
|
||||
/* Signal that data has been consumed */
|
||||
extern void ring_consumer_advance(struct ring *ring, int n);
|
||||
|
||||
/* The producer sends Eof. This will cause ring_consumer_wait_available
|
||||
and ring_producer_wait_available to return an error. */
|
||||
extern void ring_producer_eof(struct ring *ring);
|
||||
|
||||
/* Wait for n bytes of space for new data to become available. If
|
||||
ring_producer_eof has been called, return non-zero. If space is available
|
||||
then fill the first *iovec_len entries of the iovec and set *iovec_len to
|
||||
the number of iovecs used. */
|
||||
extern int ring_producer_wait_available(
|
||||
struct ring *ring, size_t n, struct iovec *iovec, int *iovec_len
|
||||
);
|
||||
|
||||
/* Wait for n bytes to become available for reading. If ring_producer_eof has
|
||||
been called, return non-zero. If data is available then fill the first
|
||||
*iovec_len entries of the iovec and set *iovec_len to the number of iovecs
|
||||
used. */
|
||||
extern int ring_consumer_wait_available(
|
||||
struct ring *ring, size_t n, struct iovec *iovec, int *iovec_len
|
||||
);
|
@ -10,12 +10,15 @@
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <err.h>
|
||||
#include <sys/uio.h>
|
||||
#include <stdint.h>
|
||||
#include <pthread.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <net/if.h>
|
||||
#include <linux/if_tun.h>
|
||||
#include <net/if_arp.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
@ -24,13 +27,41 @@
|
||||
|
||||
#include "hvsock.h"
|
||||
#include "protocol.h"
|
||||
#include "ring.h"
|
||||
|
||||
int daemon_flag;
|
||||
int nofork_flag;
|
||||
int listen_flag;
|
||||
int connect_flag;
|
||||
|
||||
char *default_sid = "30D48B34-7D27-4B0B-AAAF-BBBED334DD59";
|
||||
|
||||
/* Support big frames if the server requests it */
|
||||
const int max_packet_size = 16384;
|
||||
|
||||
static int verbose;
|
||||
#define INFO(...) \
|
||||
do { \
|
||||
if (verbose) { \
|
||||
printf(__VA_ARGS__); \
|
||||
fflush(stdout); \
|
||||
} \
|
||||
} while (0)
|
||||
#define DBG(...) \
|
||||
do { \
|
||||
if (verbose > 1) { \
|
||||
printf(__VA_ARGS__); \
|
||||
fflush(stdout); \
|
||||
} \
|
||||
} while (0)
|
||||
#define TRC(...) \
|
||||
do { \
|
||||
if (verbose > 2) { \
|
||||
printf(__VA_ARGS__); \
|
||||
fflush(stdout); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
void fatal(const char *msg)
|
||||
{
|
||||
syslog(LOG_CRIT, "%s Error: %d. %s", msg, errno, strerror(errno));
|
||||
@ -79,6 +110,23 @@ void set_macaddr(const char *dev, uint8_t *mac)
|
||||
close(fd);
|
||||
}
|
||||
|
||||
void set_mtu(const char *dev, int mtu)
|
||||
{
|
||||
struct ifreq ifq;
|
||||
int fd;
|
||||
|
||||
fd = socket(PF_INET, SOCK_DGRAM, 0);
|
||||
if (fd == -1)
|
||||
fatal("Could not get socket to set MTU");
|
||||
strcpy(ifq.ifr_name, dev);
|
||||
ifq.ifr_mtu = mtu;
|
||||
|
||||
if (ioctl(fd, SIOCSIFMTU, &ifq) == -1)
|
||||
fatal("SIOCSIFMTU failed");
|
||||
|
||||
close(fd);
|
||||
}
|
||||
|
||||
/* Negotiate a vmnet connection, returns 0 on success and 1 on error. */
|
||||
int negotiate(int fd, struct vif_info *vif)
|
||||
{
|
||||
@ -122,57 +170,181 @@ err:
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
/* Argument passed to proxy threads */
|
||||
struct connection {
|
||||
int fd; /* Hyper-V socket with vmnet protocol */
|
||||
int tapfd; /* TAP device with ethernet frames */
|
||||
struct vif_info vif; /* Contains MAC, MTU etc, received from server */
|
||||
struct ring* from_vmnet_ring;
|
||||
struct ring* to_vmnet_ring;
|
||||
int message_size; /* Maximum size of a Hyper-V read or write */
|
||||
};
|
||||
|
||||
static void *vmnet_to_tap(void *arg)
|
||||
/* Trim the iovec so that it contains at most len bytes. */
|
||||
void trim_iovec(struct iovec *iovec, int *iovec_len, size_t len)
|
||||
{
|
||||
struct connection *connection = (struct connection *)arg;
|
||||
uint8_t buffer[2048];
|
||||
uint8_t header[2];
|
||||
int length, n;
|
||||
for (int i = 0; i < *iovec_len; i++) {
|
||||
if (iovec[i].iov_len > len) {
|
||||
iovec[i].iov_len = len;
|
||||
*iovec_len = i + 1;
|
||||
return;
|
||||
}
|
||||
len -= iovec[i].iov_len;
|
||||
}
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (really_read(connection->fd, &header[0], 2) == -1)
|
||||
size_t len_iovec(struct iovec *iovec, int iovec_len)
|
||||
{
|
||||
size_t len = 0;
|
||||
for (int i = 0; i < iovec_len; i++) {
|
||||
len += iovec[i].iov_len;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
/* Read bytes from vmnet into the from_vmnet_ring */
|
||||
static void* vmnet_to_ring(void *arg)
|
||||
{
|
||||
struct connection *c = (struct connection *)arg;
|
||||
struct ring *ring = c->from_vmnet_ring;
|
||||
struct iovec iovec[2]; /* We won't need more than 2 for the ring */
|
||||
int iovec_len;
|
||||
while (1) {
|
||||
iovec_len = sizeof(iovec) / sizeof(struct iovec);
|
||||
TRC("vmnet_to_ring: ring_producer_wait_available n=%d iovec_len=%d\n", 1, iovec_len);
|
||||
if (ring_producer_wait_available(ring, 1, &iovec[0], &iovec_len) != 0) {
|
||||
fatal("Failed to read a data from vmnet");
|
||||
}
|
||||
trim_iovec(iovec, &iovec_len, c->message_size);
|
||||
{
|
||||
int length = 0;
|
||||
for (int i = 0; i < iovec_len; i ++) {
|
||||
length += iovec[i].iov_len;
|
||||
}
|
||||
TRC("vmnet_to_ring readv len %d\n", length);
|
||||
}
|
||||
ssize_t n = readv(c->fd, &iovec[0], iovec_len);
|
||||
TRC("vmnet_to_ring: read %zd\n", n);
|
||||
if (n == 0) {
|
||||
syslog(LOG_CRIT, "EOF reading from socket: closing\n");
|
||||
ring_producer_eof(ring);
|
||||
goto err;
|
||||
}
|
||||
if (n < 0) {
|
||||
syslog(LOG_CRIT,
|
||||
"Failure reading from socket: closing: %s (%d)",
|
||||
strerror(errno), errno);
|
||||
ring_producer_eof(ring);
|
||||
goto err;
|
||||
}
|
||||
TRC("vmnet_to_ring: advance producer %zd\n", n);
|
||||
ring_producer_advance(ring, (size_t) n);
|
||||
}
|
||||
err:
|
||||
/*
|
||||
* On error: stop reading from the socket and trigger a clean
|
||||
* shutdown
|
||||
*/
|
||||
TRC("vmnet_to_ring: shutdown\n");
|
||||
shutdown(c->fd, SHUT_RD);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Decode packets on the from_vmnet_ring and write to the tap device */
|
||||
static void* ring_to_tap(void *arg)
|
||||
{
|
||||
struct connection *c = (struct connection *)arg;
|
||||
struct iovec iovec[2]; /* We won't need more than 2 for the ring */
|
||||
int iovec_len;
|
||||
int length;
|
||||
struct ring *ring = c->from_vmnet_ring;
|
||||
while (1) {
|
||||
/* Read the packet length: this requires 2 bytes */
|
||||
iovec_len = sizeof(iovec) / sizeof(struct iovec);
|
||||
TRC("ring_to_tap: ring_consumer_wait_available n=%d iovec_len=%d\n", 2, iovec_len);
|
||||
if (ring_consumer_wait_available(ring, 2, &iovec[0], &iovec_len) != 0) {
|
||||
fatal("Failed to read a packet header from host");
|
||||
|
||||
length = (header[0] & 0xff) | ((header[1] & 0xff) << 8);
|
||||
if (length > sizeof(buffer)) {
|
||||
}
|
||||
length = *((uint8_t*)iovec[0].iov_base) & 0xff;
|
||||
/* The second byte might be in the second iovec array */
|
||||
if (iovec[0].iov_len >= 2) {
|
||||
length |= (*((uint8_t*)iovec[0].iov_base + 1) & 0xff) << 8;
|
||||
} else {
|
||||
length |= (*((uint8_t*)iovec[1].iov_base) & 0xff) << 8;
|
||||
}
|
||||
assert(length > 0);
|
||||
TRC("ring_to_tap: packet of length %d\n", length);
|
||||
if (length > max_packet_size) {
|
||||
syslog(LOG_CRIT,
|
||||
"Received an over-large packet: %d > %ld",
|
||||
length, sizeof(buffer));
|
||||
length, max_packet_size);
|
||||
exit(1);
|
||||
}
|
||||
ring_consumer_advance(ring, 2);
|
||||
|
||||
if (really_read(connection->fd, &buffer[0], length) == -1) {
|
||||
syslog(LOG_CRIT,
|
||||
"Failed to read packet contents from host");
|
||||
exit(1);
|
||||
/* Read the variable length packet */
|
||||
iovec_len = sizeof(iovec) / sizeof(struct iovec);
|
||||
TRC("ring_to_tap: ring_consumer_wait_available n=%d iovec_len=%d\n", length, iovec_len);
|
||||
if (ring_consumer_wait_available(ring, length, &iovec[0], &iovec_len) != 0) {
|
||||
fatal("Failed to read a packet body from host");
|
||||
}
|
||||
|
||||
n = write(connection->tapfd, &buffer[0], length);
|
||||
assert(len_iovec(&iovec[0], iovec_len) >= length);
|
||||
trim_iovec(iovec, &iovec_len, length);
|
||||
ssize_t n = writev(c->tapfd, &iovec[0], iovec_len);
|
||||
if (n != length) {
|
||||
syslog(LOG_CRIT,
|
||||
"Failed to write %d bytes to tap device (wrote %d)", length, n);
|
||||
exit(1);
|
||||
//exit(1);
|
||||
}
|
||||
TRC("ring_to_tap: ring_consumer_advance n=%zd\n", n);
|
||||
ring_consumer_advance(ring, (size_t) length);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *tap_to_vmnet(void *arg)
|
||||
/* Write packets with header from the tap device onto the to_vmnet_ring */
|
||||
static void *tap_to_ring(void *arg)
|
||||
{
|
||||
struct connection *connection = (struct connection *)arg;
|
||||
uint8_t buffer[2048];
|
||||
uint8_t header[2];
|
||||
int length;
|
||||
struct ring *ring = connection->to_vmnet_ring;
|
||||
struct iovec iovec[2]; /* We won't need more than 2 for the ring */
|
||||
int iovec_len;
|
||||
struct iovec payload[2]; /* The packet body after the 2 byte header */
|
||||
int payload_len;
|
||||
size_t length;
|
||||
while (1) {
|
||||
/* Wait for space for a 2 byte header + max_packet_size */
|
||||
length = 2 + connection->vif.max_packet_size;
|
||||
iovec_len = sizeof(iovec) / sizeof(struct iovec);
|
||||
TRC("tap_to_ring: ring_producer_wait_available n=%zd iovec_len=%d\n", length, iovec_len);
|
||||
if (ring_producer_wait_available(ring, length, &iovec[0], &iovec_len) != 0) {
|
||||
fatal("Failed to find enough free space for a packet");
|
||||
}
|
||||
assert(iovec_len > 0);
|
||||
assert(iovec[0].iov_len > 0);
|
||||
memcpy(&payload[0], &iovec[0], sizeof(struct iovec) * iovec_len);
|
||||
payload_len = iovec_len;
|
||||
|
||||
/* take the first 2 bytes of the free space which will contain the header */
|
||||
char *header1 = payload[0].iov_base;
|
||||
payload[0].iov_base++;
|
||||
payload[0].iov_len--;
|
||||
if (payload[0].iov_len == 0) {
|
||||
assert(payload_len == 2); /* because `length` > 1 */
|
||||
payload[0].iov_base = payload[1].iov_base;
|
||||
payload[0].iov_len = payload[1].iov_len;
|
||||
payload_len --;
|
||||
}
|
||||
char *header2 = payload[0].iov_base;
|
||||
payload[0].iov_base++;
|
||||
payload[0].iov_len--;
|
||||
/* payload is now where the packet should go */
|
||||
|
||||
/* limit the message size */
|
||||
trim_iovec(payload, &payload_len, connection->message_size);
|
||||
|
||||
length = readv(connection->tapfd, payload, payload_len);
|
||||
|
||||
for (;;) {
|
||||
length = read(connection->tapfd, &buffer[0], sizeof(buffer));
|
||||
if (length == -1) {
|
||||
if (errno == ENXIO)
|
||||
fatal("tap device has gone down");
|
||||
@ -182,18 +354,43 @@ static void *tap_to_vmnet(void *arg)
|
||||
* This is what mirage-net-unix does. Is it a good
|
||||
* idea really?
|
||||
*/
|
||||
continue;
|
||||
exit(1);
|
||||
}
|
||||
*header1 = (length >> 0) & 0xff;
|
||||
*header2 = (length >> 8) & 0xff;
|
||||
TRC("tap_to_ring: ring_producer_advance n=%zd\n", length + 2);
|
||||
|
||||
ring_producer_advance(ring, (size_t) (length + 2));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
header[0] = (length >> 0) & 0xff;
|
||||
header[1] = (length >> 8) & 0xff;
|
||||
if (really_write(connection->fd, &header[0], 2) == -1)
|
||||
fatal("Failed to write packet header");
|
||||
|
||||
if (really_write(connection->fd, &buffer[0], length) == -1)
|
||||
fatal("Failed to write packet body");
|
||||
/* Write bytes from the to_vmnet_ring to the vmnet fd */
|
||||
static void *ring_to_vmnet(void *arg)
|
||||
{
|
||||
struct connection *c = (struct connection *)arg;
|
||||
struct iovec iovec[2]; /* We won't need more than 2 for the ring */
|
||||
int iovec_len;
|
||||
int length;
|
||||
struct ring *ring = c->to_vmnet_ring;
|
||||
while (1) {
|
||||
/* Read the packet length: this requires 2 bytes */
|
||||
iovec_len = sizeof(iovec) / sizeof(struct iovec);
|
||||
TRC("ring_to_vmnet: ring_producer_wait_available n=%d iovec_len=%d\n", 1, iovec_len);
|
||||
if (ring_consumer_wait_available(ring, 1, &iovec[0], &iovec_len) != 0) {
|
||||
fatal("Failed to read data from ring");
|
||||
}
|
||||
trim_iovec(iovec, &iovec_len, c->message_size);
|
||||
length = 0;
|
||||
for (int i = 0; i < iovec_len; i++ ) {
|
||||
length += iovec[i].iov_len;
|
||||
}
|
||||
TRC("ring_to_vmnet: read %d bytes\n", length);
|
||||
ssize_t n = writev(c->fd, &iovec[0], iovec_len);
|
||||
|
||||
TRC("ring_to_vmnet: advance consumer %zd\n", n);
|
||||
ring_consumer_advance(ring, (size_t) n);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -202,19 +399,31 @@ static void *tap_to_vmnet(void *arg)
|
||||
*/
|
||||
static void handle(struct connection *connection)
|
||||
{
|
||||
pthread_t v2t, t2v;
|
||||
pthread_t v2r, r2t, t2r, r2v;
|
||||
|
||||
if (pthread_create(&v2t, NULL, vmnet_to_tap, connection) != 0)
|
||||
if (pthread_create(&t2r, NULL, tap_to_ring, connection) != 0)
|
||||
fatal("Failed to create the tap_to_ring thread");
|
||||
|
||||
if (pthread_create(&v2r, NULL, vmnet_to_ring, connection) != 0)
|
||||
fatal("Failed to create the vmnet_to_tap thread");
|
||||
|
||||
if (pthread_create(&t2v, NULL, tap_to_vmnet, connection) != 0)
|
||||
fatal("Failed to create the tap_to_vmnet thread");
|
||||
if (pthread_create(&r2t, NULL, ring_to_tap, connection) != 0)
|
||||
fatal("Failed to create the ring_to_tap thread");
|
||||
|
||||
if (pthread_join(v2t, NULL) != 0)
|
||||
fatal("Failed to join the vmnet_to_tap thread");
|
||||
if (pthread_create(&r2v, NULL, ring_to_vmnet, connection) != 0)
|
||||
fatal("Failed to create the ring_to_vmnet thread");
|
||||
|
||||
if (pthread_join(t2v, NULL) != 0)
|
||||
fatal("Failed to join the tap_to_vmnet thread");
|
||||
if (pthread_join(t2r, NULL) != 0)
|
||||
fatal("Failed to join the tap_to_ring thread");
|
||||
|
||||
if (pthread_join(v2r, NULL) != 0)
|
||||
fatal("Failed to join the vmnet_to_ring thread");
|
||||
|
||||
if (pthread_join(t2r, NULL) != 0)
|
||||
fatal("Failed to join the tap_to_ring thread");
|
||||
|
||||
if (pthread_join(r2v, NULL) != 0)
|
||||
fatal("Failed to join the ring_to_vmnet thread");
|
||||
}
|
||||
|
||||
static int create_listening_socket(GUID serviceid)
|
||||
@ -338,14 +547,18 @@ void usage(char *name)
|
||||
{
|
||||
printf("%s usage:\n", name);
|
||||
printf("\t[--daemon] [--tap <name>] [--serviceid <guid>] [--pid <file>]\n");
|
||||
printf("\t[--message-size <bytes>] [--buffer-size <bytes>]\n");
|
||||
printf("\t[--listen | --connect]\n\n");
|
||||
printf("where\n");
|
||||
printf("\t--daemonize: run as a background daemon\n");
|
||||
printf("\t--nofork: don't run handlers in subprocesses\n");
|
||||
printf("\t--tap <name>: create a tap device with the given name\n");
|
||||
printf("\t (defaults to eth1)\n");
|
||||
printf("\t--serviceid <guid>: use <guid> as the well-known service GUID\n");
|
||||
printf("\t (defaults to %s)\n", default_sid);
|
||||
printf("\t--pid <file>: write a pid to the given file\n");
|
||||
printf("\t--message-size <bytes>: dictates the maximum transfer size for AF_HVSOCK\n");
|
||||
printf("\t--buffer-size <bytes>: dictates the buffer size for AF_HVSOCK\n");
|
||||
printf("\t--listen: listen forever for incoming AF_HVSOCK connections\n");
|
||||
printf("\t--connect: connect to the parent partition\n");
|
||||
}
|
||||
@ -362,6 +575,8 @@ int main(int argc, char **argv)
|
||||
int status;
|
||||
pid_t child;
|
||||
int tapfd;
|
||||
int ring_size = 1048576;
|
||||
int message_size = 8192; /* Well known to work across Hyper-V versions */
|
||||
GUID sid;
|
||||
int c;
|
||||
|
||||
@ -370,11 +585,14 @@ int main(int argc, char **argv)
|
||||
static struct option long_options[] = {
|
||||
/* These options set a flag. */
|
||||
{"daemon", no_argument, &daemon_flag, 1},
|
||||
{"nofork", no_argument, &nofork_flag, 1},
|
||||
{"serviceid", required_argument, NULL, 's'},
|
||||
{"tap", required_argument, NULL, 't'},
|
||||
{"pidfile", required_argument, NULL, 'p'},
|
||||
{"listen", no_argument, &listen_flag, 1},
|
||||
{"connect", no_argument, &connect_flag, 1},
|
||||
{"buffer-size", required_argument, NULL, 'b'},
|
||||
{"message-size", required_argument, NULL, 'm'},
|
||||
{0, 0, 0, 0}
|
||||
};
|
||||
|
||||
@ -382,7 +600,7 @@ int main(int argc, char **argv)
|
||||
while (1) {
|
||||
option_index = 0;
|
||||
|
||||
c = getopt_long(argc, argv, "ds:t:p:",
|
||||
c = getopt_long(argc, argv, "ds:t:p:r:m:v",
|
||||
long_options, &option_index);
|
||||
if (c == -1)
|
||||
break;
|
||||
@ -391,6 +609,9 @@ int main(int argc, char **argv)
|
||||
case 'd':
|
||||
daemon_flag = 1;
|
||||
break;
|
||||
case 'n':
|
||||
nofork_flag = 1;
|
||||
break;
|
||||
case 's':
|
||||
serviceid = optarg;
|
||||
break;
|
||||
@ -400,6 +621,15 @@ int main(int argc, char **argv)
|
||||
case 'p':
|
||||
pidfile = optarg;
|
||||
break;
|
||||
case 'b':
|
||||
ring_size = atoi(optarg);
|
||||
break;
|
||||
case 'm':
|
||||
message_size = atoi(optarg);
|
||||
break;
|
||||
case 'v':
|
||||
verbose ++;
|
||||
break;
|
||||
case 0:
|
||||
break;
|
||||
default:
|
||||
@ -432,7 +662,9 @@ int main(int argc, char **argv)
|
||||
|
||||
tapfd = alloc_tap(tap);
|
||||
connection.tapfd = tapfd;
|
||||
|
||||
connection.to_vmnet_ring = ring_allocate(ring_size);
|
||||
connection.from_vmnet_ring = ring_allocate(ring_size);
|
||||
connection.message_size = message_size;
|
||||
if (listen_flag) {
|
||||
syslog(LOG_INFO, "starting in listening mode with serviceid=%s and tap=%s", serviceid, tap);
|
||||
lsocket = create_listening_socket(sid);
|
||||
@ -462,13 +694,17 @@ int main(int argc, char **argv)
|
||||
connection.vif.mac[3], connection.vif.mac[4], connection.vif.mac[5]
|
||||
);
|
||||
set_macaddr(tap, &connection.vif.mac[0]);
|
||||
set_mtu(tap, connection.vif.mtu);
|
||||
|
||||
/* Daemonize after we've made our first reliable connection */
|
||||
if (daemon_flag) {
|
||||
daemon_flag = 0;
|
||||
daemonize(pidfile);
|
||||
}
|
||||
|
||||
if (nofork_flag) {
|
||||
handle(&connection);
|
||||
exit(1);
|
||||
}
|
||||
/*
|
||||
* Run the multithreaded part in a subprocess. On error the
|
||||
* process will exit() which tears down all the threads
|
||||
|
Loading…
Reference in New Issue
Block a user