diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 710519cfeff..7b4870c2bd8 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -27,12 +27,14 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 0ce7fa5af6f..91833ae9ddc 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -25,12 +25,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientv1 "k8s.io/client-go/pkg/api/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" + "k8s.io/client-go/util/workqueue" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federatedtypes" @@ -70,6 +72,9 @@ type FederationSyncController struct { // Informer controller for resources that should be federated. controller cache.Controller + // Work queue allowing parallel processing of resources + workQueue workqueue.Interface + // Backoff manager backoff *flowcontrol.Backoff @@ -110,6 +115,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, + workQueue: workqueue.New(), backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), eventRecorder: recorder, adapter: adapter, @@ -215,17 +221,55 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) { go func() { <-stopChan s.informer.Stop() + s.workQueue.ShutDown() }() s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - namespacedName := item.Value.(*types.NamespacedName) - s.reconcile(*namespacedName) + s.workQueue.Add(item) }) s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { s.reconcileOnClusterChange() }) + + // TODO: Allow multiple workers. + go wait.Until(s.worker, time.Second, stopChan) + util.StartBackoffGC(s.backoff, stopChan) } +type reconciliationStatus int + +const ( + statusAllOK reconciliationStatus = iota + statusNeedsRecheck + statusError + statusNotSynced +) + +func (s *FederationSyncController) worker() { + for { + obj, quit := s.workQueue.Get() + if quit { + return + } + + item := obj.(*util.DelayingDelivererItem) + namespacedName := item.Value.(*types.NamespacedName) + status := s.reconcile(*namespacedName) + s.workQueue.Done(item) + + switch status { + case statusAllOK: + break + case statusError: + s.deliver(*namespacedName, 0, true) + case statusNeedsRecheck: + s.deliver(*namespacedName, s.reviewDelay, false) + case statusNotSynced: + s.deliver(*namespacedName, s.reviewDelay, false) + } + } +} + func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) { namespacedName := s.adapter.NamespacedName(obj) s.deliver(namespacedName, delay, failed) @@ -272,50 +316,46 @@ func (s *FederationSyncController) reconcileOnClusterChange() { } } -func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) { +func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) reconciliationStatus { if !s.isSynced() { - s.deliver(namespacedName, s.clusterAvailableDelay, false) - return + return statusNotSynced } key := namespacedName.String() kind := s.adapter.Kind() cachedObj, exist, err := s.store.GetByKey(key) if err != nil { - glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err) - s.deliver(namespacedName, 0, true) - return + glog.Errorf("failed to query main %s store for %v: %v", kind, key, err) + return statusError } if !exist { // Not federated, ignoring. - return + return statusAllOK } // Create a copy before modifying the resource to prevent racing // with other readers. copiedObj, err := api.Scheme.DeepCopy(cachedObj) if err != nil { - glog.Errorf("Error in retrieving %s from store: %v", kind, err) - s.deliver(namespacedName, 0, true) - return + glog.Errorf("error in retrieving %s from store: %v", kind, err) + return statusError } if !s.adapter.IsExpectedType(copiedObj) { - glog.Errorf("Object is not the expected type: %v", copiedObj) - s.deliver(namespacedName, 0, true) - return + glog.Errorf("object is not the expected type: %v", copiedObj) + return statusError } obj := copiedObj.(pkgruntime.Object) meta := s.adapter.ObjectMeta(obj) if meta.DeletionTimestamp != nil { if err := s.delete(obj, namespacedName); err != nil { - glog.Errorf("Failed to delete %s %s: %v", kind, namespacedName, err) s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", "%s delete failed: %v", strings.ToTitle(kind), err) - s.deliver(namespacedName, 0, true) + glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err) + return statusError } - return + return statusAllOK } glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", @@ -323,28 +363,25 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName // Add the required finalizers before creating the resource in underlying clusters. obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v", + glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v", kind, namespacedName, err) - s.deliver(namespacedName, 0, false) - return + return statusError } glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) clusters, err := s.informer.GetReadyClusters() if err != nil { - glog.Errorf("Failed to get cluster list: %v", err) - s.deliver(namespacedName, s.clusterAvailableDelay, false) - return + glog.Errorf("failed to get cluster list: %v", err) + return statusNotSynced } operations := make([]util.FederatedOperation, 0) for _, cluster := range clusters { clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { - glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err) - s.deliver(namespacedName, 0, true) - return + glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err) + return statusError } // The data should not be modified. @@ -374,18 +411,17 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName if len(operations) == 0 { // Everything is in order - return + return statusAllOK } + err = s.updater.Update(operations, s.updateTimeout) - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v", key, err) - s.deliver(namespacedName, 0, true) - return + glog.Errorf("failed to execute updates for %s: %v", key, err) + return statusError } - // Evertyhing is in order but lets be double sure - s.deliver(namespacedName, s.reviewDelay, false) + // Evertyhing is in order but let's be double sure + return statusNeedsRecheck } // delete deletes the given resource or returns error if the deletion was not complete.