diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 2113955d82b..add6594c2a9 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -48,6 +48,7 @@ type ProxyServer struct { ResourceContainer string Master string Kubeconfig string + PortRange util.PortRange } // NewProxyServer creates a new ProxyServer object with default parameters @@ -70,6 +71,7 @@ func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom_score_adj value for kube-proxy process. Values must be within the range [-1000, 1000]") fs.StringVar(&s.ResourceContainer, "resource-container", s.ResourceContainer, "Absolute name of the resource-only container to create and run the Kube-proxy in (Default: /kube-proxy).") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).") + fs.Var(&s.PortRange, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.") } // Run runs the specified ProxyServer. This should never exit. @@ -94,7 +96,7 @@ func (s *ProxyServer) Run(_ []string) error { protocol = iptables.ProtocolIpv6 } loadBalancer := proxy.NewLoadBalancerRR() - proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol)) + proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange) if err != nil { glog.Fatalf("Unable to create proxer: %v", err) } diff --git a/pkg/proxy/port_allocator.go b/pkg/proxy/port_allocator.go new file mode 100644 index 00000000000..c767c3e8bf2 --- /dev/null +++ b/pkg/proxy/port_allocator.go @@ -0,0 +1,152 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "errors" + "math/big" + "math/rand" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +var ( + errPortRangeNoPortsRemaining = errors.New("port allocation failed; there are no remaining ports left to allocate in the accepted range") +) + +type PortAllocator interface { + AllocateNext() (int, error) + Release(int) +} + +// randomAllocator is a PortAllocator implementation that allocates random ports, yielding +// a port value of 0 for every call to AllocateNext(). +type randomAllocator struct{} + +// AllocateNext always returns 0 +func (r *randomAllocator) AllocateNext() (int, error) { + return 0, nil +} + +// Release is a noop +func (r *randomAllocator) Release(_ int) { + // noop +} + +// newPortAllocator builds PortAllocator for a given PortRange. If the PortRange is empty +// then a random port allocator is returned; otherwise, a new range-based allocator +// is returned. +func newPortAllocator(r util.PortRange) PortAllocator { + if r.Base == 0 { + return &randomAllocator{} + } + return newPortRangeAllocator(r) +} + +const ( + portsBufSize = 16 + nextFreePortCooldown = 500 * time.Millisecond + allocateNextTimeout = 1 * time.Second +) + +type rangeAllocator struct { + util.PortRange + ports chan int + used big.Int + lock sync.Mutex + rand *rand.Rand +} + +func newPortRangeAllocator(r util.PortRange) PortAllocator { + if r.Base == 0 || r.Size == 0 { + panic("illegal argument: may not specify an empty port range") + } + ra := &rangeAllocator{ + PortRange: r, + ports: make(chan int, portsBufSize), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + } + go util.Until(func() { ra.fillPorts(util.NeverStop) }, nextFreePortCooldown, util.NeverStop) + return ra +} + +// fillPorts loops, always searching for the next free port and, if found, fills the ports buffer with it. +// this func blocks until either there are no remaining free ports, or else the stopCh chan is closed. +func (r *rangeAllocator) fillPorts(stopCh <-chan struct{}) { + for { + port := r.nextFreePort() + if port == -1 { + return + } + select { + case <-stopCh: + return + case r.ports <- port: + } + } +} + +// nextFreePort finds a free port, first picking a random port. if that port is already in use +// then the port range is scanned sequentially until either a port is found or the scan completes +// unsuccessfully. an unsuccessful scan returns a port of -1. +func (r *rangeAllocator) nextFreePort() int { + r.lock.Lock() + defer r.lock.Unlock() + + // choose random port + j := r.rand.Intn(r.Size) + if b := r.used.Bit(j); b == 0 { + r.used.SetBit(&r.used, j, 1) + return j + r.Base + } + + // search sequentially + for i := j + 1; i < r.Size; i++ { + if b := r.used.Bit(i); b == 0 { + r.used.SetBit(&r.used, i, 1) + return i + r.Base + } + } + for i := 0; i < j; i++ { + if b := r.used.Bit(i); b == 0 { + r.used.SetBit(&r.used, i, 1) + return i + r.Base + } + } + return -1 +} + +func (r *rangeAllocator) AllocateNext() (port int, err error) { + select { + case port = <-r.ports: + case <-time.After(allocateNextTimeout): + err = errPortRangeNoPortsRemaining + } + return +} + +func (r *rangeAllocator) Release(port int) { + port -= r.Base + if port < 0 || port >= r.Size { + return + } + r.lock.Lock() + defer r.lock.Unlock() + r.used.SetBit(&r.used, port, 0) +} diff --git a/pkg/proxy/port_allocator_test.go b/pkg/proxy/port_allocator_test.go new file mode 100644 index 00000000000..9028758779a --- /dev/null +++ b/pkg/proxy/port_allocator_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestRangeAllocatorEmpty(t *testing.T) { + r := &util.PortRange{} + r.Set("0-0") + defer func() { + if rv := recover(); rv == nil { + t.Fatalf("expected panic because of empty port range: %+v", r) + } + }() + _ = newPortRangeAllocator(*r) +} + +func TestRangeAllocatorFullyAllocated(t *testing.T) { + r := &util.PortRange{} + r.Set("1-1") + a := newPortRangeAllocator(*r) + p, err := a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p != 1 { + t.Fatalf("unexpected allocated port: %d", p) + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + a.Release(p) + p, err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p != 1 { + t.Fatalf("unexpected allocated port: %d", p) + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } +} + +func TestRangeAllocator_RandomishAllocation(t *testing.T) { + r := &util.PortRange{} + r.Set("1-100") + a := newPortRangeAllocator(*r) + + // allocate all the ports + var err error + ports := make([]int, 100, 100) + for i := 0; i < 100; i++ { + ports[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + // release them all + for i := 0; i < 100; i++ { + a.Release(ports[i]) + } + + // allocate the ports again + rports := make([]int, 100, 100) + for i := 0; i < 100; i++ { + rports[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + if reflect.DeepEqual(ports, rports) { + t.Fatalf("expected re-allocated ports to be in a somewhat random order") + } +} diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index c2e78747197..a2eecf6d96f 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -74,6 +74,7 @@ type Proxier struct { listenIP net.IP iptables iptables.Interface hostIP net.IP + proxyPorts PortAllocator } // A key for the portMap @@ -105,7 +106,7 @@ func IsProxyLocked(err error) bool { // if iptables fails to update or acquire the initial lock. Once a proxier is // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. -func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) (*Proxier, error) { +func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -115,11 +116,17 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In return nil, fmt.Errorf("failed to select a host interface: %v", err) } + proxyPorts := newPortAllocator(pr) + glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP) + return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator) (*Proxier, error) { + // convenient to pass nil for tests.. + if proxyPorts == nil { + proxyPorts = newPortAllocator(util.PortRange{}) + } // Set up the iptables foundations we need. if err := iptablesInit(iptables); err != nil { return nil, fmt.Errorf("failed to initialize iptables: %v", err) @@ -136,6 +143,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables listenIP: listenIP, iptables: iptables, hostIP: hostIP, + proxyPorts: proxyPorts, }, nil } @@ -189,7 +197,10 @@ func (proxier *Proxier) stopProxy(service ServicePortName, info *serviceInfo) er // This assumes proxier.mu is locked. func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *serviceInfo) error { delete(proxier.serviceMap, service) - return info.socket.Close() + err := info.socket.Close() + port := info.socket.ListenPort() + proxier.proxyPorts.Release(port) + return err } func (proxier *Proxier) getServiceInfo(service ServicePortName) (*serviceInfo, bool) { @@ -285,8 +296,15 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { glog.Errorf("Failed to stop service %q: %v", serviceName, err) } } + + proxyPort, err := proxier.proxyPorts.AllocateNext() + if err != nil { + glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) + continue + } + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err := proxier.addServiceOnPort(serviceName, servicePort.Protocol, 0, udpIdleTimeout) + info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout) if err != nil { glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 49c02e28873..0cb3db0a3cb 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -206,7 +206,7 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -233,7 +233,7 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -292,7 +292,7 @@ func TestMultiPortOnUpdate(t *testing.T) { serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"} serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -355,7 +355,7 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -393,7 +393,7 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -431,7 +431,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -468,7 +468,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -505,7 +505,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -557,7 +557,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -609,7 +609,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -657,7 +657,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } @@ -754,7 +754,7 @@ func TestProxyUpdatePortal(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/proxy/proxysocket.go b/pkg/proxy/proxysocket.go index 8fbd2fbada1..4513fa95e33 100644 --- a/pkg/proxy/proxysocket.go +++ b/pkg/proxy/proxysocket.go @@ -40,6 +40,8 @@ type proxySocket interface { Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier) + // ListenPort returns the host port that the proxySocket is listening on + ListenPort() int } func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { @@ -50,7 +52,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er if err != nil { return nil, err } - return &tcpProxySocket{listener}, nil + return &tcpProxySocket{Listener: listener, port: port}, nil case "UDP": addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) if err != nil { @@ -60,7 +62,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er if err != nil { return nil, err } - return &udpProxySocket{conn}, nil + return &udpProxySocket{UDPConn: conn, port: port}, nil } return nil, fmt.Errorf("unknown protocol %q", protocol) } @@ -72,6 +74,11 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8} // no new connections are allowed but existing connections are left untouched. type tcpProxySocket struct { net.Listener + port int +} + +func (tcp *tcpProxySocket) ListenPort() int { + return tcp.port } func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { @@ -162,6 +169,11 @@ func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { // TODO: We could lame-duck this ourselves, if it becomes important. type udpProxySocket struct { *net.UDPConn + port int +} + +func (udp *udpProxySocket) ListenPort() int { + return udp.port } func (udp *udpProxySocket) Addr() net.Addr {