Merge pull request #89 from djs55/proxy-vsock

Switch the port forwarding docker-proxy replacement to vsock
This commit is contained in:
Dave Scott 2016-04-16 13:44:28 +01:00
commit b2abc289ef
10 changed files with 249 additions and 83 deletions

View File

@ -1,18 +1,15 @@
FROM golang:alpine
RUN apk update && apk add alpine-sdk
RUN mkdir -p /go/src/proxy
WORKDIR /go/src/proxy
COPY * /go/src/proxy/
RUN mkdir -p /go/src/pkg/proxy
COPY pkg/* /go/src/pkg/proxy/
RUN mkdir -p /go/src/vendor/github.com/Sirupsen/logrus
COPY vendor/github.com/Sirupsen/logrus/* /go/src/vendor/github.com/Sirupsen/logrus/
COPY ./ /go/src/proxy/
ARG GOARCH
ARG GOOS
RUN go install
RUN go install --ldflags '-extldflags "-fno-PIC"'
RUN [ -f /go/bin/*/proxy ] && mv /go/bin/*/proxy /go/bin/ || true

View File

@ -1,4 +1,4 @@
package proxy
package libproxy
import (
"bytes"

View File

@ -1,10 +1,11 @@
// Package proxy provides a network Proxy interface and implementations for TCP
// and UDP.
package proxy
package libproxy
import (
"fmt"
"net"
"github.com/djs55/vsock"
)
// Proxy defines the behavior of a proxy. It forwards traffic back and forth
@ -24,13 +25,25 @@ type Proxy interface {
BackendAddr() net.Addr
}
// NewProxy creates a Proxy according to the specified frontendAddr and backendAddr.
func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
switch frontendAddr.(type) {
case *net.UDPAddr:
return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
case *net.TCPAddr:
return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
listener, err := net.Listen("tcp", frontendAddr.String())
if err != nil {
return nil, err
}
return NewTCPProxy(listener, backendAddr.(*net.TCPAddr))
case *vsock.VsockAddr:
listener, err := vsock.Listen(frontendAddr.(*vsock.VsockAddr).Port)
if err != nil {
return nil, err
}
return NewTCPProxy(listener, backendAddr.(*net.TCPAddr))
default:
panic(fmt.Errorf("Unsupported protocol"))
}

View File

@ -1,4 +1,4 @@
package proxy
package libproxy
import (
"net"

View File

@ -1,4 +1,4 @@
package proxy
package libproxy
import (
"io"
@ -8,30 +8,34 @@ import (
"github.com/Sirupsen/logrus"
)
type Conn interface {
io.Reader
io.Writer
io.Closer
CloseRead() error
CloseWrite() error
}
// TCPProxy is a proxy for TCP connections. It implements the Proxy interface to
// handle TCP traffic forwarding between the frontend and backend addresses.
type TCPProxy struct {
listener *net.TCPListener
frontendAddr *net.TCPAddr
listener net.Listener
frontendAddr net.Addr
backendAddr *net.TCPAddr
}
// NewTCPProxy creates a new TCPProxy.
func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) {
listener, err := net.ListenTCP("tcp", frontendAddr)
if err != nil {
return nil, err
}
func NewTCPProxy(listener net.Listener, backendAddr *net.TCPAddr) (*TCPProxy, error) {
// If the port in frontendAddr was 0 then ListenTCP will have a picked
// a port to listen on, hence the call to Addr to get that actual port:
return &TCPProxy{
listener: listener,
frontendAddr: listener.Addr().(*net.TCPAddr),
frontendAddr: listener.Addr(),
backendAddr: backendAddr,
}, nil
}
func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
func (proxy *TCPProxy) clientLoop(client Conn, quit chan bool) {
backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
if err != nil {
logrus.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
@ -40,7 +44,7 @@ func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
}
event := make(chan int64)
var broker = func(to, from *net.TCPConn) {
var broker = func(to, from Conn) {
written, err := io.Copy(to, from)
if err != nil {
// If the socket we are writing to is shutdown with
@ -85,7 +89,7 @@ func (proxy *TCPProxy) Run() {
logrus.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
return
}
go proxy.clientLoop(client.(*net.TCPConn), quit)
go proxy.clientLoop(client.(Conn), quit)
}
}

View File

@ -1,4 +1,4 @@
package proxy
package libproxy
import (
"encoding/binary"

View File

@ -6,62 +6,59 @@ import (
"log"
"net"
"os"
"pkg/proxy"
"proxy/libproxy"
"strings"
"github.com/djs55/vsock"
)
func main() {
host, port, container := parseHostContainerAddrs()
err := exposePort(host, port)
ctl, err := exposePort(host, port)
if err != nil {
sendError(err)
}
p, err := proxy.NewProxy(host, container)
p, err := libproxy.NewProxy(&vsock.VsockAddr{Port: uint(port)}, container)
if err != nil {
unexposePort(host)
sendError(err)
}
go handleStopSignals(p)
sendOK()
p.Run()
unexposePort(host)
ctl.Close() // ensure ctl remains alive and un-GCed until here
os.Exit(0)
}
func exposePort(host net.Addr, port int) error {
func exposePort(host net.Addr, port int) (*os.File, error) {
name := host.String()
log.Printf("exposePort %s\n", name)
err := os.Mkdir("/port/"+name, 0)
if err != nil {
log.Printf("Failed to mkdir /port/%s: %#v\n", name, err)
return err
return nil, err
}
ctl, err := os.OpenFile("/port/"+name+"/ctl", os.O_RDWR, 0)
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return err
return nil, err
}
me, err := getMyAddress()
if err != nil {
log.Printf("Failed to determine my local address: %#v\n", err)
return err
}
_, err = ctl.WriteString(fmt.Sprintf("%s:%s:%d", name, me, port))
_, err = ctl.WriteString(fmt.Sprintf("%s:%08x", name, port))
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return err
return nil, err
}
_, err = ctl.Seek(0, 0)
if err != nil {
log.Printf("Failed to seek on /port/%s/ctl: %#v\n", name, err)
return err
return nil, err
}
results := make([]byte, 100)
count, err := ctl.Read(results)
if err != nil {
log.Printf("Failed to read from /port/%s/ctl: %#v\n", name, err)
return err
return nil, err
}
// We deliberately keep the control file open since 9P clunk
// will trigger a shutdown on the host side.
@ -70,42 +67,8 @@ func exposePort(host net.Addr, port int) error {
if strings.HasPrefix(response, "ERROR ") {
os.Remove("/port/" + name + "/ctl")
response = strings.Trim(response[6:], " \t\r\n")
return errors.New(response)
return nil, errors.New(response)
}
return nil
}
func unexposePort(host net.Addr) {
name := host.String()
log.Printf("unexposePort %s\n", name)
err := os.Remove("/port/" + name)
if err != nil {
log.Printf("Failed to remove /port/%s: %#v\n", name, err)
}
}
var myAddress string
// getMyAddress returns a string representing my address from the host's
// point of view. For now this is an IP address but it soon should be a vsock
// port.
func getMyAddress() (string, error) {
if myAddress != "" {
return myAddress, nil
}
d, err := os.Open("/port/docker")
if err != nil {
return "", err
}
defer d.Close()
bytes := make([]byte, 100)
count, err := d.Read(bytes)
if err != nil {
return "", err
}
s := string(bytes)[0:count]
bits := strings.Split(s, ":")
myAddress = bits[2]
return myAddress, nil
// Hold on to a reference to prevent premature GC and close
return ctl, nil
}

View File

@ -9,7 +9,7 @@ import (
"os/signal"
"syscall"
"pkg/proxy"
"proxy/libproxy"
)
// sendError signals the error to the parent and quits the process.
@ -28,6 +28,9 @@ func sendOK() {
f.Close()
}
// Map dynamic ports onto vsock ports over this offset
var vSockPortOffset = 0x10000
// From docker/libnetwork/portmapper/proxy.go:
// parseHostContainerAddrs parses the flags passed on reexec to create the TCP or UDP
@ -46,11 +49,11 @@ func parseHostContainerAddrs() (host net.Addr, port int, container net.Addr) {
switch *proto {
case "tcp":
host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
port = *hostPort
port = vSockPortOffset + *hostPort
container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
case "udp":
host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
port = *hostPort
port = vSockPortOffset + *hostPort
container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
default:
log.Fatalf("unsupported protocol %s", *proto)
@ -59,11 +62,13 @@ func parseHostContainerAddrs() (host net.Addr, port int, container net.Addr) {
return host, port, container
}
func handleStopSignals(p proxy.Proxy) {
func handleStopSignals(p libproxy.Proxy) {
s := make(chan os.Signal, 10)
signal.Notify(s, os.Interrupt, syscall.SIGTERM, syscall.SIGSTOP)
for range s {
p.Close()
os.Exit(0)
// The vsock proxy cannot be shutdown the same way as the TCP one:
//p.Close()
}
}

View File

@ -0,0 +1,171 @@
package vsock
import (
"errors"
"fmt"
"net"
"os"
"syscall"
"time"
)
/* No way to teach net or syscall about vsock sockaddr, so go right to C */
/*
#include <sys/socket.h>
struct sockaddr_vm {
sa_family_t svm_family;
unsigned short svm_reserved1;
unsigned int svm_port;
unsigned int svm_cid;
unsigned char svm_zero[sizeof(struct sockaddr) -
sizeof(sa_family_t) - sizeof(unsigned short) -
sizeof(unsigned int) - sizeof(unsigned int)];
};
int bind_sockaddr_vm(int fd, const struct sockaddr_vm *sa_vm) {
return bind(fd, (const struct sockaddr*)sa_vm, sizeof(*sa_vm));
}
int connect_sockaddr_vm(int fd, const struct sockaddr_vm *sa_vm) {
return connect(fd, (const struct sockaddr*)sa_vm, sizeof(*sa_vm));
}
int accept_vm(int fd, struct sockaddr_vm *sa_vm, socklen_t *sa_vm_len) {
return accept4(fd, (struct sockaddr *)sa_vm, sa_vm_len, 0);
}
*/
import "C"
const (
AF_VSOCK = 40
VSOCK_CID_ANY = 4294967295 /* 2^32-1 */
VSOCK_CID_SELF = 3
)
// Listen returns a net.Listener which can accept connections on the given
// vhan port.
func Listen(port uint) (net.Listener, error) {
accept_fd, err := syscall.Socket(AF_VSOCK, syscall.SOCK_STREAM, 0)
if err != nil {
return nil, err
}
sa := C.struct_sockaddr_vm{}
sa.svm_family = AF_VSOCK
sa.svm_port = C.uint(port)
sa.svm_cid = VSOCK_CID_ANY
if ret := C.bind_sockaddr_vm(C.int(accept_fd), &sa); ret != 0 {
return nil, errors.New(fmt.Sprintf("failed bind vsock connection to %08x.%08x, returned %d", sa.svm_cid, sa.svm_port, ret))
}
err = syscall.Listen(accept_fd, syscall.SOMAXCONN)
if err != nil {
return nil, err
}
return &vsockListener{accept_fd, port}, nil
}
// Conn is a vsock connection which support half-close.
type Conn interface {
net.Conn
CloseRead() error
CloseWrite() error
}
type vsockListener struct {
accept_fd int
port uint
}
func (v *vsockListener) Accept() (net.Conn, error) {
var accept_sa C.struct_sockaddr_vm
var accept_sa_len C.socklen_t
accept_sa_len = C.sizeof_struct_sockaddr_vm
fd, err := C.accept_vm(C.int(v.accept_fd), &accept_sa, &accept_sa_len)
if err != nil {
return nil, err
}
return newVsockConn(uintptr(fd), v.port)
}
func (v *vsockListener) Close() error {
// Note this won't cause the Accept to unblock.
return syscall.Close(v.accept_fd)
}
type VsockAddr struct {
Port uint
}
func (a VsockAddr) Network() string {
return "vsock"
}
func (a VsockAddr) String() string {
return fmt.Sprintf("%08x", a.Port)
}
func (v *vsockListener) Addr() net.Addr {
return VsockAddr{Port: v.port}
}
// a wrapper around FileConn which supports CloseRead and CloseWrite
type vsockConn struct {
vsock *os.File
fd uintptr
local VsockAddr
remote VsockAddr
}
type VsockConn struct {
vsockConn
}
func newVsockConn(fd uintptr, localPort uint) (*VsockConn, error) {
vsock := os.NewFile(fd, fmt.Sprintf("vsock:%d", fd))
local := VsockAddr{Port: localPort}
remote := VsockAddr{Port: uint(0)} // FIXME
return &VsockConn{vsockConn{vsock: vsock, fd: fd, local: local, remote: remote}}, nil
}
func (v *VsockConn) LocalAddr() net.Addr {
return v.local
}
func (v *VsockConn) RemoteAddr() net.Addr {
return v.remote
}
func (v *VsockConn) CloseRead() error {
return syscall.Shutdown(int(v.fd), syscall.SHUT_RD)
}
func (v *VsockConn) CloseWrite() error {
return syscall.Shutdown(int(v.fd), syscall.SHUT_WR)
}
func (v *VsockConn) Close() error {
return v.vsock.Close()
}
func (v *VsockConn) Read(buf []byte) (int, error) {
return v.vsock.Read(buf)
}
func (v *VsockConn) Write(buf []byte) (int, error) {
return v.vsock.Write(buf)
}
func (v *VsockConn) SetDeadline(t time.Time) error {
return nil // FIXME
}
func (v *VsockConn) SetReadDeadline(t time.Time) error {
return nil // FIXME
}
func (v *VsockConn) SetWriteDeadline(t time.Time) error {
return nil // FIXME
}

13
alpine/packages/proxy/vendor/manifest vendored Normal file
View File

@ -0,0 +1,13 @@
{
"version": 0,
"dependencies": [
{
"importpath": "github.com/djs55/vsock",
"repository": "ssh://github.com/djs55/vsock",
"vcs": "git",
"revision": "7ea787de194e9a251ea746cf37f464c1e6cb822a",
"branch": "master",
"notests": true
}
]
}