Merge pull request #41050 from marun/fed-ctlr-refactor

Automatic merge from submit-queue

[Federation] refactor controller for secrets

First pass of a simple federation controller that can be configured via an adapter to support multiple types.  The controller is a strict refactor of the secrets controller.

Targets #40989 

cc: @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-04-07 01:38:34 -07:00 committed by GitHub
commit fce31358c5
6 changed files with 277 additions and 202 deletions

View File

@ -18,6 +18,7 @@ go_library(
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/typeadapters:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",

View File

@ -18,6 +18,7 @@ package secret
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@ -36,6 +37,7 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/typeadapters"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -53,185 +55,194 @@ var (
RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("secrets")}
)
type SecretController struct {
// For triggering single secret reconciliation. This is used when there is an
// add/update/delete operation on a secret in either federated API server or
// in some member of the federation.
secretDeliverer *util.DelayingDeliverer
// FederationSyncController synchronizes the state of a federated type
// to clusters that are members of the federation.
type FederationSyncController struct {
// For triggering reconciliation of a single resource. This is
// used when there is an add/update/delete operation on a resource
// in either federated API server or in some member of the
// federation.
deliverer *util.DelayingDeliverer
// For triggering all secrets reconciliation. This is used when
// a new cluster becomes available.
// For triggering reconciliation of all target resources. This is
// used when a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer
// Contains secrets present in members of federation.
secretFederatedInformer util.FederatedInformer
// Contains resources present in members of federation.
informer util.FederatedInformer
// For updating members of federation.
federatedUpdater util.FederatedUpdater
// Definitions of secrets that should be federated.
secretInformerStore cache.Store
// Informer controller for secrets that should be federated.
secretInformerController cache.Controller
updater util.FederatedUpdater
// Definitions of resources that should be federated.
store cache.Store
// Informer controller for resources that should be federated.
controller cache.Controller
// Client to federated api server.
federatedApiClient federationclientset.Interface
// Backoff manager for secrets
secretBackoff *flowcontrol.Backoff
// Backoff manager
backoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
secretReviewDelay time.Duration
reviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
adapter typeadapters.FederatedTypeAdapter
}
// StartSecretController starts a new secret controller
func StartSecretController(config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
restclient.AddUserAgent(config, "secret-controller")
client := federationclientset.NewForConfigOrDie(config)
controller := newSecretController(client)
if minimizeLatency {
controller.minimizeLatency()
}
glog.Infof("Starting Secret controller")
controller.Run(stopChan)
startFederationSyncController(&typeadapters.SecretAdapter{}, config, stopChan, minimizeLatency)
}
// newSecretController returns a new secret controller
func newSecretController(client federationclientset.Interface) *SecretController {
func newSecretController(client federationclientset.Interface) *FederationSyncController {
return newFederationSyncController(client, typeadapters.NewSecretAdapter(client))
}
// startFederationSyncController starts a new sync controller for the given type adapter
func startFederationSyncController(adapter typeadapters.FederatedTypeAdapter, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
restclient.AddUserAgent(config, fmt.Sprintf("%s-controller", adapter.Kind()))
client := federationclientset.NewForConfigOrDie(config)
adapter.SetClient(client)
controller := newFederationSyncController(client, adapter)
if minimizeLatency {
controller.minimizeLatency()
}
glog.Infof(fmt.Sprintf("Starting federated sync controller for %s resources", adapter.Kind()))
controller.Run(stopChan)
}
// newFederationSyncController returns a new sync controller for the given client and type adapter
func newFederationSyncController(client federationclientset.Interface, adapter typeadapters.FederatedTypeAdapter) *FederationSyncController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-secrets-controller"})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federated-%v-controller", adapter.Kind())})
secretcontroller := &SecretController{
federatedApiClient: client,
secretReviewDelay: time.Second * 10,
s := &FederationSyncController{
reviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
secretBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
adapter: adapter,
}
// Build delivereres for triggering reconciliations.
secretcontroller.secretDeliverer = util.NewDelayingDeliverer()
secretcontroller.clusterDeliverer = util.NewDelayingDeliverer()
s.deliverer = util.NewDelayingDeliverer()
s.clusterDeliverer = util.NewDelayingDeliverer()
// Start informer in federated API servers on secrets that should be federated.
secretcontroller.secretInformerStore, secretcontroller.secretInformerController = cache.NewInformer(
// Start informer in federated API servers on the resource type that should be federated.
s.store, s.controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return client.Core().Secrets(metav1.NamespaceAll).List(options)
return adapter.FedList(metav1.NamespaceAll, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Core().Secrets(metav1.NamespaceAll).Watch(options)
return adapter.FedWatch(metav1.NamespaceAll, options)
},
},
&apiv1.Secret{},
adapter.ObjectType(),
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { secretcontroller.deliverSecretObj(obj, 0, false) }))
util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { s.deliverObj(obj, 0, false) }))
// Federated informer on secrets in members of federation.
secretcontroller.secretFederatedInformer = util.NewFederatedInformer(
// Federated informer on the resource type in members of federation.
s.informer = util.NewFederatedInformer(
client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return targetClient.Core().Secrets(metav1.NamespaceAll).List(options)
return adapter.ClusterList(targetClient, metav1.NamespaceAll, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return targetClient.Core().Secrets(metav1.NamespaceAll).Watch(options)
return adapter.ClusterWatch(targetClient, metav1.NamespaceAll, options)
},
},
&apiv1.Secret{},
adapter.ObjectType(),
controller.NoResyncPeriodFunc(),
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some secret operation succeeded.
// would be just confirmation that some operation on the target resource type had succeeded.
util.NewTriggerOnAllChanges(
func(obj pkgruntime.Object) {
secretcontroller.deliverSecretObj(obj, secretcontroller.secretReviewDelay, false)
s.deliverObj(obj, s.reviewDelay, false)
},
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federationapi.Cluster) {
// When new cluster becomes available process all the secrets again.
secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay))
// When new cluster becomes available process all the target resources again.
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
},
},
)
// Federated updeater along with Create/Update/Delete operations.
secretcontroller.federatedUpdater = util.NewFederatedUpdater(secretcontroller.secretFederatedInformer,
s.updater = util.NewFederatedUpdater(s.informer,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
secret := obj.(*apiv1.Secret)
_, err := client.Core().Secrets(secret.Namespace).Create(secret)
_, err := adapter.ClusterCreate(client, obj)
return err
},
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
secret := obj.(*apiv1.Secret)
_, err := client.Core().Secrets(secret.Namespace).Update(secret)
_, err := adapter.ClusterUpdate(client, obj)
return err
},
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
secret := obj.(*apiv1.Secret)
namespacedName := adapter.NamespacedName(obj)
orphanDependents := false
err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
return err
})
secretcontroller.deletionHelper = deletionhelper.NewDeletionHelper(
secretcontroller.hasFinalizerFunc,
secretcontroller.removeFinalizerFunc,
secretcontroller.addFinalizerFunc,
s.deletionHelper = deletionhelper.NewDeletionHelper(
s.hasFinalizerFunc,
s.removeFinalizerFunc,
s.addFinalizerFunc,
// objNameFunc
func(obj pkgruntime.Object) string {
secret := obj.(*apiv1.Secret)
return secret.Name
return adapter.ObjectMeta(obj).Name
},
secretcontroller.updateTimeout,
secretcontroller.eventRecorder,
secretcontroller.secretFederatedInformer,
secretcontroller.federatedUpdater,
s.updateTimeout,
s.eventRecorder,
s.informer,
s.updater,
)
return secretcontroller
return s
}
// minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing).
func (secretcontroller *SecretController) minimizeLatency() {
secretcontroller.clusterAvailableDelay = time.Second
secretcontroller.secretReviewDelay = 50 * time.Millisecond
secretcontroller.smallDelay = 20 * time.Millisecond
secretcontroller.updateTimeout = 5 * time.Second
func (s *FederationSyncController) minimizeLatency() {
s.clusterAvailableDelay = time.Second
s.reviewDelay = 50 * time.Millisecond
s.smallDelay = 20 * time.Millisecond
s.updateTimeout = 5 * time.Second
}
// Returns true if the given object has the given finalizer in its ObjectMeta.
func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
secret := obj.(*apiv1.Secret)
for i := range secret.ObjectMeta.Finalizers {
if string(secret.ObjectMeta.Finalizers[i]) == finalizer {
func (s *FederationSyncController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
meta := s.adapter.ObjectMeta(obj)
for i := range meta.Finalizers {
if string(meta.Finalizers[i]) == finalizer {
return true
}
}
return false
}
// Removes the finalizers from the given objects ObjectMeta.
// Assumes that the given object is a secret.
func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
// Removes the finalizer from the given objects ObjectMeta.
func (s *FederationSyncController) removeFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) {
meta := s.adapter.ObjectMeta(obj)
newFinalizers := []string{}
hasFinalizer := false
for i := range secret.ObjectMeta.Finalizers {
if !deletionhelper.ContainsString(finalizers, secret.ObjectMeta.Finalizers[i]) {
newFinalizers = append(newFinalizers, secret.ObjectMeta.Finalizers[i])
for i := range meta.Finalizers {
if !deletionhelper.ContainsString(finalizers, meta.Finalizers[i]) {
newFinalizers = append(newFinalizers, meta.Finalizers[i])
} else {
hasFinalizer = true
}
@ -240,184 +251,185 @@ func (secretcontroller *SecretController) removeFinalizerFunc(obj pkgruntime.Obj
// Nothing to do.
return obj, nil
}
secret.ObjectMeta.Finalizers = newFinalizers
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
meta.Finalizers = newFinalizers
secret, err := s.adapter.FedUpdate(obj)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizers %v from secret %s: %v", finalizers, secret.Name, err)
return nil, fmt.Errorf("failed to remove finalizers %v from %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err)
}
return secret, nil
}
// Adds the given finalizers to the given objects ObjectMeta.
// Assumes that the given object is a secret.
func (secretcontroller *SecretController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
secret.ObjectMeta.Finalizers = append(secret.ObjectMeta.Finalizers, finalizers...)
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
func (s *FederationSyncController) addFinalizerFunc(obj pkgruntime.Object, finalizers []string) (pkgruntime.Object, error) {
meta := s.adapter.ObjectMeta(obj)
meta.Finalizers = append(meta.Finalizers, finalizers...)
secret, err := s.adapter.FedUpdate(obj)
if err != nil {
return nil, fmt.Errorf("failed to add finalizers %v to secret %s: %v", finalizers, secret.Name, err)
return nil, fmt.Errorf("failed to add finalizers %v to %s %s: %v", finalizers, s.adapter.Kind(), meta.Name, err)
}
return secret, nil
}
func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
go secretcontroller.secretInformerController.Run(stopChan)
secretcontroller.secretFederatedInformer.Start()
func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go s.controller.Run(stopChan)
s.informer.Start()
go func() {
<-stopChan
secretcontroller.secretFederatedInformer.Stop()
s.informer.Stop()
}()
secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
secret := item.Value.(*types.NamespacedName)
secretcontroller.reconcileSecret(*secret)
s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
namespacedName := item.Value.(*types.NamespacedName)
s.reconcile(*namespacedName)
})
secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
secretcontroller.reconcileSecretsOnClusterChange()
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.reconcileOnClusterChange()
})
util.StartBackoffGC(secretcontroller.secretBackoff, stopChan)
util.StartBackoffGC(s.backoff, stopChan)
}
func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) {
secret := obj.(*apiv1.Secret)
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, delay, failed)
func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) {
namespacedName := s.adapter.NamespacedName(obj)
s.deliver(namespacedName, delay, failed)
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (secretcontroller *SecretController) deliverSecret(secret types.NamespacedName, delay time.Duration, failed bool) {
key := secret.String()
func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) {
key := namespacedName.String()
if failed {
secretcontroller.secretBackoff.Next(key, time.Now())
delay = delay + secretcontroller.secretBackoff.Get(key)
s.backoff.Next(key, time.Now())
delay = delay + s.backoff.Get(key)
} else {
secretcontroller.secretBackoff.Reset(key)
s.backoff.Reset(key)
}
secretcontroller.secretDeliverer.DeliverAfter(key, &secret, delay)
s.deliverer.DeliverAfter(key, &namespacedName, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (secretcontroller *SecretController) isSynced() bool {
if !secretcontroller.secretFederatedInformer.ClustersSynced() {
func (s *FederationSyncController) isSynced() bool {
if !s.informer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
clusters, err := s.informer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !secretcontroller.secretFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
return false
}
return true
}
// The function triggers reconciliation of all federated secrets.
func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() {
if !secretcontroller.isSynced() {
secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay))
// The function triggers reconciliation of all target federated resources.
func (s *FederationSyncController) reconcileOnClusterChange() {
if !s.isSynced() {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
}
for _, obj := range secretcontroller.secretInformerStore.List() {
secret := obj.(*apiv1.Secret)
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, secretcontroller.smallDelay, false)
for _, obj := range s.store.List() {
namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object))
s.deliver(namespacedName, s.smallDelay, false)
}
}
func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) {
if !secretcontroller.isSynced() {
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) {
if !s.isSynced() {
s.deliver(namespacedName, s.clusterAvailableDelay, false)
return
}
key := secret.String()
baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
key := namespacedName.String()
kind := s.adapter.Kind()
cachedObj, exist, err := s.store.GetByKey(key)
if err != nil {
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
secretcontroller.deliverSecret(secret, 0, true)
glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err)
s.deliver(namespacedName, 0, true)
return
}
if !exist {
// Not federated secret, ignoring.
// Not federated, ignoring.
return
}
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
baseSecretObj, err := api.Scheme.DeepCopy(baseSecretObjFromStore)
baseSecret, ok := baseSecretObj.(*apiv1.Secret)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
secretcontroller.deliverSecret(secret, 0, true)
// Create a copy before modifying the resource to prevent racing
// with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
glog.Errorf("Error in retrieving %s from store: %v", kind, err)
s.deliver(namespacedName, 0, true)
return
}
if baseSecret.DeletionTimestamp != nil {
if err := secretcontroller.delete(baseSecret); err != nil {
glog.Errorf("Failed to delete %s: %v", secret, err)
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed",
"Secret delete failed: %v", err)
secretcontroller.deliverSecret(secret, 0, true)
if !s.adapter.IsExpectedType(copiedObj) {
glog.Errorf("Object is not the expected type: %v", copiedObj)
s.deliver(namespacedName, 0, true)
return
}
obj := copiedObj.(pkgruntime.Object)
meta := s.adapter.ObjectMeta(obj)
if meta.DeletionTimestamp != nil {
if err := s.delete(obj, namespacedName); err != nil {
glog.Errorf("Failed to delete %s %s: %v", kind, namespacedName, err)
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteFailed",
"%s delete failed: %v", strings.ToTitle(kind), err)
s.deliver(namespacedName, 0, true)
}
return
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for secret: %s",
baseSecret.Name)
// Add the required finalizers before creating a secret in underlying clusters.
updatedSecretObj, err := secretcontroller.deletionHelper.EnsureFinalizers(baseSecret)
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s",
kind, namespacedName)
// Add the required finalizers before creating the resource in underlying clusters.
obj, err = s.deletionHelper.EnsureFinalizers(obj)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in secret %s: %v",
baseSecret.Name, err)
secretcontroller.deliverSecret(secret, 0, false)
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v",
kind, namespacedName, err)
s.deliver(namespacedName, 0, false)
return
}
baseSecret = updatedSecretObj.(*apiv1.Secret)
glog.V(3).Infof("Syncing secret %s in underlying clusters", baseSecret.Name)
glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName)
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
clusters, err := s.informer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
s.deliver(namespacedName, s.clusterAvailableDelay, false)
return
}
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
secretcontroller.deliverSecret(secret, 0, true)
s.deliver(namespacedName, 0, true)
return
}
// The data should not be modified.
desiredSecret := &apiv1.Secret{
ObjectMeta: util.DeepCopyRelevantObjectMeta(baseSecret.ObjectMeta),
Data: baseSecret.Data,
Type: baseSecret.Type,
}
desiredObj := s.adapter.Copy(obj)
if !found {
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "CreateInCluster",
"Creating secret in cluster %s", cluster.Name)
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster",
"Creating %s in cluster %s", kind, cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredSecret,
Obj: desiredObj,
ClusterName: cluster.Name,
})
} else {
clusterSecret := clusterSecretObj.(*apiv1.Secret)
clusterObj := clusterObj.(pkgruntime.Object)
// Update existing secret, if needed.
if !util.SecretEquivalent(*desiredSecret, *clusterSecret) {
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInCluster",
"Updating secret in cluster %s", cluster.Name)
// Update existing resource, if needed.
if !s.adapter.Equivalent(desiredObj, clusterObj) {
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster",
"Updating %s in cluster %s", kind, cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredSecret,
Obj: desiredObj,
ClusterName: cluster.Name,
})
}
@ -428,37 +440,38 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace
// Everything is in order
return
}
err = secretcontroller.federatedUpdater.UpdateWithOnError(operations, secretcontroller.updateTimeout,
err = s.updater.UpdateWithOnError(operations, s.updateTimeout,
func(op util.FederatedOperation, operror error) {
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInClusterFailed",
"Secret update in cluster %s failed: %v", op.ClusterName, operror)
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInClusterFailed",
"%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror)
})
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
secretcontroller.deliverSecret(secret, 0, true)
s.deliver(namespacedName, 0, true)
return
}
// Evertyhing is in order but lets be double sure
secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false)
s.deliver(namespacedName, s.reviewDelay, false)
}
// delete deletes the given secret or returns error if the deletion was not complete.
func (secretcontroller *SecretController) delete(secret *apiv1.Secret) error {
glog.V(3).Infof("Handling deletion of secret: %v", *secret)
_, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(secret)
// delete deletes the given resource or returns error if the deletion was not complete.
func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error {
kind := s.adapter.Kind()
glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName)
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj)
if err != nil {
return err
}
err = secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Delete(secret.Name, nil)
err = s.adapter.FedDelete(namespacedName, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of secret finalizer deletion.
// The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything.
// This is expected when we are processing an update as a result of finalizer deletion.
// The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete secret: %v", err)
return fmt.Errorf("failed to delete %s: %v", kind, err)
}
}
return nil

View File

@ -77,7 +77,7 @@ func TestSecretController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster")
}
}
setClientFactory(secretController.secretFederatedInformer, informerClientFactory)
setClientFactory(secretController.informer, informerClientFactory)
secretController.minimizeLatency()
@ -115,7 +115,7 @@ func TestSecretController(t *testing.T) {
// Wait for the secret to appear in the informer store
err := WaitForStoreUpdate(
secretController.secretFederatedInformer.GetTargetStore(),
secretController.informer.GetTargetStore(),
cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), wait.ForeverTestTimeout)
assert.Nil(t, err, "secret should have appeared in the informer store")
@ -146,7 +146,7 @@ func TestSecretController(t *testing.T) {
// Wait for the secret to be updated in the informer store.
err = WaitForSecretStoreUpdate(
secretController.secretFederatedInformer.GetTargetStore(),
secretController.informer.GetTargetStore(),
cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(),
&secret1, wait.ForeverTestTimeout)
assert.NoError(t, err, "secret should have been updated in the informer store")

View File

@ -22,6 +22,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/watch",
],
)

View File

@ -20,8 +20,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
// FederatedTypeAdapter defines operations for interacting with a
@ -31,18 +32,28 @@ type FederatedTypeAdapter interface {
SetClient(client federationclientset.Interface)
Kind() string
ObjectType() pkgruntime.Object
IsExpectedType(obj interface{}) bool
Copy(obj pkgruntime.Object) pkgruntime.Object
Equivalent(obj1, obj2 pkgruntime.Object) bool
ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta
NamespacedName(obj pkgruntime.Object) types.NamespacedName
ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta
// Fed* operations target the federation control plane
FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error)
FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error
FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error)
FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error)
// The following operations are intended to target a cluster that is a member of a federation
ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error)
ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error
ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error)
ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error)
NewTestObject(namespace string) pkgruntime.Object
}

View File

@ -20,10 +20,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
type SecretAdapter struct {
@ -42,43 +43,91 @@ func (a *SecretAdapter) Kind() string {
return "secret"
}
func (a *SecretAdapter) ObjectType() pkgruntime.Object {
return &apiv1.Secret{}
}
func (a *SecretAdapter) IsExpectedType(obj interface{}) bool {
_, ok := obj.(*apiv1.Secret)
return ok
}
func (a *SecretAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
secret := obj.(*apiv1.Secret)
return &apiv1.Secret{
ObjectMeta: util.DeepCopyRelevantObjectMeta(secret.ObjectMeta),
Data: secret.Data,
Type: secret.Type,
}
}
func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
secret1 := obj1.(*apiv1.Secret)
secret2 := obj2.(*apiv1.Secret)
return util.SecretEquivalent(*secret1, *secret2)
}
func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*apiv1.Secret).ObjectMeta
}
func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
secret := obj.(*apiv1.Secret)
return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}
}
func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*apiv1.Secret).ObjectMeta
}
func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
return a.client.CoreV1().Secrets(secret.Namespace).Create(secret)
}
func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options)
}
func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return a.client.CoreV1().Secrets(namespace).List(options)
}
func (a *SecretAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
return a.client.CoreV1().Secrets(secret.Namespace).Update(secret)
}
func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options)
func (a *SecretAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
return a.client.CoreV1().Secrets(namespace).Watch(options)
}
func (a *SecretAdapter) ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
return client.CoreV1().Secrets(secret.Namespace).Create(secret)
}
func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options)
}
func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return client.CoreV1().Secrets(namespace).List(options)
}
func (a *SecretAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
secret := obj.(*apiv1.Secret)
return client.CoreV1().Secrets(secret.Namespace).Update(secret)
}
func (a *SecretAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Secrets(namespace).Watch(options)
}
func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object {
return &apiv1.Secret{
ObjectMeta: metav1.ObjectMeta{