diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index faa20fcda88..692c63eae15 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -10,11 +10,16 @@ load( go_test( name = "go_default_test", - srcs = ["scheduling_test.go"], + srcs = [ + "hpa_test.go", + "scheduling_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ + "//federation/pkg/federation-controller/util/test:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/autoscaling/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -29,6 +34,7 @@ go_library( "configmap.go", "daemonset.go", "deployment.go", + "hpa.go", "namespace.go", "qualifiedname.go", "registry.go", @@ -48,12 +54,14 @@ go_library( "//pkg/api:go_default_library", "//pkg/controller/namespace/deletion:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/autoscaling/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/federation/pkg/federatedtypes/deployment.go b/federation/pkg/federatedtypes/deployment.go index af61f0f92bc..230501db0ef 100644 --- a/federation/pkg/federatedtypes/deployment.go +++ b/federation/pkg/federatedtypes/deployment.go @@ -40,16 +40,16 @@ func init() { } type DeploymentAdapter struct { - *schedulingAdapter + *replicaSchedulingAdapter client federationclientset.Interface } func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { - schedulingAdapter := schedulingAdapter{ + schedulingAdapter := replicaSchedulingAdapter{ preferencesAnnotationName: FedDeploymentPreferencesAnnotation, - updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error { + updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error { deployment := obj.(*extensionsv1.Deployment) - typedStatus := status.(ReplicaSchedulingStatus) + typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status if typedStatus.Replicas != deployment.Status.Replicas || typedStatus.UpdatedReplicas != deployment.Status.UpdatedReplicas || typedStatus.ReadyReplicas != deployment.Status.ReadyReplicas || typedStatus.AvailableReplicas != deployment.Status.AvailableReplicas { deployment.Status = extensionsv1.DeploymentStatus{ diff --git a/federation/pkg/federatedtypes/hpa.go b/federation/pkg/federatedtypes/hpa.go new file mode 100644 index 00000000000..3479d7c4b3b --- /dev/null +++ b/federation/pkg/federatedtypes/hpa.go @@ -0,0 +1,927 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "time" + + "fmt" + autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + kubeclientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" +) + +const ( + HpaKind = "horizontalpodautoscaler" + HpaControllerName = "horizontalpodautoscalers" + // This is a tunable which does not change replica nums + // on an existing local hpa, before this timeout, if it + // did scale already (avoids thrashing of replicas around). + scaleForbiddenWindow = 5 * time.Minute + // This is used as the default min for hpa object submitted + // to federation, in a situation where the default is for + // some reason not present (Spec.MinReplicas == nil) + hpaMinReplicaDefault = int32(1) +) + +func init() { + RegisterFederatedType(HpaKind, HpaControllerName, []schema.GroupVersionResource{autoscalingv1.SchemeGroupVersion.WithResource(HpaControllerName)}, NewHpaAdapter) +} + +type HpaAdapter struct { + client federationclientset.Interface +} + +func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { + return &HpaAdapter{client: client} +} + +func (a *HpaAdapter) Kind() string { + return HpaKind +} + +func (a *HpaAdapter) ObjectType() pkgruntime.Object { + return &autoscalingv1.HorizontalPodAutoscaler{} +} + +func (a *HpaAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return ok +} + +func (a *HpaAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(hpa.ObjectMeta), + Spec: *fedutil.DeepCopyApiTypeOrPanic(&hpa.Spec).(*autoscalingv1.HorizontalPodAutoscalerSpec), + } +} + +func (a *HpaAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2) +} + +func (a *HpaAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return QualifiedName{Namespace: hpa.Namespace, Name: hpa.Name} +} + +func (a *HpaAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*autoscalingv1.HorizontalPodAutoscaler).ObjectMeta +} + +func (a *HpaAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa) +} + +func (a *HpaAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options) +} + +func (a *HpaAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) +} + +func (a *HpaAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options) +} + +func (a *HpaAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa) +} + +func (a *HpaAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options) +} + +func (a *HpaAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa) +} + +func (a *HpaAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options) +} + +func (a *HpaAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) +} + +func (a *HpaAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options) +} + +func (a *HpaAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa) +} + +func (a *HpaAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options) +} + +func (a *HpaAdapter) NewTestObject(namespace string) pkgruntime.Object { + var min int32 = 4 + var targetCPU int32 = 70 + return &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-hpa-", + Namespace: namespace, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + Kind: "replicaset", + Name: "myrs", + }, + MinReplicas: &min, + MaxReplicas: int32(10), + TargetCPUUtilizationPercentage: &targetCPU, + }, + } +} + +func (a *HpaAdapter) IsSchedulingAdapter() bool { + return true +} + +func (a *HpaAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool { + hpa1 := obj1.(*autoscalingv1.HorizontalPodAutoscaler) + hpa2 := a.Copy(obj2).(*autoscalingv1.HorizontalPodAutoscaler) + if hpa1.Spec.MinReplicas == nil { + hpa2.Spec.MinReplicas = nil + } else if hpa2.Spec.MinReplicas == nil { + var r int32 = *hpa1.Spec.MinReplicas + hpa2.Spec.MinReplicas = &r + } else { + *hpa2.Spec.MinReplicas = *hpa1.Spec.MinReplicas + } + hpa2.Spec.MaxReplicas = hpa1.Spec.MaxReplicas + return fedutil.ObjectMetaAndSpecEquivalent(hpa1, hpa2) +} + +type replicaNums struct { + min int32 + max int32 +} + +type hpaFederatedStatus struct { + lastScaleTime *metav1.Time + // Indicates how many clusters have hpa/replicas. + // Used to average the cpu utilization which is + // reflected to the federation user. + count int32 + aggregateCPUUtilizationPercentage *int32 + currentReplicas int32 + desiredReplicas int32 +} + +type hpaSchedulingInfo struct { + scheduleState map[string]*replicaNums + fedStatus hpaFederatedStatus +} + +// List of cluster names. +type hpaLists struct { + // Stores names of those clusters which can offer min. + availableMin sets.String + // Stores names of those clusters which can offer max. + availableMax sets.String + // Stores names of those clusters which do not have hpa yet. + noHpa sets.String +} + +func (a *HpaAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) { + currentClusterObjs, err := getCurrentClusterObjs(informer, key, clusters) + if err != nil { + return nil, err + } + + // Initialise averaged cpu utilisation for this reconcile. + var ccup int32 = 0 + fedStatus := hpaFederatedStatus{ + aggregateCPUUtilizationPercentage: &ccup, + count: int32(0), + desiredReplicas: int32(0), + currentReplicas: int32(0), + } + fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + // We assign the last known scale time here, which we update with + // the latest time from among all clusters in ScheduleObject() + if fedHpa.Status.LastScaleTime != nil { + t := metav1.NewTime(fedHpa.Status.LastScaleTime.Time) + fedStatus.lastScaleTime = &t + } + + return &hpaSchedulingInfo{ + scheduleState: getHpaScheduleState(obj, currentClusterObjs), + fedStatus: fedStatus, + }, nil +} + +func getCurrentClusterObjs(informer fedutil.FederatedInformer, key string, clusters []*federationapi.Cluster) (map[string]pkgruntime.Object, error) { + currentClusterObjs := make(map[string]pkgruntime.Object) + for _, cluster := range clusters { + clusterName := cluster.Name + clusterObj, found, err := informer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return nil, err + } + currentClusterObjs[clusterName] = nil + if found { + currentClusterObjs[clusterName] = clusterObj.(pkgruntime.Object) + } + } + return currentClusterObjs, nil +} + +// The algorithm used for scheduling is briefed as below: +// +// 1. Find clusters which can offer max and min, if any (lists.availableMax and +// lists.availableMin) in one pass on all clusters. +// +// 2. Reduce the replicas (both min and max) if needed (situation when fedHpa +// has lesser replicas then all cluster local hpa replicas totalled together). +// In this step reduce first from those hpas which already have max (and min) +// reducible. Once such clusters are over and reduction still needed, reduce +// one at a time from all clusters, randomly. This step will ensure that the +// exceeding replicas in local hpas are reduced to match the fedHpa. +// This step would ideally be a noop in most cases because its rare that fedHpa +// would have lesser replicas then the cluster local total (probably when user +// forces update if fedHpa). +// +// 3. Distribute the replicas. In this step we have replicas to distribute (which +// are fed replicas exceeding the sum total of local cluster replicas). If clusters +// already have replicas, one replica from each cluster which can offer replicas +// (both for max and min) are also added to this replicas to distribute numbers (min +// and max). +// 3a. We first do a sub-pass to distribute to clusters which need replicas, considering +// those as clusters in crucial need of replicas. +// 3b. After previous sub-pass, if we still have replicas remaining, in the sub-pass +// we distribute to those clusters which do not yet have any hpa. +// 3c. After previous if we still have more to distribute, then we distribute to all +// clusters randomly, giving replica distribution count (rdc=total-fed-replicas/no-of-clusters) +// to each at a time. +// +// The above algorithm is run to first distribute max and then distribute min to those clusters +// which get max. +func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums { + fedHpa := fedObj.(*autoscalingv1.HorizontalPodAutoscaler) + requestedMin := hpaMinReplicaDefault + if fedHpa.Spec.MinReplicas != nil { + requestedMin = *fedHpa.Spec.MinReplicas + } + requestedReplicas := replicaNums{ + min: requestedMin, + max: fedHpa.Spec.MaxReplicas, + } + // replica distribution count, per cluster + rdc := replicaNums{ + min: requestedReplicas.min / int32(len(currentObjs)), + max: requestedReplicas.max / int32(len(currentObjs)), + } + if rdc.min < 1 { + rdc.min = 1 + } + // TODO: Is there a better way? + // We need to cap the lowest limit of Max to 2, because in a + // situation like both min and max become 1 (same) for all clusters, + // no rebalancing would happen. + if rdc.max < 2 { + rdc.max = 2 + } + + // Pass 1: Analyse existing local hpa's if any. + // clusterLists holds the list of those clusters which can offer + // min and max replicas, to those which want them. + // For example new clusters joining the federation and/or + // those clusters which need to increase or reduce replicas + // beyond min/max limits. + // schedStatus currently have status of existing hpas. + // It will eventually have desired status for this reconcile. + clusterLists, currentReplicas, scheduleState := prepareForScheduling(currentObjs) + + remainingReplicas := replicaNums{ + min: requestedReplicas.min - currentReplicas.min, + max: requestedReplicas.max - currentReplicas.max, + } + + // Pass 2: reduction of replicas if needed ( situation that fedHpa updated replicas + // to lesser then existing). + // In this pass, we remain pessimistic and reduce one replica per cluster at a time. + if remainingReplicas.min < 0 { + excessMin := (remainingReplicas.min * int32(-1)) + remainingReplicas.min = reduceMinReplicas(excessMin, clusterLists.availableMin, scheduleState) + } + if remainingReplicas.max < 0 { + excessMax := (remainingReplicas.max * int32(-1)) + remainingReplicas.max = reduceMaxReplicas(excessMax, clusterLists.availableMax, scheduleState) + } + + toDistribute := replicaNums{ + min: remainingReplicas.min + int32(clusterLists.availableMin.Len()), + max: remainingReplicas.max + int32(clusterLists.availableMax.Len()), + } + + // Pass 3: Distribute Max and then Min. + // Here we first distribute max and then (in the next loop) + // distribute min into those clusters which already get the + // max fixed. + // In this process we might not meet the min limit and total of + // min limits might remain more then the requested federated min. + // This is partially because a min per cluster cannot be lesser + // then 1, but min could be requested as 1 at federation. + // Additionally we first increase replicas into those clusters + // which already have hpa's and are in a condition to increase. + // This will save cluster related resources for the user, such that + // if an already existing cluster can satisfy users request why send + // the workload to another. + // We then go ahead to give the replicas to those which do not + // have any hpa. In this pass however we try to ensure that all + // our Max are consumed in this reconcile. + distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState) + + // We distribute min to those clusters which: + // 1 - can adjust min (our increase step would be only 1) + // 2 - which do not have this hpa and got max(increase step rdcMin) + // We might exhaust all min replicas here, with + // some clusters still needing them. We adjust this in finalise by + // assigning min replicas to 1 into those clusters which got max + // but min remains 0. + distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState) + + return finaliseScheduleState(scheduleState) +} + +func (a *HpaAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) { + // Update federated status info + typedInfo := schedulingInfo.(*hpaSchedulingInfo) + if clusterObj != nil { + clusterHpa := clusterObj.(*autoscalingv1.HorizontalPodAutoscaler) + if clusterHpa.Status.CurrentCPUUtilizationPercentage != nil { + *typedInfo.fedStatus.aggregateCPUUtilizationPercentage += + (*clusterHpa.Status.CurrentCPUUtilizationPercentage * clusterHpa.Status.CurrentReplicas) + typedInfo.fedStatus.count += clusterHpa.Status.CurrentReplicas + } + if clusterHpa.Status.LastScaleTime != nil { + t := metav1.NewTime(clusterHpa.Status.LastScaleTime.Time) + if typedInfo.fedStatus.lastScaleTime != nil && + t.After(typedInfo.fedStatus.lastScaleTime.Time) { + typedInfo.fedStatus.lastScaleTime = &t + } + } + + typedInfo.fedStatus.currentReplicas += clusterHpa.Status.CurrentReplicas + typedInfo.fedStatus.desiredReplicas += clusterHpa.Status.DesiredReplicas + } + + // Update the cluster obj and the needed action on the cluster + clusterHpaState := typedInfo.scheduleState[cluster.Name] + desiredHpa := federationObjCopy.(*autoscalingv1.HorizontalPodAutoscaler) + if clusterHpaState != nil { + desiredHpa.Spec.MaxReplicas = clusterHpaState.max + if desiredHpa.Spec.MinReplicas == nil { + min := int32(0) + desiredHpa.Spec.MinReplicas = &min + } + *desiredHpa.Spec.MinReplicas = clusterHpaState.min + } + + var defaultAction ScheduleAction = "" + switch { + case clusterHpaState != nil && clusterObj != nil: + return desiredHpa, defaultAction, nil + case clusterHpaState != nil && clusterObj == nil: + return desiredHpa, ActionAdd, nil + case clusterHpaState == nil && clusterObj != nil: + return nil, ActionDelete, nil + } + return nil, defaultAction, nil +} + +func (a *HpaAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error { + fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + needUpdate, newFedHpaStatus := updateStatus(fedHpa, schedulingInfo.(*hpaSchedulingInfo).fedStatus) + if needUpdate { + fedHpa.Status = newFedHpaStatus + _, err := a.client.AutoscalingV1().HorizontalPodAutoscalers(fedHpa.Namespace).UpdateStatus(fedHpa) + if err != nil { + return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err) + } + } + return nil +} + +func updateStatus(fedHpa *autoscalingv1.HorizontalPodAutoscaler, newStatus hpaFederatedStatus) (bool, autoscalingv1.HorizontalPodAutoscalerStatus) { + averageCPUUtilizationPercentage := int32(0) + // Average out the available current utilisation + if *newStatus.aggregateCPUUtilizationPercentage != 0 && newStatus.count != 0 { + averageCPUUtilizationPercentage = *newStatus.aggregateCPUUtilizationPercentage / newStatus.count + } + gen := fedHpa.Generation + newFedHpaStatus := autoscalingv1.HorizontalPodAutoscalerStatus{ObservedGeneration: &gen} + needUpdate := false + if (fedHpa.Status.CurrentCPUUtilizationPercentage == nil && + averageCPUUtilizationPercentage != 0) || + (fedHpa.Status.CurrentCPUUtilizationPercentage != nil && + averageCPUUtilizationPercentage != + *fedHpa.Status.CurrentCPUUtilizationPercentage) { + needUpdate = true + newFedHpaStatus.CurrentCPUUtilizationPercentage = &averageCPUUtilizationPercentage + } + if (fedHpa.Status.LastScaleTime == nil && newStatus.lastScaleTime != nil) || + (fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime == nil) || + ((fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime != nil) && + newStatus.lastScaleTime.After(fedHpa.Status.LastScaleTime.Time)) { + needUpdate = true + newFedHpaStatus.LastScaleTime = newStatus.lastScaleTime + } + if fedHpa.Status.DesiredReplicas != newStatus.desiredReplicas { + needUpdate = true + newFedHpaStatus.CurrentReplicas = newStatus.currentReplicas + } + if fedHpa.Status.CurrentReplicas != newStatus.currentReplicas { + needUpdate = true + newFedHpaStatus.DesiredReplicas = newStatus.desiredReplicas + } + return needUpdate, newFedHpaStatus +} + +// prepareForScheduling prepares the lists and totals from the +// existing objs. +// currentObjs has the list of all clusters, with obj as nil +// for those clusters which do not have hpa yet. +func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) { + lists := hpaLists{ + availableMax: sets.NewString(), + availableMin: sets.NewString(), + noHpa: sets.NewString(), + } + existingTotal := replicaNums{ + min: int32(0), + max: int32(0), + } + + scheduleState := make(map[string]*replicaNums) + for cluster, obj := range currentObjs { + if obj == nil { + lists.noHpa.Insert(cluster) + scheduleState[cluster] = nil + continue + } + + if maxReplicasReducible(obj) { + lists.availableMax.Insert(cluster) + } + if minReplicasReducible(obj) { + lists.availableMin.Insert(cluster) + } + + replicas := replicaNums{min: 0, max: 0} + scheduleState[cluster] = &replicas + if obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas != nil { + existingTotal.min += *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas + replicas.min = *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas + } + existingTotal.max += obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas + replicas.max = obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas + } + + return lists, existingTotal, scheduleState +} + +// Note: reduceMinReplicas and reduceMaxReplicas, look quite similar in flow +// and code, however there are subtle differences. They together can be made +// into 1 function with an arg governing the functionality difference and +// additional args (superset of args in both) as needed. Doing so however +// makes the logical flow quite less readable. They are thus left as 2 for +// readability. + +// reduceMinReplicas reduces the min replicas from existing clusters. +// At the end of the function excessMin should be 0 and the MinList +// and the scheduledReplicas properly updated in place. +func reduceMinReplicas(excessMin int32, availableMinList sets.String, scheduled map[string]*replicaNums) int32 { + if excessMin > 0 { + // first we try reducing from those clusters which already offer min + if availableMinList.Len() > 0 { + for _, cluster := range availableMinList.List() { + replicas := scheduled[cluster] + if replicas.min > 1 { + replicas.min-- + availableMinList.Delete(cluster) + excessMin-- + if excessMin <= 0 { + break + } + } + } + } + } + + // If we could not get needed replicas from already offered min above + // we abruptly start removing replicas from some/all clusters. + // Here we might make some min to 0 signalling that this hpa might be a + // candidate to be removed from this cluster altogether. + for excessMin > 0 { + for _, replicas := range scheduled { + if replicas != nil && + replicas.min > 0 { + replicas.min-- + excessMin-- + if excessMin <= 0 { + break + } + } + } + } + + return excessMin +} + +// reduceMaxReplicas reduces the max replicas from existing clusters. +// At the end of the function excessMax should be 0 and the MaxList +// and the scheduledReplicas properly updated in place. +func reduceMaxReplicas(excessMax int32, availableMaxList sets.String, scheduled map[string]*replicaNums) int32 { + if excessMax > 0 { + // first we try reducing from those clusters which already offer max + if availableMaxList.Len() > 0 { + for _, cluster := range availableMaxList.List() { + replicas := scheduled[cluster] + if replicas != nil && !((replicas.max - replicas.min) < 0) { + replicas.max-- + availableMaxList.Delete(cluster) + excessMax-- + if excessMax <= 0 { + break + } + } + } + } + } + // If we could not get needed replicas to reduce from already offered + // max above we abruptly start removing replicas from some/all clusters. + // Here we might make some max and min to 0, signalling that this hpa be + // removed from this cluster altogether + for excessMax > 0 { + for _, replicas := range scheduled { + if replicas != nil && + !((replicas.max - replicas.min) < 0) { + replicas.max-- + excessMax-- + if excessMax <= 0 { + break + } + } + } + } + + return excessMax +} + +// distributeMaxReplicas +// Takes input: +// toDistributeMax: number of replicas to distribute. +// lists: cluster name lists, which have clusters with available max, +// available min and those with no hpas yet. +// rdc: replicadistributioncount for max and min. +// currentObjs: list of current cluster hpas. +// scheduled: schedule state which will be updated in place. +func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums, + currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 { + for cluster, replicas := range scheduled { + if toDistributeMax == 0 { + break + } + if replicas == nil { + continue + } + if maxReplicasNeeded(currentObjs[cluster]) { + replicas.max++ + if lists.availableMax.Len() > 0 { + popped, notEmpty := lists.availableMax.PopAny() + if notEmpty { + // Boundary checks have happened earlier in + // minReplicasReducible(). + scheduled[popped].max-- + } + } + // Any which ways utilise available map replicas + toDistributeMax-- + } + } + + // If we have new clusters where we can give our replicas, + // then give away all our replicas to the new clusters first. + if lists.noHpa.Len() > 0 { + for toDistributeMax > 0 { + for _, cluster := range lists.noHpa.UnsortedList() { + if scheduled[cluster] == nil { + scheduled[cluster] = &replicaNums{min: 0, max: 0} + } + replicas := scheduled[cluster] + // first give away max from clusters offering them + // this case especially helps getting hpa into newly joining + // clusters. + if lists.availableMax.Len() > 0 { + popped, notEmpty := lists.availableMax.PopAny() + if notEmpty { + // Boundary checks to reduce max have happened earlier in + // minReplicasReducible(). + replicas.max++ + scheduled[popped].max-- + toDistributeMax-- + continue + } + } + if toDistributeMax < rdc.max { + replicas.max += toDistributeMax + toDistributeMax = 0 + break + } + replicas.max += rdc.max + toDistributeMax -= rdc.max + } + } + } else { // we have no new clusters but if still have max replicas to distribute; + // just distribute all in current clusters. + for toDistributeMax > 0 { + for cluster, replicas := range scheduled { + if replicas == nil { + replicas = &replicaNums{min: 0, max: 0} + scheduled[cluster] = replicas + } + // First give away max from clusters offering them. + // This case especially helps getting hpa into newly joining + // clusters. + if lists.availableMax.Len() > 0 { + popped, notEmpty := lists.availableMax.PopAny() + if notEmpty { + // Boundary checks have happened earlier in + // minReplicasReducible(). + replicas.max++ + scheduled[popped].max-- + toDistributeMax-- + continue + } + } + if toDistributeMax < rdc.max { + replicas.max += toDistributeMax + toDistributeMax = 0 + break + } + replicas.max += rdc.max + toDistributeMax -= rdc.max + } + } + } + return toDistributeMax +} + +// distributeMinReplicas +// Takes input: +// toDistributeMin: number of replicas to distribute. +// lists: cluster name lists, which have clusters with available max, +// available min and those with no hpas yet. +// rdc: replicadistributioncount for max and min. +// currentObjs: list of current cluster hpas. +// scheduled: schedule state which will be updated in place. +func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums, + currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 { + for cluster, replicas := range scheduled { + if toDistributeMin == 0 { + break + } + // We have distriubted Max and thus scheduled might not be nil + // but probably current (what we got originally) is nil(no hpa) + if replicas == nil || currentObjs[cluster] == nil { + continue + } + if minReplicasIncreasable(currentObjs[cluster]) { + if lists.availableMin.Len() > 0 { + popped, notEmpty := lists.availableMin.PopAny() + if notEmpty { + // Boundary checks have happened earlier. + scheduled[popped].min-- + replicas.min++ + toDistributeMin-- + } + } + } + } + + if lists.noHpa.Len() > 0 { + // TODO: can this become an infinite loop? + for toDistributeMin > 0 { + for _, cluster := range lists.noHpa.UnsortedList() { + replicas := scheduled[cluster] + if replicas == nil { + // We did not get max here so this cluster + // remains without hpa + continue + } + var replicaNum int32 = 0 + if toDistributeMin < rdc.min { + replicaNum = toDistributeMin + } else { + replicaNum = rdc.min + } + if (replicas.max - replicaNum) < replicas.min { + // Cannot increase the min in this cluster + // as it will go beyond max + continue + } + if lists.availableMin.Len() > 0 { + popped, notEmpty := lists.availableMin.PopAny() + if notEmpty { + // Boundary checks have happened earlier. + scheduled[popped].min-- + replicas.min++ + toDistributeMin-- + continue + } + } + replicas.min += replicaNum + toDistributeMin -= replicaNum + } + } + } else { // we have no new clusters but if still have min replicas to distribute; + // just distribute all in current clusters. + for toDistributeMin > 0 { + for _, replicas := range scheduled { + if replicas == nil { + // We did not get max here so this cluster + // remains without hpa + continue + } + var replicaNum int32 = 0 + if toDistributeMin < rdc.min { + replicaNum = toDistributeMin + } else { + replicaNum = rdc.min + } + if (replicas.max - replicaNum) < replicas.min { + // Cannot increase the min in this cluster + // as it will go beyond max + continue + } + if lists.availableMin.Len() > 0 { + popped, notEmpty := lists.availableMin.PopAny() + if notEmpty { + // Boundary checks have happened earlier. + scheduled[popped].min-- + replicas.min++ + toDistributeMin-- + continue + } + } + replicas.min += replicaNum + toDistributeMin -= replicaNum + } + } + } + return toDistributeMin +} + +// finaliseScheduleState ensures that the minReplica count is made to 1 +// for those clusters which got max, but did not get min. This is because +// k8s hpa does not accept hpas with 0 min replicas. +// The replica num distribution can thus have more mins then fedHpa requested +// but its better then having all replicas go into one cluster (if fedHpa +// requested min=1 (which is the most usual case). +func finaliseScheduleState(scheduled map[string]*replicaNums) map[string]*replicaNums { + for _, replicas := range scheduled { + if (replicas != nil) && (replicas.min <= 0) && (replicas.max > 0) { + // Min total does not necessarily meet the federated min limit. + replicas.min = 1 + } + } + return scheduled +} + +// isPristine is used to determine if so far local controller has been +// able to really determine, what should be the desired replica number for +// this cluster. +// This is used to get hpas into those clusters which might join fresh, +// and so far other cluster hpas haven't really reached anywhere. +// TODO: There is a flaw here, that a just born object would also offer its +// replicas which can also lead to fast thrashing. +// The only better way is to either ensure that object creation time stamp is set +// and can be used authoritatively; or have another field on the local object +// which is mandatorily set on creation and can be used authoritatively. +// Should we abuse annotations again for this, or this can be a proper requirement? +func isPristine(hpa *autoscalingv1.HorizontalPodAutoscaler) bool { + if hpa.Status.LastScaleTime == nil && + hpa.Status.DesiredReplicas == 0 { + return true + } + return false +} + +// isScaleable tells if it already has been a reasonable amount of +// time since this hpa scaled. Its used to avoid fast thrashing. +func isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool { + if hpa.Status.LastScaleTime == nil { + return false + } + t := hpa.Status.LastScaleTime.Add(scaleForbiddenWindow) + if t.After(time.Now()) { + return false + } + return true +} + +func maxReplicasReducible(obj pkgruntime.Object) bool { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + if (hpa.Spec.MinReplicas != nil) && + (((hpa.Spec.MaxReplicas - 1) - *hpa.Spec.MinReplicas) < 0) { + return false + } + if isPristine(hpa) { + return true + } + if !isScaleable(hpa) { + return false + } + if (hpa.Status.DesiredReplicas < hpa.Status.CurrentReplicas) || + ((hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) && + (hpa.Status.DesiredReplicas < hpa.Spec.MaxReplicas)) { + return true + } + return false +} + +// minReplicasReducible checks if this cluster (hpa) can offer replicas which are +// stuck here because of min limit. +// Its noteworthy, that min and max are adjusted separately, but if the replicas +// are not being used here, the max adjustment will lead it to become equal to min, +// but will not be able to scale down further and offer max to some other cluster +// which needs replicas. +func minReplicasReducible(obj pkgruntime.Object) bool { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + if isPristine(hpa) && (hpa.Spec.MinReplicas != nil) && + (*hpa.Spec.MinReplicas > 1) && + (*hpa.Spec.MinReplicas <= hpa.Spec.MaxReplicas) { + return true + } + if !isScaleable(hpa) { + return false + } + if (hpa.Spec.MinReplicas != nil) && + (*hpa.Spec.MinReplicas > 1) && + (hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) && + (hpa.Status.CurrentReplicas == *hpa.Spec.MinReplicas) { + return true + } + return false +} + +func maxReplicasNeeded(obj pkgruntime.Object) bool { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + if !isScaleable(hpa) { + return false + } + + if (hpa.Status.CurrentReplicas == hpa.Status.DesiredReplicas) && + (hpa.Status.CurrentReplicas == hpa.Spec.MaxReplicas) { + return true + } + return false +} + +func minReplicasIncreasable(obj pkgruntime.Object) bool { + hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) + if !isScaleable(hpa) || + ((hpa.Spec.MinReplicas != nil) && + (*hpa.Spec.MinReplicas) >= hpa.Spec.MaxReplicas) { + return false + } + + if (hpa.Spec.MinReplicas != nil) && + (hpa.Status.DesiredReplicas > *hpa.Spec.MinReplicas) { + return true + } + return false +} diff --git a/federation/pkg/federatedtypes/hpa_test.go b/federation/pkg/federatedtypes/hpa_test.go new file mode 100644 index 00000000000..2168fb894f6 --- /dev/null +++ b/federation/pkg/federatedtypes/hpa_test.go @@ -0,0 +1,262 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "testing" + "time" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + + "github.com/stretchr/testify/assert" +) + +type replicas struct { + min int32 + max int32 +} + +func TestGetHpaScheduleState(t *testing.T) { + defaultFedHpa := newHpaWithReplicas(NewInt32(1), NewInt32(70), 10) + testCases := map[string]struct { + fedHpa *autoscalingv1.HorizontalPodAutoscaler + localHpas map[string]pkgruntime.Object + expectedReplicas map[string]*replicas + }{ + "Distribiutes replicas randomly if no existing hpa in any local cluster": { + localHpas: func() map[string]pkgruntime.Object { + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = nil + hpas["c2"] = nil + return hpas + }(), + }, + "Cluster with no hpa gets replicas if other clusters have replicas": { + localHpas: func() map[string]pkgruntime.Object { + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 10) + hpas["c2"] = nil + return hpas + }(), + expectedReplicas: map[string]*replicas{ + "c1": { + min: int32(1), + max: int32(9), + }, + "c2": { + min: int32(1), + max: int32(1), + }, + }, + }, + "Cluster needing max replicas gets it if there is another cluster to offer max": { + localHpas: func() map[string]pkgruntime.Object { + hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 7) + hpa1 = updateHpaStatus(hpa1, NewInt32(50), 5, 5, true) + hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1) + hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true) + // include third object to ensure, it does not break the test + hpa3 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 2) + hpa3 = updateHpaStatus(hpa3, NewInt32(70), 1, 1, false) + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = hpa1 + hpas["c2"] = hpa2 + hpas["c3"] = hpa3 + return hpas + }(), + expectedReplicas: map[string]*replicas{ + "c1": { + min: int32(1), + max: int32(6), + }, + "c2": { + min: int32(1), + max: int32(2), + }, + "c3": { + min: int32(1), + max: int32(2), + }, + }, + }, + "Cluster needing max replicas does not get it if there is no cluster offerring max": { + localHpas: func() map[string]pkgruntime.Object { + hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 9) + hpa1 = updateHpaStatus(hpa1, NewInt32(70), 9, 9, false) + hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1) + hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true) + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = hpa1 + hpas["c2"] = hpa2 + return hpas + }(), + expectedReplicas: map[string]*replicas{ + "c1": { + min: int32(1), + max: int32(9), + }, + "c2": { + min: int32(1), + max: int32(1), + }, + }, + }, + "Cluster which can increase min replicas gets to increase min if there is a cluster offering min": { + fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10), + localHpas: func() map[string]pkgruntime.Object { + hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6) + hpa1 = updateHpaStatus(hpa1, NewInt32(50), 3, 3, true) + hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4) + hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true) + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = hpa1 + hpas["c2"] = hpa2 + return hpas + }(), + expectedReplicas: map[string]*replicas{ + "c1": { + min: int32(2), + max: int32(6), + }, + "c2": { + min: int32(2), + max: int32(4), + }, + }, + }, + "Cluster which can increase min replicas does not increase if there are no clusters offering min": { + fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10), + localHpas: func() map[string]pkgruntime.Object { + hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6) + hpa1 = updateHpaStatus(hpa1, NewInt32(50), 4, 4, true) + hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4) + hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true) + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = hpa1 + hpas["c2"] = hpa2 + return hpas + }(), + expectedReplicas: map[string]*replicas{ + "c1": { + min: int32(3), + max: int32(6), + }, + "c2": { + min: int32(1), + max: int32(4), + }, + }, + }, + "Increasing replicas on fed object increases the same on clusters": { + // Existing total of local min, max = 1+1, 5+5 decreasing to below + fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 14), + localHpas: func() map[string]pkgruntime.Object { + // does not matter if scaleability is true + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5) + hpas["c2"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5) + return hpas + }(), + // We dont know which cluster gets how many, but the resultant total should match + }, + "Decreasing replicas on fed object decreases the same on clusters": { + // Existing total of local min, max = 2+2, 8+8 decreasing to below + fedHpa: newHpaWithReplicas(NewInt32(3), NewInt32(70), 8), + localHpas: func() map[string]pkgruntime.Object { + // does not matter if scaleability is true + hpas := make(map[string]pkgruntime.Object) + hpas["c1"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8) + hpas["c2"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8) + return hpas + }(), + // We dont know which cluster gets how many, but the resultant total should match + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + if testCase.fedHpa == nil { + testCase.fedHpa = defaultFedHpa + } + scheduledState := getHpaScheduleState(testCase.fedHpa, testCase.localHpas) + checkClusterConditions(t, testCase.fedHpa, scheduledState) + if testCase.expectedReplicas != nil { + for cluster, replicas := range testCase.expectedReplicas { + scheduledReplicas := scheduledState[cluster] + assert.Equal(t, replicas.min, scheduledReplicas.min) + assert.Equal(t, replicas.max, scheduledReplicas.max) + } + } + }) + } +} + +func updateHpaStatus(hpa *autoscalingv1.HorizontalPodAutoscaler, currentUtilisation *int32, current, desired int32, scaleable bool) *autoscalingv1.HorizontalPodAutoscaler { + hpa.Status.CurrentReplicas = current + hpa.Status.DesiredReplicas = desired + hpa.Status.CurrentCPUUtilizationPercentage = currentUtilisation + now := metav1.Now() + scaledTime := now + if scaleable { + // definitely more then 5 minutes ago + scaledTime = metav1.NewTime(now.Time.Add(-6 * time.Minute)) + } + hpa.Status.LastScaleTime = &scaledTime + return hpa +} + +func checkClusterConditions(t *testing.T, fedHpa *autoscalingv1.HorizontalPodAutoscaler, scheduled map[string]*replicaNums) { + minTotal := int32(0) + maxTotal := int32(0) + for _, replicas := range scheduled { + minTotal += replicas.min + maxTotal += replicas.max + } + + // - Total of max matches the fed max + assert.Equal(t, fedHpa.Spec.MaxReplicas, maxTotal) + // - Total of min is not less then fed min + assert.Condition(t, func() bool { + if *fedHpa.Spec.MinReplicas <= minTotal { + return true + } + return false + }) +} + +func newHpaWithReplicas(min, targetUtilisation *int32, max int32) *autoscalingv1.HorizontalPodAutoscaler { + return &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myhpa", + Namespace: apiv1.NamespaceDefault, + SelfLink: "/api/mylink", + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + Kind: "HorizontalPodAutoscaler", + Name: "target-", + }, + MinReplicas: min, + MaxReplicas: max, + TargetCPUUtilizationPercentage: targetUtilisation, + }, + } +} diff --git a/federation/pkg/federatedtypes/replicaset.go b/federation/pkg/federatedtypes/replicaset.go index d439c7a361a..909b12b7d0a 100644 --- a/federation/pkg/federatedtypes/replicaset.go +++ b/federation/pkg/federatedtypes/replicaset.go @@ -40,16 +40,16 @@ func init() { } type ReplicaSetAdapter struct { - *schedulingAdapter + *replicaSchedulingAdapter client federationclientset.Interface } func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { - schedulingAdapter := schedulingAdapter{ + replicaSchedulingAdapter := replicaSchedulingAdapter{ preferencesAnnotationName: FedReplicaSetPreferencesAnnotation, - updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error { + updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error { rs := obj.(*extensionsv1.ReplicaSet) - typedStatus := status.(ReplicaSchedulingStatus) + typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status if typedStatus.Replicas != rs.Status.Replicas || typedStatus.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas || typedStatus.ReadyReplicas != rs.Status.ReadyReplicas || typedStatus.AvailableReplicas != rs.Status.AvailableReplicas { rs.Status = extensionsv1.ReplicaSetStatus{ @@ -64,7 +64,7 @@ func NewReplicaSetAdapter(client federationclientset.Interface, config *restclie return nil }, } - return &ReplicaSetAdapter{&schedulingAdapter, client} + return &ReplicaSetAdapter{&replicaSchedulingAdapter, client} } func (a *ReplicaSetAdapter) Kind() string { diff --git a/federation/pkg/federatedtypes/scheduling.go b/federation/pkg/federatedtypes/scheduling.go index a6f9bcc9905..3cad44bd59e 100644 --- a/federation/pkg/federatedtypes/scheduling.go +++ b/federation/pkg/federatedtypes/scheduling.go @@ -37,6 +37,16 @@ import ( "github.com/golang/glog" ) +// ScheduleAction is used by the interface ScheduleObject of SchedulingAdapter +// to sync controller reconcile to convey the action type needed for the +// particular cluster local object in ScheduleObject +type ScheduleAction string + +const ( + ActionAdd = "add" + ActionDelete = "delete" +) + // ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment) // that are being scheduled into joined clusters. type ReplicaSchedulingStatus struct { @@ -58,26 +68,26 @@ type ReplicaSchedulingInfo struct { // federated type that requires more complex synchronization logic. type SchedulingAdapter interface { GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) - ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error) - UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error + ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) + UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error // EquivalentIgnoringSchedule returns whether obj1 and obj2 are // equivalent ignoring differences due to scheduling. EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool } -// schedulingAdapter is meant to be embedded in other type adapters that require -// workload scheduling. -type schedulingAdapter struct { +// replicaSchedulingAdapter is meant to be embedded in other type adapters that require +// workload scheduling with actual pod replicas. +type replicaSchedulingAdapter struct { preferencesAnnotationName string updateStatusFunc func(pkgruntime.Object, interface{}) error } -func (a *schedulingAdapter) IsSchedulingAdapter() bool { +func (a *replicaSchedulingAdapter) IsSchedulingAdapter() bool { return true } -func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) { +func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) { var clusterNames []string for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) @@ -128,7 +138,7 @@ func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clust }, nil } -func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error) { +func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) { typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo) replicas, ok := typedSchedulingInfo.Schedule[cluster.Name] if !ok { @@ -152,11 +162,15 @@ func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clust } } } - return federationObjCopy, replicas > 0, nil + var action ScheduleAction = "" + if replicas > 0 { + action = ActionAdd + } + return federationObjCopy, action, nil } -func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error { - return a.updateStatusFunc(obj, status) +func (a *replicaSchedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error { + return a.updateStatusFunc(obj, schedulingInfo) } func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 800a1484f8a..41cee680fd3 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -490,8 +490,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op if !ok { glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind) } - typedScheduleInfo := schedulingInfo.(*federatedtypes.ReplicaSchedulingInfo) - err = schedulingAdapter.UpdateFederatedStatus(obj, typedScheduleInfo.Status) + err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo) if err != nil { runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err)) return statusError @@ -548,7 +547,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus return nil, wrappedErr } - shouldCreateIfNeeded := true + var scheduleAction federatedtypes.ScheduleAction = federatedtypes.ActionAdd if adapter.IsSchedulingAdapter() { schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter) if !ok { @@ -559,7 +558,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus if clusterObj != nil { clusterTypedObj = clusterObj.(pkgruntime.Object) } - desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo) + desiredObj, scheduleAction, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo) if err != nil { runtime.HandleError(err) return nil, err @@ -568,11 +567,15 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus var operationType util.FederatedOperationType = "" if found { - clusterObj := clusterObj.(pkgruntime.Object) - if !adapter.Equivalent(desiredObj, clusterObj) { - operationType = util.OperationTypeUpdate + if scheduleAction == federatedtypes.ActionDelete { + operationType = util.OperationTypeDelete + } else { + clusterObj := clusterObj.(pkgruntime.Object) + if !adapter.Equivalent(desiredObj, clusterObj) { + operationType = util.OperationTypeUpdate + } } - } else if shouldCreateIfNeeded { + } else if scheduleAction == federatedtypes.ActionAdd { operationType = util.OperationTypeAdd } diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index de6307b80e4..534111d7424 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -446,3 +446,9 @@ func AssertHasFinalizer(t *testing.T, obj runtime.Object, finalizer string) { require.Nil(t, err) assert.True(t, hasFinalizer) } + +func NewInt32(val int32) *int32 { + p := new(int32) + *p = val + return p +}