mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 04:52:08 +00:00
fed: Further refactor of sync controller reconcile
This change breaks out non-delete cluster updates into a method for testability.
This commit is contained in:
parent
547ece5b83
commit
23b2cee8de
@ -70,3 +70,8 @@ func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, val
|
|||||||
}
|
}
|
||||||
meta.Annotations[key] = value
|
meta.Annotations[key] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ObjectKey returns a cluster-unique key for the given object
|
||||||
|
func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string {
|
||||||
|
return adapter.NamespacedName(obj).String()
|
||||||
|
}
|
||||||
|
@ -348,40 +348,25 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
|
|||||||
return statusAllOK
|
return statusAllOK
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Ensuring finalizers for %s %q", kind, key)
|
glog.V(3).Infof("Ensuring finalizers exist on %s %q", kind, key)
|
||||||
obj, err = s.deletionHelper.EnsureFinalizers(obj)
|
obj, err = s.deletionHelper.EnsureFinalizers(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err)
|
glog.Errorf("Failed to ensure finalizers for %s %q: %v", kind, key, err)
|
||||||
return statusError
|
return statusError
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key)
|
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) {
|
||||||
|
return clusterOperations(adapter, clusters, obj, func(clusterName string) (interface{}, bool, error) {
|
||||||
clusters, err := s.informer.GetReadyClusters()
|
return s.informer.GetTargetStore().GetByKey(clusterName, key)
|
||||||
if err != nil {
|
})
|
||||||
glog.Errorf("Failed to get cluster list: %v", err)
|
|
||||||
return statusNotSynced
|
|
||||||
}
|
}
|
||||||
|
return syncToClusters(
|
||||||
operations, err := clusterOperations(s.adapter, clusters, obj, key, func(clusterName string) (interface{}, bool, error) {
|
s.informer.GetReadyClusters,
|
||||||
return s.informer.GetTargetStore().GetByKey(clusterName, key)
|
operationsAccessor,
|
||||||
})
|
s.updater.Update,
|
||||||
if err != nil {
|
s.adapter,
|
||||||
glog.Error(err)
|
obj,
|
||||||
return statusError
|
)
|
||||||
}
|
|
||||||
if len(operations) == 0 {
|
|
||||||
return statusAllOK
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.updater.Update(operations)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err)
|
|
||||||
return statusError
|
|
||||||
}
|
|
||||||
|
|
||||||
// Evertyhing is in order but let's be double sure
|
|
||||||
return statusNeedsRecheck
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) {
|
func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Object, error) {
|
||||||
@ -424,10 +409,47 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type clusterAccessor func(clusterName string) (interface{}, bool, error)
|
type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
|
||||||
|
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error)
|
||||||
|
type executionFunc func([]util.FederatedOperation) error
|
||||||
|
|
||||||
|
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
|
||||||
|
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus {
|
||||||
|
kind := adapter.Kind()
|
||||||
|
key := federatedtypes.ObjectKey(adapter, obj)
|
||||||
|
|
||||||
|
glog.V(3).Infof("Syncing %s %q in underlying clusters", kind, key)
|
||||||
|
|
||||||
|
clusters, err := clustersAccessor()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get cluster list: %v", err)
|
||||||
|
return statusNotSynced
|
||||||
|
}
|
||||||
|
|
||||||
|
operations, err := operationsAccessor(adapter, clusters, obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(err)
|
||||||
|
return statusError
|
||||||
|
}
|
||||||
|
if len(operations) == 0 {
|
||||||
|
return statusAllOK
|
||||||
|
}
|
||||||
|
|
||||||
|
err = execute(operations)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to execute updates for %s %q: %v", kind, key, err)
|
||||||
|
return statusError
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evertyhing is in order but let's be double sure
|
||||||
|
return statusNeedsRecheck
|
||||||
|
}
|
||||||
|
|
||||||
|
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
|
||||||
|
|
||||||
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
|
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
|
||||||
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterAccessor) ([]util.FederatedOperation, error) {
|
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, clusters []*federationapi.Cluster, obj pkgruntime.Object, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
|
||||||
|
key := federatedtypes.ObjectKey(adapter, obj)
|
||||||
operations := make([]util.FederatedOperation, 0)
|
operations := make([]util.FederatedOperation, 0)
|
||||||
for _, cluster := range clusters {
|
for _, cluster := range clusters {
|
||||||
clusterObj, found, err := accessor(cluster.Name)
|
clusterObj, found, err := accessor(cluster.Name)
|
||||||
|
@ -17,7 +17,7 @@ limitations under the License.
|
|||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -30,6 +30,70 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var awfulError error = errors.New("Something bad happened")
|
||||||
|
|
||||||
|
func TestSyncToClusters(t *testing.T) {
|
||||||
|
adapter := &federatedtypes.SecretAdapter{}
|
||||||
|
obj := adapter.NewTestObject("foo")
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
clusterError bool
|
||||||
|
operationsError bool
|
||||||
|
executionError bool
|
||||||
|
operations []util.FederatedOperation
|
||||||
|
status reconciliationStatus
|
||||||
|
}{
|
||||||
|
"Error listing clusters redelivers with cluster delay": {
|
||||||
|
clusterError: true,
|
||||||
|
status: statusNotSynced,
|
||||||
|
},
|
||||||
|
"Error retrieving cluster operations redelivers": {
|
||||||
|
operationsError: true,
|
||||||
|
status: statusError,
|
||||||
|
},
|
||||||
|
"No operations returns ok": {
|
||||||
|
status: statusAllOK,
|
||||||
|
},
|
||||||
|
"Execution error redelivers": {
|
||||||
|
executionError: true,
|
||||||
|
operations: []util.FederatedOperation{{}},
|
||||||
|
status: statusError,
|
||||||
|
},
|
||||||
|
"Successful update indicates recheck": {
|
||||||
|
operations: []util.FederatedOperation{{}},
|
||||||
|
status: statusNeedsRecheck,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, testCase := range testCases {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
status := syncToClusters(
|
||||||
|
func() ([]*federationapi.Cluster, error) {
|
||||||
|
if testCase.clusterError {
|
||||||
|
return nil, awfulError
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) {
|
||||||
|
if testCase.operationsError {
|
||||||
|
return nil, awfulError
|
||||||
|
}
|
||||||
|
return testCase.operations, nil
|
||||||
|
},
|
||||||
|
func([]util.FederatedOperation) error {
|
||||||
|
if testCase.executionError {
|
||||||
|
return awfulError
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
adapter,
|
||||||
|
obj,
|
||||||
|
)
|
||||||
|
require.Equal(t, testCase.status, status, "Unexpected status!")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClusterOperations(t *testing.T) {
|
func TestClusterOperations(t *testing.T) {
|
||||||
adapter := &federatedtypes.SecretAdapter{}
|
adapter := &federatedtypes.SecretAdapter{}
|
||||||
obj := adapter.NewTestObject("foo")
|
obj := adapter.NewTestObject("foo")
|
||||||
@ -58,9 +122,9 @@ func TestClusterOperations(t *testing.T) {
|
|||||||
for testName, testCase := range testCases {
|
for testName, testCase := range testCases {
|
||||||
t.Run(testName, func(t *testing.T) {
|
t.Run(testName, func(t *testing.T) {
|
||||||
clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)}
|
clusters := []*federationapi.Cluster{fedtest.NewCluster("cluster1", apiv1.ConditionTrue)}
|
||||||
operations, err := clusterOperations(adapter, clusters, obj, "key", func(string) (interface{}, bool, error) {
|
operations, err := clusterOperations(adapter, clusters, obj, func(string) (interface{}, bool, error) {
|
||||||
if testCase.expectedErr {
|
if testCase.expectedErr {
|
||||||
return nil, false, fmt.Errorf("Not found!")
|
return nil, false, awfulError
|
||||||
}
|
}
|
||||||
return testCase.clusterObject, (testCase.clusterObject != nil), nil
|
return testCase.clusterObject, (testCase.clusterObject != nil), nil
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user