From a3a2246f734cad04656a396fc1f0d3ba4b47dacc Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 22 Feb 2017 16:42:20 -0500 Subject: [PATCH] federation: use generated listers --- .../federation-controller/replicaset/BUILD | 4 +- .../replicaset/replicasetcontroller.go | 38 ++++++++++---- .../pkg/federation-controller/service/BUILD | 2 +- .../service/cluster_helper.go | 15 ++++-- .../service/endpoint_helper.go | 28 +++++----- .../service/service_helper.go | 28 ++++------ .../service/servicecontroller.go | 52 +++++++++---------- 7 files changed, 90 insertions(+), 77 deletions(-) diff --git a/federation/pkg/federation-controller/replicaset/BUILD b/federation/pkg/federation-controller/replicaset/BUILD index 77a0b598c4d..ea16d8c9e76 100644 --- a/federation/pkg/federation-controller/replicaset/BUILD +++ b/federation/pkg/federation-controller/replicaset/BUILD @@ -25,12 +25,14 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/listers/extensions/v1beta1:go_default_library", "//pkg/controller:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/pkg/api/v1", diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index c31b1c38350..070b8eb0c7a 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -27,7 +27,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientv1 "k8s.io/client-go/pkg/api/v1" @@ -47,7 +49,7 @@ import ( apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" ) @@ -84,7 +86,7 @@ type ReplicaSetController struct { fedClient fedclientset.Interface replicaSetController cache.Controller - replicaSetStore listers.StoreToReplicaSetLister + replicaSetLister extensionslisters.ReplicaSetLister fedReplicaSetInformer fedutil.FederatedInformer fedPodInformer fedutil.FederatedInformer @@ -172,7 +174,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe } frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) - frsc.replicaSetStore.Indexer, frsc.replicaSetController = cache.NewIndexerInformer( + var replicaSetIndexer cache.Indexer + replicaSetIndexer, frsc.replicaSetController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options) @@ -188,6 +191,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe ), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer) frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, func(client kubeclientset.Interface, obj runtime.Object) error { @@ -349,12 +353,18 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati glog.Errorf("Couldn't get key for object %v: %v", obj, err) return } - _, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) - return + glog.Errorf("Error splitting key for object %v: %v", obj, err) } - if exists { // ignore replicasets exists only in local k8s + _, err = frsc.replicaSetLister.ReplicaSets(namespace).Get(name) + switch { + case errors.IsNotFound(err): + // do nothing + case err != nil: + glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) + default: + // ReplicaSet exists. Ignore ReplicaSets that exist only in local k8s frsc.deliverReplicaSetByKey(key, duration, false) } } @@ -477,14 +487,18 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio startTime := time.Now() defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) - objFromStore, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return statusError, err } - if !exists { + objFromStore, err := frsc.replicaSetLister.ReplicaSets(namespace).Get(name) + if errors.IsNotFound(err) { // don't delete local replicasets for now. Do not reconcile it anymore. return statusAllOk, nil } + if err != nil { + return statusError, err + } obj, err := api.Scheme.DeepCopy(objFromStore) frs, ok := obj.(*extensionsv1.ReplicaSet) if err != nil || !ok { @@ -626,7 +640,11 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } - rss := frsc.replicaSetStore.Indexer.List() + rss, err := frsc.replicaSetLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error listing replica sets: %v", err)) + return + } for _, rs := range rss { key, _ := controller.KeyFunc(rs) frsc.deliverReplicaSetByKey(key, 0, false) diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index bd54106394e..8bc3d772c4a 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -30,7 +30,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/controller:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go index 180f2e23667..b3446a19559 100644 --- a/federation/pkg/federation-controller/service/cluster_helper.go +++ b/federation/pkg/federation-controller/service/cluster_helper.go @@ -29,23 +29,23 @@ import ( v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" v1 "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" "reflect" "github.com/golang/glog" "k8s.io/kubernetes/federation/pkg/federation-controller/util" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" ) type clusterCache struct { clientset *kubeclientset.Clientset cluster *v1beta1.Cluster // A store of services, populated by the serviceController - serviceStore listers.StoreToServiceLister + serviceStore corelisters.ServiceLister // Watches changes to all services serviceController cache.Controller // A store of endpoint, populated by the serviceController - endpointStore listers.StoreToEndpointsLister + endpointStore corelisters.EndpointsLister // Watches changes to all endpoints endpointController cache.Controller // services that need to be synced @@ -91,7 +91,8 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa serviceQueue: workqueue.New(), endpointQueue: workqueue.New(), } - cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = cache.NewInformer( + var endpointIndexer cache.Indexer + endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { return clientset.Core().Endpoints(metav1.NamespaceAll).List(options) @@ -113,9 +114,12 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa cc.enqueueEndpoint(obj, clusterName) }, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer) - cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer( + var serviceIndexer cache.Indexer + serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { return clientset.Core().Services(metav1.NamespaceAll).List(options) @@ -152,6 +156,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer) cc.clientMap[clusterName] = cachedClusterClient go cachedClusterClient.serviceController.Run(wait.NeverStop) go cachedClusterClient.endpointController.Run(wait.NeverStop) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 17bfe0b0c04..53295c4e523 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -17,9 +17,9 @@ limitations under the License. package service import ( - "fmt" "time" + "k8s.io/apimachinery/pkg/api/errors" cache "k8s.io/client-go/tools/cache" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" v1 "k8s.io/kubernetes/pkg/api/v1" @@ -81,29 +81,25 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache // here we filtered all non-federation services return nil } - endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) clusterCache.endpointQueue.Add(key) return err } - if exists { - endpoint, ok := endpointInterface.(*v1.Endpoints) - if ok { - glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) - err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController) - } else { - _, ok := endpointInterface.(cache.DeletedFinalStateUnknown) - if !ok { - return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface) - } - glog.Infof("Found tombstone for %v", key) - err = cc.processEndpointDeletion(cachedService, clusterName, serviceController) - } - } else { + endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name) + switch { + case errors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName) err = cc.processEndpointDeletion(cachedService, clusterName, serviceController) + case err != nil: + glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) + clusterCache.endpointQueue.Add(key) + return err + default: + glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) + err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController) } if err != nil { glog.Errorf("Failed to sync service: %+v, put back to service queue", err) diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index f775946ae7c..887769bec29 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -17,7 +17,6 @@ limitations under the License. package service import ( - "fmt" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -85,31 +84,26 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache // if serviceCache does not exists, that means the service is not created by federation, we should skip it return nil } - serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) clusterCache.serviceQueue.Add(key) return err } var needUpdate, isDeletion bool - if exists { - service, ok := serviceInterface.(*v1.Service) - if ok { - glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) - needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) - } else { - _, ok := serviceInterface.(cache.DeletedFinalStateUnknown) - if !ok { - return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface) - } - glog.Infof("Found tombstone for %v", key) - needUpdate = cc.processServiceDeletion(cachedService, clusterName) - isDeletion = true - } - } else { + service, err := clusterCache.serviceStore.Services(namespace).Get(name) + switch { + case errors.IsNotFound(err): glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName) needUpdate = cc.processServiceDeletion(cachedService, clusterName) isDeletion = true + case err != nil: + glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) + clusterCache.serviceQueue.Add(key) + return err + default: + glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) + needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) } if needUpdate { diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 5c07aa46c94..610bac20e9b 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -46,7 +46,7 @@ import ( "k8s.io/kubernetes/pkg/api" v1 "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller" ) @@ -121,7 +121,7 @@ type ServiceController struct { serviceCache *serviceCache clusterCache *clusterClientCache // A store of services, populated by the serviceController - serviceStore listers.StoreToServiceLister + serviceStore corelisters.ServiceLister // Watches changes to all services serviceController cache.Controller federatedInformer fedutil.FederatedInformer @@ -180,7 +180,8 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, knownClusterSet: make(sets.String), } s.clusterDeliverer = util.NewDelayingDeliverer() - s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( + var serviceIndexer cache.Indexer + serviceIndexer, s.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { return s.federationClient.Core().Services(metav1.NamespaceAll).List(options) @@ -203,6 +204,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + s.serviceStore = corelisters.NewServiceLister(serviceIndexer) s.clusterStore.Store, s.clusterController = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { @@ -938,44 +940,40 @@ func (s *ServiceController) syncService(key string) error { defer func() { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) }() - // obj holds the latest service info from apiserver - objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { glog.Errorf("Unable to retrieve service %v from store: %v", key, err) s.queue.Add(key) return err } - if !exists { + + service, err := s.serviceStore.Services(namespace).Get(name) + switch { + case errors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned glog.Infof("Service has been deleted %v", key) err, retryDelay = s.processServiceDeletion(key) - } - // Create a copy before modifying the obj to prevent race condition with - // other readers of obj from store. - obj, err := conversion.NewCloner().DeepCopy(objFromStore) - if err != nil { - glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err) + case err != nil: + glog.Errorf("Unable to retrieve service %v from store: %v", key, err) s.queue.Add(key) return err - } - - if exists { - service, ok := obj.(*v1.Service) - if ok { - cachedService = s.serviceCache.getOrCreate(key) - err, retryDelay = s.processServiceUpdate(cachedService, service, key) - } else { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj) - } - glog.Infof("Found tombstone for %v", key) - err, retryDelay = s.processServiceDeletion(tombstone.Key) + default: + // Create a copy before modifying the obj to prevent race condition with + // other readers of obj from store. + copy, err := conversion.NewCloner().DeepCopy(service) + if err != nil { + glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err) + s.queue.Add(key) + return err } + service := copy.(*v1.Service) + cachedService = s.serviceCache.getOrCreate(key) + err, retryDelay = s.processServiceUpdate(cachedService, service, key) } if retryDelay != 0 { - s.enqueueService(obj) + s.enqueueService(service) } else if err != nil { runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err)) }