diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 3f0c43d9449..3ac43c0a9ec 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -219,7 +219,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) var proxier proxy.ProxyProvider - var servicesHandler proxyconfig.ServiceConfigHandler + var serviceEventHandler proxyconfig.ServiceHandler + // TODO: Migrate all handlers to ServiceHandler types and + // get rid of this one. + var serviceHandler proxyconfig.ServiceConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) @@ -246,7 +249,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierIPTables - servicesHandler = proxierIPTables + serviceEventHandler = proxierIPTables endpointsEventHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") @@ -271,7 +274,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } - servicesHandler = proxierUserspace + serviceHandler = proxierUserspace proxier = proxierUserspace } else { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for @@ -292,7 +295,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } - servicesHandler = proxierUserspace + serviceHandler = proxierUserspace proxier = proxierUserspace } // Remove artifacts from the pure-iptables Proxier, if not on Windows. @@ -314,7 +317,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod) - serviceConfig.RegisterHandler(servicesHandler) + if serviceHandler != nil { + serviceConfig.RegisterHandler(serviceHandler) + } + if serviceEventHandler != nil { + serviceConfig.RegisterEventHandler(serviceEventHandler) + } go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 3918c4de27b..b68c78286d9 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -20,7 +20,6 @@ go_library( "//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/controller:go_default_library", - "//pkg/util/config:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index fbc1d41785e..3e99d1f9fa6 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -28,7 +28,6 @@ import ( coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util/config" ) // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. @@ -47,7 +46,7 @@ type ServiceConfigHandler interface { OnServiceUpdate(services []*api.Service) } -// ServiceHandler is an abstract interface of objects whic receive +// ServiceHandler is an abstract interface of objects which receive // notifications about service object changes. type ServiceHandler interface { // OnServiceAdd is called whenever creation of new service object diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9c6cf16acb1..bd547b5aa60 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -196,6 +196,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se } type endpointsMap map[types.NamespacedName]*api.Endpoints +type serviceMap map[types.NamespacedName]*api.Service type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo @@ -211,13 +212,13 @@ type Proxier struct { // to not be modified in the meantime, but also require to be not modified // by Proxier. allEndpoints endpointsMap - // allServices is nil until we have seen an OnServiceUpdate event. - allServices []*api.Service + allServices serviceMap - // endpointsSynced is set to true when endpoints are synced after startup. - // This is used to avoid updating iptables with some partial data after - // kube-proxy restart. + // endpointsSynced and servicesSynced are set to true when corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. endpointsSynced bool + servicesSynced bool throttle flowcontrol.RateLimiter @@ -333,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface, endpointsMap: make(proxyEndpointMap), portsMap: make(map[localPort]closeable), allEndpoints: make(endpointsMap), + allServices: make(serviceMap), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -457,7 +459,7 @@ func (proxier *Proxier) SyncLoop() { // Accepts a list of Services and the existing service map. Returns the new // service map, a map of healthcheck ports, and a set of stale UDP // services. -func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { +func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { newServiceMap := make(proxyServiceMap) hcPorts := make(map[types.NamespacedName]uint16) @@ -525,15 +527,37 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa return newServiceMap, hcPorts, staleUDPServices } -// OnServiceUpdate tracks the active set of service proxies. -// They will be synchronized using syncProxyRules() -func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.allServices == nil { - glog.V(2).Info("Received first Services update") - } - proxier.allServices = allServices + proxier.allServices[namespacedName] = service + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.allServices[namespacedName] = service + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + delete(proxier.allServices, namespacedName) + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceSynced() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.servicesSynced = true proxier.syncProxyRules(syncReasonServices) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 4a0d1575985..72f7c4a6ab2 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { iptables: ipt, clusterCIDR: "10.0.0.0/24", allEndpoints: make(endpointsMap), - allServices: []*api.Service{}, + allServices: make(serviceMap), endpointsSynced: true, hostname: testHostname, portsMap: make(map[localPort]closeable), @@ -567,7 +567,7 @@ func TestClusterIPReject(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -576,7 +576,7 @@ func TestClusterIPReject(t *testing.T) { Protocol: api.ProtocolTCP, }} }), - } + ) fp.syncProxyRules(syncReasonForce) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -600,7 +600,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -609,7 +609,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { Protocol: api.ProtocolTCP, }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -659,7 +659,7 @@ func TestLoadBalancer(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -673,7 +673,7 @@ func TestLoadBalancer(t *testing.T) { IP: svcLBIP, }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -719,7 +719,7 @@ func TestNodePort(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -730,7 +730,7 @@ func TestNodePort(t *testing.T) { NodePort: int32(svcNodePort), }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -769,7 +769,7 @@ func TestNodePortReject(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -780,7 +780,7 @@ func TestNodePortReject(t *testing.T) { NodePort: int32(svcNodePort), }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -806,7 +806,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -821,7 +821,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }} svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal }), - } + ) epIP1 := "10.180.0.1" epIP2 := "10.180.2.1" @@ -900,7 +900,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -912,7 +912,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }} svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal }), - } + ) epIP1 := "10.180.0.1" epIP2 := "10.180.2.1" @@ -992,7 +992,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po } func TestBuildServiceMapAddRemove(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" @@ -1033,7 +1033,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { }, } }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 8 { @@ -1056,8 +1056,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } // Remove some stuff - services = []*api.Service{services[0]} - services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} + oneService := services[makeNSN("somewhere-else", "cluster-ip")] + oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]} + services = makeServiceMap(oneService) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) if len(serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", serviceMap) @@ -1082,13 +1083,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } func TestBuildServiceMapServiceHeadless(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "headless", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), - } + ) // Headless service should be ignored serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) @@ -1107,14 +1108,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "external-name", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { @@ -1130,16 +1131,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { } func TestBuildServiceMapServiceUpdate(t *testing.T) { - first := []*api.Service{ + first := makeServiceMap( makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) }), - } + ) - second := []*api.Service{ + second := makeServiceMap( makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.ObjectMeta.Annotations = map[string]string{ service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, @@ -1156,7 +1157,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { }, } }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) if len(serviceMap) != 2 { @@ -1426,6 +1427,15 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } } +func makeServiceMap(allServices ...*api.Service) serviceMap { + result := make(serviceMap) + for _, service := range allServices { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + result[namespacedName] = service + } + return result +} + func Test_buildNewEndpointsMap(t *testing.T) { var nodeName = "host"