From 547ece5b839b90d19633a8a735a2ab7d8a7ede7a Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Thu, 11 May 2017 11:33:36 -0700 Subject: [PATCH] fed: Refactor sync controller's reconcile method --- .../pkg/federation-controller/sync/BUILD | 2 + .../federation-controller/sync/controller.go | 164 ++++++++++-------- .../sync/controller_test.go | 80 +++++++++ 3 files changed, 171 insertions(+), 75 deletions(-) create mode 100644 federation/pkg/federation-controller/sync/controller_test.go diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 7b4870c2bd8..d7f595e4950 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -55,6 +55,7 @@ go_test( name = "go_default_test", srcs = [ "configmap_controller_test.go", + "controller_test.go", "daemonset_controller_test.go", "secret_controller_test.go", ], @@ -73,6 +74,7 @@ go_test( "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 651bd6c5259..e1aca959d0b 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -18,7 +18,6 @@ package sync import ( "fmt" - "strings" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -217,11 +216,6 @@ func (s *FederationSyncController) updateObject(obj pkgruntime.Object) (pkgrunti func (s *FederationSyncController) Run(stopChan <-chan struct{}) { go s.controller.Run(stopChan) s.informer.Start() - go func() { - <-stopChan - s.informer.Stop() - s.workQueue.ShutDown() - }() s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { s.workQueue.Add(item) }) @@ -233,6 +227,15 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) { go wait.Until(s.worker, time.Second, stopChan) util.StartBackoffGC(s.backoff, stopChan) + + // Ensure all goroutines are cleaned up when the stop channel closes + go func() { + <-stopChan + s.informer.Stop() + s.workQueue.ShutDown() + s.deliverer.Stop() + s.clusterDeliverer.Stop() + }() } type reconciliationStatus int @@ -264,7 +267,7 @@ func (s *FederationSyncController) worker() { case statusNeedsRecheck: s.deliver(*namespacedName, s.reviewDelay, false) case statusNotSynced: - s.deliver(*namespacedName, s.reviewDelay, false) + s.deliver(*namespacedName, s.clusterAvailableDelay, false) } } } @@ -320,102 +323,60 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusNotSynced } - key := namespacedName.String() kind := s.adapter.Kind() - cachedObj, exist, err := s.store.GetByKey(key) + key := namespacedName.String() + + obj, err := s.objFromCache(kind, key) if err != nil { - glog.Errorf("failed to query main %s store for %v: %v", kind, key, err) + glog.Error(err) return statusError } - - if !exist { - // Not federated, ignoring. + if obj == nil { return statusAllOK } - // Create a copy before modifying the resource to prevent racing - // with other readers. - copiedObj, err := api.Scheme.DeepCopy(cachedObj) - if err != nil { - glog.Errorf("error in retrieving %s from store: %v", kind, err) - return statusError - } - if !s.adapter.IsExpectedType(copiedObj) { - glog.Errorf("object is not the expected type: %v", copiedObj) - return statusError - } - obj := copiedObj.(pkgruntime.Object) meta := s.adapter.ObjectMeta(obj) - if meta.DeletionTimestamp != nil { - if err := s.delete(obj, namespacedName); err != nil { - s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", - "%s delete failed: %v", strings.ToTitle(kind), err) - glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err) + err := s.delete(obj, kind, namespacedName) + if err != nil { + msg := "Failed to delete %s %q: %v" + args := []interface{}{kind, namespacedName, err} + glog.Errorf(msg, args...) + s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...) return statusError } return statusAllOK } - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", - kind, namespacedName) - // Add the required finalizers before creating the resource in underlying clusters. + glog.V(3).Infof("Ensuring finalizers for %s %q", kind, key) obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { - glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v", - kind, namespacedName, err) + glog.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err) return statusError } - glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) + glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key) clusters, err := s.informer.GetReadyClusters() if err != nil { - glog.Errorf("failed to get cluster list: %v", err) + glog.Errorf("Failed to get cluster list: %v", err) return statusNotSynced } - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err) - return statusError - } - - // The data should not be modified. - desiredObj := s.adapter.Copy(obj) - - if !found { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredObj, - ClusterName: cluster.Name, - Key: key, - }) - } else { - clusterObj := clusterObj.(pkgruntime.Object) - - // Update existing resource, if needed. - if !s.adapter.Equivalent(desiredObj, clusterObj) { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredObj, - ClusterName: cluster.Name, - Key: key, - }) - } - } + operations, err := clusterOperations(s.adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) { + return s.informer.GetTargetStore().GetByKey(clusterName, key) + }) + if err != nil { + glog.Error(err) + return statusError } - if len(operations) == 0 { - // Everything is in order return statusAllOK } err = s.updater.Update(operations) if err != nil { - glog.Errorf("failed to execute updates for %s: %v", key, err) + glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err) return statusError } @@ -423,10 +384,29 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusNeedsRecheck } +func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) { + cachedObj, exist, err := s.store.GetByKey(key) + if err != nil { + return nil, fmt.Errorf("Failed to query %s store for %q: %v", kind, key, err) + } + if !exist { + return nil, nil + } + + // Create a copy before modifying the resource to prevent racing with other readers. + copiedObj, err := api.Scheme.DeepCopy(cachedObj) + if err != nil { + return nil, fmt.Errorf("Error in retrieving %s %q from store: %v", kind, key, err) + } + if !s.adapter.IsExpectedType(copiedObj) { + return nil, fmt.Errorf("Object is not the expected type: %v", copiedObj) + } + return copiedObj.(pkgruntime.Object), nil +} + // delete deletes the given resource or returns error if the deletion was not complete. -func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { - kind := s.adapter.Kind() - glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName) +func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error { + glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName) _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err @@ -438,8 +418,42 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName // This is expected when we are processing an update as a result of finalizer deletion. // The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything. if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete %s: %v", kind, err) + return err } } return nil } + +type clusterAccessor func(clusterName string) (interface{}, bool, error) + +// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterAccessor) ([]util.FederatedOperation, error) { + operations := make([]util.FederatedOperation, 0) + for _, cluster := range clusters { + clusterObj, found, err := accessor(cluster.Name) + if err != nil { + return nil, fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + } + // The data should not be modified. + desiredObj := adapter.Copy(obj) + + var operationType util.FederatedOperationType = "" + if found { + clusterObj := clusterObj.(pkgruntime.Object) + if !adapter.Equivalent(desiredObj, clusterObj) { + operationType = util.OperationTypeUpdate + } + } else { + operationType = util.OperationTypeAdd + } + if len(operationType) > 0 { + operations = append(operations, util.FederatedOperation{ + Type: operationType, + Obj: desiredObj, + ClusterName: cluster.Name, + Key: key, + }) + } + } + return operations, nil +} diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go new file mode 100644 index 00000000000..e67db423085 --- /dev/null +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "fmt" + "testing" + + pkgruntime "k8s.io/apimachinery/pkg/runtime" + federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" + "k8s.io/kubernetes/federation/pkg/federatedtypes" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + fedtest "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + + "github.com/stretchr/testify/require" +) + +func TestClusterOperations(t *testing.T) { + adapter := &federatedtypes.SecretAdapter{} + obj := adapter.NewTestObject("foo") + differingObj := adapter.Copy(obj) + federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar") + + testCases := map[string]struct { + clusterObject pkgruntime.Object + expectedErr bool + operationType util.FederatedOperationType + }{ + "Accessor error returned": { + expectedErr: true, + }, + "Missing cluster object should result in add operation": { + operationType: util.OperationTypeAdd, + }, + "Differing cluster object should result in update operation": { + clusterObject: differingObj, + operationType: util.OperationTypeUpdate, + }, + "Matching cluster object should not result in an operation": { + clusterObject: obj, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} + operations, err := clusterOperations(adapter, clusters, obj, "key", func(string) (interface{}, bool, error) { + if testCase.expectedErr { + return nil, false, fmt.Errorf("Not found!") + } + return testCase.clusterObject, (testCase.clusterObject != nil), nil + }) + if testCase.expectedErr { + require.Error(t, err, "An error was expected") + } else { + require.NoError(t, err, "An error was not expected") + } + if len(testCase.operationType) == 0 { + require.True(t, len(operations) == 0, "An operation was not expected") + } else { + require.True(t, len(operations) == 1, "A single operation was expected") + require.Equal(t, testCase.operationType, operations[0].Type, "Unexpected operation returned") + } + }) + } +}