From 82e73efe8339da3a80bd1cfbbb77572f6e4e78fd Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Sun, 5 Feb 2017 20:28:20 -0800 Subject: [PATCH 1/2] fed: Abstract secret controller interaction with the secret type This change uses an adapter class to abstracts the interaction of the secret controller with the secret api type. This is the first step to creating a generic controller that can target any type for which an adapter exists. --- .../pkg/federation-controller/secret/BUILD | 1 + .../secret/secret_controller.go | 146 +++++++++--------- federation/pkg/typeadapters/BUILD | 1 + federation/pkg/typeadapters/adapter.go | 21 ++- federation/pkg/typeadapters/secret.go | 65 +++++++- 5 files changed, 152 insertions(+), 82 deletions(-) diff --git a/federation/pkg/federation-controller/secret/BUILD b/federation/pkg/federation-controller/secret/BUILD index 3ad927bed97..a2ce2020f27 100644 --- a/federation/pkg/federation-controller/secret/BUILD +++ b/federation/pkg/federation-controller/secret/BUILD @@ -18,6 +18,7 @@ go_library( "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/eventsink:go_default_library", + "//federation/pkg/typeadapters:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", diff --git a/federation/pkg/federation-controller/secret/secret_controller.go b/federation/pkg/federation-controller/secret/secret_controller.go index c054e7c6157..2119995a001 100644 --- a/federation/pkg/federation-controller/secret/secret_controller.go +++ b/federation/pkg/federation-controller/secret/secret_controller.go @@ -18,6 +18,7 @@ package secret import ( "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +37,7 @@ import ( "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" + "k8s.io/kubernetes/federation/pkg/typeadapters" "k8s.io/kubernetes/pkg/api" apiv1 "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -72,9 +74,6 @@ type SecretController struct { // Informer controller for secrets that should be federated. secretInformerController cache.Controller - // Client to federated api server. - federatedApiClient federationclientset.Interface - // Backoff manager for secrets secretBackoff *flowcontrol.Backoff @@ -87,6 +86,8 @@ type SecretController struct { clusterAvailableDelay time.Duration smallDelay time.Duration updateTimeout time.Duration + + adapter typeadapters.FederatedTypeAdapter } // StartSecretController starts a new secret controller @@ -103,18 +104,20 @@ func StartSecretController(config *restclient.Config, stopChan <-chan struct{}, // newSecretController returns a new secret controller func newSecretController(client federationclientset.Interface) *SecretController { + adapter := typeadapters.NewSecretAdapter(client) + broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-secrets-controller"}) + recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federated-%v-controller", adapter.Kind())}) secretcontroller := &SecretController{ - federatedApiClient: client, secretReviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, secretBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), eventRecorder: recorder, + adapter: adapter, } // Build delivereres for triggering reconciliations. @@ -125,13 +128,13 @@ func newSecretController(client federationclientset.Interface) *SecretController secretcontroller.secretInformerStore, secretcontroller.secretInformerController = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return client.Core().Secrets(metav1.NamespaceAll).List(options) + return adapter.FedList(metav1.NamespaceAll, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().Secrets(metav1.NamespaceAll).Watch(options) + return adapter.FedWatch(metav1.NamespaceAll, options) }, }, - &apiv1.Secret{}, + adapter.ObjectType(), controller.NoResyncPeriodFunc(), util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { secretcontroller.deliverSecretObj(obj, 0, false) })) @@ -142,13 +145,13 @@ func newSecretController(client federationclientset.Interface) *SecretController return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return targetClient.Core().Secrets(metav1.NamespaceAll).List(options) + return adapter.ClusterList(targetClient, metav1.NamespaceAll, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Core().Secrets(metav1.NamespaceAll).Watch(options) + return adapter.ClusterWatch(targetClient, metav1.NamespaceAll, options) }, }, - &apiv1.Secret{}, + adapter.ObjectType(), controller.NoResyncPeriodFunc(), // Trigger reconciliation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some secret operation succeeded. @@ -170,19 +173,17 @@ func newSecretController(client federationclientset.Interface) *SecretController // Federated updeater along with Create/Update/Delete operations. secretcontroller.federatedUpdater = util.NewFederatedUpdater(secretcontroller.secretFederatedInformer, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) - _, err := client.Core().Secrets(secret.Namespace).Create(secret) + _, err := adapter.ClusterCreate(client, obj) return err }, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) - _, err := client.Core().Secrets(secret.Namespace).Update(secret) + _, err := adapter.ClusterUpdate(client, obj) return err }, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) + namespacedName := adapter.NamespacedName(obj) orphanDependents := false - err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) + err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) return err }) @@ -192,8 +193,7 @@ func newSecretController(client federationclientset.Interface) *SecretController secretcontroller.addFinalizerFunc, // objNameFunc func(obj pkgruntime.Object) string { - secret := obj.(*apiv1.Secret) - return secret.Name + return adapter.ObjectMeta(obj).Name }, secretcontroller.updateTimeout, secretcontroller.eventRecorder, @@ -214,9 +214,9 @@ func (secretcontroller *SecretController) minimizeLatency() { // Returns true if the given object has the given finalizer in its ObjectMeta. func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { - secret := obj.(*apiv1.Secret) - for i := range secret.ObjectMeta.Finalizers { - if string(secret.ObjectMeta.Finalizers[i]) == finalizer { + meta := secretcontroller.adapter.ObjectMeta(obj) + for i := range meta.Finalizers { + if string(meta.Finalizers[i]) == finalizer { return true } } @@ -226,12 +226,13 @@ func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object // Removes the finalizers from the given objects ObjectMeta. // Assumes that the given object is a secret. func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - secret := obj.(*apiv1.Secret) + adapter := secretcontroller.adapter + meta := adapter.ObjectMeta(obj) newFinalizers := []string{} hasFinalizer := false - for i := range secret.ObjectMeta.Finalizers { - if !deletionhelper.ContainsString(finalizers, secret.ObjectMeta.Finalizers[i]) { - newFinalizers = append(newFinalizers, secret.ObjectMeta.Finalizers[i]) + for i := range meta.Finalizers { + if !deletionhelper.ContainsString(finalizers, meta.Finalizers[i]) { + newFinalizers = append(newFinalizers, meta.Finalizers[i]) } else { hasFinalizer = true } @@ -240,10 +241,10 @@ func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Obj // Nothing to do. return obj, nil } - secret.ObjectMeta.Finalizers = newFinalizers - secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret) + meta.Finalizers = newFinalizers + secret, err := secretcontroller.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to remove finalizers %v from secret %s: %v", finalizers, secret.Name, err) + return nil, fmt.Errorf("failed to remove finalizers %v from %s %s: %v", finalizers, adapter.Kind(), meta.Name, err) } return secret, nil } @@ -251,11 +252,12 @@ func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Obj // Adds the given finalizers to the given objects ObjectMeta. // Assumes that the given object is a secret. func (secretcontroller *SecretController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - secret := obj.(*apiv1.Secret) - secret.ObjectMeta.Finalizers = append(secret.ObjectMeta.Finalizers, finalizers...) - secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret) + adapter := secretcontroller.adapter + meta := adapter.ObjectMeta(obj) + meta.Finalizers = append(meta.Finalizers, finalizers...) + secret, err := secretcontroller.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to add finalizers %v to secret %s: %v", finalizers, secret.Name, err) + return nil, fmt.Errorf("failed to add finalizers %v to %s %s: %v", finalizers, adapter.Kind(), meta.Name, err) } return secret, nil } @@ -277,9 +279,9 @@ func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) { util.StartBackoffGC(secretcontroller.secretBackoff, stopChan) } -func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) { - secret := obj.(*apiv1.Secret) - secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, delay, failed) +func (secretcontroller *SecretController) deliverSecretObj(obj pkgruntime.Object, delay time.Duration, failed bool) { + namespacedName := secretcontroller.adapter.NamespacedName(obj) + secretcontroller.deliverSecret(namespacedName, delay, failed) } // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. @@ -318,12 +320,15 @@ func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() { secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) } for _, obj := range secretcontroller.secretInformerStore.List() { - secret := obj.(*apiv1.Secret) - secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, secretcontroller.smallDelay, false) + namespacedName := secretcontroller.adapter.NamespacedName(obj.(pkgruntime.Object)) + secretcontroller.deliverSecret(namespacedName, secretcontroller.smallDelay, false) } } func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) { + adapter := secretcontroller.adapter + kind := adapter.Kind() + if !secretcontroller.isSynced() { secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) return @@ -332,7 +337,7 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace key := secret.String() baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key) if err != nil { - glog.Errorf("Failed to query main secret store for %v: %v", key, err) + glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err) secretcontroller.deliverSecret(secret, 0, true) return } @@ -345,35 +350,42 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace // Create a copy before modifying the obj to prevent race condition with // other readers of obj from store. baseSecretObj, err := api.Scheme.DeepCopy(baseSecretObjFromStore) - baseSecret, ok := baseSecretObj.(*apiv1.Secret) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) + if err != nil { + glog.Errorf("Error in retrieving obj from store: %v", err) secretcontroller.deliverSecret(secret, 0, true) return } - if baseSecret.DeletionTimestamp != nil { - if err := secretcontroller.delete(baseSecret); err != nil { - glog.Errorf("Failed to delete %s: %v", secret, err) + if !adapter.IsExpectedType(baseSecretObj) { + glog.Errorf("Object is not the expected type: %v", baseSecretObj) + secretcontroller.deliverSecret(secret, 0, true) + return + } + + baseSecret := baseSecretObj.(pkgruntime.Object) + meta := adapter.ObjectMeta(baseSecret) + + if meta.DeletionTimestamp != nil { + if err := secretcontroller.delete(baseSecret, secret); err != nil { + glog.Errorf("Failed to delete %s %s: %v", kind, secret, err) secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed", - "Secret delete failed: %v", err) + "%s delete failed: %v", strings.ToTitle(kind), err) secretcontroller.deliverSecret(secret, 0, true) } return } - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for secret: %s", - baseSecret.Name) + glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", + kind, secret) // Add the required finalizers before creating a secret in underlying clusters. - updatedSecretObj, err := secretcontroller.deletionHelper.EnsureFinalizers(baseSecret) + baseSecret, err = secretcontroller.deletionHelper.EnsureFinalizers(baseSecret) if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in secret %s: %v", - baseSecret.Name, err) + glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v", + kind, secret, err) secretcontroller.deliverSecret(secret, 0, false) return } - baseSecret = updatedSecretObj.(*apiv1.Secret) - glog.V(3).Infof("Syncing secret %s in underlying clusters", baseSecret.Name) + glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, secret) clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() if err != nil { @@ -392,15 +404,11 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace } // The data should not be modified. - desiredSecret := &apiv1.Secret{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(baseSecret.ObjectMeta), - Data: baseSecret.Data, - Type: baseSecret.Type, - } + desiredSecret := adapter.Copy(baseSecret) if !found { secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "CreateInCluster", - "Creating secret in cluster %s", cluster.Name) + "Creating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, @@ -408,13 +416,12 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace ClusterName: cluster.Name, }) } else { - clusterSecret := clusterSecretObj.(*apiv1.Secret) + clusterSecret := clusterSecretObj.(pkgruntime.Object) // Update existing secret, if needed. - if !util.SecretEquivalent(*desiredSecret, *clusterSecret) { - + if !adapter.Equivalent(desiredSecret, clusterSecret) { secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInCluster", - "Updating secret in cluster %s", cluster.Name) + "Updating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, Obj: desiredSecret, @@ -431,7 +438,7 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace err = secretcontroller.federatedUpdater.UpdateWithOnError(operations, secretcontroller.updateTimeout, func(op util.FederatedOperation, operror error) { secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInClusterFailed", - "Secret update in cluster %s failed: %v", op.ClusterName, operror) + "%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror) }) if err != nil { @@ -445,20 +452,21 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace } // delete deletes the given secret or returns error if the deletion was not complete. -func (secretcontroller *SecretController) delete(secret *apiv1.Secret) error { - glog.V(3).Infof("Handling deletion of secret: %v", *secret) - _, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(secret) +func (secretcontroller *SecretController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { + kind := secretcontroller.adapter.Kind() + glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName) + _, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err } - err = secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Delete(secret.Name, nil) + err = secretcontroller.adapter.FedDelete(namespacedName, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. // This is expected when we are processing an update as a result of secret finalizer deletion. // The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything. if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete secret: %v", err) + return fmt.Errorf("failed to delete %s: %v", kind, err) } } return nil diff --git a/federation/pkg/typeadapters/BUILD b/federation/pkg/typeadapters/BUILD index e2acb8be78b..05f4234a95d 100644 --- a/federation/pkg/typeadapters/BUILD +++ b/federation/pkg/typeadapters/BUILD @@ -22,6 +22,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/watch", ], ) diff --git a/federation/pkg/typeadapters/adapter.go b/federation/pkg/typeadapters/adapter.go index 62bc2d86825..b5799666a6c 100644 --- a/federation/pkg/typeadapters/adapter.go +++ b/federation/pkg/typeadapters/adapter.go @@ -20,8 +20,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) // FederatedTypeAdapter defines operations for interacting with a @@ -31,18 +32,28 @@ type FederatedTypeAdapter interface { SetClient(client federationclientset.Interface) Kind() string + ObjectType() pkgruntime.Object + IsExpectedType(obj interface{}) bool + Copy(obj pkgruntime.Object) pkgruntime.Object Equivalent(obj1, obj2 pkgruntime.Object) bool - ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta NamespacedName(obj pkgruntime.Object) types.NamespacedName + ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta // Fed* operations target the federation control plane FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) - FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) - FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error + FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) + FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) + FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) + FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) // The following operations are intended to target a cluster that is a member of a federation - ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) + ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) + ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error + ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) + ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) + ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) + ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) NewTestObject(namespace string) pkgruntime.Object } diff --git a/federation/pkg/typeadapters/secret.go b/federation/pkg/typeadapters/secret.go index c7e2018f0ff..aae439c611e 100644 --- a/federation/pkg/typeadapters/secret.go +++ b/federation/pkg/typeadapters/secret.go @@ -20,10 +20,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federation-controller/util" apiv1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) type SecretAdapter struct { @@ -42,43 +43,91 @@ func (a *SecretAdapter) Kind() string { return "secret" } +func (a *SecretAdapter) ObjectType() pkgruntime.Object { + return &apiv1.Secret{} +} + +func (a *SecretAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*apiv1.Secret) + return ok +} + +func (a *SecretAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + secret := obj.(*apiv1.Secret) + return &apiv1.Secret{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(secret.ObjectMeta), + Data: secret.Data, + Type: secret.Type, + } +} + func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { secret1 := obj1.(*apiv1.Secret) secret2 := obj2.(*apiv1.Secret) return util.SecretEquivalent(*secret1, *secret2) } -func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { - return &obj.(*apiv1.Secret).ObjectMeta -} - func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { secret := obj.(*apiv1.Secret) return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name} } +func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*apiv1.Secret).ObjectMeta +} + func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { secret := obj.(*apiv1.Secret) return a.client.CoreV1().Secrets(secret.Namespace).Create(secret) } +func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) } +func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.CoreV1().Secrets(namespace).List(options) +} + func (a *SecretAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { secret := obj.(*apiv1.Secret) return a.client.CoreV1().Secrets(secret.Namespace).Update(secret) } -func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *SecretAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.CoreV1().Secrets(namespace).Watch(options) } -func (a *SecretAdapter) ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { +func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + secret := obj.(*apiv1.Secret) + return client.CoreV1().Secrets(secret.Namespace).Create(secret) +} + +func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) } +func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.CoreV1().Secrets(namespace).List(options) +} + +func (a *SecretAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + secret := obj.(*apiv1.Secret) + return client.CoreV1().Secrets(secret.Namespace).Update(secret) +} + +func (a *SecretAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Secrets(namespace).Watch(options) +} + func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object { return &apiv1.Secret{ ObjectMeta: metav1.ObjectMeta{ From d08e9d96ef5f6359453c1ce3d5459a81e73a968e Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Wed, 22 Mar 2017 11:52:50 -0700 Subject: [PATCH 2/2] fed: Create generic synchronizer from refactored secret controller --- .../secret/secret_controller.go | 315 +++++++++--------- .../secret/secret_controller_test.go | 6 +- 2 files changed, 163 insertions(+), 158 deletions(-) diff --git a/federation/pkg/federation-controller/secret/secret_controller.go b/federation/pkg/federation-controller/secret/secret_controller.go index 2119995a001..128bcb3649f 100644 --- a/federation/pkg/federation-controller/secret/secret_controller.go +++ b/federation/pkg/federation-controller/secret/secret_controller.go @@ -55,34 +55,37 @@ var ( RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("secrets")} ) -type SecretController struct { - // For triggering single secret reconciliation. This is used when there is an - // add/update/delete operation on a secret in either federated API server or - // in some member of the federation. - secretDeliverer *util.DelayingDeliverer +// FederationSyncController synchronizes the state of a federated type +// to clusters that are members of the federation. +type FederationSyncController struct { + // For triggering reconciliation of a single resource. This is + // used when there is an add/update/delete operation on a resource + // in either federated API server or in some member of the + // federation. + deliverer *util.DelayingDeliverer - // For triggering all secrets reconciliation. This is used when - // a new cluster becomes available. + // For triggering reconciliation of all target resources. This is + // used when a new cluster becomes available. clusterDeliverer *util.DelayingDeliverer - // Contains secrets present in members of federation. - secretFederatedInformer util.FederatedInformer + // Contains resources present in members of federation. + informer util.FederatedInformer // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of secrets that should be federated. - secretInformerStore cache.Store - // Informer controller for secrets that should be federated. - secretInformerController cache.Controller + updater util.FederatedUpdater + // Definitions of resources that should be federated. + store cache.Store + // Informer controller for resources that should be federated. + controller cache.Controller - // Backoff manager for secrets - secretBackoff *flowcontrol.Backoff + // Backoff manager + backoff *flowcontrol.Backoff // For events eventRecorder record.EventRecorder deletionHelper *deletionhelper.DeletionHelper - secretReviewDelay time.Duration + reviewDelay time.Duration clusterAvailableDelay time.Duration smallDelay time.Duration updateTimeout time.Duration @@ -92,40 +95,49 @@ type SecretController struct { // StartSecretController starts a new secret controller func StartSecretController(config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { - restclient.AddUserAgent(config, "secret-controller") - client := federationclientset.NewForConfigOrDie(config) - controller := newSecretController(client) - if minimizeLatency { - controller.minimizeLatency() - } - glog.Infof("Starting Secret controller") - controller.Run(stopChan) + startFederationSyncController(&typeadapters.SecretAdapter{}, config, stopChan, minimizeLatency) } // newSecretController returns a new secret controller -func newSecretController(client federationclientset.Interface) *SecretController { - adapter := typeadapters.NewSecretAdapter(client) +func newSecretController(client federationclientset.Interface) *FederationSyncController { + return newFederationSyncController(client, typeadapters.NewSecretAdapter(client)) +} +// startFederationSyncController starts a new sync controller for the given type adapter +func startFederationSyncController(adapter typeadapters.FederatedTypeAdapter, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { + restclient.AddUserAgent(config, fmt.Sprintf("%s-controller", adapter.Kind())) + client := federationclientset.NewForConfigOrDie(config) + adapter.SetClient(client) + controller := newFederationSyncController(client, adapter) + if minimizeLatency { + controller.minimizeLatency() + } + glog.Infof(fmt.Sprintf("Starting federated sync controller for %s resources", adapter.Kind())) + controller.Run(stopChan) +} + +// newFederationSyncController returns a new sync controller for the given client and type adapter +func newFederationSyncController(client federationclientset.Interface, adapter typeadapters.FederatedTypeAdapter) *FederationSyncController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federated-%v-controller", adapter.Kind())}) - secretcontroller := &SecretController{ - secretReviewDelay: time.Second * 10, + s := &FederationSyncController{ + reviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, - secretBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), eventRecorder: recorder, adapter: adapter, } // Build delivereres for triggering reconciliations. - secretcontroller.secretDeliverer = util.NewDelayingDeliverer() - secretcontroller.clusterDeliverer = util.NewDelayingDeliverer() + s.deliverer = util.NewDelayingDeliverer() + s.clusterDeliverer = util.NewDelayingDeliverer() - // Start informer in federated API servers on secrets that should be federated. - secretcontroller.secretInformerStore, secretcontroller.secretInformerController = cache.NewInformer( + // Start informer in federated API servers on the resource type that should be federated. + s.store, s.controller = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { return adapter.FedList(metav1.NamespaceAll, options) @@ -136,10 +148,10 @@ func newSecretController(client federationclientset.Interface) *SecretController }, adapter.ObjectType(), controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { secretcontroller.deliverSecretObj(obj, 0, false) })) + util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { s.deliverObj(obj, 0, false) })) - // Federated informer on secrets in members of federation. - secretcontroller.secretFederatedInformer = util.NewFederatedInformer( + // Federated informer on the resource type in members of federation. + s.informer = util.NewFederatedInformer( client, func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( @@ -154,24 +166,24 @@ func newSecretController(client federationclientset.Interface) *SecretController adapter.ObjectType(), controller.NoResyncPeriodFunc(), // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some secret operation succeeded. + // would be just confirmation that some operation on the target resource type had succeeded. util.NewTriggerOnAllChanges( func(obj pkgruntime.Object) { - secretcontroller.deliverSecretObj(obj, secretcontroller.secretReviewDelay, false) + s.deliverObj(obj, s.reviewDelay, false) }, )) }, &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the secrets again. - secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) + // When new cluster becomes available process all the target resources again. + s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) }, }, ) // Federated updeater along with Create/Update/Delete operations. - secretcontroller.federatedUpdater = util.NewFederatedUpdater(secretcontroller.secretFederatedInformer, + s.updater = util.NewFederatedUpdater(s.informer, func(client kubeclientset.Interface, obj pkgruntime.Object) error { _, err := adapter.ClusterCreate(client, obj) return err @@ -187,34 +199,34 @@ func newSecretController(client federationclientset.Interface) *SecretController return err }) - secretcontroller.deletionHelper = deletionhelper.NewDeletionHelper( - secretcontroller.hasFinalizerFunc, - secretcontroller.removeFinalizerFunc, - secretcontroller.addFinalizerFunc, + s.deletionHelper = deletionhelper.NewDeletionHelper( + s.hasFinalizerFunc, + s.removeFinalizerFunc, + s.addFinalizerFunc, // objNameFunc func(obj pkgruntime.Object) string { return adapter.ObjectMeta(obj).Name }, - secretcontroller.updateTimeout, - secretcontroller.eventRecorder, - secretcontroller.secretFederatedInformer, - secretcontroller.federatedUpdater, + s.updateTimeout, + s.eventRecorder, + s.informer, + s.updater, ) - return secretcontroller + return s } // minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing). -func (secretcontroller *SecretController) minimizeLatency() { - secretcontroller.clusterAvailableDelay = time.Second - secretcontroller.secretReviewDelay = 50 * time.Millisecond - secretcontroller.smallDelay = 20 * time.Millisecond - secretcontroller.updateTimeout = 5 * time.Second +func (s *FederationSyncController) minimizeLatency() { + s.clusterAvailableDelay = time.Second + s.reviewDelay = 50 * time.Millisecond + s.smallDelay = 20 * time.Millisecond + s.updateTimeout = 5 * time.Second } // Returns true if the given object has the given finalizer in its ObjectMeta. -func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { - meta := secretcontroller.adapter.ObjectMeta(obj) +func (s *FederationSyncController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { + meta := s.adapter.ObjectMeta(obj) for i := range meta.Finalizers { if string(meta.Finalizers[i]) == finalizer { return true @@ -223,11 +235,9 @@ func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object return false } -// Removes the finalizers from the given objects ObjectMeta. -// Assumes that the given object is a secret. -func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - adapter := secretcontroller.adapter - meta := adapter.ObjectMeta(obj) +// Removes the finalizer from the given objects ObjectMeta. +func (s *FederationSyncController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { + meta := s.adapter.ObjectMeta(obj) newFinalizers := []string{} hasFinalizer := false for i := range meta.Finalizers { @@ -242,189 +252,184 @@ func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Obj return obj, nil } meta.Finalizers = newFinalizers - secret, err := secretcontroller.adapter.FedUpdate(obj) + secret, err := s.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to remove finalizers %v from %s %s: %v", finalizers, adapter.Kind(), meta.Name, err) + return nil, fmt.Errorf("failed to remove finalizers %v from %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err) } return secret, nil } // Adds the given finalizers to the given objects ObjectMeta. -// Assumes that the given object is a secret. -func (secretcontroller *SecretController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - adapter := secretcontroller.adapter - meta := adapter.ObjectMeta(obj) +func (s *FederationSyncController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { + meta := s.adapter.ObjectMeta(obj) meta.Finalizers = append(meta.Finalizers, finalizers...) - secret, err := secretcontroller.adapter.FedUpdate(obj) + secret, err := s.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to add finalizers %v to %s %s: %v", finalizers, adapter.Kind(), meta.Name, err) + return nil, fmt.Errorf("failed to add finalizers %v to %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err) } return secret, nil } -func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) { - go secretcontroller.secretInformerController.Run(stopChan) - secretcontroller.secretFederatedInformer.Start() +func (s *FederationSyncController) Run(stopChan <-chan struct{}) { + go s.controller.Run(stopChan) + s.informer.Start() go func() { <-stopChan - secretcontroller.secretFederatedInformer.Stop() + s.informer.Stop() }() - secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - secret := item.Value.(*types.NamespacedName) - secretcontroller.reconcileSecret(*secret) + s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + namespacedName := item.Value.(*types.NamespacedName) + s.reconcile(*namespacedName) }) - secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - secretcontroller.reconcileSecretsOnClusterChange() + s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + s.reconcileOnClusterChange() }) - util.StartBackoffGC(secretcontroller.secretBackoff, stopChan) + util.StartBackoffGC(s.backoff, stopChan) } -func (secretcontroller *SecretController) deliverSecretObj(obj pkgruntime.Object, delay time.Duration, failed bool) { - namespacedName := secretcontroller.adapter.NamespacedName(obj) - secretcontroller.deliverSecret(namespacedName, delay, failed) +func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) { + namespacedName := s.adapter.NamespacedName(obj) + s.deliver(namespacedName, delay, failed) } // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (secretcontroller *SecretController) deliverSecret(secret types.NamespacedName, delay time.Duration, failed bool) { - key := secret.String() +func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) { + key := namespacedName.String() if failed { - secretcontroller.secretBackoff.Next(key, time.Now()) - delay = delay + secretcontroller.secretBackoff.Get(key) + s.backoff.Next(key, time.Now()) + delay = delay + s.backoff.Get(key) } else { - secretcontroller.secretBackoff.Reset(key) + s.backoff.Reset(key) } - secretcontroller.secretDeliverer.DeliverAfter(key, &secret, delay) + s.deliverer.DeliverAfter(key, &namespacedName, delay) } // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // synced with the corresponding api server. -func (secretcontroller *SecretController) isSynced() bool { - if !secretcontroller.secretFederatedInformer.ClustersSynced() { +func (s *FederationSyncController) isSynced() bool { + if !s.informer.ClustersSynced() { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + clusters, err := s.informer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get ready clusters: %v", err) return false } - if !secretcontroller.secretFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + if !s.informer.GetTargetStore().ClustersSynced(clusters) { return false } return true } -// The function triggers reconciliation of all federated secrets. -func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() { - if !secretcontroller.isSynced() { - secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) +// The function triggers reconciliation of all target federated resources. +func (s *FederationSyncController) reconcileOnClusterChange() { + if !s.isSynced() { + s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) } - for _, obj := range secretcontroller.secretInformerStore.List() { - namespacedName := secretcontroller.adapter.NamespacedName(obj.(pkgruntime.Object)) - secretcontroller.deliverSecret(namespacedName, secretcontroller.smallDelay, false) + for _, obj := range s.store.List() { + namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object)) + s.deliver(namespacedName, s.smallDelay, false) } } -func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) { - adapter := secretcontroller.adapter - kind := adapter.Kind() - - if !secretcontroller.isSynced() { - secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) +func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) { + if !s.isSynced() { + s.deliver(namespacedName, s.clusterAvailableDelay, false) return } - key := secret.String() - baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key) + key := namespacedName.String() + kind := s.adapter.Kind() + cachedObj, exist, err := s.store.GetByKey(key) if err != nil { glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) return } if !exist { - // Not federated secret, ignoring. + // Not federated, ignoring. return } - // Create a copy before modifying the obj to prevent race condition with - // other readers of obj from store. - baseSecretObj, err := api.Scheme.DeepCopy(baseSecretObjFromStore) + // Create a copy before modifying the resource to prevent racing + // with other readers. + copiedObj, err := api.Scheme.DeepCopy(cachedObj) if err != nil { - glog.Errorf("Error in retrieving obj from store: %v", err) - secretcontroller.deliverSecret(secret, 0, true) + glog.Errorf("Error in retrieving %s from store: %v", kind, err) + s.deliver(namespacedName, 0, true) return } - if !adapter.IsExpectedType(baseSecretObj) { - glog.Errorf("Object is not the expected type: %v", baseSecretObj) - secretcontroller.deliverSecret(secret, 0, true) + if !s.adapter.IsExpectedType(copiedObj) { + glog.Errorf("Object is not the expected type: %v", copiedObj) + s.deliver(namespacedName, 0, true) return } - - baseSecret := baseSecretObj.(pkgruntime.Object) - meta := adapter.ObjectMeta(baseSecret) + obj := copiedObj.(pkgruntime.Object) + meta := s.adapter.ObjectMeta(obj) if meta.DeletionTimestamp != nil { - if err := secretcontroller.delete(baseSecret, secret); err != nil { - glog.Errorf("Failed to delete %s %s: %v", kind, secret, err) - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed", + if err := s.delete(obj, namespacedName); err != nil { + glog.Errorf("Failed to delete %s %s: %v", kind, namespacedName, err) + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteFailed", "%s delete failed: %v", strings.ToTitle(kind), err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) } return } glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", - kind, secret) - // Add the required finalizers before creating a secret in underlying clusters. - baseSecret, err = secretcontroller.deletionHelper.EnsureFinalizers(baseSecret) + kind, namespacedName) + // Add the required finalizers before creating the resource in underlying clusters. + obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v", - kind, secret, err) - secretcontroller.deliverSecret(secret, 0, false) + kind, namespacedName, err) + s.deliver(namespacedName, 0, false) return } - glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, secret) + glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) - clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + clusters, err := s.informer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) + s.deliver(namespacedName, s.clusterAvailableDelay, false) return } operations := make([]util.FederatedOperation, 0) for _, cluster := range clusters { - clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) return } // The data should not be modified. - desiredSecret := adapter.Copy(baseSecret) + desiredObj := s.adapter.Copy(obj) if !found { - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "CreateInCluster", + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster", "Creating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, - Obj: desiredSecret, + Obj: desiredObj, ClusterName: cluster.Name, }) } else { - clusterSecret := clusterSecretObj.(pkgruntime.Object) + clusterObj := clusterObj.(pkgruntime.Object) - // Update existing secret, if needed. - if !adapter.Equivalent(desiredSecret, clusterSecret) { - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInCluster", + // Update existing resource, if needed. + if !s.adapter.Equivalent(desiredObj, clusterObj) { + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster", "Updating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, - Obj: desiredSecret, + Obj: desiredObj, ClusterName: cluster.Name, }) } @@ -435,36 +440,36 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace // Everything is in order return } - err = secretcontroller.federatedUpdater.UpdateWithOnError(operations, secretcontroller.updateTimeout, + err = s.updater.UpdateWithOnError(operations, s.updateTimeout, func(op util.FederatedOperation, operror error) { - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInClusterFailed", + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInClusterFailed", "%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror) }) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) return } // Evertyhing is in order but lets be double sure - secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false) + s.deliver(namespacedName, s.reviewDelay, false) } -// delete deletes the given secret or returns error if the deletion was not complete. -func (secretcontroller *SecretController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { - kind := secretcontroller.adapter.Kind() +// delete deletes the given resource or returns error if the deletion was not complete. +func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { + kind := s.adapter.Kind() glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName) - _, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(obj) + _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err } - err = secretcontroller.adapter.FedDelete(namespacedName, nil) + err = s.adapter.FedDelete(namespacedName, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of secret finalizer deletion. - // The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything. + // This is expected when we are processing an update as a result of finalizer deletion. + // The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything. if !errors.IsNotFound(err) { return fmt.Errorf("failed to delete %s: %v", kind, err) } diff --git a/federation/pkg/federation-controller/secret/secret_controller_test.go b/federation/pkg/federation-controller/secret/secret_controller_test.go index b7c68f87ce6..3bfeb4848c4 100644 --- a/federation/pkg/federation-controller/secret/secret_controller_test.go +++ b/federation/pkg/federation-controller/secret/secret_controller_test.go @@ -77,7 +77,7 @@ func TestSecretController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster") } } - setClientFactory(secretController.secretFederatedInformer, informerClientFactory) + setClientFactory(secretController.informer, informerClientFactory) secretController.minimizeLatency() @@ -115,7 +115,7 @@ func TestSecretController(t *testing.T) { // Wait for the secret to appear in the informer store err := WaitForStoreUpdate( - secretController.secretFederatedInformer.GetTargetStore(), + secretController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), wait.ForeverTestTimeout) assert.Nil(t, err, "secret should have appeared in the informer store") @@ -146,7 +146,7 @@ func TestSecretController(t *testing.T) { // Wait for the secret to be updated in the informer store. err = WaitForSecretStoreUpdate( - secretController.secretFederatedInformer.GetTargetStore(), + secretController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), &secret1, wait.ForeverTestTimeout) assert.NoError(t, err, "secret should have been updated in the informer store")