From b5c6af7b4248135604b001e504d3e72900eedd98 Mon Sep 17 00:00:00 2001 From: Jonathan MacMillan Date: Mon, 22 May 2017 15:09:29 -0700 Subject: [PATCH] [Federation] Refactor the cluster selection logic in the sync controller. This is intended to make it easier to define the interaction between cluster selection and scheduling preferences in the sync controller when used for workload types. --- .../pkg/federation-controller/sync/BUILD | 1 + .../federation-controller/sync/controller.go | 87 +++++++++++------ .../sync/controller_test.go | 94 ++++++++++++++++--- 3 files changed, 138 insertions(+), 44 deletions(-) diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 020a836644c..9e6dee6d657 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -52,6 +52,7 @@ go_test( "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 7b5e3bef848..77221449eeb 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -356,10 +356,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusError } - operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { - operations, err := clusterOperations(adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) { + operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) { return s.informer.GetTargetStore().GetByKey(clusterName, key) - }, clusterselector.SendToCluster) + }) if err != nil { s.eventRecorder.Eventf(obj, api.EventTypeWarning, "FedClusterOperationsError", "Error obtaining sync operations for %s: %s error: %s", kind, key, err.Error()) } @@ -369,6 +369,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return syncToClusters( s.informer.GetReadyClusters, operationsAccessor, + selectedClusters, s.updater.Update, s.adapter, obj, @@ -422,11 +423,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na } type clustersAccessorFunc func() ([]*federationapi.Cluster, error) -type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) type executionFunc func([]util.FederatedOperation) error // syncToClusters ensures that the state of the given object is synchronized to member clusters. -func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { +func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { kind := adapter.Kind() key := federatedtypes.ObjectKey(adapter, obj) @@ -438,7 +440,12 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusNotSynced } - operations, err := operationsAccessor(adapter, clusters, obj) + selectedClusters, unselectedClusters, err := selector(adapter.ObjectMeta(obj), clusterselector.SendToCluster, clusters) + if err != nil { + return statusError + } + + operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj) if err != nil { return statusError } @@ -456,18 +463,33 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusNeedsRecheck } -type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) -type clusterSelectorFunc func(map[string]string, map[string]string) (bool, error) - -// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters -func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc, selector clusterSelectorFunc) ([]util.FederatedOperation, error) { - // The data should not be modified. - desiredObj := adapter.Copy(obj) - objMeta := adapter.ObjectMeta(desiredObj) - kind := adapter.Kind() - operations := make([]util.FederatedOperation, 0) +// selectedClusters filters the provided clusters into two slices, one containing the clusters selected by selector and the other containing the rest of the provided clusters. +func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) { + selectedClusters := []*federationapi.Cluster{} + unselectedClusters := []*federationapi.Cluster{} for _, cluster := range clusters { + send, err := selector(cluster.Labels, objMeta.Annotations) + if err != nil { + return nil, nil, err + } else if !send { + unselectedClusters = append(unselectedClusters, cluster) + } else { + selectedClusters = append(selectedClusters, cluster) + } + } + return selectedClusters, unselectedClusters, nil +} + +type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) + +// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { + // The data should not be modified. + desiredObj := adapter.Copy(obj) + operations := make([]util.FederatedOperation, 0) + + for _, cluster := range selectedClusters { clusterObj, found, err := accessor(cluster.Name) if err != nil { wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) @@ -475,24 +497,13 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []* return nil, wrappedErr } - send, err := selector(cluster.Labels, objMeta.Annotations) - if err != nil { - glog.Errorf("Error processing ClusterSelector cluster: %s for %s map: %s error: %s", cluster.Name, kind, key, err.Error()) - return nil, err - } else if !send { - glog.V(5).Infof("Skipping cluster: %s for %s: %s reason: cluster selectors do not match: %-v %-v", cluster.Name, kind, key, cluster.ObjectMeta.Labels, objMeta.Annotations[federationapi.FederationClusterSelectorAnnotation]) - } - var operationType util.FederatedOperationType = "" - switch { - case found && send: + if found { clusterObj := clusterObj.(pkgruntime.Object) if !adapter.Equivalent(desiredObj, clusterObj) { operationType = util.OperationTypeUpdate } - case found && !send: - operationType = util.OperationTypeDelete - case !found && send: + } else { operationType = util.OperationTypeAdd } @@ -505,5 +516,23 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []* }) } } + + for _, cluster := range unselectedClusters { + _, found, err := accessor(cluster.Name) + if err != nil { + wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + runtime.HandleError(wrappedErr) + return nil, wrappedErr + } + if found { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeDelete, + Obj: desiredObj, + ClusterName: cluster.Name, + Key: key, + }) + } + } + return operations, nil } diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go index 10023a4eb38..667e8471095 100644 --- a/federation/pkg/federation-controller/sync/controller_test.go +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -20,6 +20,7 @@ import ( "errors" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" "k8s.io/kubernetes/federation/pkg/federatedtypes" @@ -74,12 +75,15 @@ func TestSyncToClusters(t *testing.T) { } return nil, nil }, - func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { + func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { if testCase.operationsError { return nil, awfulError } return testCase.operations, nil }, + func(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) { + return clusters, []*federationapi.Cluster{}, nil + }, func([]util.FederatedOperation) error { if testCase.executionError { return awfulError @@ -94,6 +98,67 @@ func TestSyncToClusters(t *testing.T) { } } +func TestSelectedClusters(t *testing.T) { + clusterOne := fedtest.NewCluster("cluster1", apiv1.ConditionTrue) + clusterOne.Labels = map[string]string{"name": "cluster1"} + clusterTwo := fedtest.NewCluster("cluster2", apiv1.ConditionTrue) + clusterTwo.Labels = map[string]string{"name": "cluster2"} + + clusters := []*federationapi.Cluster{clusterOne, clusterTwo} + testCases := map[string]struct { + expectedSelectorError bool + clusterOneSelected bool + clusterTwoSelected bool + expectedSelectedClusters []*federationapi.Cluster + expectedUnselectedClusters []*federationapi.Cluster + }{ + "Selector returned error": { + expectedSelectorError: true, + }, + "All clusters selected": { + clusterOneSelected: true, + clusterTwoSelected: true, + expectedSelectedClusters: clusters, + expectedUnselectedClusters: []*federationapi.Cluster{}, + }, + "One cluster selected": { + clusterOneSelected: true, + expectedSelectedClusters: []*federationapi.Cluster{clusterOne}, + expectedUnselectedClusters: []*federationapi.Cluster{clusterTwo}, + }, + "No clusters selected": { + expectedSelectedClusters: []*federationapi.Cluster{}, + expectedUnselectedClusters: clusters, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + selectedClusters, unselectedClusters, err := selectedClusters(&metav1.ObjectMeta{}, func(labels map[string]string, annotations map[string]string) (bool, error) { + if testCase.expectedSelectorError { + return false, awfulError + } + if labels["name"] == "cluster1" { + return testCase.clusterOneSelected, nil + } + if labels["name"] == "cluster2" { + return testCase.clusterTwoSelected, nil + } + t.Errorf("Unexpected cluster") + return false, nil + }, clusters) + + if testCase.expectedSelectorError { + require.Error(t, err, "An error was expected") + } else { + require.NoError(t, err, "An error was not expected") + } + require.Equal(t, testCase.expectedSelectedClusters, selectedClusters, "Expected the correct clusters to be selected.") + require.Equal(t, testCase.expectedUnselectedClusters, unselectedClusters, "Expected the correct clusters to be unselected.") + }) + } +} + func TestClusterOperations(t *testing.T) { adapter := &federatedtypes.SecretAdapter{} obj := adapter.NewTestObject("foo") @@ -101,19 +166,15 @@ func TestClusterOperations(t *testing.T) { federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar") testCases := map[string]struct { - clusterObject pkgruntime.Object - expectedErr bool - expectedSendErr bool - sendToCluster bool + clusterObject pkgruntime.Object + expectedErr bool + sendToCluster bool operationType util.FederatedOperationType }{ "Accessor error returned": { expectedErr: true, }, - "sendToCluster error returned": { - expectedSendErr: true, - }, "Missing cluster object should result in add operation": { operationType: util.OperationTypeAdd, sendToCluster: true, @@ -138,18 +199,21 @@ func TestClusterOperations(t *testing.T) { clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} key := federatedtypes.ObjectKey(adapter, obj) - operations, err := clusterOperations(adapter, clusters, obj, key, func(string) (interface{}, bool, error) { + var selectedClusters, unselectedClusters []*federationapi.Cluster + if testCase.sendToCluster { + selectedClusters = clusters + unselectedClusters = []*federationapi.Cluster{} + } else { + selectedClusters = []*federationapi.Cluster{} + unselectedClusters = clusters + } + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(string) (interface{}, bool, error) { if testCase.expectedErr { return nil, false, awfulError } return testCase.clusterObject, (testCase.clusterObject != nil), nil - }, func(map[string]string, map[string]string) (bool, error) { - if testCase.expectedSendErr { - return false, awfulError - } - return testCase.sendToCluster, nil }) - if testCase.expectedErr || testCase.expectedSendErr { + if testCase.expectedErr { require.Error(t, err, "An error was expected") } else { require.NoError(t, err, "An error was not expected")