diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index a9bcfce4f56..c9bebb008ca 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -14,10 +14,12 @@ go_library( "configmap.go", "daemonset.go", "registry.go", + "scheduling.go", "secret.go", ], tags = ["automanaged"], deps = [ + "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//pkg/api/v1:go_default_library", diff --git a/federation/pkg/federatedtypes/adapter.go b/federation/pkg/federatedtypes/adapter.go index 39e5d234ee4..1da54a75547 100644 --- a/federation/pkg/federatedtypes/adapter.go +++ b/federation/pkg/federatedtypes/adapter.go @@ -53,6 +53,8 @@ type FederatedTypeAdapter interface { ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) + IsSchedulingAdapter() bool + NewTestObject(namespace string) pkgruntime.Object } diff --git a/federation/pkg/federatedtypes/configmap.go b/federation/pkg/federatedtypes/configmap.go index b60cea286df..4b529320281 100644 --- a/federation/pkg/federatedtypes/configmap.go +++ b/federation/pkg/federatedtypes/configmap.go @@ -133,6 +133,10 @@ func (a *ConfigMapAdapter) ClusterWatch(client kubeclientset.Interface, namespac return client.CoreV1().ConfigMaps(namespace).Watch(options) } +func (a *ConfigMapAdapter) IsSchedulingAdapter() bool { + return false +} + func (a *ConfigMapAdapter) NewTestObject(namespace string) pkgruntime.Object { return &apiv1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ diff --git a/federation/pkg/federatedtypes/daemonset.go b/federation/pkg/federatedtypes/daemonset.go index 593c5ee9bd3..dabd8804728 100644 --- a/federation/pkg/federatedtypes/daemonset.go +++ b/federation/pkg/federatedtypes/daemonset.go @@ -136,6 +136,10 @@ func (a *DaemonSetAdapter) ClusterWatch(client kubeclientset.Interface, namespac return client.Extensions().DaemonSets(namespace).Watch(options) } +func (a *DaemonSetAdapter) IsSchedulingAdapter() bool { + return false +} + func (a *DaemonSetAdapter) NewTestObject(namespace string) pkgruntime.Object { return &extensionsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/federation/pkg/federatedtypes/scheduling.go b/federation/pkg/federatedtypes/scheduling.go new file mode 100644 index 00000000000..86d6dc785cd --- /dev/null +++ b/federation/pkg/federatedtypes/scheduling.go @@ -0,0 +1,47 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + pkgruntime "k8s.io/apimachinery/pkg/runtime" + federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" +) + +// SchedulingStatus contains the status of the objects that are being +// scheduled into joined clusters. +type SchedulingStatus struct { + Replicas int32 + FullyLabeledReplicas int32 + ReadyReplicas int32 + AvailableReplicas int32 +} + +// SchedulingInfo wraps the information that a SchedulingAdapter needs +// to update objects per a schedule. +type SchedulingInfo struct { + Schedule map[string]int64 + Status SchedulingStatus +} + +// SchedulingAdapter defines operations for interacting with a +// federated type that requires more complex synchronization logic. +type SchedulingAdapter interface { + GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) + ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) + UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error +} diff --git a/federation/pkg/federatedtypes/secret.go b/federation/pkg/federatedtypes/secret.go index 5bf1e9808c2..d623bc95f9a 100644 --- a/federation/pkg/federatedtypes/secret.go +++ b/federation/pkg/federatedtypes/secret.go @@ -134,6 +134,10 @@ func (a *SecretAdapter) ClusterWatch(client kubeclientset.Interface, namespace s return client.CoreV1().Secrets(namespace).Watch(options) } +func (a *SecretAdapter) IsSchedulingAdapter() bool { + return false +} + func (a *SecretAdapter) NewTestObject(namespace string) pkgruntime.Object { return &apiv1.Secret{ ObjectMeta: metav1.ObjectMeta{ diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 77221449eeb..69f62fc619e 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -356,8 +356,8 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusError } - operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) { - operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) { + operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, schedulingInfo *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) { + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, schedulingInfo, func(clusterName string) (interface{}, bool, error) { return s.informer.GetTargetStore().GetByKey(clusterName, key) }) if err != nil { @@ -372,6 +372,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName selectedClusters, s.updater.Update, s.adapter, + s.informer, obj, ) } @@ -423,12 +424,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na } type clustersAccessorFunc func() ([]*federationapi.Cluster, error) -type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) +type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) type executionFunc func([]util.FederatedOperation) error // syncToClusters ensures that the state of the given object is synchronized to member clusters. -func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus { +func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, informer util.FederatedInformer, obj pkgruntime.Object) reconciliationStatus { kind := adapter.Kind() key := federatedtypes.ObjectKey(adapter, obj) @@ -445,10 +446,36 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusError } - operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj) + var schedulingInfo *federatedtypes.SchedulingInfo + if adapter.IsSchedulingAdapter() { + schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter) + if !ok { + glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind) + } + schedulingInfo, err = schedulingAdapter.GetSchedule(obj, key, selectedClusters, informer) + if err != nil { + runtime.HandleError(fmt.Errorf("adapter.GetSchedule() failed on adapter for %s %q: %v", kind, key, err)) + return statusError + } + } + + operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj, schedulingInfo) if err != nil { return statusError } + + if adapter.IsSchedulingAdapter() { + schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter) + if !ok { + glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind) + } + err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo.Status) + if err != nil { + runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err)) + return statusError + } + } + if len(operations) == 0 { return statusAllOK } @@ -459,7 +486,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op return statusError } - // Evertyhing is in order but let's be double sure + // Everything is in order but let's be double sure return statusNeedsRecheck } @@ -484,26 +511,46 @@ func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]strin type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) // clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters -func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { - // The data should not be modified. - desiredObj := adapter.Copy(obj) +func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, schedulingInfo *federatedtypes.SchedulingInfo, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { operations := make([]util.FederatedOperation, 0) + kind := adapter.Kind() for _, cluster := range selectedClusters { + // The data should not be modified. + desiredObj := adapter.Copy(obj) + clusterObj, found, err := accessor(cluster.Name) if err != nil { - wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err) runtime.HandleError(wrappedErr) return nil, wrappedErr } + shouldCreateIfNeeded := true + if adapter.IsSchedulingAdapter() { + schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter) + if !ok { + err = fmt.Errorf("adapter for kind %s does not properly implement SchedulingAdapter.", kind) + glog.Fatalf("Error: %v", err) + } + var clusterTypedObj pkgruntime.Object = nil + if clusterObj != nil { + clusterTypedObj = clusterObj.(pkgruntime.Object) + } + desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo) + if err != nil { + runtime.HandleError(err) + return nil, err + } + } + var operationType util.FederatedOperationType = "" if found { clusterObj := clusterObj.(pkgruntime.Object) if !adapter.Equivalent(desiredObj, clusterObj) { operationType = util.OperationTypeUpdate } - } else { + } else if shouldCreateIfNeeded { operationType = util.OperationTypeAdd } @@ -518,16 +565,16 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus } for _, cluster := range unselectedClusters { - _, found, err := accessor(cluster.Name) + clusterObj, found, err := accessor(cluster.Name) if err != nil { - wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err) + wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err) runtime.HandleError(wrappedErr) return nil, wrappedErr } if found { operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeDelete, - Obj: desiredObj, + Obj: clusterObj.(pkgruntime.Object), ClusterName: cluster.Name, Key: key, }) diff --git a/federation/pkg/federation-controller/sync/controller_test.go b/federation/pkg/federation-controller/sync/controller_test.go index 667e8471095..46becc260be 100644 --- a/federation/pkg/federation-controller/sync/controller_test.go +++ b/federation/pkg/federation-controller/sync/controller_test.go @@ -75,7 +75,7 @@ func TestSyncToClusters(t *testing.T) { } return nil, nil }, - func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error) { + func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) { if testCase.operationsError { return nil, awfulError } @@ -91,6 +91,7 @@ func TestSyncToClusters(t *testing.T) { return nil }, adapter, + nil, obj, ) require.Equal(t, testCase.status, status, "Unexpected status!") @@ -207,7 +208,8 @@ func TestClusterOperations(t *testing.T) { selectedClusters = []*federationapi.Cluster{} unselectedClusters = clusters } - operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(string) (interface{}, bool, error) { + // TODO: Tests for ScheduleObject on type adapter + operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, nil, func(string) (interface{}, bool, error) { if testCase.expectedErr { return nil, false, awfulError }