fed: Refactor sync controller's reconcile method

This commit is contained in:
Maru Newby 2017-05-11 11:33:36 -07:00
parent cf71a8ef11
commit 547ece5b83
3 changed files with 171 additions and 75 deletions

View File

@ -55,6 +55,7 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"configmap_controller_test.go", "configmap_controller_test.go",
"controller_test.go",
"daemonset_controller_test.go", "daemonset_controller_test.go",
"secret_controller_test.go", "secret_controller_test.go",
], ],
@ -73,6 +74,7 @@ go_test(
"//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -18,7 +18,6 @@ package sync
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
@ -217,11 +216,6 @@ func (s *FederationSyncController) updateObject(obj pkgruntime.Object) (pkgrunti
func (s *FederationSyncController) Run(stopChan <-chan struct{}) { func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go s.controller.Run(stopChan) go s.controller.Run(stopChan)
s.informer.Start() s.informer.Start()
go func() {
<-stopChan
s.informer.Stop()
s.workQueue.ShutDown()
}()
s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
s.workQueue.Add(item) s.workQueue.Add(item)
}) })
@ -233,6 +227,15 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go wait.Until(s.worker, time.Second, stopChan) go wait.Until(s.worker, time.Second, stopChan)
util.StartBackoffGC(s.backoff, stopChan) util.StartBackoffGC(s.backoff, stopChan)
// Ensure all goroutines are cleaned up when the stop channel closes
go func() {
<-stopChan
s.informer.Stop()
s.workQueue.ShutDown()
s.deliverer.Stop()
s.clusterDeliverer.Stop()
}()
} }
type reconciliationStatus int type reconciliationStatus int
@ -264,7 +267,7 @@ func (s *FederationSyncController) worker() {
case statusNeedsRecheck: case statusNeedsRecheck:
s.deliver(*namespacedName, s.reviewDelay, false) s.deliver(*namespacedName, s.reviewDelay, false)
case statusNotSynced: case statusNotSynced:
s.deliver(*namespacedName, s.reviewDelay, false) s.deliver(*namespacedName, s.clusterAvailableDelay, false)
} }
} }
} }
@ -320,102 +323,60 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusNotSynced return statusNotSynced
} }
key := namespacedName.String()
kind := s.adapter.Kind() kind := s.adapter.Kind()
cachedObj, exist, err := s.store.GetByKey(key) key := namespacedName.String()
obj, err := s.objFromCache(kind, key)
if err != nil { if err != nil {
glog.Errorf("failed to query main %s store for %v: %v", kind, key, err) glog.Error(err)
return statusError return statusError
} }
if obj == nil {
if !exist {
// Not federated, ignoring.
return statusAllOK return statusAllOK
} }
// Create a copy before modifying the resource to prevent racing
// with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
glog.Errorf("error in retrieving %s from store: %v", kind, err)
return statusError
}
if !s.adapter.IsExpectedType(copiedObj) {
glog.Errorf("object is not the expected type: %v", copiedObj)
return statusError
}
obj := copiedObj.(pkgruntime.Object)
meta := s.adapter.ObjectMeta(obj) meta := s.adapter.ObjectMeta(obj)
if meta.DeletionTimestamp != nil { if meta.DeletionTimestamp != nil {
if err := s.delete(obj, namespacedName); err != nil { err := s.delete(obj, kind, namespacedName)
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", if err != nil {
"%s delete failed: %v", strings.ToTitle(kind), err) msg := "Failed to delete %s %q: %v"
glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err) args := []interface{}{kind, namespacedName, err}
glog.Errorf(msg, args...)
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...)
return statusError return statusError
} }
return statusAllOK return statusAllOK
} }
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s", glog.V(3).Infof("Ensuring finalizers for %s %q", kind, key)
kind, namespacedName)
// Add the required finalizers before creating the resource in underlying clusters.
obj, err = s.deletionHelper.EnsureFinalizers(obj) obj, err = s.deletionHelper.EnsureFinalizers(obj)
if err != nil { if err != nil {
glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v", glog.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err)
kind, namespacedName, err)
return statusError return statusError
} }
glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName) glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key)
clusters, err := s.informer.GetReadyClusters() clusters, err := s.informer.GetReadyClusters()
if err != nil { if err != nil {
glog.Errorf("failed to get cluster list: %v", err) glog.Errorf("Failed to get cluster list: %v", err)
return statusNotSynced return statusNotSynced
} }
operations := make([]util.FederatedOperation, 0) operations, err := clusterOperations(s.adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) {
for _, cluster := range clusters { return s.informer.GetTargetStore().GetByKey(clusterName, key)
clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key) })
if err != nil { if err != nil {
glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err) glog.Error(err)
return statusError return statusError
} }
// The data should not be modified.
desiredObj := s.adapter.Copy(obj)
if !found {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
} else {
clusterObj := clusterObj.(pkgruntime.Object)
// Update existing resource, if needed.
if !s.adapter.Equivalent(desiredObj, clusterObj) {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
}
if len(operations) == 0 { if len(operations) == 0 {
// Everything is in order
return statusAllOK return statusAllOK
} }
err = s.updater.Update(operations) err = s.updater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("failed to execute updates for %s: %v", key, err) glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err)
return statusError return statusError
} }
@ -423,10 +384,29 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusNeedsRecheck return statusNeedsRecheck
} }
func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) {
cachedObj, exist, err := s.store.GetByKey(key)
if err != nil {
return nil, fmt.Errorf("Failed to query %s store for %q: %v", kind, key, err)
}
if !exist {
return nil, nil
}
// Create a copy before modifying the resource to prevent racing with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
return nil, fmt.Errorf("Error in retrieving %s %q from store: %v", kind, key, err)
}
if !s.adapter.IsExpectedType(copiedObj) {
return nil, fmt.Errorf("Object is not the expected type: %v", copiedObj)
}
return copiedObj.(pkgruntime.Object), nil
}
// delete deletes the given resource or returns error if the deletion was not complete. // delete deletes the given resource or returns error if the deletion was not complete.
func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName types.NamespacedName) error { func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error {
kind := s.adapter.Kind() glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName)
glog.V(3).Infof("Handling deletion of %s: %v", kind, namespacedName)
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj)
if err != nil { if err != nil {
return err return err
@ -438,8 +418,42 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, namespacedName
// This is expected when we are processing an update as a result of finalizer deletion. // This is expected when we are processing an update as a result of finalizer deletion.
// The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything. // The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything.
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete %s: %v", kind, err) return err
} }
} }
return nil return nil
} }
type clusterAccessor func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterAccessor) ([]util.FederatedOperation, error) {
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterObj, found, err := accessor(cluster.Name)
if err != nil {
return nil, fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
}
// The data should not be modified.
desiredObj := adapter.Copy(obj)
var operationType util.FederatedOperationType = ""
if found {
clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate
}
} else {
operationType = util.OperationTypeAdd
}
if len(operationType) > 0 {
operations = append(operations, util.FederatedOperation{
Type: operationType,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
return operations, nil
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"fmt"
"testing"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
fedtest "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"github.com/stretchr/testify/require"
)
func TestClusterOperations(t *testing.T) {
adapter := &federatedtypes.SecretAdapter{}
obj := adapter.NewTestObject("foo")
differingObj := adapter.Copy(obj)
federatedtypes.SetAnnotation(adapter, differingObj, "foo", "bar")
testCases := map[string]struct {
clusterObject pkgruntime.Object
expectedErr bool
operationType util.FederatedOperationType
}{
"Accessor error returned": {
expectedErr: true,
},
"Missing cluster object should result in add operation": {
operationType: util.OperationTypeAdd,
},
"Differing cluster object should result in update operation": {
clusterObject: differingObj,
operationType: util.OperationTypeUpdate,
},
"Matching cluster object should not result in an operation": {
clusterObject: obj,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)}
operations, err := clusterOperations(adapter, clusters, obj, "key", func(string) (interface{}, bool, error) {
if testCase.expectedErr {
return nil, false, fmt.Errorf("Not found!")
}
return testCase.clusterObject, (testCase.clusterObject != nil), nil
})
if testCase.expectedErr {
require.Error(t, err, "An error was expected")
} else {
require.NoError(t, err, "An error was not expected")
}
if len(testCase.operationType) == 0 {
require.True(t, len(operations) == 0, "An operation was not expected")
} else {
require.True(t, len(operations) == 1, "A single operation was expected")
require.Equal(t, testCase.operationType, operations[0].Type, "Unexpected operation returned")
}
})
}
}