Merge pull request #46252 from perotinus/cs

Automatic merge from submit-queue (batch tested with PRs 46252, 45524, 46236, 46277, 46522)

[Federation] Refactor the cluster selection logic in the sync controller

This is intended to make it easier to define the interaction between cluster selection and scheduling preferences in the sync controller when used for workload types.

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-26 19:47:56 -07:00 committed by GitHub
commit f8c90e04cd
3 changed files with 138 additions and 44 deletions

View File

@ -52,6 +52,7 @@ go_test(
"//federation/pkg/federation-controller/util/test:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
], ],
) )

View File

@ -356,10 +356,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusError return statusError
} }
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) {
operations, err := clusterOperations(adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) { operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) {
return s.informer.GetTargetStore().GetByKey(clusterName, key) return s.informer.GetTargetStore().GetByKey(clusterName, key)
}, clusterselector.SendToCluster) })
if err != nil { if err != nil {
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "FedClusterOperationsError", "Error obtaining sync operations for %s: %s error: %s", kind, key, err.Error()) s.eventRecorder.Eventf(obj, api.EventTypeWarning, "FedClusterOperationsError", "Error obtaining sync operations for %s: %s error: %s", kind, key, err.Error())
} }
@ -369,6 +369,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return syncToClusters( return syncToClusters(
s.informer.GetReadyClusters, s.informer.GetReadyClusters,
operationsAccessor, operationsAccessor,
selectedClusters,
s.updater.Update, s.updater.Update,
s.adapter, s.adapter,
obj, obj,
@ -422,11 +423,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na
} }
type clustersAccessorFunc func() ([]*federationapi.Cluster, error) type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error)
type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error)
type executionFunc func([]util.FederatedOperation) error type executionFunc func([]util.FederatedOperation) error
// syncToClusters ensures that the state of the given object is synchronized to member clusters. // syncToClusters ensures that the state of the given object is synchronized to member clusters.
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus {
kind := adapter.Kind() kind := adapter.Kind()
key := federatedtypes.ObjectKey(adapter, obj) key := federatedtypes.ObjectKey(adapter, obj)
@ -438,7 +440,12 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusNotSynced return statusNotSynced
} }
operations, err := operationsAccessor(adapter, clusters, obj) selectedClusters, unselectedClusters, err := selector(adapter.ObjectMeta(obj), clusterselector.SendToCluster, clusters)
if err != nil {
return statusError
}
operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj)
if err != nil { if err != nil {
return statusError return statusError
} }
@ -456,18 +463,33 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusNeedsRecheck return statusNeedsRecheck
} }
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) // selectedClusters filters the provided clusters into two slices, one containing the clusters selected by selector and the other containing the rest of the provided clusters.
type clusterSelectorFunc func(map[string]string, map[string]string) (bool, error) func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) {
selectedClusters := []*federationapi.Cluster{}
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters unselectedClusters := []*federationapi.Cluster{}
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc, selector clusterSelectorFunc) ([]util.FederatedOperation, error) {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
objMeta := adapter.ObjectMeta(desiredObj)
kind := adapter.Kind()
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters { for _, cluster := range clusters {
send, err := selector(cluster.Labels, objMeta.Annotations)
if err != nil {
return nil, nil, err
} else if !send {
unselectedClusters = append(unselectedClusters, cluster)
} else {
selectedClusters = append(selectedClusters, cluster)
}
}
return selectedClusters, unselectedClusters, nil
}
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
operations := make([]util.FederatedOperation, 0)
for _, cluster := range selectedClusters {
clusterObj, found, err := accessor(cluster.Name) clusterObj, found, err := accessor(cluster.Name)
if err != nil { if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
@ -475,24 +497,13 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*
return nil, wrappedErr return nil, wrappedErr
} }
send, err := selector(cluster.Labels, objMeta.Annotations)
if err != nil {
glog.Errorf("Error processing ClusterSelector cluster: %s for %s map: %s error: %s", cluster.Name, kind, key, err.Error())
return nil, err
} else if !send {
glog.V(5).Infof("Skipping cluster: %s for %s: %s reason: cluster selectors do not match: %-v %-v", cluster.Name, kind, key, cluster.ObjectMeta.Labels, objMeta.Annotations[federationapi.FederationClusterSelectorAnnotation])
}
var operationType util.FederatedOperationType = "" var operationType util.FederatedOperationType = ""
switch { if found {
case found && send:
clusterObj := clusterObj.(pkgruntime.Object) clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) { if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate operationType = util.OperationTypeUpdate
} }
case found && !send: } else {
operationType = util.OperationTypeDelete
case !found && send:
operationType = util.OperationTypeAdd operationType = util.OperationTypeAdd
} }
@ -505,5 +516,23 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*
}) })
} }
} }
for _, cluster := range unselectedClusters {
_, found, err := accessor(cluster.Name)
if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
runtime.HandleError(wrappedErr)
return nil, wrappedErr
}
if found {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeDelete,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
return operations, nil return operations, nil
} }

View File

@ -20,6 +20,7 @@ import (
"errors" "errors"
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
"k8s.io/kubernetes/federation/pkg/federatedtypes" "k8s.io/kubernetes/federation/pkg/federatedtypes"
@ -74,12 +75,15 @@ func TestSyncToClusters(t *testing.T) {
} }
return nil, nil return nil, nil
}, },
func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) {
if testCase.operationsError { if testCase.operationsError {
return nil, awfulError return nil, awfulError
} }
return testCase.operations, nil return testCase.operations, nil
}, },
func(objMeta *metav1.ObjectMeta, selector func(map[string]string, map[string]string) (bool, error), clusters []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) {
return clusters, []*federationapi.Cluster{}, nil
},
func([]util.FederatedOperation) error { func([]util.FederatedOperation) error {
if testCase.executionError { if testCase.executionError {
return awfulError return awfulError
@ -94,6 +98,67 @@ func TestSyncToClusters(t *testing.T) {
} }
} }
func TestSelectedClusters(t *testing.T) {
clusterOne := fedtest.NewCluster("cluster1", apiv1.ConditionTrue)
clusterOne.Labels = map[string]string{"name": "cluster1"}
clusterTwo := fedtest.NewCluster("cluster2", apiv1.ConditionTrue)
clusterTwo.Labels = map[string]string{"name": "cluster2"}
clusters := []*federationapi.Cluster{clusterOne, clusterTwo}
testCases := map[string]struct {
expectedSelectorError bool
clusterOneSelected bool
clusterTwoSelected bool
expectedSelectedClusters []*federationapi.Cluster
expectedUnselectedClusters []*federationapi.Cluster
}{
"Selector returned error": {
expectedSelectorError: true,
},
"All clusters selected": {
clusterOneSelected: true,
clusterTwoSelected: true,
expectedSelectedClusters: clusters,
expectedUnselectedClusters: []*federationapi.Cluster{},
},
"One cluster selected": {
clusterOneSelected: true,
expectedSelectedClusters: []*federationapi.Cluster{clusterOne},
expectedUnselectedClusters: []*federationapi.Cluster{clusterTwo},
},
"No clusters selected": {
expectedSelectedClusters: []*federationapi.Cluster{},
expectedUnselectedClusters: clusters,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
selectedClusters, unselectedClusters, err := selectedClusters(&metav1.ObjectMeta{}, func(labels map[string]string, annotations map[string]string) (bool, error) {
if testCase.expectedSelectorError {
return false, awfulError
}
if labels["name"] == "cluster1" {
return testCase.clusterOneSelected, nil
}
if labels["name"] == "cluster2" {
return testCase.clusterTwoSelected, nil
}
t.Errorf("Unexpected cluster")
return false, nil
}, clusters)
if testCase.expectedSelectorError {
require.Error(t, err, "An error was expected")
} else {
require.NoError(t, err, "An error was not expected")
}
require.Equal(t, testCase.expectedSelectedClusters, selectedClusters, "Expected the correct clusters to be selected.")
require.Equal(t, testCase.expectedUnselectedClusters, unselectedClusters, "Expected the correct clusters to be unselected.")
})
}
}
func TestClusterOperations(t *testing.T) { func TestClusterOperations(t *testing.T) {
adapter := &federatedtypes.SecretAdapter{} adapter := &federatedtypes.SecretAdapter{}
obj := adapter.NewTestObject("foo") obj := adapter.NewTestObject("foo")
@ -103,7 +168,6 @@ func TestClusterOperations(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
clusterObject pkgruntime.Object clusterObject pkgruntime.Object
expectedErr bool expectedErr bool
expectedSendErr bool
sendToCluster bool sendToCluster bool
operationType util.FederatedOperationType operationType util.FederatedOperationType
@ -111,9 +175,6 @@ func TestClusterOperations(t *testing.T) {
"Accessor error returned": { "Accessor error returned": {
expectedErr: true, expectedErr: true,
}, },
"sendToCluster error returned": {
expectedSendErr: true,
},
"Missing cluster object should result in add operation": { "Missing cluster object should result in add operation": {
operationType: util.OperationTypeAdd, operationType: util.OperationTypeAdd,
sendToCluster: true, sendToCluster: true,
@ -138,18 +199,21 @@ func TestClusterOperations(t *testing.T) {
clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)} clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)}
key := federatedtypes.ObjectKey(adapter, obj) key := federatedtypes.ObjectKey(adapter, obj)
operations, err := clusterOperations(adapter, clusters, obj, key, func(string) (interface{}, bool, error) { var selectedClusters, unselectedClusters []*federationapi.Cluster
if testCase.sendToCluster {
selectedClusters = clusters
unselectedClusters = []*federationapi.Cluster{}
} else {
selectedClusters = []*federationapi.Cluster{}
unselectedClusters = clusters
}
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(string) (interface{}, bool, error) {
if testCase.expectedErr { if testCase.expectedErr {
return nil, false, awfulError return nil, false, awfulError
} }
return testCase.clusterObject, (testCase.clusterObject != nil), nil return testCase.clusterObject, (testCase.clusterObject != nil), nil
}, func(map[string]string, map[string]string) (bool, error) {
if testCase.expectedSendErr {
return false, awfulError
}
return testCase.sendToCluster, nil
}) })
if testCase.expectedErr || testCase.expectedSendErr { if testCase.expectedErr {
require.Error(t, err, "An error was expected") require.Error(t, err, "An error was expected")
} else { } else {
require.NoError(t, err, "An error was not expected") require.NoError(t, err, "An error was not expected")