diff --git a/federation/pkg/federation-controller/secret/BUILD b/federation/pkg/federation-controller/secret/BUILD index 3ad927bed97..a2ce2020f27 100644 --- a/federation/pkg/federation-controller/secret/BUILD +++ b/federation/pkg/federation-controller/secret/BUILD @@ -18,6 +18,7 @@ go_library( "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/eventsink:go_default_library", + "//federation/pkg/typeadapters:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", diff --git a/federation/pkg/federation-controller/secret/secret_controller.go b/federation/pkg/federation-controller/secret/secret_controller.go index c054e7c6157..128bcb3649f 100644 --- a/federation/pkg/federation-controller/secret/secret_controller.go +++ b/federation/pkg/federation-controller/secret/secret_controller.go @@ -18,6 +18,7 @@ package secret import ( "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +37,7 @@ import ( "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" + "k8s.io/kubernetes/federation/pkg/typeadapters" "k8s.io/kubernetes/pkg/api" apiv1 "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -53,185 +55,194 @@ var ( RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("secrets")} ) -type SecretController struct { - // For triggering single secret reconciliation. This is used when there is an - // add/update/delete operation on a secret in either federated API server or - // in some member of the federation. - secretDeliverer *util.DelayingDeliverer +// FederationSyncController synchronizes the state of a federated type +// to clusters that are members of the federation. +type FederationSyncController struct { + // For triggering reconciliation of a single resource. This is + // used when there is an add/update/delete operation on a resource + // in either federated API server or in some member of the + // federation. + deliverer *util.DelayingDeliverer - // For triggering all secrets reconciliation. This is used when - // a new cluster becomes available. + // For triggering reconciliation of all target resources. This is + // used when a new cluster becomes available. clusterDeliverer *util.DelayingDeliverer - // Contains secrets present in members of federation. - secretFederatedInformer util.FederatedInformer + // Contains resources present in members of federation. + informer util.FederatedInformer // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of secrets that should be federated. - secretInformerStore cache.Store - // Informer controller for secrets that should be federated. - secretInformerController cache.Controller + updater util.FederatedUpdater + // Definitions of resources that should be federated. + store cache.Store + // Informer controller for resources that should be federated. + controller cache.Controller - // Client to federated api server. - federatedApiClient federationclientset.Interface - - // Backoff manager for secrets - secretBackoff *flowcontrol.Backoff + // Backoff manager + backoff *flowcontrol.Backoff // For events eventRecorder record.EventRecorder deletionHelper *deletionhelper.DeletionHelper - secretReviewDelay time.Duration + reviewDelay time.Duration clusterAvailableDelay time.Duration smallDelay time.Duration updateTimeout time.Duration + + adapter typeadapters.FederatedTypeAdapter } // StartSecretController starts a new secret controller func StartSecretController(config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { - restclient.AddUserAgent(config, "secret-controller") - client := federationclientset.NewForConfigOrDie(config) - controller := newSecretController(client) - if minimizeLatency { - controller.minimizeLatency() - } - glog.Infof("Starting Secret controller") - controller.Run(stopChan) + startFederationSyncController(&typeadapters.SecretAdapter{}, config, stopChan, minimizeLatency) } // newSecretController returns a new secret controller -func newSecretController(client federationclientset.Interface) *SecretController { +func newSecretController(client federationclientset.Interface) *FederationSyncController { + return newFederationSyncController(client, typeadapters.NewSecretAdapter(client)) +} + +// startFederationSyncController starts a new sync controller for the given type adapter +func startFederationSyncController(adapter typeadapters.FederatedTypeAdapter, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { + restclient.AddUserAgent(config, fmt.Sprintf("%s-controller", adapter.Kind())) + client := federationclientset.NewForConfigOrDie(config) + adapter.SetClient(client) + controller := newFederationSyncController(client, adapter) + if minimizeLatency { + controller.minimizeLatency() + } + glog.Infof(fmt.Sprintf("Starting federated sync controller for %s resources", adapter.Kind())) + controller.Run(stopChan) +} + +// newFederationSyncController returns a new sync controller for the given client and type adapter +func newFederationSyncController(client federationclientset.Interface, adapter typeadapters.FederatedTypeAdapter) *FederationSyncController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-secrets-controller"}) + recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federated-%v-controller", adapter.Kind())}) - secretcontroller := &SecretController{ - federatedApiClient: client, - secretReviewDelay: time.Second * 10, + s := &FederationSyncController{ + reviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, - secretBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), eventRecorder: recorder, + adapter: adapter, } // Build delivereres for triggering reconciliations. - secretcontroller.secretDeliverer = util.NewDelayingDeliverer() - secretcontroller.clusterDeliverer = util.NewDelayingDeliverer() + s.deliverer = util.NewDelayingDeliverer() + s.clusterDeliverer = util.NewDelayingDeliverer() - // Start informer in federated API servers on secrets that should be federated. - secretcontroller.secretInformerStore, secretcontroller.secretInformerController = cache.NewInformer( + // Start informer in federated API servers on the resource type that should be federated. + s.store, s.controller = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return client.Core().Secrets(metav1.NamespaceAll).List(options) + return adapter.FedList(metav1.NamespaceAll, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().Secrets(metav1.NamespaceAll).Watch(options) + return adapter.FedWatch(metav1.NamespaceAll, options) }, }, - &apiv1.Secret{}, + adapter.ObjectType(), controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { secretcontroller.deliverSecretObj(obj, 0, false) })) + util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { s.deliverObj(obj, 0, false) })) - // Federated informer on secrets in members of federation. - secretcontroller.secretFederatedInformer = util.NewFederatedInformer( + // Federated informer on the resource type in members of federation. + s.informer = util.NewFederatedInformer( client, func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return targetClient.Core().Secrets(metav1.NamespaceAll).List(options) + return adapter.ClusterList(targetClient, metav1.NamespaceAll, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Core().Secrets(metav1.NamespaceAll).Watch(options) + return adapter.ClusterWatch(targetClient, metav1.NamespaceAll, options) }, }, - &apiv1.Secret{}, + adapter.ObjectType(), controller.NoResyncPeriodFunc(), // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some secret operation succeeded. + // would be just confirmation that some operation on the target resource type had succeeded. util.NewTriggerOnAllChanges( func(obj pkgruntime.Object) { - secretcontroller.deliverSecretObj(obj, secretcontroller.secretReviewDelay, false) + s.deliverObj(obj, s.reviewDelay, false) }, )) }, &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the secrets again. - secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) + // When new cluster becomes available process all the target resources again. + s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) }, }, ) // Federated updeater along with Create/Update/Delete operations. - secretcontroller.federatedUpdater = util.NewFederatedUpdater(secretcontroller.secretFederatedInformer, + s.updater = util.NewFederatedUpdater(s.informer, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) - _, err := client.Core().Secrets(secret.Namespace).Create(secret) + _, err := adapter.ClusterCreate(client, obj) return err }, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) - _, err := client.Core().Secrets(secret.Namespace).Update(secret) + _, err := adapter.ClusterUpdate(client, obj) return err }, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - secret := obj.(*apiv1.Secret) + namespacedName := adapter.NamespacedName(obj) orphanDependents := false - err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) + err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) return err }) - secretcontroller.deletionHelper = deletionhelper.NewDeletionHelper( - secretcontroller.hasFinalizerFunc, - secretcontroller.removeFinalizerFunc, - secretcontroller.addFinalizerFunc, + s.deletionHelper = deletionhelper.NewDeletionHelper( + s.hasFinalizerFunc, + s.removeFinalizerFunc, + s.addFinalizerFunc, // objNameFunc func(obj pkgruntime.Object) string { - secret := obj.(*apiv1.Secret) - return secret.Name + return adapter.ObjectMeta(obj).Name }, - secretcontroller.updateTimeout, - secretcontroller.eventRecorder, - secretcontroller.secretFederatedInformer, - secretcontroller.federatedUpdater, + s.updateTimeout, + s.eventRecorder, + s.informer, + s.updater, ) - return secretcontroller + return s } // minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing). -func (secretcontroller *SecretController) minimizeLatency() { - secretcontroller.clusterAvailableDelay = time.Second - secretcontroller.secretReviewDelay = 50 * time.Millisecond - secretcontroller.smallDelay = 20 * time.Millisecond - secretcontroller.updateTimeout = 5 * time.Second +func (s *FederationSyncController) minimizeLatency() { + s.clusterAvailableDelay = time.Second + s.reviewDelay = 50 * time.Millisecond + s.smallDelay = 20 * time.Millisecond + s.updateTimeout = 5 * time.Second } // Returns true if the given object has the given finalizer in its ObjectMeta. -func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { - secret := obj.(*apiv1.Secret) - for i := range secret.ObjectMeta.Finalizers { - if string(secret.ObjectMeta.Finalizers[i]) == finalizer { +func (s *FederationSyncController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { + meta := s.adapter.ObjectMeta(obj) + for i := range meta.Finalizers { + if string(meta.Finalizers[i]) == finalizer { return true } } return false } -// Removes the finalizers from the given objects ObjectMeta. -// Assumes that the given object is a secret. -func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - secret := obj.(*apiv1.Secret) +// Removes the finalizer from the given objects ObjectMeta. +func (s *FederationSyncController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { + meta := s.adapter.ObjectMeta(obj) newFinalizers := []string{} hasFinalizer := false - for i := range secret.ObjectMeta.Finalizers { - if !deletionhelper.ContainsString(finalizers, secret.ObjectMeta.Finalizers[i]) { - newFinalizers = append(newFinalizers, secret.ObjectMeta.Finalizers[i]) + for i := range meta.Finalizers { + if !deletionhelper.ContainsString(finalizers, meta.Finalizers[i]) { + newFinalizers = append(newFinalizers, meta.Finalizers[i]) } else { hasFinalizer = true } @@ -240,184 +251,185 @@ func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Obj // Nothing to do. return obj, nil } - secret.ObjectMeta.Finalizers = newFinalizers - secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret) + meta.Finalizers = newFinalizers + secret, err := s.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to remove finalizers %v from secret %s: %v", finalizers, secret.Name, err) + return nil, fmt.Errorf("failed to remove finalizers %v from %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err) } return secret, nil } // Adds the given finalizers to the given objects ObjectMeta. -// Assumes that the given object is a secret. -func (secretcontroller *SecretController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { - secret := obj.(*apiv1.Secret) - secret.ObjectMeta.Finalizers = append(secret.ObjectMeta.Finalizers, finalizers...) - secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret) +func (s *FederationSyncController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) { + meta := s.adapter.ObjectMeta(obj) + meta.Finalizers = append(meta.Finalizers, finalizers...) + secret, err := s.adapter.FedUpdate(obj) if err != nil { - return nil, fmt.Errorf("failed to add finalizers %v to secret %s: %v", finalizers, secret.Name, err) + return nil, fmt.Errorf("failed to add finalizers %v to %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err) } return secret, nil } -func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) { - go secretcontroller.secretInformerController.Run(stopChan) - secretcontroller.secretFederatedInformer.Start() +func (s *FederationSyncController) Run(stopChan <-chan struct{}) { + go s.controller.Run(stopChan) + s.informer.Start() go func() { <-stopChan - secretcontroller.secretFederatedInformer.Stop() + s.informer.Stop() }() - secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - secret := item.Value.(*types.NamespacedName) - secretcontroller.reconcileSecret(*secret) + s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + namespacedName := item.Value.(*types.NamespacedName) + s.reconcile(*namespacedName) }) - secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - secretcontroller.reconcileSecretsOnClusterChange() + s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + s.reconcileOnClusterChange() }) - util.StartBackoffGC(secretcontroller.secretBackoff, stopChan) + util.StartBackoffGC(s.backoff, stopChan) } -func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) { - secret := obj.(*apiv1.Secret) - secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, delay, failed) +func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) { + namespacedName := s.adapter.NamespacedName(obj) + s.deliver(namespacedName, delay, failed) } // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (secretcontroller *SecretController) deliverSecret(secret types.NamespacedName, delay time.Duration, failed bool) { - key := secret.String() +func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) { + key := namespacedName.String() if failed { - secretcontroller.secretBackoff.Next(key, time.Now()) - delay = delay + secretcontroller.secretBackoff.Get(key) + s.backoff.Next(key, time.Now()) + delay = delay + s.backoff.Get(key) } else { - secretcontroller.secretBackoff.Reset(key) + s.backoff.Reset(key) } - secretcontroller.secretDeliverer.DeliverAfter(key, &secret, delay) + s.deliverer.DeliverAfter(key, &namespacedName, delay) } // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // synced with the corresponding api server. -func (secretcontroller *SecretController) isSynced() bool { - if !secretcontroller.secretFederatedInformer.ClustersSynced() { +func (s *FederationSyncController) isSynced() bool { + if !s.informer.ClustersSynced() { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + clusters, err := s.informer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get ready clusters: %v", err) return false } - if !secretcontroller.secretFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + if !s.informer.GetTargetStore().ClustersSynced(clusters) { return false } return true } -// The function triggers reconciliation of all federated secrets. -func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() { - if !secretcontroller.isSynced() { - secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) +// The function triggers reconciliation of all target federated resources. +func (s *FederationSyncController) reconcileOnClusterChange() { + if !s.isSynced() { + s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) } - for _, obj := range secretcontroller.secretInformerStore.List() { - secret := obj.(*apiv1.Secret) - secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, secretcontroller.smallDelay, false) + for _, obj := range s.store.List() { + namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object)) + s.deliver(namespacedName, s.smallDelay, false) } } -func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) { - if !secretcontroller.isSynced() { - secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) +func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) { + if !s.isSynced() { + s.deliver(namespacedName, s.clusterAvailableDelay, false) return } - key := secret.String() - baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key) + key := namespacedName.String() + kind := s.adapter.Kind() + cachedObj, exist, err := s.store.GetByKey(key) if err != nil { - glog.Errorf("Failed to query main secret store for %v: %v", key, err) - secretcontroller.deliverSecret(secret, 0, true) + glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err) + s.deliver(namespacedName, 0, true) return } if !exist { - // Not federated secret, ignoring. + // Not federated, ignoring. return } - // Create a copy before modifying the obj to prevent race condition with - // other readers of obj from store. - baseSecretObj, err := api.Scheme.DeepCopy(baseSecretObjFromStore) - baseSecret, ok := baseSecretObj.(*apiv1.Secret) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - secretcontroller.deliverSecret(secret, 0, true) + // Create a copy before modifying the resource to prevent racing + // with other readers. + copiedObj, err := api.Scheme.DeepCopy(cachedObj) + if err != nil { + glog.Errorf("Error in retrieving %s from store: %v", kind, err) + s.deliver(namespacedName, 0, true) return } - if baseSecret.DeletionTimestamp != nil { - if err := secretcontroller.delete(baseSecret); err != nil { - glog.Errorf("Failed to delete %s: %v", secret, err) - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed", - "Secret delete failed: %v", err) - secretcontroller.deliverSecret(secret, 0, true) + if !s.adapter.IsExpectedType(copiedObj) { + glog.Errorf("Object is not the expected type: %v", copiedObj) + s.deliver(namespacedName, 0, true) + return + } + obj := copiedObj.(pkgruntime.Object) + meta := s.adapter.ObjectMeta(obj) + + if meta.DeletionTimestamp != nil { + if err := s.delete(obj, namespacedName); err != nil { + glog.Errorf("Failed to delete %s %s: %v", kind, namespacedName, err) + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteFailed", + "%s delete failed: %v", strings.ToTitle(kind), err) + s.deliver(namespacedName, 0, true) } return } - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for secret: %s", - baseSecret.Name) - // Add the required finalizers before creating a secret in underlying clusters. - updatedSecretObj, err := secretcontroller.deletionHelper.EnsureFinalizers(baseSecret) + glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", + kind, namespacedName) + // Add the required finalizers before creating the resource in underlying clusters. + obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in secret %s: %v", - baseSecret.Name, err) - secretcontroller.deliverSecret(secret, 0, false) + glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v", + kind, namespacedName, err) + s.deliver(namespacedName, 0, false) return } - baseSecret = updatedSecretObj.(*apiv1.Secret) - glog.V(3).Infof("Syncing secret %s in underlying clusters", baseSecret.Name) + glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) - clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + clusters, err := s.informer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) - secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) + s.deliver(namespacedName, s.clusterAvailableDelay, false) return } operations := make([]util.FederatedOperation, 0) for _, cluster := range clusters { - clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) return } // The data should not be modified. - desiredSecret := &apiv1.Secret{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(baseSecret.ObjectMeta), - Data: baseSecret.Data, - Type: baseSecret.Type, - } + desiredObj := s.adapter.Copy(obj) if !found { - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "CreateInCluster", - "Creating secret in cluster %s", cluster.Name) + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster", + "Creating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, - Obj: desiredSecret, + Obj: desiredObj, ClusterName: cluster.Name, }) } else { - clusterSecret := clusterSecretObj.(*apiv1.Secret) + clusterObj := clusterObj.(pkgruntime.Object) - // Update existing secret, if needed. - if !util.SecretEquivalent(*desiredSecret, *clusterSecret) { - - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInCluster", - "Updating secret in cluster %s", cluster.Name) + // Update existing resource, if needed. + if !s.adapter.Equivalent(desiredObj, clusterObj) { + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster", + "Updating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, - Obj: desiredSecret, + Obj: desiredObj, ClusterName: cluster.Name, }) } @@ -428,37 +440,38 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace // Everything is in order return } - err = secretcontroller.federatedUpdater.UpdateWithOnError(operations, secretcontroller.updateTimeout, + err = s.updater.UpdateWithOnError(operations, s.updateTimeout, func(op util.FederatedOperation, operror error) { - secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInClusterFailed", - "Secret update in cluster %s failed: %v", op.ClusterName, operror) + s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInClusterFailed", + "%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror) }) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) - secretcontroller.deliverSecret(secret, 0, true) + s.deliver(namespacedName, 0, true) return } // Evertyhing is in order but lets be double sure - secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false) + s.deliver(namespacedName, s.reviewDelay, false) } -// delete deletes the given secret or returns error if the deletion was not complete. -func (secretcontroller *SecretController) delete(secret *apiv1.Secret) error { - glog.V(3).Infof("Handling deletion of secret: %v", *secret) - _, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(secret) +// delete deletes the given resource or returns error if the deletion was not complete. +func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { + kind := s.adapter.Kind() + glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName) + _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err } - err = secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Delete(secret.Name, nil) + err = s.adapter.FedDelete(namespacedName, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of secret finalizer deletion. - // The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything. + // This is expected when we are processing an update as a result of finalizer deletion. + // The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything. if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete secret: %v", err) + return fmt.Errorf("failed to delete %s: %v", kind, err) } } return nil diff --git a/federation/pkg/federation-controller/secret/secret_controller_test.go b/federation/pkg/federation-controller/secret/secret_controller_test.go index b7c68f87ce6..3bfeb4848c4 100644 --- a/federation/pkg/federation-controller/secret/secret_controller_test.go +++ b/federation/pkg/federation-controller/secret/secret_controller_test.go @@ -77,7 +77,7 @@ func TestSecretController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster") } } - setClientFactory(secretController.secretFederatedInformer, informerClientFactory) + setClientFactory(secretController.informer, informerClientFactory) secretController.minimizeLatency() @@ -115,7 +115,7 @@ func TestSecretController(t *testing.T) { // Wait for the secret to appear in the informer store err := WaitForStoreUpdate( - secretController.secretFederatedInformer.GetTargetStore(), + secretController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), wait.ForeverTestTimeout) assert.Nil(t, err, "secret should have appeared in the informer store") @@ -146,7 +146,7 @@ func TestSecretController(t *testing.T) { // Wait for the secret to be updated in the informer store. err = WaitForSecretStoreUpdate( - secretController.secretFederatedInformer.GetTargetStore(), + secretController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), &secret1, wait.ForeverTestTimeout) assert.NoError(t, err, "secret should have been updated in the informer store") diff --git a/federation/pkg/typeadapters/BUILD b/federation/pkg/typeadapters/BUILD index e2acb8be78b..05f4234a95d 100644 --- a/federation/pkg/typeadapters/BUILD +++ b/federation/pkg/typeadapters/BUILD @@ -22,6 +22,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/watch", ], ) diff --git a/federation/pkg/typeadapters/adapter.go b/federation/pkg/typeadapters/adapter.go index 62bc2d86825..b5799666a6c 100644 --- a/federation/pkg/typeadapters/adapter.go +++ b/federation/pkg/typeadapters/adapter.go @@ -20,8 +20,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) // FederatedTypeAdapter defines operations for interacting with a @@ -31,18 +32,28 @@ type FederatedTypeAdapter interface { SetClient(client federationclientset.Interface) Kind() string + ObjectType() pkgruntime.Object + IsExpectedType(obj interface{}) bool + Copy(obj pkgruntime.Object) pkgruntime.Object Equivalent(obj1, obj2 pkgruntime.Object) bool - ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta NamespacedName(obj pkgruntime.Object) types.NamespacedName + ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta // Fed* operations target the federation control plane FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) - FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) - FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error + FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) + FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) + FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) + FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) // The following operations are intended to target a cluster that is a member of a federation - ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) + ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) + ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error + ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) + ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) + ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) + ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) NewTestObject(namespace string) pkgruntime.Object } diff --git a/federation/pkg/typeadapters/secret.go b/federation/pkg/typeadapters/secret.go index c7e2018f0ff..aae439c611e 100644 --- a/federation/pkg/typeadapters/secret.go +++ b/federation/pkg/typeadapters/secret.go @@ -20,10 +20,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federation-controller/util" apiv1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) type SecretAdapter struct { @@ -42,43 +43,91 @@ func (a *SecretAdapter) Kind() string { return "secret" } +func (a *SecretAdapter) ObjectType() pkgruntime.Object { + return &apiv1.Secret{} +} + +func (a *SecretAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*apiv1.Secret) + return ok +} + +func (a *SecretAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + secret := obj.(*apiv1.Secret) + return &apiv1.Secret{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(secret.ObjectMeta), + Data: secret.Data, + Type: secret.Type, + } +} + func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { secret1 := obj1.(*apiv1.Secret) secret2 := obj2.(*apiv1.Secret) return util.SecretEquivalent(*secret1, *secret2) } -func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { - return &obj.(*apiv1.Secret).ObjectMeta -} - func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { secret := obj.(*apiv1.Secret) return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name} } +func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*apiv1.Secret).ObjectMeta +} + func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { secret := obj.(*apiv1.Secret) return a.client.CoreV1().Secrets(secret.Namespace).Create(secret) } +func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) } +func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.CoreV1().Secrets(namespace).List(options) +} + func (a *SecretAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { secret := obj.(*apiv1.Secret) return a.client.CoreV1().Secrets(secret.Namespace).Update(secret) } -func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *SecretAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.CoreV1().Secrets(namespace).Watch(options) } -func (a *SecretAdapter) ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { +func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + secret := obj.(*apiv1.Secret) + return client.CoreV1().Secrets(secret.Namespace).Create(secret) +} + +func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) } +func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.CoreV1().Secrets(namespace).List(options) +} + +func (a *SecretAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + secret := obj.(*apiv1.Secret) + return client.CoreV1().Secrets(secret.Namespace).Update(secret) +} + +func (a *SecretAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Secrets(namespace).Watch(options) +} + func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object { return &apiv1.Secret{ ObjectMeta: metav1.ObjectMeta{