proxy: add a multiplexing server frontend

On a Hyper-V system we can only register one listening endpoint (with
a GUID), so we need to accept connections, read a header and then
start the proxy.

If the binary has argv[0] == "proxy-vsockd" then run this new frontend.

Signed-off-by: David Scott <dave.scott@docker.com>
This commit is contained in:
David Scott 2016-05-17 22:14:59 +01:00
parent c779594ab4
commit 980588b68f
7 changed files with 240 additions and 77 deletions

View File

@ -37,6 +37,7 @@ COPY kernel/kernel-source-info /etc/
ADD kernel/kernel-patches.tar /etc/kernel-patches
COPY packages/proxy/proxy /sbin/
COPY packages/proxy/proxy /sbin/proxy-vsockd
COPY packages/proxy/etc /etc/
COPY packages/transfused/transfused /sbin/
COPY packages/transfused/etc /etc/

View File

@ -7,10 +7,31 @@ depend()
start()
{
ebegin "Setting up proxy port mount"
ebegin "Setting up proxy port service"
mkdir -p /port
mount -t 9p -o trans=virtio,dfltuid=1001,dfltgid=50,version=9p2000 port /port
eend $? "Failed to mount proxy port filesystem"
[ -n "${PIDFILE}" ] || PIDFILE=/var/run/proxy-vsockd.pid
[ -n "${LOGFILE}" ] || LOGFILE=/var/log/proxy-vsockd.log
start-stop-daemon --start --quiet \
--background \
--exec /sbin/proxy-vsockd \
--make-pidfile --pidfile ${PIDFILE} \
--stderr "${LOGFILE}" --stdout "${LOGFILE}" \
-- -vsockPort 62373 -hvGuid 0B95756A-9985-48AD-9470-78E060895BE7
eend $? "Failed to start proxy port service"
}
stop()
{
ebegin "Stopping proxy port service"
[ -n "${PIDFILE}" ] || PIDFILE=/var/run/proxy-vsockd.pid
start-stop-daemon --stop --quiet --pidfile ${PIDFILE}
eend $? "Failed to stop proxy-vsockd"
}

View File

@ -35,10 +35,10 @@ func NewTCPProxy(listener net.Listener, backendAddr *net.TCPAddr) (*TCPProxy, er
}, nil
}
func (proxy *TCPProxy) clientLoop(client Conn, quit chan bool) {
backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
func HandleTCPConnection(client Conn, backendAddr *net.TCPAddr, quit chan bool) {
backend, err := net.DialTCP("tcp", nil, backendAddr)
if err != nil {
logrus.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
logrus.Printf("Can't forward traffic to backend tcp/%v: %s\n", backendAddr, err)
client.Close()
return
}
@ -89,7 +89,7 @@ func (proxy *TCPProxy) Run() {
logrus.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
return
}
go proxy.clientLoop(client.(Conn), quit)
go HandleTCPConnection(client.(Conn), proxy.backendAddr, quit)
}
}

View File

@ -75,6 +75,19 @@ func (u *udpEncapsulator) Close() error {
return nil
}
func NewUDPConn(conn net.Conn) udpListener {
var m sync.Mutex
var r sync.Mutex
var w sync.Mutex
return &udpEncapsulator{
conn: &conn,
listener: nil,
m: &m,
r: &r,
w: &w,
}
}
func NewUDPListener(listener net.Listener) udpListener {
var m sync.Mutex
var r sync.Mutex

View File

@ -1,80 +1,14 @@
package main
import (
"errors"
"fmt"
"log"
"net"
"os"
"proxy/libproxy"
"strings"
"github.com/rneugeba/virtsock/go/vsock"
"path"
)
func main() {
host, port, container := parseHostContainerAddrs()
vsockP, err := libproxy.NewVsockProxy(&vsock.VsockAddr{Port: uint(port)}, container)
if err != nil {
sendError(err)
if path.Base(os.Args[0]) == "proxy-vsockd" {
manyPorts()
return
}
ipP, err := libproxy.NewIPProxy(host, container)
if err != nil {
sendError(err)
}
ctl, err := exposePort(host, port)
if err != nil {
sendError(err)
}
go handleStopSignals(ipP)
// TODO: avoid this line if we are running in a TTY
sendOK()
go ipP.Run()
vsockP.Run()
ctl.Close() // ensure ctl remains alive and un-GCed until here
os.Exit(0)
}
func exposePort(host net.Addr, port int) (*os.File, error) {
name := host.Network() + ":" + host.String()
log.Printf("exposePort %s\n", name)
err := os.Mkdir("/port/"+name, 0)
if err != nil {
log.Printf("Failed to mkdir /port/%s: %#v\n", name, err)
return nil, err
}
ctl, err := os.OpenFile("/port/"+name+"/ctl", os.O_RDWR, 0)
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return nil, err
}
_, err = ctl.WriteString(fmt.Sprintf("%s:%08x", name, port))
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return nil, err
}
_, err = ctl.Seek(0, 0)
if err != nil {
log.Printf("Failed to seek on /port/%s/ctl: %#v\n", name, err)
return nil, err
}
results := make([]byte, 100)
count, err := ctl.Read(results)
if err != nil {
log.Printf("Failed to read from /port/%s/ctl: %#v\n", name, err)
return nil, err
}
// We deliberately keep the control file open since 9P clunk
// will trigger a shutdown on the host side.
response := string(results[0:count])
if strings.HasPrefix(response, "ERROR ") {
os.Remove("/port/" + name + "/ctl")
response = strings.Trim(response[6:], " \t\r\n")
return nil, errors.New(response)
}
// Hold on to a reference to prevent premature GC and close
return ctl, nil
onePort()
}

View File

@ -0,0 +1,114 @@
package main
import (
"encoding/binary"
"flag"
"github.com/rneugeba/virtsock/go/vsock"
"github.com/rneugeba/virtsock/go/hvsock"
"log"
"net"
"proxy/libproxy"
)
// Listen on virtio-vsock and AF_HYPERV for multiplexed connections
func manyPorts() {
var (
vsockPort = flag.Int("vsockPort", 62373, "virtio-vsock port")
hvGuid = flag.String("hvGuid", "0B95756A-9985-48AD-9470-78E060895BE7", "Hyper-V service GUID")
)
flag.Parse()
listeners := make([]net.Listener, 0)
vsock, err := vsock.Listen(uint(*vsockPort))
if err != nil {
log.Printf("Failed to bind to vsock port %d: %#v", vsockPort, err)
} else {
listeners = append(listeners, vsock)
}
svcid, _ := hvsock.GuidFromString(*hvGuid)
hvsock, err := hvsock.Listen(hvsock.HypervAddr{VmId: hvsock.GUID_WILDCARD, ServiceId: svcid})
if err != nil {
log.Printf("Failed to bind hvsock guid: %s: %#v", *hvGuid, err)
} else {
listeners = append(listeners, hvsock)
}
quit := make(chan bool)
defer close(quit)
for _, l := range listeners {
go func(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
log.Printf("Error accepting connection: %#v", err)
return // no more listening
}
go func(conn net.Conn) {
// Read header which describes TCP/UDP and destination IP:port
d, err := unmarshalDestination(conn)
if err != nil {
log.Printf("Failed to unmarshal header: %#v", err)
conn.Close()
return
}
switch d.Proto {
case TCP:
backendAddr := net.TCPAddr{IP: d.IP, Port: int(d.Port), Zone: ""}
libproxy.HandleTCPConnection(conn.(libproxy.Conn), &backendAddr, quit)
break
case UDP:
backendAddr := &net.UDPAddr{IP: d.IP, Port: int(d.Port), Zone: ""}
proxy, err := libproxy.NewUDPProxy(backendAddr, libproxy.NewUDPConn(conn), backendAddr)
if err != nil {
log.Printf("Failed to setup UDP proxy for %s: %#v", backendAddr, err)
conn.Close()
return
}
proxy.Run()
break
default:
log.Printf("Unknown protocol: %d", d.Proto)
conn.Close()
return
}
}(conn)
}
}(l)
}
forever := make(chan int)
<-forever
}
const (
TCP = 1
UDP = 2
)
type destination struct {
Proto uint8
IP net.IP
Port uint16
}
func unmarshalDestination(conn net.Conn) (destination, error) {
d := destination{}
if err := binary.Read(conn, binary.LittleEndian, &d.Proto); err != nil {
return d, err
}
var length uint16
// IP length
if err := binary.Read(conn, binary.LittleEndian, &length); err != nil {
return d, err
}
d.IP = make([]byte, length)
if err := binary.Read(conn, binary.LittleEndian, &d.IP); err != nil {
return d, err
}
if err := binary.Read(conn, binary.LittleEndian, &d.Port); err != nil {
return d, err
}
return d, nil
}

View File

@ -0,0 +1,80 @@
package main
import (
"errors"
"fmt"
"github.com/rneugeba/virtsock/go/vsock"
"log"
"net"
"os"
"proxy/libproxy"
"strings"
)
func onePort() {
host, port, container := parseHostContainerAddrs()
vsockP, err := libproxy.NewVsockProxy(&vsock.VsockAddr{Port: uint(port)}, container)
if err != nil {
sendError(err)
}
ipP, err := libproxy.NewIPProxy(host, container)
if err != nil {
sendError(err)
}
ctl, err := exposePort(host, port)
if err != nil {
sendError(err)
}
go handleStopSignals(ipP)
// TODO: avoid this line if we are running in a TTY
sendOK()
go ipP.Run()
vsockP.Run()
ctl.Close() // ensure ctl remains alive and un-GCed until here
os.Exit(0)
}
func exposePort(host net.Addr, port int) (*os.File, error) {
name := host.Network() + ":" + host.String()
log.Printf("exposePort %s\n", name)
err := os.Mkdir("/port/"+name, 0)
if err != nil {
log.Printf("Failed to mkdir /port/%s: %#v\n", name, err)
return nil, err
}
ctl, err := os.OpenFile("/port/"+name+"/ctl", os.O_RDWR, 0)
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return nil, err
}
_, err = ctl.WriteString(fmt.Sprintf("%s:%08x", name, port))
if err != nil {
log.Printf("Failed to open /port/%s/ctl: %#v\n", name, err)
return nil, err
}
_, err = ctl.Seek(0, 0)
if err != nil {
log.Printf("Failed to seek on /port/%s/ctl: %#v\n", name, err)
return nil, err
}
results := make([]byte, 100)
count, err := ctl.Read(results)
if err != nil {
log.Printf("Failed to read from /port/%s/ctl: %#v\n", name, err)
return nil, err
}
// We deliberately keep the control file open since 9P clunk
// will trigger a shutdown on the host side.
response := string(results[0:count])
if strings.HasPrefix(response, "ERROR ") {
os.Remove("/port/" + name + "/ctl")
response = strings.Trim(response[6:], " \t\r\n")
return nil, errors.New(response)
}
// Hold on to a reference to prevent premature GC and close
return ctl, nil
}