Merge pull request #45256 from perotinus/rs_noindexer

Automatic merge from submit-queue (batch tested with PRs 45556, 45561, 45256)

[Federation] Replace the indexing lister with a regular store in the replicaset controller

This is part of the refactoring work to allow the replicaset controller to use the generic sync controller.

None of the other controllers use a lister, including the deployment controller

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-10 22:24:43 -07:00 committed by GitHub
commit c2f6ccf0ef
2 changed files with 10 additions and 34 deletions

View File

@ -25,15 +25,12 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",

View File

@ -27,10 +27,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
@ -50,7 +48,6 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
)
@ -88,8 +85,8 @@ func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.Fede
type ReplicaSetController struct {
fedClient fedclientset.Interface
replicaSetStore cache.Store
replicaSetController cache.Controller
replicaSetLister extensionslisters.ReplicaSetLister
fedReplicaSetInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
@ -176,9 +173,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
)
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
var replicaSetIndexer cache.Indexer
replicaSetIndexer, frsc.replicaSetController = cache.NewIndexerInformer(
frsc.replicaSetStore, frsc.replicaSetController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
@ -190,11 +185,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
&extensionsv1.ReplicaSet{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) },
),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)
func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }))
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", updateTimeout, frsc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
@ -312,13 +303,9 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Error splitting key for object %v: %v", obj, err)
}
_, err = frsc.replicaSetLister.ReplicaSets(namespace).Get(name)
_, exists, err := frsc.replicaSetStore.GetByKey(key)
switch {
case errors.IsNotFound(err):
case !exists:
// do nothing
case err != nil:
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
@ -446,18 +433,14 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
namespace, name, err := cache.SplitMetaNamespaceKey(key)
objFromStore, exists, err := frsc.replicaSetStore.GetByKey(key)
if err != nil {
return statusError, err
}
objFromStore, err := frsc.replicaSetLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
// don't delete local replicasets for now. Do not reconcile it anymore.
if !exists {
return statusAllOk, nil
}
if err != nil {
return statusError, err
}
obj, err := api.Scheme.DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok {
@ -592,12 +575,8 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
rss, err := frsc.replicaSetLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("error listing replica sets: %v", err))
return
}
for _, rs := range rss {
for _, rs := range frsc.replicaSetStore.List() {
key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false)
}