Merge pull request #45563 from perotinus/rssyncconversion

Automatic merge from submit-queue (batch tested with PRs 46801, 45184, 45930, 46192, 45563)

[Federation] Add a SchedulingAdapter that can extend the FederatedTypeAdapter and that provides hooks for scheduling objects into clusters.

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-02 18:06:00 -07:00 committed by GitHub
commit 77710c41bb
8 changed files with 128 additions and 16 deletions

View File

@ -14,10 +14,12 @@ go_library(
"configmap.go", "configmap.go",
"daemonset.go", "daemonset.go",
"registry.go", "registry.go",
"scheduling.go",
"secret.go", "secret.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",

View File

@ -53,6 +53,8 @@ type FederatedTypeAdapter interface {
ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error)
IsSchedulingAdapter() bool
NewTestObject(namespace string) pkgruntime.Object NewTestObject(namespace string) pkgruntime.Object
} }

View File

@ -133,6 +133,10 @@ func (a *ConfigMapAdapter) ClusterWatch(client kubeclientset.Interface, namespac
return client.CoreV1().ConfigMaps(namespace).Watch(options) return client.CoreV1().ConfigMaps(namespace).Watch(options)
} }
func (a *ConfigMapAdapter) IsSchedulingAdapter() bool {
return false
}
func (a *ConfigMapAdapter) NewTestObject(namespace string) pkgruntime.Object { func (a *ConfigMapAdapter) NewTestObject(namespace string) pkgruntime.Object {
return &apiv1.ConfigMap{ return &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -136,6 +136,10 @@ func (a *DaemonSetAdapter) ClusterWatch(client kubeclientset.Interface, namespac
return client.Extensions().DaemonSets(namespace).Watch(options) return client.Extensions().DaemonSets(namespace).Watch(options)
} }
func (a *DaemonSetAdapter) IsSchedulingAdapter() bool {
return false
}
func (a *DaemonSetAdapter) NewTestObject(namespace string) pkgruntime.Object { func (a *DaemonSetAdapter) NewTestObject(namespace string) pkgruntime.Object {
return &extensionsv1.DaemonSet{ return &extensionsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
pkgruntime "k8s.io/apimachinery/pkg/runtime"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
)
// SchedulingStatus contains the status of the objects that are being
// scheduled into joined clusters.
type SchedulingStatus struct {
Replicas int32
FullyLabeledReplicas int32
ReadyReplicas int32
AvailableReplicas int32
}
// SchedulingInfo wraps the information that a SchedulingAdapter needs
// to update objects per a schedule.
type SchedulingInfo struct {
Schedule map[string]int64
Status SchedulingStatus
}
// SchedulingAdapter defines operations for interacting with a
// federated type that requires more complex synchronization logic.
type SchedulingAdapter interface {
GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error)
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error)
UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error
}

View File

@ -134,6 +134,10 @@ func (a *SecretAdapter) ClusterWatch(client kubeclientset.Interface, namespace s
return client.CoreV1().Secrets(namespace).Watch(options) return client.CoreV1().Secrets(namespace).Watch(options)
} }
func (a *SecretAdapter) IsSchedulingAdapter() bool {
return false
}
func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object { func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object {
return &apiv1.Secret{ return &apiv1.Secret{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -356,8 +356,8 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusError return statusError
} }
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, schedulingInfo *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) {
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) { operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, schedulingInfo, func(clusterName string) (interface{}, bool, error) {
return s.informer.GetTargetStore().GetByKey(clusterName, key) return s.informer.GetTargetStore().GetByKey(clusterName, key)
}) })
if err != nil { if err != nil {
@ -372,6 +372,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
selectedClusters, selectedClusters,
s.updater.Update, s.updater.Update,
s.adapter, s.adapter,
s.informer,
obj, obj,
) )
} }
@ -423,12 +424,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na
} }
type clustersAccessorFunc func() ([]*federationapi.Cluster, error) type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error)
type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error)
type executionFunc func([]util.FederatedOperation) error type executionFunc func([]util.FederatedOperation) error
// syncToClusters ensures that the state of the given object is synchronized to member clusters. // syncToClusters ensures that the state of the given object is synchronized to member clusters.
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, informer util.FederatedInformer, obj pkgruntime.Object) reconciliationStatus {
kind := adapter.Kind() kind := adapter.Kind()
key := federatedtypes.ObjectKey(adapter, obj) key := federatedtypes.ObjectKey(adapter, obj)
@ -445,10 +446,36 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusError return statusError
} }
operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj) var schedulingInfo *federatedtypes.SchedulingInfo
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
}
schedulingInfo, err = schedulingAdapter.GetSchedule(obj, key, selectedClusters, informer)
if err != nil {
runtime.HandleError(fmt.Errorf("adapter.GetSchedule() failed on adapter for %s %q: %v", kind, key, err))
return statusError
}
}
operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj, schedulingInfo)
if err != nil { if err != nil {
return statusError return statusError
} }
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
}
err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo.Status)
if err != nil {
runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err))
return statusError
}
}
if len(operations) == 0 { if len(operations) == 0 {
return statusAllOK return statusAllOK
} }
@ -459,7 +486,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusError return statusError
} }
// Evertyhing is in order but let's be double sure // Everything is in order but let's be double sure
return statusNeedsRecheck return statusNeedsRecheck
} }
@ -484,26 +511,46 @@ func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]strin
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters // clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, schedulingInfo *federatedtypes.SchedulingInfo, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
operations := make([]util.FederatedOperation, 0) operations := make([]util.FederatedOperation, 0)
kind := adapter.Kind()
for _, cluster := range selectedClusters { for _, cluster := range selectedClusters {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
clusterObj, found, err := accessor(cluster.Name) clusterObj, found, err := accessor(cluster.Name)
if err != nil { if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err)
runtime.HandleError(wrappedErr) runtime.HandleError(wrappedErr)
return nil, wrappedErr return nil, wrappedErr
} }
shouldCreateIfNeeded := true
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
err = fmt.Errorf("adapter for kind %s does not properly implement SchedulingAdapter.", kind)
glog.Fatalf("Error: %v", err)
}
var clusterTypedObj pkgruntime.Object = nil
if clusterObj != nil {
clusterTypedObj = clusterObj.(pkgruntime.Object)
}
desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
if err != nil {
runtime.HandleError(err)
return nil, err
}
}
var operationType util.FederatedOperationType = "" var operationType util.FederatedOperationType = ""
if found { if found {
clusterObj := clusterObj.(pkgruntime.Object) clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) { if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate operationType = util.OperationTypeUpdate
} }
} else { } else if shouldCreateIfNeeded {
operationType = util.OperationTypeAdd operationType = util.OperationTypeAdd
} }
@ -518,16 +565,16 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
} }
for _, cluster := range unselectedClusters { for _, cluster := range unselectedClusters {
_, found, err := accessor(cluster.Name) clusterObj, found, err := accessor(cluster.Name)
if err != nil { if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err)
runtime.HandleError(wrappedErr) runtime.HandleError(wrappedErr)
return nil, wrappedErr return nil, wrappedErr
} }
if found { if found {
operations = append(operations, util.FederatedOperation{ operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeDelete, Type: util.OperationTypeDelete,
Obj: desiredObj, Obj: clusterObj.(pkgruntime.Object),
ClusterName: cluster.Name, ClusterName: cluster.Name,
Key: key, Key: key,
}) })

View File

@ -75,7 +75,7 @@ func TestSyncToClusters(t *testing.T) {
} }
return nil, nil return nil, nil
}, },
func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) {
if testCase.operationsError { if testCase.operationsError {
return nil, awfulError return nil, awfulError
} }
@ -91,6 +91,7 @@ func TestSyncToClusters(t *testing.T) {
return nil return nil
}, },
adapter, adapter,
nil,
obj, obj,
) )
require.Equal(t, testCase.status, status, "Unexpected status!") require.Equal(t, testCase.status, status, "Unexpected status!")
@ -207,7 +208,8 @@ func TestClusterOperations(t *testing.T) {
selectedClusters = []*federationapi.Cluster{} selectedClusters = []*federationapi.Cluster{}
unselectedClusters = clusters unselectedClusters = clusters
} }
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(string) (interface{}, bool, error) { // TODO: Tests for ScheduleObject on type adapter
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, nil, func(string) (interface{}, bool, error) {
if testCase.expectedErr { if testCase.expectedErr {
return nil, false, awfulError return nil, false, awfulError
} }