From 1f6baa6549e3b7faf3200ed136e1929ed78b3688 Mon Sep 17 00:00:00 2001 From: BenTheElder Date: Fri, 7 Aug 2015 20:07:15 -0400 Subject: [PATCH 1/3] Move userspace code to sub-package in proxy. Moves the userspace code in proxy to a sub-package and adds the ProxyProvider interface. This is in preparation for landing an implementation of https://github.com/GoogleCloudPlatform/kubernetes/issues/3760, which will mostly be in another sub package for iptables. --- cmd/kube-proxy/app/server.go | 6 +- pkg/proxy/proxy_provider.go | 33 +++++++++++ .../{loadbalancer.go => service_port_name.go} | 13 +---- pkg/proxy/userspace/loadbalancer.go | 33 +++++++++++ pkg/proxy/{ => userspace}/port_allocator.go | 2 +- .../{ => userspace}/port_allocator_test.go | 2 +- pkg/proxy/{ => userspace}/proxier.go | 56 ++++++++++--------- pkg/proxy/{ => userspace}/proxier_test.go | 39 ++++++------- pkg/proxy/{ => userspace}/proxysocket.go | 13 +++-- pkg/proxy/{ => userspace}/roundrobin.go | 23 ++++---- pkg/proxy/{ => userspace}/roundrobin_test.go | 34 +++++------ pkg/proxy/{ => userspace}/udp_server.go | 2 +- 12 files changed, 159 insertions(+), 97 deletions(-) create mode 100644 pkg/proxy/proxy_provider.go rename pkg/proxy/{loadbalancer.go => service_port_name.go} (61%) create mode 100644 pkg/proxy/userspace/loadbalancer.go rename pkg/proxy/{ => userspace}/port_allocator.go (99%) rename pkg/proxy/{ => userspace}/port_allocator_test.go (99%) rename pkg/proxy/{ => userspace}/proxier.go (93%) rename pkg/proxy/{ => userspace}/proxier_test.go (93%) rename pkg/proxy/{ => userspace}/proxysocket.go (93%) rename pkg/proxy/{ => userspace}/roundrobin.go (90%) rename pkg/proxy/{ => userspace}/roundrobin_test.go (95%) rename pkg/proxy/{ => userspace}/udp_server.go (98%) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index accc4bb1191..72b5585fd65 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -29,8 +29,8 @@ import ( "k8s.io/kubernetes/pkg/client/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/clientcmd/api" "k8s.io/kubernetes/pkg/kubelet/qos" - "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/config" + "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/iptables" @@ -97,8 +97,8 @@ func (s *ProxyServer) Run(_ []string) error { if net.IP(s.BindAddress).To4() == nil { protocol = iptables.ProtocolIpv6 } - loadBalancer := proxy.NewLoadBalancerRR() - proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange) + loadBalancer := userspace.NewLoadBalancerRR() + proxier, err := userspace.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange) if err != nil { glog.Fatalf("Unable to create proxer: %v", err) } diff --git a/pkg/proxy/proxy_provider.go b/pkg/proxy/proxy_provider.go new file mode 100644 index 00000000000..161a6fc0720 --- /dev/null +++ b/pkg/proxy/proxy_provider.go @@ -0,0 +1,33 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "k8s.io/kubernetes/pkg/api" +) + +// ProxyProvider is the interface provided by proxier implementations. +type ProxyProvider interface { + // OnUpdate manages the active set of service proxies. + // Active service proxies are reinitialized if found in the update set or + // removed if missing from the update set. + OnUpdate(services []api.Service) + // SyncLoop runs periodic work. + // This is expected to run as a goroutine or as the main loop of the app. + // It does not return. + SyncLoop() +} diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/service_port_name.go similarity index 61% rename from pkg/proxy/loadbalancer.go rename to pkg/proxy/service_port_name.go index d03c4285bf0..9ac763d3abd 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/service_port_name.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2015 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,21 +18,10 @@ package proxy import ( "fmt" - "net" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" ) -// LoadBalancer is an interface for distributing incoming requests to service endpoints. -type LoadBalancer interface { - // NextEndpoint returns the endpoint to handle a request for the given - // service-port and source address. - NextEndpoint(service ServicePortName, srcAddr net.Addr) (string, error) - NewService(service ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error - CleanupStaleStickySessions(service ServicePortName) -} - // ServicePortName carries a namespace + name + portname. This is the unique // identfier for a load-balanced service. type ServicePortName struct { diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go new file mode 100644 index 00000000000..f32f05fc8a1 --- /dev/null +++ b/pkg/proxy/userspace/loadbalancer.go @@ -0,0 +1,33 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userspace + +import ( + "net" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" +) + +// LoadBalancer is an interface for distributing incoming requests to service endpoints. +type LoadBalancer interface { + // NextEndpoint returns the endpoint to handle a request for the given + // service-port and source address. + NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error) + NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service proxy.ServicePortName) +} diff --git a/pkg/proxy/port_allocator.go b/pkg/proxy/userspace/port_allocator.go similarity index 99% rename from pkg/proxy/port_allocator.go rename to pkg/proxy/userspace/port_allocator.go index 7b8872d9030..c9fcd7692af 100644 --- a/pkg/proxy/port_allocator.go +++ b/pkg/proxy/userspace/port_allocator.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "errors" diff --git a/pkg/proxy/port_allocator_test.go b/pkg/proxy/userspace/port_allocator_test.go similarity index 99% rename from pkg/proxy/port_allocator_test.go rename to pkg/proxy/userspace/port_allocator_test.go index 0a6c8c3ab38..2573d5f168f 100644 --- a/pkg/proxy/port_allocator_test.go +++ b/pkg/proxy/userspace/port_allocator_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "reflect" diff --git a/pkg/proxy/proxier.go b/pkg/proxy/userspace/proxier.go similarity index 93% rename from pkg/proxy/proxier.go rename to pkg/proxy/userspace/proxier.go index ca8c0b9ea35..4b3ab339a62 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "fmt" @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/errors" @@ -67,9 +68,9 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[ServicePortName]*serviceInfo + serviceMap map[proxy.ServicePortName]*serviceInfo portMapMutex sync.Mutex - portMap map[portMapKey]ServicePortName + portMap map[portMapKey]proxy.ServicePortName numProxyLoops int32 // use atomic ops to access this; mostly for testing listenIP net.IP iptables iptables.Interface @@ -77,6 +78,9 @@ type Proxier struct { proxyPorts PortAllocator } +// assert Proxier is a ProxyProvider +var _ proxy.ProxyProvider = &Proxier{} + // A key for the portMap type portMapKey struct { port int @@ -138,8 +142,8 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables } return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[ServicePortName]*serviceInfo), - portMap: make(map[portMapKey]ServicePortName), + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + portMap: make(map[portMapKey]proxy.ServicePortName), listenIP: listenIP, iptables: iptables, hostIP: hostIP, @@ -188,14 +192,14 @@ func (proxier *Proxier) cleanupStaleStickySessions() { } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { delete(proxier.serviceMap, service) err := info.socket.Close() port := info.socket.ListenPort() @@ -203,14 +207,14 @@ func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *service return err } -func (proxier *Proxier) getServiceInfo(service ServicePortName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service ServicePortName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info @@ -219,7 +223,7 @@ func (proxier *Proxier) setServiceInfo(service ServicePortName, info *serviceInf // addServiceOnPort starts listening for a new service, returning the serviceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -245,7 +249,7 @@ func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.P proxier.setServiceInfo(service, si) glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) - go func(service ServicePortName, proxier *Proxier) { + go func(service proxy.ServicePortName, proxier *Proxier) { defer util.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) sock.ProxyLoop(service, si, proxier) @@ -263,7 +267,7 @@ const udpIdleTimeout = 1 * time.Second // shutdown if missing from the update set. func (proxier *Proxier) OnUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[ServicePortName]bool) // use a map as a set + activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set for i := range services { service := &services[i] @@ -276,7 +280,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - serviceName := ServicePortName{types.NamespacedName{service.Namespace, service.Name}, servicePort.Name} + serviceName := proxy.ServicePortName{types.NamespacedName{service.Namespace, service.Name}, servicePort.Name} activeServices[serviceName] = true serviceIP := net.ParseIP(service.Spec.ClusterIP) info, exists := proxier.getServiceInfo(serviceName) @@ -373,7 +377,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } -func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { return err @@ -401,7 +405,7 @@ func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) e return nil } -func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error { +func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { // Handle traffic from containers. args := proxier.iptablesContainerPortalArgs(portal.ip, portal.port, protocol, proxyIP, proxyPort, name) existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...) @@ -428,7 +432,7 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox // Marks a port as being owned by a particular service, or returns error if already claimed. // Idempotent: reclaiming with the same owner is not an error -func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner ServicePortName) error { +func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { proxier.portMapMutex.Lock() defer proxier.portMapMutex.Unlock() @@ -449,7 +453,7 @@ func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner Service // Release a claim on a port. Returns an error if the owner does not match the claim. // Tolerates release on an unclaimed port, to simplify . -func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner ServicePortName) error { +func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error { proxier.portMapMutex.Lock() defer proxier.portMapMutex.Unlock() @@ -467,7 +471,7 @@ func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner Servi return nil } -func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error { +func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { // TODO: Do we want to allow containers to access public services? Probably yes. // TODO: We could refactor this to be the same code as portal, but with IP == nil @@ -500,7 +504,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI return nil } -func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) for _, publicIP := range info.deprecatedPublicIPs { @@ -522,7 +526,7 @@ func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) return errors.NewAggregate(el) } -func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error { +func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} // Handle traffic from containers. @@ -542,7 +546,7 @@ func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, pro return el } -func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error { +func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} // Handle traffic from containers. @@ -662,7 +666,7 @@ var zeroIPv6 = net.ParseIP("::0") var localhostIPv6 = net.ParseIP("::1") // Build a slice of iptables args that are common to from-container and from-host portal rules. -func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service ServicePortName) []string { +func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service proxy.ServicePortName) []string { // This list needs to include all fields as they are eventually spit out // by iptables-save. This is because some systems do not support the // 'iptables -C' arg, and so fall back on parsing iptables-save output. @@ -687,7 +691,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol } // Build a slice of iptables args for a from-container portal rule. -func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { +func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) // This is tricky. @@ -734,7 +738,7 @@ func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, } // Build a slice of iptables args for a from-host portal rule. -func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { +func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) // This is tricky. @@ -769,7 +773,7 @@ func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, prot // Build a slice of iptables args for a from-container public-port rule. // See iptablesContainerPortalArgs // TODO: Should we just reuse iptablesContainerPortalArgs? -func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { +func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := iptablesCommonPortalArgs(nil, nodePort, protocol, service) if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) { @@ -786,7 +790,7 @@ func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api // Build a slice of iptables args for a from-host public-port rule. // See iptablesHostPortalArgs // TODO: Should we just reuse iptablesHostPortalArgs? -func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { +func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := iptablesCommonPortalArgs(nil, nodePort, protocol, service) if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) { diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/userspace/proxier_test.go similarity index 93% rename from pkg/proxy/proxier_test.go rename to pkg/proxy/userspace/proxier_test.go index e5714eca7da..fa841d1c70c 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "fmt" @@ -29,6 +29,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/iptables" @@ -211,7 +212,7 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -238,7 +239,7 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -265,8 +266,8 @@ func TestUDPProxy(t *testing.T) { func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() - serviceP := ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"} - serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"} + serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"} + serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"} lb.OnUpdate([]api.Endpoints{{ ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ @@ -304,9 +305,9 @@ func TestMultiPortProxy(t *testing.T) { func TestMultiPortOnUpdate(t *testing.T) { lb := NewLoadBalancerRR() - serviceP := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"} - serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"} + serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"} + serviceX := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"} p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil) if err != nil { @@ -350,7 +351,7 @@ func TestMultiPortOnUpdate(t *testing.T) { } // Helper: Stops the proxy for the named service. -func stopProxyByName(proxier *Proxier, service ServicePortName) error { +func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { info, found := proxier.getServiceInfo(service) if !found { return fmt.Errorf("unknown service: %s", service) @@ -360,7 +361,7 @@ func stopProxyByName(proxier *Proxier, service ServicePortName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, @@ -398,7 +399,7 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, @@ -436,7 +437,7 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, @@ -473,7 +474,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, @@ -510,7 +511,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -562,7 +563,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -614,7 +615,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -662,7 +663,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -707,7 +708,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -759,7 +760,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, diff --git a/pkg/proxy/proxysocket.go b/pkg/proxy/userspace/proxysocket.go similarity index 93% rename from pkg/proxy/proxysocket.go rename to pkg/proxy/userspace/proxysocket.go index ab94b8e69a9..94978f1c7bd 100644 --- a/pkg/proxy/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "fmt" @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/util" ) @@ -39,7 +40,7 @@ type proxySocket interface { // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier) + ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) // ListenPort returns the host port that the proxySocket is listening on ListenPort() int } @@ -81,7 +82,7 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { for _, retryTimeout := range endpointDialTimeout { endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) if err != nil { @@ -104,7 +105,7 @@ func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, prox return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { for { if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { // The service port was closed or replaced. @@ -190,7 +191,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { @@ -235,7 +236,7 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInf } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortName, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/userspace/roundrobin.go similarity index 90% rename from pkg/proxy/roundrobin.go rename to pkg/proxy/userspace/roundrobin.go index 82ba34e5c21..e3599878e7e 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "errors" @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/slice" ) @@ -53,7 +54,7 @@ type affinityPolicy struct { // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { lock sync.RWMutex - services map[ServicePortName]*balancerState + services map[proxy.ServicePortName]*balancerState } // Ensure this implements LoadBalancer. @@ -76,11 +77,11 @@ func newAffinityPolicy(affinityType api.ServiceAffinity, ttlMinutes int) *affini // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - services: map[ServicePortName]*balancerState{}, + services: map[proxy.ServicePortName]*balancerState{}, } } -func (lb *LoadBalancerRR) NewService(svcPort ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { +func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { lb.lock.Lock() defer lb.lock.Unlock() lb.newServiceInternal(svcPort, affinityType, ttlMinutes) @@ -88,7 +89,7 @@ func (lb *LoadBalancerRR) NewService(svcPort ServicePortName, affinityType api.S } // This assumes that lb.lock is already held. -func (lb *LoadBalancerRR) newServiceInternal(svcPort ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState { +func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } @@ -113,7 +114,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(svcPort ServicePortName, srcAddr net.Addr) (string, error) { +func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr) (string, error) { // Coarse locking is simple. We can get more fine-grained if/when we // can prove it matters. lb.lock.Lock() @@ -190,7 +191,7 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string { } // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(state *balancerState, svcPort ServicePortName, endpoint string) { +func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort) @@ -202,7 +203,7 @@ func removeSessionAffinityByEndpoint(state *balancerState, svcPort ServicePortNa // Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. // This assumes the lb.lock is held. -func (lb *LoadBalancerRR) updateAffinityMap(svcPort ServicePortName, newEndpoints []string) { +func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEndpoints []string) { allEndpoints := map[string]int{} for _, newEndpoint := range newEndpoints { allEndpoints[newEndpoint] = 1 @@ -226,7 +227,7 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort ServicePortName, newEndpoint // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { - registeredEndpoints := make(map[ServicePortName]bool) + registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() defer lb.lock.Unlock() @@ -250,7 +251,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { } for portname := range portsToEndpoints { - svcPort := ServicePortName{types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}, portname} + svcPort := proxy.ServicePortName{types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}, portname} state, exists := lb.services[svcPort] curEndpoints := []string{} if state != nil { @@ -294,7 +295,7 @@ func slicesEquiv(lhs, rhs []string) bool { return false } -func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort ServicePortName) { +func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortName) { lb.lock.Lock() defer lb.lock.Unlock() diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go similarity index 95% rename from pkg/proxy/roundrobin_test.go rename to pkg/proxy/userspace/roundrobin_test.go index d7863bf3517..22beb65b2c1 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "net" @@ -67,7 +67,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"} + service := proxy.servicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil { t.Errorf("Didn't fail with non-existent service") @@ -77,7 +77,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service ServicePortName, expected string, netaddr net.Addr) { +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { endpoint, err := loadBalancer.NextEndpoint(service, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) @@ -89,7 +89,7 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service ServiceP func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -127,7 +127,7 @@ func stringsInSlice(haystack []string, needles ...string) bool { func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -154,8 +154,8 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { loadBalancer := NewLoadBalancerRR() - serviceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} - serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"} + serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} + serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"} endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -197,8 +197,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() - serviceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} - serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"} + serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} + serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "q"} endpoint, err := loadBalancer.NextEndpoint(serviceP, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -288,8 +288,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() - fooServiceP := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} - barServiceP := ServicePortName{types.NamespacedName{"testnamespace", "bar"}, "p"} + fooServiceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "p"} + barServiceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "bar"}, "p"} endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -344,7 +344,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -463,7 +463,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -537,7 +537,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -598,7 +598,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - fooService := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} + fooService := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""} endpoint, err := loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -614,7 +614,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - barService := ServicePortName{types.NamespacedName{"testnamespace", "bar"}, ""} + barService := proxy.ServicePortName{types.NamespacedName{"testnamespace", "bar"}, ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, diff --git a/pkg/proxy/udp_server.go b/pkg/proxy/userspace/udp_server.go similarity index 98% rename from pkg/proxy/udp_server.go rename to pkg/proxy/userspace/udp_server.go index 0185ef1d267..fdc85b47df1 100644 --- a/pkg/proxy/udp_server.go +++ b/pkg/proxy/userspace/udp_server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package proxy +package userspace import ( "fmt" From 962a7b492ba7792305ee7e3f221bb8754feae3ce Mon Sep 17 00:00:00 2001 From: BenTheElder Date: Fri, 7 Aug 2015 20:54:22 -0400 Subject: [PATCH 2/3] in pkg/proxy, merge proxy_provider.go and service_port_name.go to types.go --- pkg/proxy/service_port_name.go | 34 ----------------------- pkg/proxy/{proxy_provider.go => types.go} | 14 ++++++++++ 2 files changed, 14 insertions(+), 34 deletions(-) delete mode 100644 pkg/proxy/service_port_name.go rename pkg/proxy/{proxy_provider.go => types.go} (76%) diff --git a/pkg/proxy/service_port_name.go b/pkg/proxy/service_port_name.go deleted file mode 100644 index 9ac763d3abd..00000000000 --- a/pkg/proxy/service_port_name.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package proxy - -import ( - "fmt" - - "k8s.io/kubernetes/pkg/types" -) - -// ServicePortName carries a namespace + name + portname. This is the unique -// identfier for a load-balanced service. -type ServicePortName struct { - types.NamespacedName - Port string -} - -func (spn ServicePortName) String() string { - return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) -} diff --git a/pkg/proxy/proxy_provider.go b/pkg/proxy/types.go similarity index 76% rename from pkg/proxy/proxy_provider.go rename to pkg/proxy/types.go index 161a6fc0720..fb759db744d 100644 --- a/pkg/proxy/proxy_provider.go +++ b/pkg/proxy/types.go @@ -17,7 +17,10 @@ limitations under the License. package proxy import ( + "fmt" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" ) // ProxyProvider is the interface provided by proxier implementations. @@ -31,3 +34,14 @@ type ProxyProvider interface { // It does not return. SyncLoop() } + +// ServicePortName carries a namespace + name + portname. This is the unique +// identfier for a load-balanced service. +type ServicePortName struct { + types.NamespacedName + Port string +} + +func (spn ServicePortName) String() string { + return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) +} From f6d257c0f3c216afaa05ad83939cb77827111c4c Mon Sep 17 00:00:00 2001 From: BenTheElder Date: Fri, 7 Aug 2015 23:41:07 -0400 Subject: [PATCH 3/3] fix missing import in roundrobin_test.go --- pkg/proxy/userspace/roundrobin_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index 22beb65b2c1..bb0155dffdb 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -21,6 +21,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" ) @@ -67,7 +68,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - service := proxy.servicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"} + service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil { t.Errorf("Didn't fail with non-existent service")