From f5a9281a26386478cd0cb0379854419ba7dd3246 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 17 Jun 2015 15:26:14 -0700 Subject: [PATCH] Actually hold NodePorts open in kube-proxy --- pkg/proxy/userspace/proxier.go | 40 +++++++++++++++++++++++++--------- test/e2e/service.go | 27 ++++++++++++++++++++--- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 255442071b1..859c8d63d88 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -73,7 +73,7 @@ type Proxier struct { serviceMap map[proxy.ServicePortName]*serviceInfo syncPeriod time.Duration portMapMutex sync.Mutex - portMap map[portMapKey]proxy.ServicePortName + portMap map[portMapKey]*portMapValue numProxyLoops int32 // use atomic ops to access this; mostly for testing listenIP net.IP iptables iptables.Interface @@ -94,6 +94,14 @@ func (k *portMapKey) String() string { return fmt.Sprintf("%s/%d", k.protocol, k.port) } +// A value for the portMap +type portMapValue struct { + owner proxy.ServicePortName + socket interface { + Close() error + } +} + var ( // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on // the loopback address. May be checked for by callers of NewProxier to know whether @@ -146,7 +154,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables return &Proxier{ loadBalancer: loadBalancer, serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - portMap: make(map[portMapKey]proxy.ServicePortName), + portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, listenIP: listenIP, iptables: iptables, @@ -499,7 +507,7 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox // Marks a port as being owned by a particular service, or returns error if already claimed. // Idempotent: reclaiming with the same owner is not an error -func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { +func (proxier *Proxier) claimNodePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { proxier.portMapMutex.Lock() defer proxier.portMapMutex.Unlock() @@ -508,19 +516,30 @@ func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.S key := portMapKey{port: port, protocol: protocol} existing, found := proxier.portMap[key] if !found { - proxier.portMap[key] = owner + // Hold the actual port open, even though we use iptables to redirect + // it. This ensures that a) it's safe to take and b) that stays true. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + socket, err := newProxySocket(protocol, net.ParseIP("127.0.0.1"), port) + if err != nil { + return fmt.Errorf("can't open node port for %s: %v", key.String(), err) + } + proxier.portMap[key] = &portMapValue{owner: owner, socket: socket} return nil } - if existing == owner { + if existing.owner == owner { // We are idempotent return nil } - return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing) + return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing) } // Release a claim on a port. Returns an error if the owner does not match the claim. // Tolerates release on an unclaimed port, to simplify . -func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { +func (proxier *Proxier) releaseNodePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { proxier.portMapMutex.Lock() defer proxier.portMapMutex.Unlock() @@ -531,10 +550,11 @@ func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy glog.Infof("Ignoring release on unowned port: %v", key) return nil } - if existing != owner { + if existing.owner != owner { return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing) } delete(proxier.portMap, key) + existing.socket.Close() return nil } @@ -542,7 +562,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI // TODO: Do we want to allow containers to access public services? Probably yes. // TODO: We could refactor this to be the same code as portal, but with IP == nil - err := proxier.claimPort(nodePort, protocol, name) + err := proxier.claimNodePort(nodePort, protocol, name) if err != nil { return err } @@ -645,7 +665,7 @@ func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxy el = append(el, err) } - if err := proxier.releasePort(nodePort, protocol, name); err != nil { + if err := proxier.releaseNodePort(nodePort, protocol, name); err != nil { el = append(el, err) } diff --git a/test/e2e/service.go b/test/e2e/service.go index c73425d332d..c0bd456c612 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -69,6 +69,7 @@ var _ = Describe("Services", func() { } } }) + // TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here. It("should provide secure master service", func() { _, err := c.Services(api.NamespaceDefault).Get("kubernetes") @@ -460,6 +461,16 @@ var _ = Describe("Services", func() { By("hitting the pod through the service's NodePort") ip := pickMinionIP(c) testReachable(ip, nodePort) + + hosts, err := NodeSSHHosts(c) + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort) + _, _, code, err := SSH(cmd, hosts[0], testContext.Provider) + if code != 0 { + Failf("expected node port (%d) to be in use", nodePort) + } }) It("should be able to change the type and nodeport settings of a service", func() { @@ -878,16 +889,26 @@ var _ = Describe("Services", func() { if !ServiceNodePortRange.Contains(port.NodePort) { Failf("got unexpected (out-of-range) port for new service: %v", service) } - port1 := port.NodePort + nodePort := port.NodePort By("deleting original service " + serviceName) err = t.DeleteService(serviceName) Expect(err).NotTo(HaveOccurred()) - By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", port1)) + hosts, err := NodeSSHHosts(c) + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort) + _, _, code, err := SSH(cmd, hosts[0], testContext.Provider) + if code == 0 { + Failf("expected node port (%d) to not be in use", nodePort) + } + + By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort)) service = t.BuildServiceSpec() service.Spec.Type = api.ServiceTypeNodePort - service.Spec.Ports[0].NodePort = port1 + service.Spec.Ports[0].NodePort = nodePort service, err = t.CreateService(service) Expect(err).NotTo(HaveOccurred()) })