Merge pull request #8417 from jdef/proxy_port_allocator

proxy port range allocation
This commit is contained in:
Brian Grant 2015-06-02 14:17:41 -07:00
commit 0cb0a56895
6 changed files with 307 additions and 22 deletions

View File

@ -48,6 +48,7 @@ type ProxyServer struct {
ResourceContainer string
Master string
Kubeconfig string
PortRange util.PortRange
}
// NewProxyServer creates a new ProxyServer object with default parameters
@ -70,6 +71,7 @@ func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom_score_adj value for kube-proxy process. Values must be within the range [-1000, 1000]")
fs.StringVar(&s.ResourceContainer, "resource-container", s.ResourceContainer, "Absolute name of the resource-only container to create and run the Kube-proxy in (Default: /kube-proxy).")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
fs.Var(&s.PortRange, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.")
}
// Run runs the specified ProxyServer. This should never exit.
@ -94,7 +96,7 @@ func (s *ProxyServer) Run(_ []string) error {
protocol = iptables.ProtocolIpv6
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol))
proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange)
if err != nil {
glog.Fatalf("Unable to create proxer: %v", err)
}

152
pkg/proxy/port_allocator.go Normal file
View File

@ -0,0 +1,152 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"errors"
"math/big"
"math/rand"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
var (
errPortRangeNoPortsRemaining = errors.New("port allocation failed; there are no remaining ports left to allocate in the accepted range")
)
type PortAllocator interface {
AllocateNext() (int, error)
Release(int)
}
// randomAllocator is a PortAllocator implementation that allocates random ports, yielding
// a port value of 0 for every call to AllocateNext().
type randomAllocator struct{}
// AllocateNext always returns 0
func (r *randomAllocator) AllocateNext() (int, error) {
return 0, nil
}
// Release is a noop
func (r *randomAllocator) Release(_ int) {
// noop
}
// newPortAllocator builds PortAllocator for a given PortRange. If the PortRange is empty
// then a random port allocator is returned; otherwise, a new range-based allocator
// is returned.
func newPortAllocator(r util.PortRange) PortAllocator {
if r.Base == 0 {
return &randomAllocator{}
}
return newPortRangeAllocator(r)
}
const (
portsBufSize = 16
nextFreePortCooldown = 500 * time.Millisecond
allocateNextTimeout = 1 * time.Second
)
type rangeAllocator struct {
util.PortRange
ports chan int
used big.Int
lock sync.Mutex
rand *rand.Rand
}
func newPortRangeAllocator(r util.PortRange) PortAllocator {
if r.Base == 0 || r.Size == 0 {
panic("illegal argument: may not specify an empty port range")
}
ra := &rangeAllocator{
PortRange: r,
ports: make(chan int, portsBufSize),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
go util.Until(func() { ra.fillPorts(util.NeverStop) }, nextFreePortCooldown, util.NeverStop)
return ra
}
// fillPorts loops, always searching for the next free port and, if found, fills the ports buffer with it.
// this func blocks until either there are no remaining free ports, or else the stopCh chan is closed.
func (r *rangeAllocator) fillPorts(stopCh <-chan struct{}) {
for {
port := r.nextFreePort()
if port == -1 {
return
}
select {
case <-stopCh:
return
case r.ports <- port:
}
}
}
// nextFreePort finds a free port, first picking a random port. if that port is already in use
// then the port range is scanned sequentially until either a port is found or the scan completes
// unsuccessfully. an unsuccessful scan returns a port of -1.
func (r *rangeAllocator) nextFreePort() int {
r.lock.Lock()
defer r.lock.Unlock()
// choose random port
j := r.rand.Intn(r.Size)
if b := r.used.Bit(j); b == 0 {
r.used.SetBit(&r.used, j, 1)
return j + r.Base
}
// search sequentially
for i := j + 1; i < r.Size; i++ {
if b := r.used.Bit(i); b == 0 {
r.used.SetBit(&r.used, i, 1)
return i + r.Base
}
}
for i := 0; i < j; i++ {
if b := r.used.Bit(i); b == 0 {
r.used.SetBit(&r.used, i, 1)
return i + r.Base
}
}
return -1
}
func (r *rangeAllocator) AllocateNext() (port int, err error) {
select {
case port = <-r.ports:
case <-time.After(allocateNextTimeout):
err = errPortRangeNoPortsRemaining
}
return
}
func (r *rangeAllocator) Release(port int) {
port -= r.Base
if port < 0 || port >= r.Size {
return
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, port, 0)
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestRangeAllocatorEmpty(t *testing.T) {
r := &util.PortRange{}
r.Set("0-0")
defer func() {
if rv := recover(); rv == nil {
t.Fatalf("expected panic because of empty port range: %+v", r)
}
}()
_ = newPortRangeAllocator(*r)
}
func TestRangeAllocatorFullyAllocated(t *testing.T) {
r := &util.PortRange{}
r.Set("1-1")
a := newPortRangeAllocator(*r)
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p != 1 {
t.Fatalf("unexpected allocated port: %d", p)
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
a.Release(p)
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p != 1 {
t.Fatalf("unexpected allocated port: %d", p)
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
}
func TestRangeAllocator_RandomishAllocation(t *testing.T) {
r := &util.PortRange{}
r.Set("1-100")
a := newPortRangeAllocator(*r)
// allocate all the ports
var err error
ports := make([]int, 100, 100)
for i := 0; i < 100; i++ {
ports[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// release them all
for i := 0; i < 100; i++ {
a.Release(ports[i])
}
// allocate the ports again
rports := make([]int, 100, 100)
for i := 0; i < 100; i++ {
rports[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
if reflect.DeepEqual(ports, rports) {
t.Fatalf("expected re-allocated ports to be in a somewhat random order")
}
}

View File

@ -74,6 +74,7 @@ type Proxier struct {
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
}
// A key for the portMap
@ -105,7 +106,7 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) (*Proxier, error) {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
@ -115,11 +116,17 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
return nil, fmt.Errorf("failed to select a host interface: %v", err)
}
proxyPorts := newPortAllocator(pr)
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(util.PortRange{})
}
// Set up the iptables foundations we need.
if err := iptablesInit(iptables); err != nil {
return nil, fmt.Errorf("failed to initialize iptables: %v", err)
@ -136,6 +143,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
}, nil
}
@ -189,7 +197,10 @@ func (proxier *Proxier) stopProxy(service ServicePortName, info *serviceInfo) er
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
return info.socket.Close()
err := info.socket.Close()
port := info.socket.ListenPort()
proxier.proxyPorts.Release(port)
return err
}
func (proxier *Proxier) getServiceInfo(service ServicePortName) (*serviceInfo, bool) {
@ -285,8 +296,15 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
continue
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err := proxier.addServiceOnPort(serviceName, servicePort.Protocol, 0, udpIdleTimeout)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue

View File

@ -206,7 +206,7 @@ func TestTCPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -233,7 +233,7 @@ func TestUDPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -265,7 +265,7 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -292,7 +292,7 @@ func TestMultiPortOnUpdate(t *testing.T) {
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"}
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -355,7 +355,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -393,7 +393,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -431,7 +431,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -468,7 +468,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -505,7 +505,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -557,7 +557,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -609,7 +609,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -657,7 +657,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -702,7 +702,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}
@ -754,7 +754,7 @@ func TestProxyUpdatePortal(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
if err != nil {
t.Fatal(err)
}

View File

@ -40,6 +40,8 @@ type proxySocket interface {
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier)
// ListenPort returns the host port that the proxySocket is listening on
ListenPort() int
}
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
@ -50,7 +52,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
if err != nil {
return nil, err
}
return &tcpProxySocket{listener}, nil
return &tcpProxySocket{Listener: listener, port: port}, nil
case "UDP":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
@ -60,7 +62,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
if err != nil {
return nil, err
}
return &udpProxySocket{conn}, nil
return &udpProxySocket{UDPConn: conn, port: port}, nil
}
return nil, fmt.Errorf("unknown protocol %q", protocol)
}
@ -72,6 +74,11 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8}
// no new connections are allowed but existing connections are left untouched.
type tcpProxySocket struct {
net.Listener
port int
}
func (tcp *tcpProxySocket) ListenPort() int {
return tcp.port
}
func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
@ -162,6 +169,11 @@ func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
// TODO: We could lame-duck this ourselves, if it becomes important.
type udpProxySocket struct {
*net.UDPConn
port int
}
func (udp *udpProxySocket) ListenPort() int {
return udp.port
}
func (udp *udpProxySocket) Addr() net.Addr {