From d2462c79bd6b8971645550b73f5514bfea5dd4e0 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Fri, 21 Apr 2017 14:11:50 +0530 Subject: [PATCH] Remove unused code in federation service controller --- .../service/cluster_helper.go | 212 ------ .../federation-controller/service/dns_test.go | 36 +- .../pkg/federation-controller/service/doc.go | 19 - .../service/endpoint_helper.go | 202 ------ .../service/endpoint_helper_test.go | 167 ----- .../service/service_helper.go | 302 -------- .../service/service_helper_test.go | 163 ----- .../service/servicecontroller.go | 683 +----------------- .../service/servicecontroller_test.go | 60 -- 9 files changed, 12 insertions(+), 1832 deletions(-) delete mode 100644 federation/pkg/federation-controller/service/cluster_helper.go delete mode 100644 federation/pkg/federation-controller/service/doc.go delete mode 100644 federation/pkg/federation-controller/service/endpoint_helper.go delete mode 100644 federation/pkg/federation-controller/service/endpoint_helper_test.go delete mode 100644 federation/pkg/federation-controller/service/service_helper.go delete mode 100644 federation/pkg/federation-controller/service/service_helper_test.go diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go deleted file mode 100644 index b3446a19559..00000000000 --- a/federation/pkg/federation-controller/service/cluster_helper.go +++ /dev/null @@ -1,212 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "sync" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - restclient "k8s.io/client-go/rest" - cache "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" - v1 "k8s.io/kubernetes/pkg/api/v1" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - - "reflect" - - "github.com/golang/glog" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" -) - -type clusterCache struct { - clientset *kubeclientset.Clientset - cluster *v1beta1.Cluster - // A store of services, populated by the serviceController - serviceStore corelisters.ServiceLister - // Watches changes to all services - serviceController cache.Controller - // A store of endpoint, populated by the serviceController - endpointStore corelisters.EndpointsLister - // Watches changes to all endpoints - endpointController cache.Controller - // services that need to be synced - serviceQueue *workqueue.Type - // endpoints that need to be synced - endpointQueue *workqueue.Type -} - -type clusterClientCache struct { - rwlock sync.Mutex // protects serviceMap - clientMap map[string]*clusterCache -} - -func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterName string) { - cachedClusterClient, ok := cc.clientMap[clusterName] - // only create when no existing cachedClusterClient - if ok { - if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) { - //rebuild clientset when cluster spec is changed - clientset, err := newClusterClientset(cluster) - if err != nil || clientset == nil { - glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) - } - glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName) - cachedClusterClient.clientset = clientset - go cachedClusterClient.serviceController.Run(wait.NeverStop) - go cachedClusterClient.endpointController.Run(wait.NeverStop) - glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) - } else { - // do nothing when there is no spec change - glog.V(4).Infof("Keep clientset for cluster %s", clusterName) - return - } - } else { - glog.V(4).Infof("No client cache for cluster %s, building new", clusterName) - clientset, err := newClusterClientset(cluster) - if err != nil || clientset == nil { - glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) - } - cachedClusterClient = &clusterCache{ - cluster: cluster, - clientset: clientset, - serviceQueue: workqueue.New(), - endpointQueue: workqueue.New(), - } - var endpointIndexer cache.Indexer - endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return clientset.Core().Endpoints(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Core().Endpoints(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.Endpoints{}, - serviceSyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cc.enqueueEndpoint(obj, clusterName) - }, - UpdateFunc: func(old, cur interface{}) { - cc.enqueueEndpoint(cur, clusterName) - }, - DeleteFunc: func(obj interface{}) { - cc.enqueueEndpoint(obj, clusterName) - }, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer) - - var serviceIndexer cache.Indexer - serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return clientset.Core().Services(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Core().Services(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.Service{}, - serviceSyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cc.enqueueService(obj, clusterName) - }, - UpdateFunc: func(old, cur interface{}) { - oldService, ok := old.(*v1.Service) - - if !ok { - return - } - curService, ok := cur.(*v1.Service) - if !ok { - return - } - if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) { - cc.enqueueService(cur, clusterName) - } - }, - DeleteFunc: func(obj interface{}) { - service, _ := obj.(*v1.Service) - cc.enqueueService(obj, clusterName) - glog.V(2).Infof("Service %s/%s deletion found and enqueue to service store %s", service.Namespace, service.Name, clusterName) - }, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer) - cc.clientMap[clusterName] = cachedClusterClient - go cachedClusterClient.serviceController.Run(wait.NeverStop) - go cachedClusterClient.endpointController.Run(wait.NeverStop) - glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) - } - -} - -//TODO: copied from cluster controller, to make this as common function in pass 2 -// delFromClusterSet delete a cluster from clusterSet and -// delete the corresponding restclient from the map clusterKubeClientMap -func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { - cluster, ok := obj.(*v1beta1.Cluster) - cc.rwlock.Lock() - defer cc.rwlock.Unlock() - if ok { - delete(cc.clientMap, cluster.Name) - } else { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj) - return - } - glog.Infof("Found tombstone for %v", obj) - delete(cc.clientMap, tombstone.Key) - } -} - -// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding -// restclient to map clusterKubeClientMap -func (cc *clusterClientCache) addToClientMap(obj interface{}) { - cc.rwlock.Lock() - defer cc.rwlock.Unlock() - cluster, ok := obj.(*v1beta1.Cluster) - if !ok { - return - } - pred := getClusterConditionPredicate() - // check status - // skip if not ready - if pred(*cluster) { - cc.startClusterLW(cluster, cluster.Name) - } -} - -func newClusterClientset(c *v1beta1.Cluster) (*kubeclientset.Clientset, error) { - clusterConfig, err := util.BuildClusterConfig(c) - if clusterConfig != nil { - clientset := kubeclientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) - return clientset, nil - } - return nil, err -} diff --git a/federation/pkg/federation-controller/service/dns_test.go b/federation/pkg/federation-controller/service/dns_test.go index d057b8f3a52..552ec91e5ba 100644 --- a/federation/pkg/federation-controller/service/dns_test.go +++ b/federation/pkg/federation-controller/service/dns_test.go @@ -20,12 +20,10 @@ import ( "fmt" "reflect" "sort" - "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. @@ -45,10 +43,9 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { barZoneDNSName := "servicename.servicenamespace.myfederation.svc.barzone.barregion.federation.example.com" tests := []struct { - name string - service v1.Service - expected []string - serviceStatus v1.LoadBalancerStatus + name string + service v1.Service + expected []string }{ { name: "ServiceWithSingleLBIngress", @@ -62,7 +59,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { String()}, }, }, - serviceStatus: buildServiceStatus([][]string{{"198.51.100.1", ""}}), expected: []string{ "example.com:" + globalDNSName + ":A:180:[198.51.100.1]", "example.com:" + fooRegionDNSName + ":A:180:[198.51.100.1]", @@ -82,7 +78,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { Namespace: "servicenamespace", }, }, - serviceStatus: buildServiceStatus([][]string{{"", "randomstring.amazonelb.example.com"}}), expected: []string{ "example.com:"+globalDNSName+":A:180:[198.51.100.1]", "example.com:"+fooRegionDNSName+":A:180:[198.51.100.1]", @@ -207,31 +202,6 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { serviceDnsSuffix: "federation.example.com", zoneName: "example.com", federationName: "myfederation", - serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, - clusterCache: &clusterClientCache{ - rwlock: sync.Mutex{}, - clientMap: make(map[string]*clusterCache), - }, - knownClusterSet: make(sets.String), - } - - serviceController.clusterCache.clientMap[cluster1Name] = &clusterCache{ - cluster: &v1beta1.Cluster{ - Status: v1beta1.ClusterStatus{ - Zones: []string{"foozone"}, - Region: "fooregion", - }, - }, - } - - cachedService := &cachedService{ - lastState: &test.service, - endpointMap: make(map[string]int), - serviceStatusMap: make(map[string]v1.LoadBalancerStatus), - } - cachedService.endpointMap[cluster1Name] = 1 - if !reflect.DeepEqual(&test.serviceStatus, &v1.LoadBalancerStatus{}) { - cachedService.serviceStatusMap[cluster1Name] = test.serviceStatus } err := serviceController.ensureDnsRecords(cluster1Name, &test.service) diff --git a/federation/pkg/federation-controller/service/doc.go b/federation/pkg/federation-controller/service/doc.go deleted file mode 100644 index a346ef6f44b..00000000000 --- a/federation/pkg/federation-controller/service/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package service contains code for syncing Kubernetes services, -// and cloud DNS servers with the federated service registry. -package service // import "k8s.io/kubernetes/federation/pkg/federation-controller/service" diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go deleted file mode 100644 index b95d50082e2..00000000000 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ /dev/null @@ -1,202 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "time" - - "k8s.io/apimachinery/pkg/api/errors" - cache "k8s.io/client-go/tools/cache" - fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - v1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/controller" - - "github.com/golang/glog" -) - -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (sc *ServiceController) clusterEndpointWorker() { - // process all pending events in endpointWorkerDoneChan -ForLoop: - for { - select { - case clusterName := <-sc.endpointWorkerDoneChan: - sc.endpointWorkerMap[clusterName] = false - default: - // non-blocking, comes here if all existing events are processed - break ForLoop - } - } - - for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, found := sc.endpointWorkerMap[clusterName] - if found && workerExist { - continue - } - - // create a worker only if the previous worker has finished and gone out of scope - go func(cache *clusterCache, clusterName string) { - fedClient := sc.federationClient - for { - func() { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.endpointWorkerDoneChan <- clusterName - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } - }() - } - }(cache, clusterName) - sc.endpointWorkerMap[clusterName] = true - } -} - -// Whenever there is change on endpoint, the federation service should be updated -// key is the namespaced name of endpoint -func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, serviceController *ServiceController) error { - cachedService, ok := serviceCache.get(key) - if !ok { - // here we filtered all non-federation services - return nil - } - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) - clusterCache.endpointQueue.Add(key) - return err - } - endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name) - switch { - case errors.IsNotFound(err): - // service absence in store means watcher caught the deletion, ensure LB info is cleaned - glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName) - err = cc.processEndpointDeletion(cachedService, clusterName, serviceController) - case err != nil: - glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) - clusterCache.endpointQueue.Add(key) - return err - default: - glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) - err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController) - } - if err != nil { - glog.Errorf("Failed to sync service: %+v, put back to service queue", err) - clusterCache.endpointQueue.Add(key) - } - cachedService.resetDNSUpdateDelay() - return nil -} - -func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error { - glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) - var err error - cachedService.rwlock.Lock() - defer cachedService.rwlock.Unlock() - _, ok := cachedService.endpointMap[clusterName] - // TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist - // need to query dns info from dnsprovider and make sure of if deletion is needed - if ok { - // endpoints lost, clean dns record - glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) - delete(cachedService.endpointMap, clusterName) - for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) - if err == nil { - return nil - } - glog.V(4).Infof("Error ensuring DNS Records: %v", err) - time.Sleep(cachedService.nextDNSUpdateDelay()) - } - } - return err -} - -// Update dns info when endpoint update event received -// We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0 -func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *v1.Endpoints, clusterName string, serviceController *ServiceController) error { - glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName) - var err error - cachedService.rwlock.Lock() - var reachable bool - defer cachedService.rwlock.Unlock() - _, ok := cachedService.endpointMap[clusterName] - if !ok { - for _, subset := range endpoint.Subsets { - if len(subset.Addresses) > 0 { - reachable = true - break - } - } - if reachable { - // first time get endpoints, update dns record - glog.V(4).Infof("Reachable endpoint was found for %s/%s, cluster %s, building endpointMap", endpoint.Namespace, endpoint.Name, clusterName) - cachedService.endpointMap[clusterName] = 1 - for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) - if err == nil { - return nil - } - glog.V(4).Infof("Error ensuring DNS Records: %v", err) - time.Sleep(cachedService.nextDNSUpdateDelay()) - } - return err - } - } else { - for _, subset := range endpoint.Subsets { - if len(subset.Addresses) > 0 { - reachable = true - break - } - } - if !reachable { - // first time get endpoints, update dns record - glog.V(4).Infof("Reachable endpoint was lost for %s/%s, cluster %s, deleting endpointMap", endpoint.Namespace, endpoint.Name, clusterName) - delete(cachedService.endpointMap, clusterName) - for i := 0; i < clientRetryCount; i++ { - err := serviceController.ensureDnsRecords(clusterName, cachedService.lastState) - if err == nil { - return nil - } - glog.V(4).Infof("Error ensuring DNS Records: %v", err) - time.Sleep(cachedService.nextDNSUpdateDelay()) - } - return err - } - } - return nil -} - -// obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item. -func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - _, ok := cc.clientMap[clusterName] - if ok { - cc.clientMap[clusterName].endpointQueue.Add(key) - } -} diff --git a/federation/pkg/federation-controller/service/endpoint_helper_test.go b/federation/pkg/federation-controller/service/endpoint_helper_test.go deleted file mode 100644 index 1de1b81fbfa..00000000000 --- a/federation/pkg/federation-controller/service/endpoint_helper_test.go +++ /dev/null @@ -1,167 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "testing" - - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/federation/apis/federation/v1beta1" - fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" - "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. - . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" - v1 "k8s.io/kubernetes/pkg/api/v1" -) - -var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required. -var fakeDnsZones, _ = fakeDns.Zones() -var fakeClient = &fakefedclientset.Clientset{} - -var fakeServiceController = ServiceController{ - federationClient: fakeClient, - dns: fakeDns, - dnsZones: fakeDnsZones, - federationName: "fed1", - zoneName: "example.com", - serviceDnsSuffix: "federation.example.com", - serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, - clusterCache: &clusterClientCache{ - clientMap: make(map[string]*clusterCache), - }, - knownClusterSet: make(sets.String), -} - -func buildEndpoint(subsets [][]string) *v1.Endpoints { - endpoint := &v1.Endpoints{ - Subsets: []v1.EndpointSubset{ - {Addresses: []v1.EndpointAddress{}}, - }, - } - for _, element := range subsets { - address := v1.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} - endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address) - } - return endpoint -} - -func TestProcessEndpointUpdate(t *testing.T) { - clusterName := "foo" - cc := clusterClientCache{ - clientMap: map[string]*clusterCache{ - clusterName: { - cluster: &v1beta1.Cluster{ - Status: v1beta1.ClusterStatus{ - Zones: []string{"foozone"}, - Region: "fooregion", - }, - }, - }, - }, - } - RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) - tests := []struct { - name string - cachedService *cachedService - endpoint *v1.Endpoints - clusterName string - expectResult int - }{ - { - "no-cache", - &cachedService{ - lastState: &v1.Service{}, - endpointMap: make(map[string]int), - }, - buildEndpoint([][]string{{"ip1", ""}}), - clusterName, - 1, - }, - { - "has-cache", - &cachedService{ - lastState: &v1.Service{}, - endpointMap: map[string]int{ - "foo": 1, - }, - }, - buildEndpoint([][]string{{"ip1", ""}}), - clusterName, - 1, - }, - } - fakeServiceController.clusterCache = &cc - for _, test := range tests { - cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController) - if test.expectResult != test.cachedService.endpointMap[test.clusterName] { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) - } - } -} - -func TestProcessEndpointDeletion(t *testing.T) { - clusterName := "foo" - cc := clusterClientCache{ - clientMap: map[string]*clusterCache{ - clusterName: { - cluster: &v1beta1.Cluster{ - Status: v1beta1.ClusterStatus{ - Zones: []string{"foozone"}, - Region: "fooregion", - }, - }, - }, - }, - } - RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*NewCluster(clusterName, v1.ConditionTrue)}}) - tests := []struct { - name string - cachedService *cachedService - endpoint *v1.Endpoints - clusterName string - expectResult int - }{ - { - "no-cache", - &cachedService{ - lastState: &v1.Service{}, - endpointMap: make(map[string]int), - }, - buildEndpoint([][]string{{"ip1", ""}}), - clusterName, - 0, - }, - { - "has-cache", - &cachedService{ - lastState: &v1.Service{}, - endpointMap: map[string]int{ - clusterName: 1, - }, - }, - buildEndpoint([][]string{{"ip1", ""}}), - clusterName, - 0, - }, - } - fakeServiceController.clusterCache = &cc - for _, test := range tests { - cc.processEndpointDeletion(test.cachedService, test.clusterName, &fakeServiceController) - if test.expectResult != test.cachedService.endpointMap[test.clusterName] { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) - } - } -} diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go deleted file mode 100644 index 7efcfff8774..00000000000 --- a/federation/pkg/federation-controller/service/service_helper.go +++ /dev/null @@ -1,302 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - cache "k8s.io/client-go/tools/cache" - fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - v1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/controller" - - "reflect" - "sort" - - "github.com/golang/glog" -) - -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (sc *ServiceController) clusterServiceWorker() { - // process all pending events in serviceWorkerDoneChan -ForLoop: - for { - select { - case clusterName := <-sc.serviceWorkerDoneChan: - sc.serviceWorkerMap[clusterName] = false - default: - // non-blocking, comes here if all existing events are processed - break ForLoop - } - } - - for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, found := sc.serviceWorkerMap[clusterName] - if found && workerExist { - continue - } - - // create a worker only if the previous worker has finished and gone out of scope - go func(cache *clusterCache, clusterName string) { - fedClient := sc.federationClient - for { - func() { - key, quit := cache.serviceQueue.Get() - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.serviceWorkerDoneChan <- clusterName - return - } - defer cache.serviceQueue.Done(key) - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - }() - } - }(cache, clusterName) - sc.serviceWorkerMap[clusterName] = true - } -} - -// Whenever there is change on service, the federation service should be updated -func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient fedclientset.Interface, sc *ServiceController) error { - // obj holds the latest service info from apiserver, return if there is no federation cache for the service - cachedService, ok := serviceCache.get(key) - if !ok { - // if serviceCache does not exists, that means the service is not created by federation, we should skip it - return nil - } - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) - clusterCache.serviceQueue.Add(key) - return err - } - var needUpdate, isDeletion bool - service, err := clusterCache.serviceStore.Services(namespace).Get(name) - switch { - case errors.IsNotFound(err): - glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName) - needUpdate = cc.processServiceDeletion(cachedService, clusterName) - isDeletion = true - case err != nil: - glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) - clusterCache.serviceQueue.Add(key) - return err - default: - glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) - needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) - } - - if needUpdate { - for i := 0; i < clientRetryCount; i++ { - err := sc.ensureDnsRecords(clusterName, service) - if err == nil { - break - } - glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err) - time.Sleep(cachedService.nextDNSUpdateDelay()) - clusterCache.serviceQueue.Add(key) - // did not retry here as we still want to persist federation apiserver even ensure dns records fails - } - err := cc.persistFedServiceUpdate(cachedService, fedClient) - if err == nil { - cachedService.appliedState = cachedService.lastState - cachedService.resetFedUpdateDelay() - } else { - if err != nil { - glog.Errorf("Failed to sync service: %+v, put back to service queue", err) - clusterCache.serviceQueue.Add(key) - } - } - } - if isDeletion { - // cachedService is not reliable here as - // deleting cache is the last step of federation service deletion - service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{}) - // rebuild service if federation service still exists - if err == nil || !errors.IsNotFound(err) { - if err == nil && service.DeletionTimestamp != nil { - glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name) - return nil - } - return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset) - } - } - return nil -} - -// processServiceDeletion is triggered when a service is delete from underlying k8s cluster -// the deletion function will wip out the cached ingress info of the service from federation service ingress -// the function returns a bool to indicate if actual update happened on federation service cache -// and if the federation service cache is updated, the updated info should be post to federation apiserver -func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool { - cachedService.rwlock.Lock() - defer cachedService.rwlock.Unlock() - cachedStatus, ok := cachedService.serviceStatusMap[clusterName] - // cached status found, remove ingress info from federation service cache - if ok { - cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer - removeIndexes := []int{} - for i, fed := range cachedFedServiceStatus.Ingress { - for _, new := range cachedStatus.Ingress { - // remove if same ingress record found - if new.IP == fed.IP && new.Hostname == fed.Hostname { - removeIndexes = append(removeIndexes, i) - } - } - } - sort.Ints(removeIndexes) - for i := len(removeIndexes) - 1; i >= 0; i-- { - cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) - glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name) - } - delete(cachedService.serviceStatusMap, clusterName) - delete(cachedService.endpointMap, clusterName) - cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus - return true - } else { - glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) - } - return false -} - -// processServiceUpdate Update ingress info when service updated -// the function returns a bool to indicate if actual update happened on federation service cache -// and if the federation service cache is updated, the updated info should be post to federation apiserver -func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *v1.Service, clusterName string) bool { - glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) - cachedService.rwlock.Lock() - defer cachedService.rwlock.Unlock() - var needUpdate bool - newServiceLB := service.Status.LoadBalancer - cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer - if len(newServiceLB.Ingress) == 0 { - // not yet get LB IP - return false - } - - cachedStatus, ok := cachedService.serviceStatusMap[clusterName] - if ok { - if reflect.DeepEqual(cachedStatus, newServiceLB) { - glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress) - } else { - glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ", - service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB) - needUpdate = true - } - } else { - glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName) - - // cache is not always reliable(cache will be cleaned when service controller restart) - // two cases will run into this branch: - // 1. new service loadbalancer info received -> no info in cache, and no in federation service - // 2. service controller being restarted -> no info in cache, but it is in federation service - - // check if the lb info is already in federation service - - cachedService.serviceStatusMap[clusterName] = newServiceLB - needUpdate = false - // iterate service ingress info - for _, new := range newServiceLB.Ingress { - var found bool - // if it is known by federation service - for _, fed := range cachedFedServiceStatus.Ingress { - if new.IP == fed.IP && new.Hostname == fed.Hostname { - found = true - break - } - } - if !found { - needUpdate = true - break - } - } - } - - if needUpdate { - // new status = cached federation status - cached status + new status from k8s cluster - - removeIndexes := []int{} - for i, fed := range cachedFedServiceStatus.Ingress { - for _, new := range cachedStatus.Ingress { - // remove if same ingress record found - if new.IP == fed.IP && new.Hostname == fed.Hostname { - removeIndexes = append(removeIndexes, i) - } - } - } - sort.Ints(removeIndexes) - for i := len(removeIndexes) - 1; i >= 0; i-- { - cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) - } - cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...) - cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus - glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name) - } else { - glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) - } - return needUpdate -} - -func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient fedclientset.Interface) error { - service := cachedService.lastState - glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) - var err error - for i := 0; i < clientRetryCount; i++ { - _, err := fedClient.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", - service.Namespace, service.Name, err) - return nil - } - _, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service) - if err == nil { - glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name) - return nil - } - if errors.IsNotFound(err) { - glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", - service.Namespace, service.Name, err) - return nil - } - if errors.IsConflict(err) { - glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", - service.Namespace, service.Name, err) - return err - } - time.Sleep(cachedService.nextFedUpdateDelay()) - } - return err -} - -// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. -func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - _, ok := cc.clientMap[clusterName] - if ok { - cc.clientMap[clusterName].serviceQueue.Add(key) - } -} diff --git a/federation/pkg/federation-controller/service/service_helper_test.go b/federation/pkg/federation-controller/service/service_helper_test.go deleted file mode 100644 index 03c154a2ad9..00000000000 --- a/federation/pkg/federation-controller/service/service_helper_test.go +++ /dev/null @@ -1,163 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "reflect" - "testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/api/v1" -) - -func buildServiceStatus(ingresses [][]string) v1.LoadBalancerStatus { - status := v1.LoadBalancerStatus{ - Ingress: []v1.LoadBalancerIngress{}, - } - for _, element := range ingresses { - ingress := v1.LoadBalancerIngress{IP: element[0], Hostname: element[1]} - status.Ingress = append(status.Ingress, ingress) - } - return status -} - -func TestProcessServiceUpdate(t *testing.T) { - cc := clusterClientCache{ - clientMap: make(map[string]*clusterCache), - } - tests := []struct { - name string - cachedService *cachedService - service *v1.Service - clusterName string - expectNeedUpdate bool - expectStatus v1.LoadBalancerStatus - }{ - { - "no-cache", - &cachedService{ - lastState: &v1.Service{}, - serviceStatusMap: make(map[string]v1.LoadBalancerStatus), - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - "foo", - true, - buildServiceStatus([][]string{{"ip1", ""}}), - }, - { - "same-ingress", - &cachedService{ - lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - serviceStatusMap: map[string]v1.LoadBalancerStatus{ - "foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}}, - }, - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - "foo1", - false, - buildServiceStatus([][]string{{"ip1", ""}}), - }, - { - "diff-cluster", - &cachedService{ - lastState: &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "bar1"}, - }, - serviceStatusMap: map[string]v1.LoadBalancerStatus{ - "foo2": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}}, - }, - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - "foo1", - true, - buildServiceStatus([][]string{{"ip1", ""}}), - }, - { - "diff-ingress", - &cachedService{ - lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, - serviceStatusMap: map[string]v1.LoadBalancerStatus{ - "foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}), - }, - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, - "foo1", - true, - buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}), - }, - } - for _, test := range tests { - result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName) - if test.expectNeedUpdate != result { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) - } - if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) - } - } -} - -func TestProcessServiceDeletion(t *testing.T) { - cc := clusterClientCache{ - clientMap: make(map[string]*clusterCache), - } - tests := []struct { - name string - cachedService *cachedService - service *v1.Service - clusterName string - expectNeedUpdate bool - expectStatus v1.LoadBalancerStatus - }{ - { - "same-ingress", - &cachedService{ - lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - serviceStatusMap: map[string]v1.LoadBalancerStatus{ - "foo1": {Ingress: []v1.LoadBalancerIngress{{IP: "ip1", Hostname: ""}}}, - }, - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, - "foo1", - true, - buildServiceStatus([][]string{}), - }, - { - "diff-ingress", - &cachedService{ - lastState: &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, - serviceStatusMap: map[string]v1.LoadBalancerStatus{ - "foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}), - "foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), - }, - }, - &v1.Service{Status: v1.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, - "foo1", - true, - buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), - }, - } - for _, test := range tests { - result := cc.processServiceDeletion(test.cachedService, test.clusterName) - if test.expectNeedUpdate != result { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) - } - if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { - t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) - } - } -} diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 91c480438f7..a1f783d01f1 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -18,23 +18,19 @@ package service import ( "fmt" + "reflect" "sort" "strings" - "sync" "time" - "reflect" - "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/labels" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientv1 "k8s.io/client-go/pkg/api/v1" @@ -59,27 +55,9 @@ import ( const ( serviceSyncPeriod = 10 * time.Minute - clusterSyncPeriod = 10 * time.Minute - - // How long to wait before retrying the processing of a service change. - // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster - // should be changed appropriately. - minRetryDelay = 5 * time.Second - maxRetryDelay = 300 * time.Second - - // client retry count and interval is when accessing a remote kube-apiserver or federation apiserver - // how many times should be attempted and how long it should sleep when failure occurs - // the retry should be in short time so no exponential backoff - clientRetryCount = 5 - - retryable = true - - doNotRetry = time.Duration(0) UserAgentName = "federation-service-controller" - maxNoOfClusters = 100 - reviewDelay = 10 * time.Second updateTimeout = 30 * time.Second allClustersKey = "ALL_CLUSTERS" @@ -91,33 +69,6 @@ var ( RequiredResources = []schema.GroupVersionResource{v1.SchemeGroupVersion.WithResource("services")} ) -type cachedService struct { - lastState *v1.Service - // The state as successfully applied to the DNS server - appliedState *v1.Service - // cluster endpoint map hold subset info from kubernetes clusters - // key clusterName - // value is a flag that if there is ready address, 1 means there is ready address - endpointMap map[string]int - // serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName - serviceStatusMap map[string]v1.LoadBalancerStatus - // Ensures only one goroutine can operate on this service at any given time. - rwlock sync.Mutex - // Controls error back-off for proceeding federation service to k8s clusters - lastRetryDelay time.Duration - // Controls error back-off for updating federation service back to federation apiserver - lastFedUpdateDelay time.Duration - // Controls error back-off for dns record update - lastDNSUpdateDelay time.Duration -} - -type serviceCache struct { - rwlock sync.Mutex // protects serviceMap - // federation service map contains all service received from federation apiserver - // key serviceName - fedServiceMap map[string]*cachedService -} - type ServiceController struct { dns dnsprovider.Interface federationClient fedclientset.Interface @@ -128,9 +79,7 @@ type ServiceController struct { zoneName string zoneID string // each federation should be configured with a single zone (e.g. "mycompany.com") - dnsZones dnsprovider.Zones - serviceCache *serviceCache - clusterCache *clusterClientCache + dnsZones dnsprovider.Zones // A store of services, populated by the serviceController serviceStore corelisters.ServiceLister // Watches changes to all services @@ -143,18 +92,7 @@ type ServiceController struct { eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder // services that need to be synced - queue *workqueue.Type - knownClusterSet sets.String - // endpoint worker map contains all the clusters registered with an indication that worker exist - // key clusterName - endpointWorkerMap map[string]bool - // channel for worker to signal that it is going out of existence - endpointWorkerDoneChan chan string - // service worker map contains all the clusters registered with an indication that worker exist - // key clusterName - serviceWorkerMap map[string]bool - // channel for worker to signal that it is going out of existence - serviceWorkerDoneChan chan string + queue *workqueue.Type // For triggering all services reconciliation. This is used when // a new cluster becomes available. @@ -174,7 +112,6 @@ type ServiceController struct { // New returns a new service controller to keep DNS provider service resources // (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. - func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController { broadcaster := record.NewBroadcaster() @@ -183,21 +120,15 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) s := &ServiceController{ - dns: dns, - federationClient: federationClient, - federationName: federationName, - serviceDnsSuffix: serviceDnsSuffix, - zoneName: zoneName, - zoneID: zoneID, - serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, - clusterCache: &clusterClientCache{ - rwlock: sync.Mutex{}, - clientMap: make(map[string]*clusterCache), - }, + dns: dns, + federationClient: federationClient, + federationName: federationName, + serviceDnsSuffix: serviceDnsSuffix, + zoneName: zoneName, + zoneID: zoneID, eventBroadcaster: broadcaster, eventRecorder: recorder, queue: workqueue.New(), - knownClusterSet: make(sets.String), reviewDelay: reviewDelay, clusterAvailableDelay: clusterAvailableDelay, updateTimeout: updateTimeout, @@ -317,10 +248,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, s.federatedUpdater, ) - s.endpointWorkerMap = make(map[string]bool) - s.serviceWorkerMap = make(map[string]bool) - s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) - s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters) return s } @@ -331,16 +258,6 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj return s.federationClient.Core().Services(service.Namespace).Update(service) } -// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. -func (s *ServiceController) enqueueService(obj interface{}) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - s.queue.Add(key) -} - // Run starts a background goroutine that watches for changes to federation services // and ensures that they have Kubernetes services created, updated or deleted appropriately. // federationSyncPeriod controls how often we check the federation's services to @@ -472,579 +389,6 @@ func wantsDNSRecords(service *v1.Service) bool { return service.Spec.Type == v1.ServiceTypeLoadBalancer } -// processServiceForCluster creates or updates service to all registered running clusters, -// update DNS records and update the service info with DNS entries to federation apiserver. -// the function returns any error caught -func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error { - if service.DeletionTimestamp != nil { - glog.V(4).Infof("Service has already been marked for deletion %v", service.Name) - return nil - } - glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) - // Create or Update k8s Service - err := s.ensureClusterService(cachedService, clusterName, service, client) - if err != nil { - glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) - return err - } - glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) - return nil -} - -// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it -// should be retried. -func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { - // Clone federation service, and create them in underlying k8s cluster - desiredService := &v1.Service{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta), - Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)), - } - - // handle available clusters one by one - hasErr := false - var wg sync.WaitGroup - for clusterName, cache := range s.clusterCache.clientMap { - wg.Add(1) - go func(cache *clusterCache, clusterName string) { - defer wg.Done() - err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset) - if err != nil { - hasErr = true - } - }(cache, clusterName) - } - wg.Wait() - if hasErr { - // detail error has been dumped inside the loop - return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable - } - return nil, !retryable -} - -func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error { - var err error - var needUpdate bool - for i := 0; i < clientRetryCount; i++ { - svc, err := client.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{}) - if err == nil { - // service exists - glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) - //reserve immutable fields - service.Spec.ClusterIP = svc.Spec.ClusterIP - - //reserve auto assigned field - for i, oldPort := range svc.Spec.Ports { - for _, port := range service.Spec.Ports { - if port.NodePort == 0 { - if !portEqualExcludeNodePort(&oldPort, &port) { - svc.Spec.Ports[i] = port - needUpdate = true - } - } else { - if !portEqualForLB(&oldPort, &port) { - svc.Spec.Ports[i] = port - needUpdate = true - } - } - } - } - - if needUpdate { - // we only apply spec update - svc.Spec = service.Spec - _, err = client.Core().Services(svc.Namespace).Update(svc) - if err == nil { - glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName) - return nil - } else { - glog.V(4).Infof("Failed to update %+v", err) - } - } else { - glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName) - return nil - } - } else if errors.IsNotFound(err) { - // Create service if it is not found - glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new", - service.Namespace, service.Name, clusterName) - service.ResourceVersion = "" - _, err = client.Core().Services(service.Namespace).Create(service) - if err == nil { - glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName) - return nil - } - glog.V(4).Infof("Failed to create %+v", err) - if errors.IsAlreadyExists(err) { - glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName) - return nil - } - } - if errors.IsConflict(err) { - glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", - service.Namespace, service.Name, err) - } - // should we reuse same retry delay for all clusters? - time.Sleep(cachedService.nextRetryDelay()) - } - return err -} - -func (s *serviceCache) allServices() []*cachedService { - s.rwlock.Lock() - defer s.rwlock.Unlock() - services := make([]*cachedService, 0, len(s.fedServiceMap)) - for _, v := range s.fedServiceMap { - services = append(services, v) - } - return services -} - -func (s *serviceCache) get(serviceName string) (*cachedService, bool) { - s.rwlock.Lock() - defer s.rwlock.Unlock() - service, ok := s.fedServiceMap[serviceName] - return service, ok -} - -func (s *serviceCache) getOrCreate(serviceName string) *cachedService { - s.rwlock.Lock() - defer s.rwlock.Unlock() - service, ok := s.fedServiceMap[serviceName] - if !ok { - service = &cachedService{ - endpointMap: make(map[string]int), - serviceStatusMap: make(map[string]v1.LoadBalancerStatus), - } - s.fedServiceMap[serviceName] = service - } - return service -} - -func (s *serviceCache) set(serviceName string, service *cachedService) { - s.rwlock.Lock() - defer s.rwlock.Unlock() - s.fedServiceMap[serviceName] = service -} - -func (s *serviceCache) delete(serviceName string) { - s.rwlock.Lock() - defer s.rwlock.Unlock() - delete(s.fedServiceMap, serviceName) -} - -// needsUpdateDNS check if the dns records of the given service should be updated -func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool { - if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) { - return false - } - if wantsDNSRecords(oldService) != wantsDNSRecords(newService) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v", - oldService.Spec.Type, newService.Spec.Type) - return true - } - if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { - return true - } - if !LoadBalancerIPsAreEqual(oldService, newService) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v", - oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) - return true - } - if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v", - len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) - return true - } - for i := range oldService.Spec.ExternalIPs { - if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v", - newService.Spec.ExternalIPs[i]) - return true - } - } - if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) { - return true - } - if oldService.UID != newService.UID { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v", - oldService.UID, newService.UID) - return true - } - - return false -} - -func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) { - // TODO: quinton: Probably applies for DNS SVC records. Come back to this. - //var protocol api.Protocol - - ports := []*v1.ServicePort{} - for i := range service.Spec.Ports { - sp := &service.Spec.Ports[i] - // The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation - ports = append(ports, sp) - } - return ports, nil -} - -func portsEqualForLB(x, y *v1.Service) bool { - xPorts, err := getPortsForLB(x) - if err != nil { - return false - } - yPorts, err := getPortsForLB(y) - if err != nil { - return false - } - return portSlicesEqualForLB(xPorts, yPorts) -} - -func portSlicesEqualForLB(x, y []*v1.ServicePort) bool { - if len(x) != len(y) { - return false - } - - for i := range x { - if !portEqualForLB(x[i], y[i]) { - return false - } - } - return true -} - -func portEqualForLB(x, y *v1.ServicePort) bool { - // TODO: Should we check name? (In theory, an LB could expose it) - if x.Name != y.Name { - return false - } - - if x.Protocol != y.Protocol { - return false - } - - if x.Port != y.Port { - return false - } - if x.NodePort != y.NodePort { - return false - } - return true -} - -func portEqualExcludeNodePort(x, y *v1.ServicePort) bool { - // TODO: Should we check name? (In theory, an LB could expose it) - if x.Name != y.Name { - return false - } - - if x.Protocol != y.Protocol { - return false - } - - if x.Port != y.Port { - return false - } - return true -} - -func clustersFromList(list *v1beta1.ClusterList) []string { - result := []string{} - for ix := range list.Items { - result = append(result, list.Items[ix].Name) - } - return result -} - -// getClusterConditionPredicate filter all clusters meet condition of -// condition.type=Ready and condition.status=true -func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { - return func(cluster v1beta1.Cluster) bool { - // If we have no info, don't accept - if len(cluster.Status.Conditions) == 0 { - return false - } - for _, cond := range cluster.Status.Conditions { - //We consider the cluster for load balancing only when its ClusterReady condition status - //is ConditionTrue - if cond.Type == v1beta1.ClusterReady && cond.Status != v1.ConditionTrue { - glog.V(4).Infof("Ignoring cluster %v with %v condition status %v", cluster.Name, cond.Type, cond.Status) - return false - } - } - return true - } -} - -// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster -// and add dns records for the changes -func (s *ServiceController) clusterSyncLoop() { - var servicesToUpdate []*cachedService - // should we remove cache for cluster from ready to not ready? should remove the condition predicate if no - clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List() - if err != nil { - glog.Infof("Fail to get cluster list") - return - } - newClusters := clustersFromList(&clusters) - var newSet, increase sets.String - newSet = sets.NewString(newClusters...) - if newSet.Equal(s.knownClusterSet) { - // The set of cluster names in the services in the federation hasn't changed, but we can retry - // updating any services that we failed to update last time around. - servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) - return - } - glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet) - increase = newSet.Difference(s.knownClusterSet) - // do nothing when cluster is removed. - if increase != nil { - // Try updating all services, and save the ones that fail to try again next - // round. - servicesToUpdate = s.serviceCache.allServices() - numServices := len(servicesToUpdate) - for newCluster := range increase { - glog.Infof("New cluster observed %s", newCluster) - s.updateAllServicesToCluster(servicesToUpdate, newCluster) - } - servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) - glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster", - numServices-len(servicesToUpdate), numServices) - } - s.knownClusterSet = newSet -} - -func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) { - cluster, ok := s.clusterCache.clientMap[clusterName] - if ok { - for _, cachedService := range services { - appliedState := cachedService.lastState - s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset) - } - } -} - -// updateDNSRecords updates all existing federation service DNS Records so that -// they will match the list of cluster names provided. -// Returns the list of services that couldn't be updated. -func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) { - for _, service := range services { - func() { - service.rwlock.Lock() - defer service.rwlock.Unlock() - // If the applied state is nil, that means it hasn't yet been successfully dealt - // with by the DNS Record reconciler. We can trust the DNS Record - // reconciler to ensure the federation service's DNS records are created to target - // the correct backend service IP's - if service.appliedState == nil { - return - } - if err := s.lockedUpdateDNSRecords(service, clusters); err != nil { - glog.Errorf("External error while updating DNS Records: %v.", err) - servicesToRetry = append(servicesToRetry, service) - } - }() - } - return servicesToRetry -} - -// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex -// associated with the service. -func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error { - if !wantsDNSRecords(service.appliedState) { - return nil - } - - ensuredCount := 0 - unensuredCount := 0 - for key := range s.clusterCache.clientMap { - for _, clusterName := range clusterNames { - if key == clusterName { - err := s.ensureDnsRecords(clusterName, service.lastState) - if err != nil { - unensuredCount += 1 - glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err) - } else { - ensuredCount += 1 - } - } - } - } - missedCount := len(clusterNames) - ensuredCount - unensuredCount - if missedCount > 0 || unensuredCount > 0 { - return fmt.Errorf("Failed to update DNS records for %d clusters for service %v due to missing clients [missed count: %d] and/or failing to ensure DNS records [unensured count: %d]", - len(clusterNames), service, missedCount, unensuredCount) - } - return nil -} - -func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool { - return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP -} - -// Computes the next retry, using exponential backoff -// mutex must be held. -func (s *cachedService) nextRetryDelay() time.Duration { - s.lastRetryDelay = s.lastRetryDelay * 2 - if s.lastRetryDelay < minRetryDelay { - s.lastRetryDelay = minRetryDelay - } - if s.lastRetryDelay > maxRetryDelay { - s.lastRetryDelay = maxRetryDelay - } - return s.lastRetryDelay -} - -// resetRetryDelay Resets the retry exponential backoff. mutex must be held. -func (s *cachedService) resetRetryDelay() { - s.lastRetryDelay = time.Duration(0) -} - -// Computes the next retry, using exponential backoff -// mutex must be held. -func (s *cachedService) nextFedUpdateDelay() time.Duration { - s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2 - if s.lastFedUpdateDelay < minRetryDelay { - s.lastFedUpdateDelay = minRetryDelay - } - if s.lastFedUpdateDelay > maxRetryDelay { - s.lastFedUpdateDelay = maxRetryDelay - } - return s.lastFedUpdateDelay -} - -// resetRetryDelay Resets the retry exponential backoff. mutex must be held. -func (s *cachedService) resetFedUpdateDelay() { - s.lastFedUpdateDelay = time.Duration(0) -} - -// Computes the next retry, using exponential backoff -// mutex must be held. -func (s *cachedService) nextDNSUpdateDelay() time.Duration { - s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2 - if s.lastDNSUpdateDelay < minRetryDelay { - s.lastDNSUpdateDelay = minRetryDelay - } - if s.lastDNSUpdateDelay > maxRetryDelay { - s.lastDNSUpdateDelay = maxRetryDelay - } - return s.lastDNSUpdateDelay -} - -// resetRetryDelay Resets the retry exponential backoff. mutex must be held. -func (s *cachedService) resetDNSUpdateDelay() { - s.lastDNSUpdateDelay = time.Duration(0) -} - -// syncService will sync the Service with the given key if it has had its expectations fulfilled, -// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be -// invoked concurrently with the same key. -func (s *ServiceController) syncService(key string) error { - startTime := time.Now() - var cachedService *cachedService - var retryDelay time.Duration - defer func() { - glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) - }() - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - glog.Errorf("Unable to retrieve service %v from store: %v", key, err) - s.queue.Add(key) - return err - } - - service, err := s.serviceStore.Services(namespace).Get(name) - switch { - case errors.IsNotFound(err): - // service absence in store means watcher caught the deletion, ensure LB info is cleaned - glog.Infof("Service has been deleted %v", key) - err, retryDelay = s.processServiceDeletion(key) - case err != nil: - glog.Errorf("Unable to retrieve service %v from store: %v", key, err) - s.queue.Add(key) - return err - default: - // Create a copy before modifying the obj to prevent race condition with - // other readers of obj from store. - copy, err := conversion.NewCloner().DeepCopy(service) - if err != nil { - glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err) - s.queue.Add(key) - return err - } - service := copy.(*v1.Service) - cachedService = s.serviceCache.getOrCreate(key) - err, retryDelay = s.processServiceUpdate(cachedService, service, key) - } - - if retryDelay != 0 { - s.enqueueService(service) - } else if err != nil { - runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err)) - } - return nil -} - -// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration -// indicating whether processing should be retried; zero means no-retry; otherwise -// we should retry in that Duration. -func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) { - // Ensure that no other goroutine will interfere with our processing of the - // service. - cachedService.rwlock.Lock() - defer cachedService.rwlock.Unlock() - - if service.DeletionTimestamp != nil { - if err := s.delete(service); err != nil { - glog.Errorf("Failed to delete %s: %v", service, err) - s.eventRecorder.Eventf(service, api.EventTypeWarning, "DeleteFailed", - "Service delete failed: %v", err) - return err, cachedService.nextRetryDelay() - } - return nil, doNotRetry - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s", - service.Name) - // Add the required finalizers before creating a service in underlying clusters. - updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v", - service.Name, err) - return err, cachedService.nextRetryDelay() - } - service = updatedServiceObj.(*v1.Service) - - glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name) - - // Update the cached service (used above for populating synthetic deletes) - // alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver - // if the same service is changed before this go routine finished, there will be another queue entry to handle that. - cachedService.lastState = service - err, retry := s.updateFederationService(key, cachedService) - if err != nil { - message := "Error occurs when updating service to all clusters" - if retry { - message += " (will retry): " - } else { - message += " (will not retry): " - } - message += err.Error() - s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message) - return err, cachedService.nextRetryDelay() - } - // Always update the cache upon success. - // NOTE: Since we update the cached service if and only if we successfully - // processed it, a cached service being nil implies that it hasn't yet - // been successfully processed. - - cachedService.appliedState = service - s.serviceCache.set(key, cachedService) - glog.V(4).Infof("Successfully proceeded services %s", key) - cachedService.resetRetryDelay() - return nil, doNotRetry -} - // delete deletes the given service or returns error if the deletion was not complete. func (s *ServiceController) delete(service *v1.Service) error { glog.V(3).Infof("Handling deletion of service: %v", *service) @@ -1083,15 +427,6 @@ func (s *ServiceController) delete(service *v1.Service) error { return nil } -// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration -// indicating whether processing should be retried; zero means no-retry; otherwise -// we should retry in that Duration. -func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) { - glog.V(2).Infof("Process service deletion for %v", key) - s.serviceCache.delete(key) - return nil, doNotRetry -} - func (s *ServiceController) deliverServicesOnClusterChange() { if !s.isSynced() { s.clusterDeliverer.DeliverAfter(allClustersKey, nil, s.clusterAvailableDelay) diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index 93a560da309..063b8c8f8b8 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -20,7 +20,6 @@ import ( "fmt" "reflect" "strings" - "sync" "testing" "time" @@ -29,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/federation/apis/federation/v1beta1" @@ -43,64 +41,6 @@ import ( corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" ) -func TestGetClusterConditionPredicate(t *testing.T) { - fakedns, _ := clouddns.NewFakeInterface() // No need to check for unsupported interfaces, as the fake interface supports everything that's required. - serviceController := ServiceController{ - dns: fakedns, - serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)}, - clusterCache: &clusterClientCache{ - rwlock: sync.Mutex{}, - clientMap: make(map[string]*clusterCache), - }, - knownClusterSet: make(sets.String), - } - - tests := []struct { - cluster v1beta1.Cluster - expectAccept bool - name string - serviceController *ServiceController - }{ - { - cluster: v1beta1.Cluster{}, - expectAccept: false, - name: "empty", - serviceController: &serviceController, - }, - { - cluster: v1beta1.Cluster{ - Status: v1beta1.ClusterStatus{ - Conditions: []v1beta1.ClusterCondition{ - {Type: v1beta1.ClusterReady, Status: v1.ConditionTrue}, - }, - }, - }, - expectAccept: true, - name: "basic", - serviceController: &serviceController, - }, - { - cluster: v1beta1.Cluster{ - Status: v1beta1.ClusterStatus{ - Conditions: []v1beta1.ClusterCondition{ - {Type: v1beta1.ClusterReady, Status: v1.ConditionFalse}, - }, - }, - }, - expectAccept: false, - name: "notready", - serviceController: &serviceController, - }, - } - pred := getClusterConditionPredicate() - for _, test := range tests { - accept := pred(test.cluster) - if accept != test.expectAccept { - t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept) - } - } -} - const ( retryInterval = 100 * time.Millisecond