[Federation] Replace the indexing store with a regular store in the replicaset controller.

This commit is contained in:
Jonathan MacMillan 2017-04-28 10:31:35 -07:00
parent 899f2f4d4c
commit 6f6955819f
2 changed files with 10 additions and 34 deletions

View File

@ -25,15 +25,12 @@ go_library(
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",

View File

@ -27,10 +27,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
@ -50,7 +48,6 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
@ -88,8 +85,8 @@ func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.Fede
type ReplicaSetController struct { type ReplicaSetController struct {
fedClient fedclientset.Interface fedClient fedclientset.Interface
replicaSetStore cache.Store
replicaSetController cache.Controller replicaSetController cache.Controller
replicaSetLister extensionslisters.ReplicaSetLister
fedReplicaSetInformer fedutil.FederatedInformer fedReplicaSetInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer fedPodInformer fedutil.FederatedInformer
@ -176,9 +173,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
) )
} }
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
frsc.replicaSetStore, frsc.replicaSetController = cache.NewInformer(
var replicaSetIndexer cache.Indexer
replicaSetIndexer, frsc.replicaSetController = cache.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options) return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
@ -190,11 +185,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
&extensionsv1.ReplicaSet{}, &extensionsv1.ReplicaSet{},
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges( fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }, func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }))
),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder, frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error { func(client kubeclientset.Interface, obj runtime.Object) error {
@ -313,13 +304,9 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati
glog.Errorf("Couldn't get key for object %v: %v", obj, err) glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return return
} }
namespace, name, err := cache.SplitMetaNamespaceKey(key) _, exists, err := frsc.replicaSetStore.GetByKey(key)
if err != nil {
glog.Errorf("Error splitting key for object %v: %v", obj, err)
}
_, err = frsc.replicaSetLister.ReplicaSets(namespace).Get(name)
switch { switch {
case errors.IsNotFound(err): case !exists:
// do nothing // do nothing
case err != nil: case err != nil:
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
@ -447,18 +434,14 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now() startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
namespace, name, err := cache.SplitMetaNamespaceKey(key) objFromStore, exists, err := frsc.replicaSetStore.GetByKey(key)
if err != nil { if err != nil {
return statusError, err return statusError, err
} }
objFromStore, err := frsc.replicaSetLister.ReplicaSets(namespace).Get(name) if !exists {
if errors.IsNotFound(err) {
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil return statusAllOk, nil
} }
if err != nil {
return statusError, err
}
obj, err := api.Scheme.DeepCopy(objFromStore) obj, err := api.Scheme.DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet) frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok { if err != nil || !ok {
@ -593,12 +576,8 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() { if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
} }
rss, err := frsc.replicaSetLister.List(labels.Everything())
if err != nil { for _, rs := range frsc.replicaSetStore.List() {
utilruntime.HandleError(fmt.Errorf("error listing replica sets: %v", err))
return
}
for _, rs := range rss {
key, _ := controller.KeyFunc(rs) key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false) frsc.deliverReplicaSetByKey(key, 0, false)
} }