diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 6272b23d88d..2466bc4a561 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -218,6 +218,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) var proxier proxy.ProxyProvider + var servicesHandler proxyconfig.ServiceConfigHandler var endpointsHandler proxyconfig.EndpointsConfigHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) @@ -244,22 +245,20 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierIPTables + servicesHandler = proxierIPTables endpointsHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") userspace.CleanupLeftovers(iptInterface) } else { glog.V(0).Info("Using userspace Proxier.") - - var proxierUserspace proxy.ProxyProvider - if runtime.GOOS == "windows" { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := winuserspace.NewLoadBalancerRR() // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - proxierUserspace, err = winuserspace.NewProxier( + proxierUserspace, err := winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), netshInterface, @@ -268,13 +267,18 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err config.IPTablesSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) + if err != nil { + glog.Fatalf("Unable to create proxier: %v", err) + } + servicesHandler = proxierUserspace + proxier = proxierUserspace } else { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := userspace.NewLoadBalancerRR() // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - proxierUserspace, err = userspace.NewProxier( + proxierUserspace, err := userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), iptInterface, @@ -284,11 +288,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err config.IPTablesMinSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) + if err != nil { + glog.Fatalf("Unable to create proxier: %v", err) + } + servicesHandler = proxierUserspace + proxier = proxierUserspace } - if err != nil { - glog.Fatalf("Unable to create proxier: %v", err) - } - proxier = proxierUserspace // Remove artifacts from the pure-iptables Proxier, if not on Windows. if runtime.GOOS != "windows" { glog.V(0).Info("Tearing down pure-iptables proxy rules.") @@ -306,7 +311,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) - serviceConfig.RegisterHandler(proxier) + serviceConfig.RegisterHandler(servicesHandler) go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 6cee72f2b40..7dba0d01fd0 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -39,13 +39,13 @@ type HollowProxy struct { type FakeProxyHandler struct{} -func (*FakeProxyHandler) OnServiceUpdate(services []api.Service) {} +func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {} func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {} type FakeProxier struct{} -func (*FakeProxier) OnServiceUpdate(services []api.Service) {} -func (*FakeProxier) Sync() {} +func (*FakeProxier) OnServiceUpdate(services []*api.Service) {} +func (*FakeProxier) Sync() {} func (*FakeProxier) SyncLoop() { select {} } diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index a21f2bd8d55..a81464696ae 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -14,10 +14,7 @@ go_library( "types.go", ], tags = ["automanaged"], - deps = [ - "//pkg/api:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/types", - ], + deps = ["//vendor:k8s.io/apimachinery/pkg/types"], ) filegroup( diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index bc55748b9cf..b8b6265240b 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -74,27 +74,27 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { go serviceConfig.Run(stopCh) // Add the first service - handler.expected = []api.Service{*service1v1} + handler.expected = []*api.Service{service1v1} fakeWatch.Add(service1v1) <-ch // Add another service - handler.expected = []api.Service{*service1v1, *service2} + handler.expected = []*api.Service{service1v1, service2} fakeWatch.Add(service2) <-ch // Modify service1 - handler.expected = []api.Service{*service1v2, *service2} + handler.expected = []*api.Service{service1v2, service2} fakeWatch.Modify(service1v2) <-ch // Delete service1 - handler.expected = []api.Service{*service2} + handler.expected = []*api.Service{service2} fakeWatch.Delete(service1v2) <-ch // Delete service2 - handler.expected = []api.Service{} + handler.expected = []*api.Service{} fakeWatch.Delete(service2) <-ch } @@ -174,15 +174,15 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { type svcHandler struct { t *testing.T - expected []api.Service + expected []*api.Service done func() } -func newSvcHandler(t *testing.T, svcs []api.Service, done func()) *svcHandler { +func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler { return &svcHandler{t: t, expected: svcs, done: done} } -func (s *svcHandler) OnServiceUpdate(services []api.Service) { +func (s *svcHandler) OnServiceUpdate(services []*api.Service) { defer s.done() sort.Sort(sortedServices(services)) if !reflect.DeepEqual(s.expected, services) { @@ -242,7 +242,7 @@ func TestInitialSync(t *testing.T) { svcConfig := newServiceConfig(svcLW, time.Minute) epsConfig := newEndpointsConfig(epsLW, time.Minute) - svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) + svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsConfig.RegisterHandler(epsHandler) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index bb558316f73..3d1552add0f 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -33,9 +33,17 @@ import ( // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. type ServiceConfigHandler interface { - // OnServiceUpdate gets called when a configuration has been changed by one of the sources. - // This is the union of all the configuration sources. - OnServiceUpdate(services []api.Service) + // OnServiceUpdate gets called when a service is created, removed or changed + // on any of the configuration sources. An example is when a new service + // comes up. + // + // NOTE: For efficiency, services are being passed by reference, thus, + // OnServiceUpdate should NOT modify pointers of a given slice. + // Those service objects are shared with other layers of the system and + // are guaranteed to be immutable with the assumption that are also + // not mutated by those handlers. Make a deep copy if you need to modify + // them in your code. + OnServiceUpdate(services []*api.Service) } // EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. @@ -208,24 +216,23 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { return } - // We hanve synced informers. Now we can start delivering updates + // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { for range c.updates { services, err := c.lister.List(labels.Everything()) if err != nil { glog.Errorf("Error while listing services from cache: %v", err) - // This will cause a retry (if there isnt' any other trigger in-flight). + // This will cause a retry (if there isn't any other trigger in-flight). c.dispatchUpdate() continue } - svcs := make([]api.Service, 0, len(services)) - for i := range services { - svcs = append(svcs, *services[i]) + if services == nil { + services = []*api.Service{} } for i := range c.handlers { glog.V(3).Infof("Calling handler.OnServiceUpdate()") - c.handlers[i].OnServiceUpdate(svcs) + c.handlers[i].OnServiceUpdate(services) } } }() diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 8608e88f732..e07fd603ac6 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -28,7 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" ) -type sortedServices []api.Service +type sortedServices []*api.Service func (s sortedServices) Len() int { return len(s) @@ -41,24 +41,24 @@ func (s sortedServices) Less(i, j int) bool { } type ServiceHandlerMock struct { - updated chan []api.Service + updated chan []*api.Service waits int } func NewServiceHandlerMock() *ServiceHandlerMock { - return &ServiceHandlerMock{updated: make(chan []api.Service, 5)} + return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)} } -func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) { +func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) { sort.Sort(sortedServices(services)) h.updated <- services } -func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { +func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) { // We might get 1 or more updates for N service updates, because we // over write older snapshots of services from the producer go-routine // if the consumer falls behind. - var services []api.Service + var services []*api.Service for { select { case services = <-h.updated: @@ -139,7 +139,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, } fakeWatch.Add(service) - handler.ValidateServices(t, []api.Service{*service}) + handler.ValidateServices(t, []*api.Service{service}) } func TestServiceAddedRemovedSetAndNotified(t *testing.T) { @@ -161,18 +161,18 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, } fakeWatch.Add(service1) - handler.ValidateServices(t, []api.Service{*service1}) + handler.ValidateServices(t, []*api.Service{service1}) service2 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, } fakeWatch.Add(service2) - services := []api.Service{*service2, *service1} + services := []*api.Service{service2, service1} handler.ValidateServices(t, services) fakeWatch.Delete(service1) - services = []api.Service{*service2} + services = []*api.Service{service2} handler.ValidateServices(t, services) } @@ -203,7 +203,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { fakeWatch.Add(service1) fakeWatch.Add(service2) - services := []api.Service{*service2, *service1} + services := []*api.Service{service2, service1} handler.ValidateServices(t, services) handler2.ValidateServices(t, services) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index f79f9053a00..fc00c52003a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -457,13 +457,12 @@ type healthCheckPort struct { // Accepts a list of Services and the existing service map. Returns the new // service map, a list of healthcheck ports to add to or remove from the health // checking listener service, and a set of stale UDP services. -func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { +func buildServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { newServiceMap := make(proxyServiceMap) healthCheckAdd := make([]healthCheckPort, 0) healthCheckDel := make([]healthCheckPort, 0) - for i := range allServices { - service := &allServices[i] + for _, service := range allServices { svcName := types.NamespacedName{ Namespace: service.Namespace, Name: service.Name, @@ -529,7 +528,7 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) ( // OnServiceUpdate tracks the active set of service proxies. // They will be synchronized using syncProxyRules() -func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { +func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { start := time.Now() defer func() { glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices)) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index abb6a41b66e..afcee91ee71 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -858,8 +858,8 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable } } -func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service { - svc := api.Service{ +func makeTestService(namespace, name string, svcFunc func(*api.Service)) *api.Service { + svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, @@ -867,7 +867,7 @@ func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Ser Spec: api.ServiceSpec{}, Status: api.ServiceStatus{}, } - svcFunc(&svc) + svcFunc(svc) return svc } @@ -883,7 +883,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po } func TestBuildServiceMapAddRemove(t *testing.T) { - services := []api.Service{ + services := []*api.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" @@ -959,7 +959,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } // Remove some stuff - services = []api.Service{services[0]} + services = []*api.Service{services[0]} services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap) if len(serviceMap) != 1 { @@ -999,7 +999,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } func TestBuildServiceMapServiceHeadless(t *testing.T) { - services := []api.Service{ + services := []*api.Service{ makeTestService("somewhere-else", "headless", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = api.ClusterIPNone @@ -1027,7 +1027,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { - services := []api.Service{ + services := []*api.Service{ makeTestService("somewhere-else", "external-name", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored @@ -1053,7 +1053,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { } func TestBuildServiceMapServiceUpdate(t *testing.T) { - first := []api.Service{ + first := []*api.Service{ makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" @@ -1062,7 +1062,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { }), } - second := []api.Service{ + second := []*api.Service{ makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.ObjectMeta.Annotations = map[string]string{ service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index d9ff569cbb7..578baff693d 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -20,15 +20,10 @@ import ( "fmt" "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/api" ) // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { - // OnServiceUpdate manages the active set of service proxies. - // Active service proxies are reinitialized if found in the update set or - // removed if missing from the update set. - OnServiceUpdate(services []api.Service) // Sync immediately synchronizes the ProxyProvider's current state to iptables. Sync() // SyncLoop runs periodic work. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 79b9b61a76d..c91db19419e 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -400,12 +400,10 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []api.Service) { +func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { glog.V(4).Infof("Received update notice: %+v", services) activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set - for i := range services { - service := &services[i] - + for _, service := range services { // if ClusterIP is "None" or empty, skip proxying if !api.IsServiceIPSet(service) { glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index dfbb0c6593e..ef07a158ad7 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -350,7 +350,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -515,7 +515,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -555,7 +555,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -594,7 +594,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -602,7 +602,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -650,7 +650,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -703,7 +703,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -753,7 +753,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -802,7 +802,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -856,7 +856,7 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Name: "p", @@ -869,7 +869,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Name: "p", @@ -882,7 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 86bd9083a2d..bbc25877ea5 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -317,12 +317,10 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[ // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []api.Service) { +func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { glog.V(4).Infof("Received update notice: %+v", services) activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set - for i := range services { - service := &services[i] - + for _, service := range services { // if ClusterIP is "None" or empty, skip proxying if !api.IsServiceIPSet(service) { glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 3f3cfbbd83c..59bb6667b7a 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -359,7 +359,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Name: "p", @@ -526,7 +526,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -565,7 +565,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -603,7 +603,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -611,7 +611,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", @@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{}) + p.OnServiceUpdate([]*api.Service{}) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -666,7 +666,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", @@ -710,7 +710,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", @@ -759,7 +759,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", @@ -807,7 +807,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -860,7 +860,7 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Name: "p", @@ -873,7 +873,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Name: "p", @@ -886,7 +886,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } - p.OnServiceUpdate([]api.Service{{ + p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p",