diff --git a/federation/pkg/federatedtypes/adapter.go b/federation/pkg/federatedtypes/adapter.go index c3ea3b2b635..39e5d234ee4 100644 --- a/federation/pkg/federatedtypes/adapter.go +++ b/federation/pkg/federatedtypes/adapter.go @@ -61,3 +61,17 @@ type FederatedTypeAdapter interface { // be registered with RegisterAdapterFactory to ensure the type // adapter is discoverable. type AdapterFactory func(client federationclientset.Interface) FederatedTypeAdapter + +// SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map +func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) { + meta := adapter.ObjectMeta(obj) + if meta.Annotations == nil { + meta.Annotations = make(map[string]string) + } + meta.Annotations[key] = value +} + +// ObjectKey returns a cluster-unique key for the given object +func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string { + return adapter.NamespacedName(obj).String() +} diff --git a/federation/pkg/federatedtypes/crudtester/crudtester.go b/federation/pkg/federatedtypes/crudtester/crudtester.go index f8ca4ef17e6..8145d604398 100644 --- a/federation/pkg/federatedtypes/crudtester/crudtester.go +++ b/federation/pkg/federatedtypes/crudtester/crudtester.go @@ -193,11 +193,7 @@ func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, ob func (c *FederatedTypeCRUDTester) updateFedObject(obj pkgruntime.Object) (pkgruntime.Object, error) { err := wait.PollImmediate(c.waitInterval, wait.ForeverTestTimeout, func() (bool, error) { // Target the metadata for simplicity (it's type-agnostic) - meta := c.adapter.ObjectMeta(obj) - if meta.Annotations == nil { - meta.Annotations = make(map[string]string) - } - meta.Annotations[AnnotationTestFederationCRUDUpdate] = "updated" + federatedtypes.SetAnnotation(c.adapter, obj, AnnotationTestFederationCRUDUpdate, "updated") _, err := c.adapter.FedUpdate(obj) if errors.IsConflict(err) { diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 7b4870c2bd8..a597ede5e78 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -27,6 +27,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", @@ -55,6 +56,7 @@ go_test( name = "go_default_test", srcs = [ "configmap_controller_test.go", + "controller_test.go", "daemonset_controller_test.go", "secret_controller_test.go", ], @@ -73,6 +75,7 @@ go_test( "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 651bd6c5259..0d607e6915b 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -18,13 +18,13 @@ package sync import ( "fmt" - "strings" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientv1 "k8s.io/client-go/pkg/api/v1" @@ -217,11 +217,6 @@ func (s *FederationSyncController) updateObject(obj pkgruntime.Object) (pkgrunti func (s *FederationSyncController) Run(stopChan <-chan struct{}) { go s.controller.Run(stopChan) s.informer.Start() - go func() { - <-stopChan - s.informer.Stop() - s.workQueue.ShutDown() - }() s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { s.workQueue.Add(item) }) @@ -233,6 +228,15 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) { go wait.Until(s.worker, time.Second, stopChan) util.StartBackoffGC(s.backoff, stopChan) + + // Ensure all goroutines are cleaned up when the stop channel closes + go func() { + <-stopChan + s.informer.Stop() + s.workQueue.ShutDown() + s.deliverer.Stop() + s.clusterDeliverer.Stop() + }() } type reconciliationStatus int @@ -264,7 +268,7 @@ func (s *FederationSyncController) worker() { case statusNeedsRecheck: s.deliver(*namespacedName, s.reviewDelay, false) case statusNotSynced: - s.deliver(*namespacedName, s.reviewDelay, false) + s.deliver(*namespacedName, s.clusterAvailableDelay, false) } } } @@ -295,7 +299,7 @@ func (s *FederationSyncController) isSynced() bool { } clusters, err := s.informer.GetReadyClusters() if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) + runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err)) return false } if !s.informer.GetTargetStore().ClustersSynced(clusters) { @@ -320,113 +324,80 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusNotSynced } - key := namespacedName.String() kind := s.adapter.Kind() - cachedObj, exist, err := s.store.GetByKey(key) + key := namespacedName.String() + + obj, err := s.objFromCache(kind, key) if err != nil { - glog.Errorf("failed to query main %s store for %v: %v", kind, key, err) return statusError } - - if !exist { - // Not federated, ignoring. + if obj == nil { return statusAllOK } - // Create a copy before modifying the resource to prevent racing - // with other readers. - copiedObj, err := api.Scheme.DeepCopy(cachedObj) - if err != nil { - glog.Errorf("error in retrieving %s from store: %v", kind, err) - return statusError - } - if !s.adapter.IsExpectedType(copiedObj) { - glog.Errorf("object is not the expected type: %v", copiedObj) - return statusError - } - obj := copiedObj.(pkgruntime.Object) meta := s.adapter.ObjectMeta(obj) - if meta.DeletionTimestamp != nil { - if err := s.delete(obj, namespacedName); err != nil { - s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", - "%s delete failed: %v", strings.ToTitle(kind), err) - glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err) + err := s.delete(obj, kind, namespacedName) + if err != nil { + msg := "Failed to delete %s %q: %v" + args := []interface{}{kind, namespacedName, err} + runtime.HandleError(fmt.Errorf(msg, args...)) + s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...) return statusError } return statusAllOK } - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", - kind, namespacedName) - // Add the required finalizers before creating the resource in underlying clusters. + glog.V(3).Infof("Ensuring finalizers exist on %s %q", kind, key) obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { - glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v", - kind, namespacedName, err) + runtime.HandleError(fmt.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err)) return statusError } - glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) + operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { + return clusterOperations(adapter, clusters, obj, func(clusterName string) (interface{}, bool, error) { + return s.informer.GetTargetStore().GetByKey(clusterName, key) + }) + } + return syncToClusters( + s.informer.GetReadyClusters, + operationsAccessor, + s.updater.Update, + s.adapter, + obj, + ) +} - clusters, err := s.informer.GetReadyClusters() +func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) { + cachedObj, exist, err := s.store.GetByKey(key) if err != nil { - glog.Errorf("failed to get cluster list: %v", err) - return statusNotSynced + wrappedErr := fmt.Errorf("Failed to query %s store for %q: %v", kind, key, err) + runtime.HandleError(wrappedErr) + return nil, err + } + if !exist { + return nil, nil } - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err) - return statusError - } - - // The data should not be modified. - desiredObj := s.adapter.Copy(obj) - - if !found { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredObj, - ClusterName: cluster.Name, - Key: key, - }) - } else { - clusterObj := clusterObj.(pkgruntime.Object) - - // Update existing resource, if needed. - if !s.adapter.Equivalent(desiredObj, clusterObj) { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredObj, - ClusterName: cluster.Name, - Key: key, - }) - } - } - } - - if len(operations) == 0 { - // Everything is in order - return statusAllOK - } - - err = s.updater.Update(operations) + // Create a copy before modifying the resource to prevent racing with other readers. + copiedObj, err := api.Scheme.DeepCopy(cachedObj) if err != nil { - glog.Errorf("failed to execute updates for %s: %v", key, err) - return statusError + wrappedErr := fmt.Errorf("Error in retrieving %s %q from store: %v", kind, key, err) + runtime.HandleError(wrappedErr) + return nil, err } - - // Evertyhing is in order but let's be double sure - return statusNeedsRecheck + if !s.adapter.IsExpectedType(copiedObj) { + err = fmt.Errorf("Object is not the expected type: %v", copiedObj) + runtime.HandleError(err) + return nil, err + } + return copiedObj.(pkgruntime.Object), nil } // delete deletes the given resource or returns error if the deletion was not complete. -func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { - kind := s.adapter.Kind() - glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName) +func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error { + glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName) _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err @@ -438,8 +409,80 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName // This is expected when we are processing an update as a result of finalizer deletion. // The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything. if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete %s: %v", kind, err) + return err } } return nil } + +type clustersAccessorFunc func() ([]*federationapi.Cluster, error) +type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type executionFunc func([]util.FederatedOperation) error + +// syncToClusters ensures that the state of the given object is synchronized to member clusters. +func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { + kind := adapter.Kind() + key := federatedtypes.ObjectKey(adapter, obj) + + glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key) + + clusters, err := clustersAccessor() + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to get cluster list: %v", err)) + return statusNotSynced + } + + operations, err := operationsAccessor(adapter, clusters, obj) + if err != nil { + return statusError + } + if len(operations) == 0 { + return statusAllOK + } + + err = execute(operations) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to execute updates for %s %q: %v", kind, key, err)) + return statusError + } + + // Evertyhing is in order but let's be double sure + return statusNeedsRecheck +} + +type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) + +// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { + key := federatedtypes.ObjectKey(adapter, obj) + operations := make([]util.FederatedOperation, 0) + for _, cluster := range clusters { + clusterObj, found, err := accessor(cluster.Name) + if err != nil { + wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + runtime.HandleError(wrappedErr) + return nil, wrappedErr + } + // The data should not be modified. + desiredObj := adapter.Copy(obj) + + var operationType util.FederatedOperationType = "" + if found { + clusterObj := clusterObj.(pkgruntime.Object) + if !adapter.Equivalent(desiredObj, clusterObj) { + operationType = util.OperationTypeUpdate + } + } else { + operationType = util.OperationTypeAdd + } + if len(operationType) > 0 { + operations = append(operations, util.FederatedOperation{ + Type: operationType, + Obj: desiredObj, + ClusterName: cluster.Name, + Key: key, + }) + } + } + return operations, nil +} diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go new file mode 100644 index 00000000000..9f077ad7878 --- /dev/null +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "errors" + "testing" + + pkgruntime "k8s.io/apimachinery/pkg/runtime" + federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" + "k8s.io/kubernetes/federation/pkg/federatedtypes" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + fedtest "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + + "github.com/stretchr/testify/require" +) + +var awfulError error = errors.New("Something bad happened") + +func TestSyncToClusters(t *testing.T) { + adapter := &federatedtypes.SecretAdapter{} + obj := adapter.NewTestObject("foo") + + testCases := map[string]struct { + clusterError bool + operationsError bool + executionError bool + operations []util.FederatedOperation + status reconciliationStatus + }{ + "Error listing clusters redelivers with cluster delay": { + clusterError: true, + status: statusNotSynced, + }, + "Error retrieving cluster operations redelivers": { + operationsError: true, + status: statusError, + }, + "No operations returns ok": { + status: statusAllOK, + }, + "Execution error redelivers": { + executionError: true, + operations: []util.FederatedOperation{{}}, + status: statusError, + }, + "Successful update indicates recheck": { + operations: []util.FederatedOperation{{}}, + status: statusNeedsRecheck, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + status := syncToClusters( + func() ([]*federationapi.Cluster, error) { + if testCase.clusterError { + return nil, awfulError + } + return nil, nil + }, + func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { + if testCase.operationsError { + return nil, awfulError + } + return testCase.operations, nil + }, + func([]util.FederatedOperation) error { + if testCase.executionError { + return awfulError + } + return nil + }, + adapter, + obj, + ) + require.Equal(t, testCase.status, status, "Unexpected status!") + }) + } +} + +func TestClusterOperations(t *testing.T) { + adapter := &federatedtypes.SecretAdapter{} + obj := adapter.NewTestObject("foo") + differingObj := adapter.Copy(obj) + federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar") + + testCases := map[string]struct { + clusterObject pkgruntime.Object + expectedErr bool + operationType util.FederatedOperationType + }{ + "Accessor error returned": { + expectedErr: true, + }, + "Missing cluster object should result in add operation": { + operationType: util.OperationTypeAdd, + }, + "Differing cluster object should result in update operation": { + clusterObject: differingObj, + operationType: util.OperationTypeUpdate, + }, + "Matching cluster object should not result in an operation": { + clusterObject: obj, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} + operations, err := clusterOperations(adapter, clusters, obj, func(string) (interface{}, bool, error) { + if testCase.expectedErr { + return nil, false, awfulError + } + return testCase.clusterObject, (testCase.clusterObject != nil), nil + }) + if testCase.expectedErr { + require.Error(t, err, "An error was expected") + } else { + require.NoError(t, err, "An error was not expected") + } + if len(testCase.operationType) == 0 { + require.True(t, len(operations) == 0, "An operation was not expected") + } else { + require.True(t, len(operations) == 1, "A single operation was expected") + require.Equal(t, testCase.operationType, operations[0].Type, "Unexpected operation returned") + } + }) + } +}