diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 020a836644c..9e6dee6d657 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -52,6 +52,7 @@ go_test( "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 7b5e3bef848..77221449eeb 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -356,10 +356,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusError } - operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { - operations, err := clusterOperations(adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) { + operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) { return s.informer.GetTargetStore().GetByKey(clusterName, key) - }, clusterselector.SendToCluster) + }) if err != nil { s.eventRecorder.Eventf(obj, api.EventTypeWarning, "FedClusterOperationsError", "Error obtaining sync operations for %s: %s error: %s", kind, key, err.Error()) } @@ -369,6 +369,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return syncToClusters( s.informer.GetReadyClusters, operationsAccessor, + selectedClusters, s.updater.Update, s.adapter, obj, @@ -422,11 +423,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na } type clustersAccessorFunc func() ([]*federationapi.Cluster, error) -type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) type executionFunc func([]util.FederatedOperation) error // syncToClusters ensures that the state of the given object is synchronized to member clusters. -func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { +func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { kind := adapter.Kind() key := federatedtypes.ObjectKey(adapter, obj) @@ -438,7 +440,12 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusNotSynced } - operations, err := operationsAccessor(adapter, clusters, obj) + selectedClusters, unselectedClusters, err := selector(adapter.ObjectMeta(obj), clusterselector.SendToCluster, clusters) + if err != nil { + return statusError + } + + operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj) if err != nil { return statusError } @@ -456,18 +463,33 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusNeedsRecheck } -type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) -type clusterSelectorFunc func(map[string]string, map[string]string) (bool, error) - -// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters -func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc, selector clusterSelectorFunc) ([]util.FederatedOperation, error) { - // The data should not be modified. - desiredObj := adapter.Copy(obj) - objMeta := adapter.ObjectMeta(desiredObj) - kind := adapter.Kind() - operations := make([]util.FederatedOperation, 0) +// selectedClusters filters the provided clusters into two slices, one containing the clusters selected by selector and the other containing the rest of the provided clusters. +func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) { + selectedClusters := []*federationapi.Cluster{} + unselectedClusters := []*federationapi.Cluster{} for _, cluster := range clusters { + send, err := selector(cluster.Labels, objMeta.Annotations) + if err != nil { + return nil, nil, err + } else if !send { + unselectedClusters = append(unselectedClusters, cluster) + } else { + selectedClusters = append(selectedClusters, cluster) + } + } + return selectedClusters, unselectedClusters, nil +} + +type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) + +// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { + // The data should not be modified. + desiredObj := adapter.Copy(obj) + operations := make([]util.FederatedOperation, 0) + + for _, cluster := range selectedClusters { clusterObj, found, err := accessor(cluster.Name) if err != nil { wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) @@ -475,24 +497,13 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []* return nil, wrappedErr } - send, err := selector(cluster.Labels, objMeta.Annotations) - if err != nil { - glog.Errorf("Error processing ClusterSelector cluster: %s for %s map: %s error: %s", cluster.Name, kind, key, err.Error()) - return nil, err - } else if !send { - glog.V(5).Infof("Skipping cluster: %s for %s: %s reason: cluster selectors do not match: %-v %-v", cluster.Name, kind, key, cluster.ObjectMeta.Labels, objMeta.Annotations[federationapi.FederationClusterSelectorAnnotation]) - } - var operationType util.FederatedOperationType = "" - switch { - case found && send: + if found { clusterObj := clusterObj.(pkgruntime.Object) if !adapter.Equivalent(desiredObj, clusterObj) { operationType = util.OperationTypeUpdate } - case found && !send: - operationType = util.OperationTypeDelete - case !found && send: + } else { operationType = util.OperationTypeAdd } @@ -505,5 +516,23 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []* }) } } + + for _, cluster := range unselectedClusters { + _, found, err := accessor(cluster.Name) + if err != nil { + wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + runtime.HandleError(wrappedErr) + return nil, wrappedErr + } + if found { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeDelete, + Obj: desiredObj, + ClusterName: cluster.Name, + Key: key, + }) + } + } + return operations, nil } diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go index 10023a4eb38..667e8471095 100644 --- a/federation/pkg/federation-controller/sync/controller_test.go +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -20,6 +20,7 @@ import ( "errors" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" "k8s.io/kubernetes/federation/pkg/federatedtypes" @@ -74,12 +75,15 @@ func TestSyncToClusters(t *testing.T) { } return nil, nil }, - func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { + func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { if testCase.operationsError { return nil, awfulError } return testCase.operations, nil }, + func(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) { + return clusters, []*federationapi.Cluster{}, nil + }, func([]util.FederatedOperation) error { if testCase.executionError { return awfulError @@ -94,6 +98,67 @@ func TestSyncToClusters(t *testing.T) { } } +func TestSelectedClusters(t *testing.T) { + clusterOne := fedtest.NewCluster("cluster1", apiv1.ConditionTrue) + clusterOne.Labels = map[string]string{"name": "cluster1"} + clusterTwo := fedtest.NewCluster("cluster2", apiv1.ConditionTrue) + clusterTwo.Labels = map[string]string{"name": "cluster2"} + + clusters := []*federationapi.Cluster{clusterOne, clusterTwo} + testCases := map[string]struct { + expectedSelectorError bool + clusterOneSelected bool + clusterTwoSelected bool + expectedSelectedClusters []*federationapi.Cluster + expectedUnselectedClusters []*federationapi.Cluster + }{ + "Selector returned error": { + expectedSelectorError: true, + }, + "All clusters selected": { + clusterOneSelected: true, + clusterTwoSelected: true, + expectedSelectedClusters: clusters, + expectedUnselectedClusters: []*federationapi.Cluster{}, + }, + "One cluster selected": { + clusterOneSelected: true, + expectedSelectedClusters: []*federationapi.Cluster{clusterOne}, + expectedUnselectedClusters: []*federationapi.Cluster{clusterTwo}, + }, + "No clusters selected": { + expectedSelectedClusters: []*federationapi.Cluster{}, + expectedUnselectedClusters: clusters, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + selectedClusters, unselectedClusters, err := selectedClusters(&metav1.ObjectMeta{}, func(labels map[string]string, annotations map[string]string) (bool, error) { + if testCase.expectedSelectorError { + return false, awfulError + } + if labels["name"] == "cluster1" { + return testCase.clusterOneSelected, nil + } + if labels["name"] == "cluster2" { + return testCase.clusterTwoSelected, nil + } + t.Errorf("Unexpected cluster") + return false, nil + }, clusters) + + if testCase.expectedSelectorError { + require.Error(t, err, "An error was expected") + } else { + require.NoError(t, err, "An error was not expected") + } + require.Equal(t, testCase.expectedSelectedClusters, selectedClusters, "Expected the correct clusters to be selected.") + require.Equal(t, testCase.expectedUnselectedClusters, unselectedClusters, "Expected the correct clusters to be unselected.") + }) + } +} + func TestClusterOperations(t *testing.T) { adapter := &federatedtypes.SecretAdapter{} obj := adapter.NewTestObject("foo") @@ -101,19 +166,15 @@ func TestClusterOperations(t *testing.T) { federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar") testCases := map[string]struct { - clusterObject pkgruntime.Object - expectedErr bool - expectedSendErr bool - sendToCluster bool + clusterObject pkgruntime.Object + expectedErr bool + sendToCluster bool operationType util.FederatedOperationType }{ "Accessor error returned": { expectedErr: true, }, - "sendToCluster error returned": { - expectedSendErr: true, - }, "Missing cluster object should result in add operation": { operationType: util.OperationTypeAdd, sendToCluster: true, @@ -138,18 +199,21 @@ func TestClusterOperations(t *testing.T) { clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} key := federatedtypes.ObjectKey(adapter, obj) - operations, err := clusterOperations(adapter, clusters, obj, key, func(string) (interface{}, bool, error) { + var selectedClusters, unselectedClusters []*federationapi.Cluster + if testCase.sendToCluster { + selectedClusters = clusters + unselectedClusters = []*federationapi.Cluster{} + } else { + selectedClusters = []*federationapi.Cluster{} + unselectedClusters = clusters + } + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(string) (interface{}, bool, error) { if testCase.expectedErr { return nil, false, awfulError } return testCase.clusterObject, (testCase.clusterObject != nil), nil - }, func(map[string]string, map[string]string) (bool, error) { - if testCase.expectedSendErr { - return false, awfulError - } - return testCase.sendToCluster, nil }) - if testCase.expectedErr || testCase.expectedSendErr { + if testCase.expectedErr { require.Error(t, err, "An error was expected") } else { require.NoError(t, err, "An error was not expected")