Merge pull request #45374 from marun/fed-refactor-sync-controller

Automatic merge from submit-queue (batch tested with PRs 45374, 44537, 45739, 44474, 45888)

[Federation] Refactor sync controller's reconcile method for maintainability

This PR refactors the sync controllers reconcile method for maintainability with the goal of eliminating the need for type-specific controller unit tests.  The unit test coverage for reconcile is not complete, but I think it's a good start.

cc: @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-05-16 18:10:53 -07:00 committed by GitHub
commit 0e73596141
5 changed files with 293 additions and 93 deletions

View File

@ -61,3 +61,17 @@ type FederatedTypeAdapter interface {
// be registered with RegisterAdapterFactory to ensure the type
// adapter is discoverable.
type AdapterFactory func(client federationclientset.Interface) FederatedTypeAdapter
// SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map
func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) {
meta := adapter.ObjectMeta(obj)
if meta.Annotations == nil {
meta.Annotations = make(map[string]string)
}
meta.Annotations[key] = value
}
// ObjectKey returns a cluster-unique key for the given object
func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String()
}

View File

@ -193,11 +193,7 @@ func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, ob
func (c *FederatedTypeCRUDTester) updateFedObject(obj pkgruntime.Object) (pkgruntime.Object, error) {
err := wait.PollImmediate(c.waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
// Target the metadata for simplicity (it's type-agnostic)
meta := c.adapter.ObjectMeta(obj)
if meta.Annotations == nil {
meta.Annotations = make(map[string]string)
}
meta.Annotations[AnnotationTestFederationCRUDUpdate] = "updated"
federatedtypes.SetAnnotation(c.adapter, obj, AnnotationTestFederationCRUDUpdate, "updated")
_, err := c.adapter.FedUpdate(obj)
if errors.IsConflict(err) {

View File

@ -27,6 +27,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
@ -55,6 +56,7 @@ go_test(
name = "go_default_test",
srcs = [
"configmap_controller_test.go",
"controller_test.go",
"daemonset_controller_test.go",
"secret_controller_test.go",
],
@ -73,6 +75,7 @@ go_test(
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -18,13 +18,13 @@ package sync
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
@ -217,11 +217,6 @@ func (s *FederationSyncController) updateObject(obj pkgruntime.Object) (pkgrunti
func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go s.controller.Run(stopChan)
s.informer.Start()
go func() {
<-stopChan
s.informer.Stop()
s.workQueue.ShutDown()
}()
s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
s.workQueue.Add(item)
})
@ -233,6 +228,15 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go wait.Until(s.worker, time.Second, stopChan)
util.StartBackoffGC(s.backoff, stopChan)
// Ensure all goroutines are cleaned up when the stop channel closes
go func() {
<-stopChan
s.informer.Stop()
s.workQueue.ShutDown()
s.deliverer.Stop()
s.clusterDeliverer.Stop()
}()
}
type reconciliationStatus int
@ -264,7 +268,7 @@ func (s *FederationSyncController) worker() {
case statusNeedsRecheck:
s.deliver(*namespacedName, s.reviewDelay, false)
case statusNotSynced:
s.deliver(*namespacedName, s.reviewDelay, false)
s.deliver(*namespacedName, s.clusterAvailableDelay, false)
}
}
}
@ -295,7 +299,7 @@ func (s *FederationSyncController) isSynced() bool {
}
clusters, err := s.informer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
runtime.HandleError(fmt.Errorf("Failed to get ready clusters: %v", err))
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
@ -320,113 +324,80 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusNotSynced
}
key := namespacedName.String()
kind := s.adapter.Kind()
cachedObj, exist, err := s.store.GetByKey(key)
key := namespacedName.String()
obj, err := s.objFromCache(kind, key)
if err != nil {
glog.Errorf("failed to query main %s store for %v: %v", kind, key, err)
return statusError
}
if !exist {
// Not federated, ignoring.
if obj == nil {
return statusAllOK
}
// Create a copy before modifying the resource to prevent racing
// with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
glog.Errorf("error in retrieving %s from store: %v", kind, err)
return statusError
}
if !s.adapter.IsExpectedType(copiedObj) {
glog.Errorf("object is not the expected type: %v", copiedObj)
return statusError
}
obj := copiedObj.(pkgruntime.Object)
meta := s.adapter.ObjectMeta(obj)
if meta.DeletionTimestamp != nil {
if err := s.delete(obj, namespacedName); err != nil {
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed",
"%s delete failed: %v", strings.ToTitle(kind), err)
glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err)
err := s.delete(obj, kind, namespacedName)
if err != nil {
msg := "Failed to delete %s %q: %v"
args := []interface{}{kind, namespacedName, err}
runtime.HandleError(fmt.Errorf(msg, args...))
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...)
return statusError
}
return statusAllOK
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s",
kind, namespacedName)
// Add the required finalizers before creating the resource in underlying clusters.
glog.V(3).Infof("Ensuring finalizers exist on %s %q", kind, key)
obj, err = s.deletionHelper.EnsureFinalizers(obj)
if err != nil {
glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v",
kind, namespacedName, err)
runtime.HandleError(fmt.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err))
return statusError
}
glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName)
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) {
return clusterOperations(adapter, clusters, obj, func(clusterName string) (interface{}, bool, error) {
return s.informer.GetTargetStore().GetByKey(clusterName, key)
})
}
return syncToClusters(
s.informer.GetReadyClusters,
operationsAccessor,
s.updater.Update,
s.adapter,
obj,
)
}
clusters, err := s.informer.GetReadyClusters()
func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) {
cachedObj, exist, err := s.store.GetByKey(key)
if err != nil {
glog.Errorf("failed to get cluster list: %v", err)
return statusNotSynced
wrappedErr := fmt.Errorf("Failed to query %s store for %q: %v", kind, key, err)
runtime.HandleError(wrappedErr)
return nil, err
}
if !exist {
return nil, nil
}
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err)
return statusError
}
// The data should not be modified.
desiredObj := s.adapter.Copy(obj)
if !found {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
} else {
clusterObj := clusterObj.(pkgruntime.Object)
// Update existing resource, if needed.
if !s.adapter.Equivalent(desiredObj, clusterObj) {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
}
if len(operations) == 0 {
// Everything is in order
return statusAllOK
}
err = s.updater.Update(operations)
// Create a copy before modifying the resource to prevent racing with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
glog.Errorf("failed to execute updates for %s: %v", key, err)
return statusError
wrappedErr := fmt.Errorf("Error in retrieving %s %q from store: %v", kind, key, err)
runtime.HandleError(wrappedErr)
return nil, err
}
// Evertyhing is in order but let's be double sure
return statusNeedsRecheck
if !s.adapter.IsExpectedType(copiedObj) {
err = fmt.Errorf("Object is not the expected type: %v", copiedObj)
runtime.HandleError(err)
return nil, err
}
return copiedObj.(pkgruntime.Object), nil
}
// delete deletes the given resource or returns error if the deletion was not complete.
func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error {
kind := s.adapter.Kind()
glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName)
func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error {
glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName)
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj)
if err != nil {
return err
@ -438,8 +409,80 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName
// This is expected when we are processing an update as a result of finalizer deletion.
// The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete %s: %v", kind, err)
return err
}
}
return nil
}
type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error)
type executionFunc func([]util.FederatedOperation) error
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus {
kind := adapter.Kind()
key := federatedtypes.ObjectKey(adapter, obj)
glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key)
clusters, err := clustersAccessor()
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to get cluster list: %v", err))
return statusNotSynced
}
operations, err := operationsAccessor(adapter, clusters, obj)
if err != nil {
return statusError
}
if len(operations) == 0 {
return statusAllOK
}
err = execute(operations)
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s %q: %v", kind, key, err))
return statusError
}
// Evertyhing is in order but let's be double sure
return statusNeedsRecheck
}
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
key := federatedtypes.ObjectKey(adapter, obj)
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterObj, found, err := accessor(cluster.Name)
if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
runtime.HandleError(wrappedErr)
return nil, wrappedErr
}
// The data should not be modified.
desiredObj := adapter.Copy(obj)
var operationType util.FederatedOperationType = ""
if found {
clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate
}
} else {
operationType = util.OperationTypeAdd
}
if len(operationType) > 0 {
operations = append(operations, util.FederatedOperation{
Type: operationType,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
return operations, nil
}

View File

@ -0,0 +1,144 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"errors"
"testing"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
fedtest "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"github.com/stretchr/testify/require"
)
var awfulError error = errors.New("Something bad happened")
func TestSyncToClusters(t *testing.T) {
adapter := &federatedtypes.SecretAdapter{}
obj := adapter.NewTestObject("foo")
testCases := map[string]struct {
clusterError bool
operationsError bool
executionError bool
operations []util.FederatedOperation
status reconciliationStatus
}{
"Error listing clusters redelivers with cluster delay": {
clusterError: true,
status: statusNotSynced,
},
"Error retrieving cluster operations redelivers": {
operationsError: true,
status: statusError,
},
"No operations returns ok": {
status: statusAllOK,
},
"Execution error redelivers": {
executionError: true,
operations: []util.FederatedOperation{{}},
status: statusError,
},
"Successful update indicates recheck": {
operations: []util.FederatedOperation{{}},
status: statusNeedsRecheck,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
status := syncToClusters(
func() ([]*federationapi.Cluster, error) {
if testCase.clusterError {
return nil, awfulError
}
return nil, nil
},
func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) {
if testCase.operationsError {
return nil, awfulError
}
return testCase.operations, nil
},
func([]util.FederatedOperation) error {
if testCase.executionError {
return awfulError
}
return nil
},
adapter,
obj,
)
require.Equal(t, testCase.status, status, "Unexpected status!")
})
}
}
func TestClusterOperations(t *testing.T) {
adapter := &federatedtypes.SecretAdapter{}
obj := adapter.NewTestObject("foo")
differingObj := adapter.Copy(obj)
federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar")
testCases := map[string]struct {
clusterObject pkgruntime.Object
expectedErr bool
operationType util.FederatedOperationType
}{
"Accessor error returned": {
expectedErr: true,
},
"Missing cluster object should result in add operation": {
operationType: util.OperationTypeAdd,
},
"Differing cluster object should result in update operation": {
clusterObject: differingObj,
operationType: util.OperationTypeUpdate,
},
"Matching cluster object should not result in an operation": {
clusterObject: obj,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)}
operations, err := clusterOperations(adapter, clusters, obj, func(string) (interface{}, bool, error) {
if testCase.expectedErr {
return nil, false, awfulError
}
return testCase.clusterObject, (testCase.clusterObject != nil), nil
})
if testCase.expectedErr {
require.Error(t, err, "An error was expected")
} else {
require.NoError(t, err, "An error was not expected")
}
if len(testCase.operationType) == 0 {
require.True(t, len(operations) == 0, "An operation was not expected")
} else {
require.True(t, len(operations) == 1, "A single operation was expected")
require.Equal(t, testCase.operationType, operations[0].Type, "Unexpected operation returned")
}
})
}
}