From e1fa6e9fb817ef3cb1fb009c7d5110914def1529 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 1 Feb 2016 19:02:23 -0800 Subject: [PATCH 1/2] kube-proxy applies latest snapshot of endpoints and services. --- pkg/proxy/config/config.go | 28 +++++++++++++------ pkg/proxy/config/config_test.go | 49 ++++++++++++++++++++------------- pkg/proxy/iptables/proxier.go | 14 +++++++++- 3 files changed, 63 insertions(+), 28 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 7e6b1858220..e1d1e1523f8 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -128,19 +128,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { update := change.(EndpointsUpdate) switch update.Op { case ADD: - glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) + glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) for _, value := range update.Endpoints { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} endpoints[name] = value } case REMOVE: - glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update)) + glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update)) for _, value := range update.Endpoints { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} delete(endpoints, name) } case SET: - glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update)) + glog.V(5).Infof("Setting endpoints %s", spew.Sdump(update)) // Clear the old map entries by just creating a new map endpoints = make(map[types.NamespacedName]api.Endpoints) for _, value := range update.Endpoints { @@ -153,7 +153,13 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { s.endpoints[source] = endpoints s.endpointLock.Unlock() if s.updates != nil { - s.updates <- struct{}{} + // Since we record the snapshot before sending this signal, it's + // possible that the consumer ends up performing an extra update. + select { + case s.updates <- struct{}{}: + default: + glog.V(4).Infof("Endpoints handler already has a pending interrupt.") + } } return nil } @@ -227,19 +233,19 @@ func (s *serviceStore) Merge(source string, change interface{}) error { update := change.(ServiceUpdate) switch update.Op { case ADD: - glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) + glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} services[name] = value } case REMOVE: - glog.V(4).Infof("Removing a service %s", spew.Sdump(update)) + glog.V(5).Infof("Removing a service %s", spew.Sdump(update)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} delete(services, name) } case SET: - glog.V(4).Infof("Setting services %s", spew.Sdump(update)) + glog.V(5).Infof("Setting services %s", spew.Sdump(update)) // Clear the old map entries by just creating a new map services = make(map[types.NamespacedName]api.Service) for _, value := range update.Services { @@ -252,7 +258,13 @@ func (s *serviceStore) Merge(source string, change interface{}) error { s.services[source] = services s.serviceLock.Unlock() if s.updates != nil { - s.updates <- struct{}{} + // Since we record the snapshot before sending this signal, it's + // possible that the consumer ends up performing an extra update. + select { + case s.updates <- struct{}{}: + default: + glog.V(4).Infof("Service handler already has a pending interrupt.") + } } return nil } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 8dd373b18de..fdd3a7385a7 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -19,7 +19,6 @@ package config_test import ( "reflect" "sort" - "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -49,29 +48,35 @@ func (s sortedServices) Less(i, j int) bool { } type ServiceHandlerMock struct { - services []api.Service - updated sync.WaitGroup + updated chan []api.Service + waits int } func NewServiceHandlerMock() *ServiceHandlerMock { - return &ServiceHandlerMock{services: make([]api.Service, 0)} + return &ServiceHandlerMock{updated: make(chan []api.Service, 5)} } func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) { sort.Sort(sortedServices(services)) - h.services = services - h.updated.Done() + h.updated <- services } func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { - h.updated.Wait() - if !reflect.DeepEqual(h.services, expectedServices) { - t.Errorf("Expected %#v, Got %#v", expectedServices, h.services) + // We might get 1 or more updates for N service updates, because we + // over write older snapshots of services from the producer go-routine + // if the consumer falls behind. Unittests will hard timeout in 5m. + var services []api.Service + for ; h.waits > 0; h.waits = h.waits - 1 { + services = <-h.updated + if reflect.DeepEqual(services, expectedServices) { + return + } } + t.Errorf("Expected %#v, Got %#v", expectedServices, services) } func (h *ServiceHandlerMock) Wait(waits int) { - h.updated.Add(waits) + h.waits = waits } type sortedEndpoints []api.Endpoints @@ -87,29 +92,35 @@ func (s sortedEndpoints) Less(i, j int) bool { } type EndpointsHandlerMock struct { - endpoints []api.Endpoints - updated sync.WaitGroup + updated chan []api.Endpoints + waits int } func NewEndpointsHandlerMock() *EndpointsHandlerMock { - return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)} + return &EndpointsHandlerMock{updated: make(chan []api.Endpoints, 5)} } func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) { sort.Sort(sortedEndpoints(endpoints)) - h.endpoints = endpoints - h.updated.Done() + h.updated <- endpoints } func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) { - h.updated.Wait() - if !reflect.DeepEqual(h.endpoints, expectedEndpoints) { - t.Errorf("Expected %#v, Got %#v", expectedEndpoints, h.endpoints) + // We might get 1 or more updates for N endpoint updates, because we + // over write older snapshots of endpoints from the producer go-routine + // if the consumer falls behind. Unittests will hard timeout in 5m. + var endpoints []api.Endpoints + for ; h.waits > 0; h.waits = h.waits - 1 { + endpoints := <-h.updated + if reflect.DeepEqual(endpoints, expectedEndpoints) { + return + } } + t.Errorf("Expected %#v, Got %#v", expectedEndpoints, endpoints) } func (h *EndpointsHandlerMock) Wait(waits int) { - h.updated.Add(waits) + h.waits = waits } func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate { diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 236aeebf300..c3ffd882339 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -279,6 +279,10 @@ func (proxier *Proxier) SyncLoop() { // OnServiceUpdate tracks the active set of service proxies. // They will be synchronized using syncProxyRules() func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { + start := time.Now() + defer func() { + glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices)) + }() proxier.mu.Lock() defer proxier.mu.Unlock() proxier.haveReceivedServiceUpdate = true @@ -316,7 +320,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { glog.V(3).Infof("Something changed for service %q: removing it", serviceName) delete(proxier.serviceMap, serviceName) } - serviceIP := net.ParseIP(service.Spec.ClusterIP) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) info = newServiceInfo(serviceName) @@ -347,6 +350,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { + start := time.Now() + defer func() { + glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints)) + }() + proxier.mu.Lock() defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true @@ -451,6 +459,10 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { + start := time.Now() + defer func() { + glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) + }() // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") From 47f7f4417dcfe550e2ac06342eaad28ed2525a76 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 2 Feb 2016 11:34:36 -0800 Subject: [PATCH 2/2] Poll w/ timeout for nodeport to disappear. --- test/e2e/service.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index db60705247f..7cc12a86adb 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -42,7 +42,9 @@ import ( // Maximum time a kube-proxy daemon on a node is allowed to not // notice a Service update, such as type=NodePort. -const kubeProxyLagTimeout = 45 * time.Second +// TODO: This timeout should be O(10s), observed values are O(1m), 5m is very +// liberal. Fix tracked in #20567. +const kubeProxyLagTimeout = 5 * time.Minute // This should match whatever the default/configured range is var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768} @@ -769,9 +771,17 @@ var _ = Describe("Services", func() { hostExec := LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec") cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort) - stdout, err := RunHostCmd(hostExec.Namespace, hostExec.Name, cmd) - if err != nil { - Failf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout) + var stdout string + if pollErr := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { + var err error + stdout, err = RunHostCmd(hostExec.Namespace, hostExec.Name, cmd) + if err != nil { + Logf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout) + return false, nil + } + return true, nil + }); pollErr != nil { + Failf("expected node port (%d) to not be in use in %v, stdout: %v", nodePort, kubeProxyLagTimeout, stdout) } By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))