diff --git a/federation/pkg/federatedtypes/adapter.go b/federation/pkg/federatedtypes/adapter.go index 4d6ab764c88..39e5d234ee4 100644 --- a/federation/pkg/federatedtypes/adapter.go +++ b/federation/pkg/federatedtypes/adapter.go @@ -70,3 +70,8 @@ func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, val } meta.Annotations[key] = value } + +// ObjectKey returns a cluster-unique key for the given object +func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string { + return adapter.NamespacedName(obj).String() +} diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index e1aca959d0b..5786a35d0f1 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -348,40 +348,25 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusAllOK } - glog.V(3).Infof("Ensuring finalizers for %s %q", kind, key) + glog.V(3).Infof("Ensuring finalizers exist on %s %q", kind, key) obj, err = s.deletionHelper.EnsureFinalizers(obj) if err != nil { glog.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err) return statusError } - glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key) - - clusters, err := s.informer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get cluster list: %v", err) - return statusNotSynced + operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { + return clusterOperations(adapter, clusters, obj, func(clusterName string) (interface{}, bool, error) { + return s.informer.GetTargetStore().GetByKey(clusterName, key) + }) } - - operations, err := clusterOperations(s.adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) { - return s.informer.GetTargetStore().GetByKey(clusterName, key) - }) - if err != nil { - glog.Error(err) - return statusError - } - if len(operations) == 0 { - return statusAllOK - } - - err = s.updater.Update(operations) - if err != nil { - glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err) - return statusError - } - - // Evertyhing is in order but let's be double sure - return statusNeedsRecheck + return syncToClusters( + s.informer.GetReadyClusters, + operationsAccessor, + s.updater.Update, + s.adapter, + obj, + ) } func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) { @@ -424,10 +409,47 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na return nil } -type clusterAccessor func(clusterName string) (interface{}, bool, error) +type clustersAccessorFunc func() ([]*federationapi.Cluster, error) +type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type executionFunc func([]util.FederatedOperation) error + +// syncToClusters ensures that the state of the given object is synchronized to member clusters. +func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { + kind := adapter.Kind() + key := federatedtypes.ObjectKey(adapter, obj) + + glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key) + + clusters, err := clustersAccessor() + if err != nil { + glog.Errorf("Failed to get cluster list: %v", err) + return statusNotSynced + } + + operations, err := operationsAccessor(adapter, clusters, obj) + if err != nil { + glog.Error(err) + return statusError + } + if len(operations) == 0 { + return statusAllOK + } + + err = execute(operations) + if err != nil { + glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err) + return statusError + } + + // Evertyhing is in order but let's be double sure + return statusNeedsRecheck +} + +type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) // clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters -func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterAccessor) ([]util.FederatedOperation, error) { +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { + key := federatedtypes.ObjectKey(adapter, obj) operations := make([]util.FederatedOperation, 0) for _, cluster := range clusters { clusterObj, found, err := accessor(cluster.Name) diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go index e67db423085..9f077ad7878 100644 --- a/federation/pkg/federation-controller/sync/controller_test.go +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -17,7 +17,7 @@ limitations under the License. package sync import ( - "fmt" + "errors" "testing" pkgruntime "k8s.io/apimachinery/pkg/runtime" @@ -30,6 +30,70 @@ import ( "github.com/stretchr/testify/require" ) +var awfulError error = errors.New("Something bad happened") + +func TestSyncToClusters(t *testing.T) { + adapter := &federatedtypes.SecretAdapter{} + obj := adapter.NewTestObject("foo") + + testCases := map[string]struct { + clusterError bool + operationsError bool + executionError bool + operations []util.FederatedOperation + status reconciliationStatus + }{ + "Error listing clusters redelivers with cluster delay": { + clusterError: true, + status: statusNotSynced, + }, + "Error retrieving cluster operations redelivers": { + operationsError: true, + status: statusError, + }, + "No operations returns ok": { + status: statusAllOK, + }, + "Execution error redelivers": { + executionError: true, + operations: []util.FederatedOperation{{}}, + status: statusError, + }, + "Successful update indicates recheck": { + operations: []util.FederatedOperation{{}}, + status: statusNeedsRecheck, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + status := syncToClusters( + func() ([]*federationapi.Cluster, error) { + if testCase.clusterError { + return nil, awfulError + } + return nil, nil + }, + func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { + if testCase.operationsError { + return nil, awfulError + } + return testCase.operations, nil + }, + func([]util.FederatedOperation) error { + if testCase.executionError { + return awfulError + } + return nil + }, + adapter, + obj, + ) + require.Equal(t, testCase.status, status, "Unexpected status!") + }) + } +} + func TestClusterOperations(t *testing.T) { adapter := &federatedtypes.SecretAdapter{} obj := adapter.NewTestObject("foo") @@ -58,9 +122,9 @@ func TestClusterOperations(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} - operations, err := clusterOperations(adapter, clusters, obj, "key", func(string) (interface{}, bool, error) { + operations, err := clusterOperations(adapter, clusters, obj, func(string) (interface{}, bool, error) { if testCase.expectedErr { - return nil, false, fmt.Errorf("Not found!") + return nil, false, awfulError } return testCase.clusterObject, (testCase.clusterObject != nil), nil })