From c0c41aa083ed490527b94f3cf142f25bb16285d6 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 15 May 2017 12:13:46 +0200 Subject: [PATCH] Check whether service changed --- pkg/proxy/iptables/proxier.go | 179 +++++++++++++++++++--------------- 1 file changed, 100 insertions(+), 79 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4d996c70be4..ca9c2cd183e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -205,8 +205,8 @@ type endpointsChangeMap struct { } type serviceChange struct { - previous *api.Service - current *api.Service + previous proxyServiceMap + current proxyServiceMap } type serviceChangeMap struct { @@ -253,17 +253,60 @@ func newServiceChangeMap() serviceChangeMap { } } -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { scm.Lock() defer scm.Unlock() change, exists := scm.items[*namespacedName] if !exists { change = &serviceChange{} - change.previous = previous + change.previous = serviceToServiceMap(previous) scm.items[*namespacedName] = change } - change.current = current + change.current = serviceToServiceMap(current) + if reflect.DeepEqual(change.previous, change.current) { + delete(scm.items, *namespacedName) + return false + } + // TODO: Instead of returning true/false, we should consider returning whether + // the map contains some element or not. Currently, if the change is + // "reverting" some previous endpoints update, but there are still some other + // modified endpoints, we will return false, even though there are some change + // to apply. + return true +} + +func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { + existingPorts := sets.NewString() + for serviceName, info := range other { + existingPorts.Insert(serviceName.Port) + _, exists := (*sm)[serviceName] + if !exists { + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + } else { + glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol) + } + (*sm)[serviceName] = info + } + return existingPorts +} + +func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { + for serviceName := range other { + if existingPorts.Has(serviceName.Port) { + continue + } + info, exists := (*sm)[serviceName] + if exists { + glog.V(1).Infof("Removing service %q", serviceName) + if info.protocol == api.ProtocolUDP { + staleServices.Insert(info.clusterIP.String()) + } + delete(*sm, serviceName) + } else { + glog.Errorf("Service %q removed, but doesn't exists", serviceName) + } + } } func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { @@ -554,30 +597,48 @@ func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, nil, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, nil, service) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, oldService, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, oldService, service) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, service, nil) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, service, nil) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonServices) + } } func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true proxier.mu.Unlock() - + // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules(syncReasonServices) } @@ -595,67 +656,6 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool return false } -func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) { - if service == nil { - return false, nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return false, nil - } - syncRequired := false - existingPorts := sets.NewString() - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - existingPorts.Insert(servicePort.Name) - info := newServiceInfo(serviceName, servicePort, service) - oldInfo, exists := (*sm)[serviceName] - equal := reflect.DeepEqual(info, oldInfo) - if exists { - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } else if !equal { - glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } - if !equal { - (*sm)[serviceName] = info - syncRequired = true - } - } - return syncRequired, existingPorts -} - -// are modified by this function with detected stale services. -func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool { - if service == nil { - return false - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return false - } - syncRequired := false - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if existingPorts.Has(servicePort.Name) { - continue - } - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - info, exists := (*sm)[serviceName] - if exists { - glog.V(1).Infof("Removing service %q", serviceName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, serviceName) - syncRequired = true - } else { - glog.Errorf("Service %q removed, but doesn't exists", serviceName) - } - } - return syncRequired -} - // is updated by this function (based on the given changes). // map is cleared after applying them. func updateServiceMap( @@ -668,9 +668,9 @@ func updateServiceMap( changes.Lock() defer changes.Unlock() for _, change := range changes.items { - mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current) - unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices) - syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired + existingPorts := serviceMap.merge(change.current) + serviceMap.unmerge(change.previous, existingPorts, staleServices) + syncRequired = true } changes.items = make(map[types.NamespacedName]*serviceChange) }() @@ -855,6 +855,27 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd return endpointsMap } +// Translates single Service object to proxyServiceMap. +// +// NOTE: service object should NOT be modified. +func serviceToServiceMap(service *api.Service) proxyServiceMap { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if shouldSkipService(svcName, service) { + return nil + } + + serviceMap := make(proxyServiceMap) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service) + } + return serviceMap +} + // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables