mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
[Federation] hpa controller
This commit is contained in:
parent
d8205661b7
commit
0bea0ca1d9
@ -10,11 +10,16 @@ load(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["scheduling_test.go"],
|
||||
srcs = [
|
||||
"hpa_test.go",
|
||||
"scheduling_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//federation/pkg/federation-controller/util/test:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
@ -29,6 +34,7 @@ go_library(
|
||||
"configmap.go",
|
||||
"daemonset.go",
|
||||
"deployment.go",
|
||||
"hpa.go",
|
||||
"namespace.go",
|
||||
"qualifiedname.go",
|
||||
"registry.go",
|
||||
@ -48,12 +54,14 @@ go_library(
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/controller/namespace/deletion:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
|
@ -40,16 +40,16 @@ func init() {
|
||||
}
|
||||
|
||||
type DeploymentAdapter struct {
|
||||
*schedulingAdapter
|
||||
*replicaSchedulingAdapter
|
||||
client federationclientset.Interface
|
||||
}
|
||||
|
||||
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
|
||||
schedulingAdapter := schedulingAdapter{
|
||||
schedulingAdapter := replicaSchedulingAdapter{
|
||||
preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
|
||||
updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
|
||||
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
|
||||
deployment := obj.(*extensionsv1.Deployment)
|
||||
typedStatus := status.(ReplicaSchedulingStatus)
|
||||
typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status
|
||||
if typedStatus.Replicas != deployment.Status.Replicas || typedStatus.UpdatedReplicas != deployment.Status.UpdatedReplicas ||
|
||||
typedStatus.ReadyReplicas != deployment.Status.ReadyReplicas || typedStatus.AvailableReplicas != deployment.Status.AvailableReplicas {
|
||||
deployment.Status = extensionsv1.DeploymentStatus{
|
||||
|
927
federation/pkg/federatedtypes/hpa.go
Normal file
927
federation/pkg/federatedtypes/hpa.go
Normal file
@ -0,0 +1,927 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package federatedtypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
)
|
||||
|
||||
const (
|
||||
HpaKind = "horizontalpodautoscaler"
|
||||
HpaControllerName = "horizontalpodautoscalers"
|
||||
// This is a tunable which does not change replica nums
|
||||
// on an existing local hpa, before this timeout, if it
|
||||
// did scale already (avoids thrashing of replicas around).
|
||||
scaleForbiddenWindow = 5 * time.Minute
|
||||
// This is used as the default min for hpa object submitted
|
||||
// to federation, in a situation where the default is for
|
||||
// some reason not present (Spec.MinReplicas == nil)
|
||||
hpaMinReplicaDefault = int32(1)
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterFederatedType(HpaKind, HpaControllerName, []schema.GroupVersionResource{autoscalingv1.SchemeGroupVersion.WithResource(HpaControllerName)}, NewHpaAdapter)
|
||||
}
|
||||
|
||||
type HpaAdapter struct {
|
||||
client federationclientset.Interface
|
||||
}
|
||||
|
||||
func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
|
||||
return &HpaAdapter{client: client}
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) Kind() string {
|
||||
return HpaKind
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ObjectType() pkgruntime.Object {
|
||||
return &autoscalingv1.HorizontalPodAutoscaler{}
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) IsExpectedType(obj interface{}) bool {
|
||||
_, ok := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return &autoscalingv1.HorizontalPodAutoscaler{
|
||||
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(hpa.ObjectMeta),
|
||||
Spec: *fedutil.DeepCopyApiTypeOrPanic(&hpa.Spec).(*autoscalingv1.HorizontalPodAutoscalerSpec),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
|
||||
return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return QualifiedName{Namespace: hpa.Namespace, Name: hpa.Name}
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
|
||||
return &obj.(*autoscalingv1.HorizontalPodAutoscaler).ObjectMeta
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
|
||||
return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
|
||||
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) NewTestObject(namespace string) pkgruntime.Object {
|
||||
var min int32 = 4
|
||||
var targetCPU int32 = 70
|
||||
return &autoscalingv1.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "test-hpa-",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
|
||||
Kind: "replicaset",
|
||||
Name: "myrs",
|
||||
},
|
||||
MinReplicas: &min,
|
||||
MaxReplicas: int32(10),
|
||||
TargetCPUUtilizationPercentage: &targetCPU,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) IsSchedulingAdapter() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool {
|
||||
hpa1 := obj1.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
hpa2 := a.Copy(obj2).(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if hpa1.Spec.MinReplicas == nil {
|
||||
hpa2.Spec.MinReplicas = nil
|
||||
} else if hpa2.Spec.MinReplicas == nil {
|
||||
var r int32 = *hpa1.Spec.MinReplicas
|
||||
hpa2.Spec.MinReplicas = &r
|
||||
} else {
|
||||
*hpa2.Spec.MinReplicas = *hpa1.Spec.MinReplicas
|
||||
}
|
||||
hpa2.Spec.MaxReplicas = hpa1.Spec.MaxReplicas
|
||||
return fedutil.ObjectMetaAndSpecEquivalent(hpa1, hpa2)
|
||||
}
|
||||
|
||||
type replicaNums struct {
|
||||
min int32
|
||||
max int32
|
||||
}
|
||||
|
||||
type hpaFederatedStatus struct {
|
||||
lastScaleTime *metav1.Time
|
||||
// Indicates how many clusters have hpa/replicas.
|
||||
// Used to average the cpu utilization which is
|
||||
// reflected to the federation user.
|
||||
count int32
|
||||
aggregateCPUUtilizationPercentage *int32
|
||||
currentReplicas int32
|
||||
desiredReplicas int32
|
||||
}
|
||||
|
||||
type hpaSchedulingInfo struct {
|
||||
scheduleState map[string]*replicaNums
|
||||
fedStatus hpaFederatedStatus
|
||||
}
|
||||
|
||||
// List of cluster names.
|
||||
type hpaLists struct {
|
||||
// Stores names of those clusters which can offer min.
|
||||
availableMin sets.String
|
||||
// Stores names of those clusters which can offer max.
|
||||
availableMax sets.String
|
||||
// Stores names of those clusters which do not have hpa yet.
|
||||
noHpa sets.String
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
|
||||
currentClusterObjs, err := getCurrentClusterObjs(informer, key, clusters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialise averaged cpu utilisation for this reconcile.
|
||||
var ccup int32 = 0
|
||||
fedStatus := hpaFederatedStatus{
|
||||
aggregateCPUUtilizationPercentage: &ccup,
|
||||
count: int32(0),
|
||||
desiredReplicas: int32(0),
|
||||
currentReplicas: int32(0),
|
||||
}
|
||||
fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
// We assign the last known scale time here, which we update with
|
||||
// the latest time from among all clusters in ScheduleObject()
|
||||
if fedHpa.Status.LastScaleTime != nil {
|
||||
t := metav1.NewTime(fedHpa.Status.LastScaleTime.Time)
|
||||
fedStatus.lastScaleTime = &t
|
||||
}
|
||||
|
||||
return &hpaSchedulingInfo{
|
||||
scheduleState: getHpaScheduleState(obj, currentClusterObjs),
|
||||
fedStatus: fedStatus,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getCurrentClusterObjs(informer fedutil.FederatedInformer, key string, clusters []*federationapi.Cluster) (map[string]pkgruntime.Object, error) {
|
||||
currentClusterObjs := make(map[string]pkgruntime.Object)
|
||||
for _, cluster := range clusters {
|
||||
clusterName := cluster.Name
|
||||
clusterObj, found, err := informer.GetTargetStore().GetByKey(clusterName, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentClusterObjs[clusterName] = nil
|
||||
if found {
|
||||
currentClusterObjs[clusterName] = clusterObj.(pkgruntime.Object)
|
||||
}
|
||||
}
|
||||
return currentClusterObjs, nil
|
||||
}
|
||||
|
||||
// The algorithm used for scheduling is briefed as below:
|
||||
//
|
||||
// 1. Find clusters which can offer max and min, if any (lists.availableMax and
|
||||
// lists.availableMin) in one pass on all clusters.
|
||||
//
|
||||
// 2. Reduce the replicas (both min and max) if needed (situation when fedHpa
|
||||
// has lesser replicas then all cluster local hpa replicas totalled together).
|
||||
// In this step reduce first from those hpas which already have max (and min)
|
||||
// reducible. Once such clusters are over and reduction still needed, reduce
|
||||
// one at a time from all clusters, randomly. This step will ensure that the
|
||||
// exceeding replicas in local hpas are reduced to match the fedHpa.
|
||||
// This step would ideally be a noop in most cases because its rare that fedHpa
|
||||
// would have lesser replicas then the cluster local total (probably when user
|
||||
// forces update if fedHpa).
|
||||
//
|
||||
// 3. Distribute the replicas. In this step we have replicas to distribute (which
|
||||
// are fed replicas exceeding the sum total of local cluster replicas). If clusters
|
||||
// already have replicas, one replica from each cluster which can offer replicas
|
||||
// (both for max and min) are also added to this replicas to distribute numbers (min
|
||||
// and max).
|
||||
// 3a. We first do a sub-pass to distribute to clusters which need replicas, considering
|
||||
// those as clusters in crucial need of replicas.
|
||||
// 3b. After previous sub-pass, if we still have replicas remaining, in the sub-pass
|
||||
// we distribute to those clusters which do not yet have any hpa.
|
||||
// 3c. After previous if we still have more to distribute, then we distribute to all
|
||||
// clusters randomly, giving replica distribution count (rdc=total-fed-replicas/no-of-clusters)
|
||||
// to each at a time.
|
||||
//
|
||||
// The above algorithm is run to first distribute max and then distribute min to those clusters
|
||||
// which get max.
|
||||
func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
|
||||
fedHpa := fedObj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
requestedMin := hpaMinReplicaDefault
|
||||
if fedHpa.Spec.MinReplicas != nil {
|
||||
requestedMin = *fedHpa.Spec.MinReplicas
|
||||
}
|
||||
requestedReplicas := replicaNums{
|
||||
min: requestedMin,
|
||||
max: fedHpa.Spec.MaxReplicas,
|
||||
}
|
||||
// replica distribution count, per cluster
|
||||
rdc := replicaNums{
|
||||
min: requestedReplicas.min / int32(len(currentObjs)),
|
||||
max: requestedReplicas.max / int32(len(currentObjs)),
|
||||
}
|
||||
if rdc.min < 1 {
|
||||
rdc.min = 1
|
||||
}
|
||||
// TODO: Is there a better way?
|
||||
// We need to cap the lowest limit of Max to 2, because in a
|
||||
// situation like both min and max become 1 (same) for all clusters,
|
||||
// no rebalancing would happen.
|
||||
if rdc.max < 2 {
|
||||
rdc.max = 2
|
||||
}
|
||||
|
||||
// Pass 1: Analyse existing local hpa's if any.
|
||||
// clusterLists holds the list of those clusters which can offer
|
||||
// min and max replicas, to those which want them.
|
||||
// For example new clusters joining the federation and/or
|
||||
// those clusters which need to increase or reduce replicas
|
||||
// beyond min/max limits.
|
||||
// schedStatus currently have status of existing hpas.
|
||||
// It will eventually have desired status for this reconcile.
|
||||
clusterLists, currentReplicas, scheduleState := prepareForScheduling(currentObjs)
|
||||
|
||||
remainingReplicas := replicaNums{
|
||||
min: requestedReplicas.min - currentReplicas.min,
|
||||
max: requestedReplicas.max - currentReplicas.max,
|
||||
}
|
||||
|
||||
// Pass 2: reduction of replicas if needed ( situation that fedHpa updated replicas
|
||||
// to lesser then existing).
|
||||
// In this pass, we remain pessimistic and reduce one replica per cluster at a time.
|
||||
if remainingReplicas.min < 0 {
|
||||
excessMin := (remainingReplicas.min * int32(-1))
|
||||
remainingReplicas.min = reduceMinReplicas(excessMin, clusterLists.availableMin, scheduleState)
|
||||
}
|
||||
if remainingReplicas.max < 0 {
|
||||
excessMax := (remainingReplicas.max * int32(-1))
|
||||
remainingReplicas.max = reduceMaxReplicas(excessMax, clusterLists.availableMax, scheduleState)
|
||||
}
|
||||
|
||||
toDistribute := replicaNums{
|
||||
min: remainingReplicas.min + int32(clusterLists.availableMin.Len()),
|
||||
max: remainingReplicas.max + int32(clusterLists.availableMax.Len()),
|
||||
}
|
||||
|
||||
// Pass 3: Distribute Max and then Min.
|
||||
// Here we first distribute max and then (in the next loop)
|
||||
// distribute min into those clusters which already get the
|
||||
// max fixed.
|
||||
// In this process we might not meet the min limit and total of
|
||||
// min limits might remain more then the requested federated min.
|
||||
// This is partially because a min per cluster cannot be lesser
|
||||
// then 1, but min could be requested as 1 at federation.
|
||||
// Additionally we first increase replicas into those clusters
|
||||
// which already have hpa's and are in a condition to increase.
|
||||
// This will save cluster related resources for the user, such that
|
||||
// if an already existing cluster can satisfy users request why send
|
||||
// the workload to another.
|
||||
// We then go ahead to give the replicas to those which do not
|
||||
// have any hpa. In this pass however we try to ensure that all
|
||||
// our Max are consumed in this reconcile.
|
||||
distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)
|
||||
|
||||
// We distribute min to those clusters which:
|
||||
// 1 - can adjust min (our increase step would be only 1)
|
||||
// 2 - which do not have this hpa and got max(increase step rdcMin)
|
||||
// We might exhaust all min replicas here, with
|
||||
// some clusters still needing them. We adjust this in finalise by
|
||||
// assigning min replicas to 1 into those clusters which got max
|
||||
// but min remains 0.
|
||||
distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)
|
||||
|
||||
return finaliseScheduleState(scheduleState)
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
|
||||
// Update federated status info
|
||||
typedInfo := schedulingInfo.(*hpaSchedulingInfo)
|
||||
if clusterObj != nil {
|
||||
clusterHpa := clusterObj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if clusterHpa.Status.CurrentCPUUtilizationPercentage != nil {
|
||||
*typedInfo.fedStatus.aggregateCPUUtilizationPercentage +=
|
||||
(*clusterHpa.Status.CurrentCPUUtilizationPercentage * clusterHpa.Status.CurrentReplicas)
|
||||
typedInfo.fedStatus.count += clusterHpa.Status.CurrentReplicas
|
||||
}
|
||||
if clusterHpa.Status.LastScaleTime != nil {
|
||||
t := metav1.NewTime(clusterHpa.Status.LastScaleTime.Time)
|
||||
if typedInfo.fedStatus.lastScaleTime != nil &&
|
||||
t.After(typedInfo.fedStatus.lastScaleTime.Time) {
|
||||
typedInfo.fedStatus.lastScaleTime = &t
|
||||
}
|
||||
}
|
||||
|
||||
typedInfo.fedStatus.currentReplicas += clusterHpa.Status.CurrentReplicas
|
||||
typedInfo.fedStatus.desiredReplicas += clusterHpa.Status.DesiredReplicas
|
||||
}
|
||||
|
||||
// Update the cluster obj and the needed action on the cluster
|
||||
clusterHpaState := typedInfo.scheduleState[cluster.Name]
|
||||
desiredHpa := federationObjCopy.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if clusterHpaState != nil {
|
||||
desiredHpa.Spec.MaxReplicas = clusterHpaState.max
|
||||
if desiredHpa.Spec.MinReplicas == nil {
|
||||
min := int32(0)
|
||||
desiredHpa.Spec.MinReplicas = &min
|
||||
}
|
||||
*desiredHpa.Spec.MinReplicas = clusterHpaState.min
|
||||
}
|
||||
|
||||
var defaultAction ScheduleAction = ""
|
||||
switch {
|
||||
case clusterHpaState != nil && clusterObj != nil:
|
||||
return desiredHpa, defaultAction, nil
|
||||
case clusterHpaState != nil && clusterObj == nil:
|
||||
return desiredHpa, ActionAdd, nil
|
||||
case clusterHpaState == nil && clusterObj != nil:
|
||||
return nil, ActionDelete, nil
|
||||
}
|
||||
return nil, defaultAction, nil
|
||||
}
|
||||
|
||||
func (a *HpaAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error {
|
||||
fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
needUpdate, newFedHpaStatus := updateStatus(fedHpa, schedulingInfo.(*hpaSchedulingInfo).fedStatus)
|
||||
if needUpdate {
|
||||
fedHpa.Status = newFedHpaStatus
|
||||
_, err := a.client.AutoscalingV1().HorizontalPodAutoscalers(fedHpa.Namespace).UpdateStatus(fedHpa)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateStatus(fedHpa *autoscalingv1.HorizontalPodAutoscaler, newStatus hpaFederatedStatus) (bool, autoscalingv1.HorizontalPodAutoscalerStatus) {
|
||||
averageCPUUtilizationPercentage := int32(0)
|
||||
// Average out the available current utilisation
|
||||
if *newStatus.aggregateCPUUtilizationPercentage != 0 && newStatus.count != 0 {
|
||||
averageCPUUtilizationPercentage = *newStatus.aggregateCPUUtilizationPercentage / newStatus.count
|
||||
}
|
||||
gen := fedHpa.Generation
|
||||
newFedHpaStatus := autoscalingv1.HorizontalPodAutoscalerStatus{ObservedGeneration: &gen}
|
||||
needUpdate := false
|
||||
if (fedHpa.Status.CurrentCPUUtilizationPercentage == nil &&
|
||||
averageCPUUtilizationPercentage != 0) ||
|
||||
(fedHpa.Status.CurrentCPUUtilizationPercentage != nil &&
|
||||
averageCPUUtilizationPercentage !=
|
||||
*fedHpa.Status.CurrentCPUUtilizationPercentage) {
|
||||
needUpdate = true
|
||||
newFedHpaStatus.CurrentCPUUtilizationPercentage = &averageCPUUtilizationPercentage
|
||||
}
|
||||
if (fedHpa.Status.LastScaleTime == nil && newStatus.lastScaleTime != nil) ||
|
||||
(fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime == nil) ||
|
||||
((fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime != nil) &&
|
||||
newStatus.lastScaleTime.After(fedHpa.Status.LastScaleTime.Time)) {
|
||||
needUpdate = true
|
||||
newFedHpaStatus.LastScaleTime = newStatus.lastScaleTime
|
||||
}
|
||||
if fedHpa.Status.DesiredReplicas != newStatus.desiredReplicas {
|
||||
needUpdate = true
|
||||
newFedHpaStatus.CurrentReplicas = newStatus.currentReplicas
|
||||
}
|
||||
if fedHpa.Status.CurrentReplicas != newStatus.currentReplicas {
|
||||
needUpdate = true
|
||||
newFedHpaStatus.DesiredReplicas = newStatus.desiredReplicas
|
||||
}
|
||||
return needUpdate, newFedHpaStatus
|
||||
}
|
||||
|
||||
// prepareForScheduling prepares the lists and totals from the
|
||||
// existing objs.
|
||||
// currentObjs has the list of all clusters, with obj as nil
|
||||
// for those clusters which do not have hpa yet.
|
||||
func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
|
||||
lists := hpaLists{
|
||||
availableMax: sets.NewString(),
|
||||
availableMin: sets.NewString(),
|
||||
noHpa: sets.NewString(),
|
||||
}
|
||||
existingTotal := replicaNums{
|
||||
min: int32(0),
|
||||
max: int32(0),
|
||||
}
|
||||
|
||||
scheduleState := make(map[string]*replicaNums)
|
||||
for cluster, obj := range currentObjs {
|
||||
if obj == nil {
|
||||
lists.noHpa.Insert(cluster)
|
||||
scheduleState[cluster] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if maxReplicasReducible(obj) {
|
||||
lists.availableMax.Insert(cluster)
|
||||
}
|
||||
if minReplicasReducible(obj) {
|
||||
lists.availableMin.Insert(cluster)
|
||||
}
|
||||
|
||||
replicas := replicaNums{min: 0, max: 0}
|
||||
scheduleState[cluster] = &replicas
|
||||
if obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas != nil {
|
||||
existingTotal.min += *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas
|
||||
replicas.min = *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas
|
||||
}
|
||||
existingTotal.max += obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas
|
||||
replicas.max = obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas
|
||||
}
|
||||
|
||||
return lists, existingTotal, scheduleState
|
||||
}
|
||||
|
||||
// Note: reduceMinReplicas and reduceMaxReplicas, look quite similar in flow
|
||||
// and code, however there are subtle differences. They together can be made
|
||||
// into 1 function with an arg governing the functionality difference and
|
||||
// additional args (superset of args in both) as needed. Doing so however
|
||||
// makes the logical flow quite less readable. They are thus left as 2 for
|
||||
// readability.
|
||||
|
||||
// reduceMinReplicas reduces the min replicas from existing clusters.
|
||||
// At the end of the function excessMin should be 0 and the MinList
|
||||
// and the scheduledReplicas properly updated in place.
|
||||
func reduceMinReplicas(excessMin int32, availableMinList sets.String, scheduled map[string]*replicaNums) int32 {
|
||||
if excessMin > 0 {
|
||||
// first we try reducing from those clusters which already offer min
|
||||
if availableMinList.Len() > 0 {
|
||||
for _, cluster := range availableMinList.List() {
|
||||
replicas := scheduled[cluster]
|
||||
if replicas.min > 1 {
|
||||
replicas.min--
|
||||
availableMinList.Delete(cluster)
|
||||
excessMin--
|
||||
if excessMin <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we could not get needed replicas from already offered min above
|
||||
// we abruptly start removing replicas from some/all clusters.
|
||||
// Here we might make some min to 0 signalling that this hpa might be a
|
||||
// candidate to be removed from this cluster altogether.
|
||||
for excessMin > 0 {
|
||||
for _, replicas := range scheduled {
|
||||
if replicas != nil &&
|
||||
replicas.min > 0 {
|
||||
replicas.min--
|
||||
excessMin--
|
||||
if excessMin <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return excessMin
|
||||
}
|
||||
|
||||
// reduceMaxReplicas reduces the max replicas from existing clusters.
|
||||
// At the end of the function excessMax should be 0 and the MaxList
|
||||
// and the scheduledReplicas properly updated in place.
|
||||
func reduceMaxReplicas(excessMax int32, availableMaxList sets.String, scheduled map[string]*replicaNums) int32 {
|
||||
if excessMax > 0 {
|
||||
// first we try reducing from those clusters which already offer max
|
||||
if availableMaxList.Len() > 0 {
|
||||
for _, cluster := range availableMaxList.List() {
|
||||
replicas := scheduled[cluster]
|
||||
if replicas != nil && !((replicas.max - replicas.min) < 0) {
|
||||
replicas.max--
|
||||
availableMaxList.Delete(cluster)
|
||||
excessMax--
|
||||
if excessMax <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we could not get needed replicas to reduce from already offered
|
||||
// max above we abruptly start removing replicas from some/all clusters.
|
||||
// Here we might make some max and min to 0, signalling that this hpa be
|
||||
// removed from this cluster altogether
|
||||
for excessMax > 0 {
|
||||
for _, replicas := range scheduled {
|
||||
if replicas != nil &&
|
||||
!((replicas.max - replicas.min) < 0) {
|
||||
replicas.max--
|
||||
excessMax--
|
||||
if excessMax <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return excessMax
|
||||
}
|
||||
|
||||
// distributeMaxReplicas
|
||||
// Takes input:
|
||||
// toDistributeMax: number of replicas to distribute.
|
||||
// lists: cluster name lists, which have clusters with available max,
|
||||
// available min and those with no hpas yet.
|
||||
// rdc: replicadistributioncount for max and min.
|
||||
// currentObjs: list of current cluster hpas.
|
||||
// scheduled: schedule state which will be updated in place.
|
||||
func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
|
||||
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
|
||||
for cluster, replicas := range scheduled {
|
||||
if toDistributeMax == 0 {
|
||||
break
|
||||
}
|
||||
if replicas == nil {
|
||||
continue
|
||||
}
|
||||
if maxReplicasNeeded(currentObjs[cluster]) {
|
||||
replicas.max++
|
||||
if lists.availableMax.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMax.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks have happened earlier in
|
||||
// minReplicasReducible().
|
||||
scheduled[popped].max--
|
||||
}
|
||||
}
|
||||
// Any which ways utilise available map replicas
|
||||
toDistributeMax--
|
||||
}
|
||||
}
|
||||
|
||||
// If we have new clusters where we can give our replicas,
|
||||
// then give away all our replicas to the new clusters first.
|
||||
if lists.noHpa.Len() > 0 {
|
||||
for toDistributeMax > 0 {
|
||||
for _, cluster := range lists.noHpa.UnsortedList() {
|
||||
if scheduled[cluster] == nil {
|
||||
scheduled[cluster] = &replicaNums{min: 0, max: 0}
|
||||
}
|
||||
replicas := scheduled[cluster]
|
||||
// first give away max from clusters offering them
|
||||
// this case especially helps getting hpa into newly joining
|
||||
// clusters.
|
||||
if lists.availableMax.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMax.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks to reduce max have happened earlier in
|
||||
// minReplicasReducible().
|
||||
replicas.max++
|
||||
scheduled[popped].max--
|
||||
toDistributeMax--
|
||||
continue
|
||||
}
|
||||
}
|
||||
if toDistributeMax < rdc.max {
|
||||
replicas.max += toDistributeMax
|
||||
toDistributeMax = 0
|
||||
break
|
||||
}
|
||||
replicas.max += rdc.max
|
||||
toDistributeMax -= rdc.max
|
||||
}
|
||||
}
|
||||
} else { // we have no new clusters but if still have max replicas to distribute;
|
||||
// just distribute all in current clusters.
|
||||
for toDistributeMax > 0 {
|
||||
for cluster, replicas := range scheduled {
|
||||
if replicas == nil {
|
||||
replicas = &replicaNums{min: 0, max: 0}
|
||||
scheduled[cluster] = replicas
|
||||
}
|
||||
// First give away max from clusters offering them.
|
||||
// This case especially helps getting hpa into newly joining
|
||||
// clusters.
|
||||
if lists.availableMax.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMax.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks have happened earlier in
|
||||
// minReplicasReducible().
|
||||
replicas.max++
|
||||
scheduled[popped].max--
|
||||
toDistributeMax--
|
||||
continue
|
||||
}
|
||||
}
|
||||
if toDistributeMax < rdc.max {
|
||||
replicas.max += toDistributeMax
|
||||
toDistributeMax = 0
|
||||
break
|
||||
}
|
||||
replicas.max += rdc.max
|
||||
toDistributeMax -= rdc.max
|
||||
}
|
||||
}
|
||||
}
|
||||
return toDistributeMax
|
||||
}
|
||||
|
||||
// distributeMinReplicas
|
||||
// Takes input:
|
||||
// toDistributeMin: number of replicas to distribute.
|
||||
// lists: cluster name lists, which have clusters with available max,
|
||||
// available min and those with no hpas yet.
|
||||
// rdc: replicadistributioncount for max and min.
|
||||
// currentObjs: list of current cluster hpas.
|
||||
// scheduled: schedule state which will be updated in place.
|
||||
func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
|
||||
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
|
||||
for cluster, replicas := range scheduled {
|
||||
if toDistributeMin == 0 {
|
||||
break
|
||||
}
|
||||
// We have distriubted Max and thus scheduled might not be nil
|
||||
// but probably current (what we got originally) is nil(no hpa)
|
||||
if replicas == nil || currentObjs[cluster] == nil {
|
||||
continue
|
||||
}
|
||||
if minReplicasIncreasable(currentObjs[cluster]) {
|
||||
if lists.availableMin.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMin.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks have happened earlier.
|
||||
scheduled[popped].min--
|
||||
replicas.min++
|
||||
toDistributeMin--
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lists.noHpa.Len() > 0 {
|
||||
// TODO: can this become an infinite loop?
|
||||
for toDistributeMin > 0 {
|
||||
for _, cluster := range lists.noHpa.UnsortedList() {
|
||||
replicas := scheduled[cluster]
|
||||
if replicas == nil {
|
||||
// We did not get max here so this cluster
|
||||
// remains without hpa
|
||||
continue
|
||||
}
|
||||
var replicaNum int32 = 0
|
||||
if toDistributeMin < rdc.min {
|
||||
replicaNum = toDistributeMin
|
||||
} else {
|
||||
replicaNum = rdc.min
|
||||
}
|
||||
if (replicas.max - replicaNum) < replicas.min {
|
||||
// Cannot increase the min in this cluster
|
||||
// as it will go beyond max
|
||||
continue
|
||||
}
|
||||
if lists.availableMin.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMin.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks have happened earlier.
|
||||
scheduled[popped].min--
|
||||
replicas.min++
|
||||
toDistributeMin--
|
||||
continue
|
||||
}
|
||||
}
|
||||
replicas.min += replicaNum
|
||||
toDistributeMin -= replicaNum
|
||||
}
|
||||
}
|
||||
} else { // we have no new clusters but if still have min replicas to distribute;
|
||||
// just distribute all in current clusters.
|
||||
for toDistributeMin > 0 {
|
||||
for _, replicas := range scheduled {
|
||||
if replicas == nil {
|
||||
// We did not get max here so this cluster
|
||||
// remains without hpa
|
||||
continue
|
||||
}
|
||||
var replicaNum int32 = 0
|
||||
if toDistributeMin < rdc.min {
|
||||
replicaNum = toDistributeMin
|
||||
} else {
|
||||
replicaNum = rdc.min
|
||||
}
|
||||
if (replicas.max - replicaNum) < replicas.min {
|
||||
// Cannot increase the min in this cluster
|
||||
// as it will go beyond max
|
||||
continue
|
||||
}
|
||||
if lists.availableMin.Len() > 0 {
|
||||
popped, notEmpty := lists.availableMin.PopAny()
|
||||
if notEmpty {
|
||||
// Boundary checks have happened earlier.
|
||||
scheduled[popped].min--
|
||||
replicas.min++
|
||||
toDistributeMin--
|
||||
continue
|
||||
}
|
||||
}
|
||||
replicas.min += replicaNum
|
||||
toDistributeMin -= replicaNum
|
||||
}
|
||||
}
|
||||
}
|
||||
return toDistributeMin
|
||||
}
|
||||
|
||||
// finaliseScheduleState ensures that the minReplica count is made to 1
|
||||
// for those clusters which got max, but did not get min. This is because
|
||||
// k8s hpa does not accept hpas with 0 min replicas.
|
||||
// The replica num distribution can thus have more mins then fedHpa requested
|
||||
// but its better then having all replicas go into one cluster (if fedHpa
|
||||
// requested min=1 (which is the most usual case).
|
||||
func finaliseScheduleState(scheduled map[string]*replicaNums) map[string]*replicaNums {
|
||||
for _, replicas := range scheduled {
|
||||
if (replicas != nil) && (replicas.min <= 0) && (replicas.max > 0) {
|
||||
// Min total does not necessarily meet the federated min limit.
|
||||
replicas.min = 1
|
||||
}
|
||||
}
|
||||
return scheduled
|
||||
}
|
||||
|
||||
// isPristine is used to determine if so far local controller has been
|
||||
// able to really determine, what should be the desired replica number for
|
||||
// this cluster.
|
||||
// This is used to get hpas into those clusters which might join fresh,
|
||||
// and so far other cluster hpas haven't really reached anywhere.
|
||||
// TODO: There is a flaw here, that a just born object would also offer its
|
||||
// replicas which can also lead to fast thrashing.
|
||||
// The only better way is to either ensure that object creation time stamp is set
|
||||
// and can be used authoritatively; or have another field on the local object
|
||||
// which is mandatorily set on creation and can be used authoritatively.
|
||||
// Should we abuse annotations again for this, or this can be a proper requirement?
|
||||
func isPristine(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
|
||||
if hpa.Status.LastScaleTime == nil &&
|
||||
hpa.Status.DesiredReplicas == 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isScaleable tells if it already has been a reasonable amount of
|
||||
// time since this hpa scaled. Its used to avoid fast thrashing.
|
||||
func isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
|
||||
if hpa.Status.LastScaleTime == nil {
|
||||
return false
|
||||
}
|
||||
t := hpa.Status.LastScaleTime.Add(scaleForbiddenWindow)
|
||||
if t.After(time.Now()) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func maxReplicasReducible(obj pkgruntime.Object) bool {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if (hpa.Spec.MinReplicas != nil) &&
|
||||
(((hpa.Spec.MaxReplicas - 1) - *hpa.Spec.MinReplicas) < 0) {
|
||||
return false
|
||||
}
|
||||
if isPristine(hpa) {
|
||||
return true
|
||||
}
|
||||
if !isScaleable(hpa) {
|
||||
return false
|
||||
}
|
||||
if (hpa.Status.DesiredReplicas < hpa.Status.CurrentReplicas) ||
|
||||
((hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) &&
|
||||
(hpa.Status.DesiredReplicas < hpa.Spec.MaxReplicas)) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// minReplicasReducible checks if this cluster (hpa) can offer replicas which are
|
||||
// stuck here because of min limit.
|
||||
// Its noteworthy, that min and max are adjusted separately, but if the replicas
|
||||
// are not being used here, the max adjustment will lead it to become equal to min,
|
||||
// but will not be able to scale down further and offer max to some other cluster
|
||||
// which needs replicas.
|
||||
func minReplicasReducible(obj pkgruntime.Object) bool {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if isPristine(hpa) && (hpa.Spec.MinReplicas != nil) &&
|
||||
(*hpa.Spec.MinReplicas > 1) &&
|
||||
(*hpa.Spec.MinReplicas <= hpa.Spec.MaxReplicas) {
|
||||
return true
|
||||
}
|
||||
if !isScaleable(hpa) {
|
||||
return false
|
||||
}
|
||||
if (hpa.Spec.MinReplicas != nil) &&
|
||||
(*hpa.Spec.MinReplicas > 1) &&
|
||||
(hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) &&
|
||||
(hpa.Status.CurrentReplicas == *hpa.Spec.MinReplicas) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func maxReplicasNeeded(obj pkgruntime.Object) bool {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if !isScaleable(hpa) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (hpa.Status.CurrentReplicas == hpa.Status.DesiredReplicas) &&
|
||||
(hpa.Status.CurrentReplicas == hpa.Spec.MaxReplicas) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func minReplicasIncreasable(obj pkgruntime.Object) bool {
|
||||
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
if !isScaleable(hpa) ||
|
||||
((hpa.Spec.MinReplicas != nil) &&
|
||||
(*hpa.Spec.MinReplicas) >= hpa.Spec.MaxReplicas) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (hpa.Spec.MinReplicas != nil) &&
|
||||
(hpa.Status.DesiredReplicas > *hpa.Spec.MinReplicas) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
262
federation/pkg/federatedtypes/hpa_test.go
Normal file
262
federation/pkg/federatedtypes/hpa_test.go
Normal file
@ -0,0 +1,262 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package federatedtypes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type replicas struct {
|
||||
min int32
|
||||
max int32
|
||||
}
|
||||
|
||||
func TestGetHpaScheduleState(t *testing.T) {
|
||||
defaultFedHpa := newHpaWithReplicas(NewInt32(1), NewInt32(70), 10)
|
||||
testCases := map[string]struct {
|
||||
fedHpa *autoscalingv1.HorizontalPodAutoscaler
|
||||
localHpas map[string]pkgruntime.Object
|
||||
expectedReplicas map[string]*replicas
|
||||
}{
|
||||
"Distribiutes replicas randomly if no existing hpa in any local cluster": {
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = nil
|
||||
hpas["c2"] = nil
|
||||
return hpas
|
||||
}(),
|
||||
},
|
||||
"Cluster with no hpa gets replicas if other clusters have replicas": {
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 10)
|
||||
hpas["c2"] = nil
|
||||
return hpas
|
||||
}(),
|
||||
expectedReplicas: map[string]*replicas{
|
||||
"c1": {
|
||||
min: int32(1),
|
||||
max: int32(9),
|
||||
},
|
||||
"c2": {
|
||||
min: int32(1),
|
||||
max: int32(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
"Cluster needing max replicas gets it if there is another cluster to offer max": {
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 7)
|
||||
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 5, 5, true)
|
||||
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1)
|
||||
hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true)
|
||||
// include third object to ensure, it does not break the test
|
||||
hpa3 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 2)
|
||||
hpa3 = updateHpaStatus(hpa3, NewInt32(70), 1, 1, false)
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = hpa1
|
||||
hpas["c2"] = hpa2
|
||||
hpas["c3"] = hpa3
|
||||
return hpas
|
||||
}(),
|
||||
expectedReplicas: map[string]*replicas{
|
||||
"c1": {
|
||||
min: int32(1),
|
||||
max: int32(6),
|
||||
},
|
||||
"c2": {
|
||||
min: int32(1),
|
||||
max: int32(2),
|
||||
},
|
||||
"c3": {
|
||||
min: int32(1),
|
||||
max: int32(2),
|
||||
},
|
||||
},
|
||||
},
|
||||
"Cluster needing max replicas does not get it if there is no cluster offerring max": {
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 9)
|
||||
hpa1 = updateHpaStatus(hpa1, NewInt32(70), 9, 9, false)
|
||||
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1)
|
||||
hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true)
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = hpa1
|
||||
hpas["c2"] = hpa2
|
||||
return hpas
|
||||
}(),
|
||||
expectedReplicas: map[string]*replicas{
|
||||
"c1": {
|
||||
min: int32(1),
|
||||
max: int32(9),
|
||||
},
|
||||
"c2": {
|
||||
min: int32(1),
|
||||
max: int32(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
"Cluster which can increase min replicas gets to increase min if there is a cluster offering min": {
|
||||
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10),
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6)
|
||||
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 3, 3, true)
|
||||
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4)
|
||||
hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true)
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = hpa1
|
||||
hpas["c2"] = hpa2
|
||||
return hpas
|
||||
}(),
|
||||
expectedReplicas: map[string]*replicas{
|
||||
"c1": {
|
||||
min: int32(2),
|
||||
max: int32(6),
|
||||
},
|
||||
"c2": {
|
||||
min: int32(2),
|
||||
max: int32(4),
|
||||
},
|
||||
},
|
||||
},
|
||||
"Cluster which can increase min replicas does not increase if there are no clusters offering min": {
|
||||
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10),
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6)
|
||||
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 4, 4, true)
|
||||
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4)
|
||||
hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true)
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = hpa1
|
||||
hpas["c2"] = hpa2
|
||||
return hpas
|
||||
}(),
|
||||
expectedReplicas: map[string]*replicas{
|
||||
"c1": {
|
||||
min: int32(3),
|
||||
max: int32(6),
|
||||
},
|
||||
"c2": {
|
||||
min: int32(1),
|
||||
max: int32(4),
|
||||
},
|
||||
},
|
||||
},
|
||||
"Increasing replicas on fed object increases the same on clusters": {
|
||||
// Existing total of local min, max = 1+1, 5+5 decreasing to below
|
||||
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 14),
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
// does not matter if scaleability is true
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5)
|
||||
hpas["c2"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5)
|
||||
return hpas
|
||||
}(),
|
||||
// We dont know which cluster gets how many, but the resultant total should match
|
||||
},
|
||||
"Decreasing replicas on fed object decreases the same on clusters": {
|
||||
// Existing total of local min, max = 2+2, 8+8 decreasing to below
|
||||
fedHpa: newHpaWithReplicas(NewInt32(3), NewInt32(70), 8),
|
||||
localHpas: func() map[string]pkgruntime.Object {
|
||||
// does not matter if scaleability is true
|
||||
hpas := make(map[string]pkgruntime.Object)
|
||||
hpas["c1"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8)
|
||||
hpas["c2"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8)
|
||||
return hpas
|
||||
}(),
|
||||
// We dont know which cluster gets how many, but the resultant total should match
|
||||
},
|
||||
}
|
||||
|
||||
for testName, testCase := range testCases {
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
if testCase.fedHpa == nil {
|
||||
testCase.fedHpa = defaultFedHpa
|
||||
}
|
||||
scheduledState := getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
|
||||
checkClusterConditions(t, testCase.fedHpa, scheduledState)
|
||||
if testCase.expectedReplicas != nil {
|
||||
for cluster, replicas := range testCase.expectedReplicas {
|
||||
scheduledReplicas := scheduledState[cluster]
|
||||
assert.Equal(t, replicas.min, scheduledReplicas.min)
|
||||
assert.Equal(t, replicas.max, scheduledReplicas.max)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func updateHpaStatus(hpa *autoscalingv1.HorizontalPodAutoscaler, currentUtilisation *int32, current, desired int32, scaleable bool) *autoscalingv1.HorizontalPodAutoscaler {
|
||||
hpa.Status.CurrentReplicas = current
|
||||
hpa.Status.DesiredReplicas = desired
|
||||
hpa.Status.CurrentCPUUtilizationPercentage = currentUtilisation
|
||||
now := metav1.Now()
|
||||
scaledTime := now
|
||||
if scaleable {
|
||||
// definitely more then 5 minutes ago
|
||||
scaledTime = metav1.NewTime(now.Time.Add(-6 * time.Minute))
|
||||
}
|
||||
hpa.Status.LastScaleTime = &scaledTime
|
||||
return hpa
|
||||
}
|
||||
|
||||
func checkClusterConditions(t *testing.T, fedHpa *autoscalingv1.HorizontalPodAutoscaler, scheduled map[string]*replicaNums) {
|
||||
minTotal := int32(0)
|
||||
maxTotal := int32(0)
|
||||
for _, replicas := range scheduled {
|
||||
minTotal += replicas.min
|
||||
maxTotal += replicas.max
|
||||
}
|
||||
|
||||
// - Total of max matches the fed max
|
||||
assert.Equal(t, fedHpa.Spec.MaxReplicas, maxTotal)
|
||||
// - Total of min is not less then fed min
|
||||
assert.Condition(t, func() bool {
|
||||
if *fedHpa.Spec.MinReplicas <= minTotal {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func newHpaWithReplicas(min, targetUtilisation *int32, max int32) *autoscalingv1.HorizontalPodAutoscaler {
|
||||
return &autoscalingv1.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "myhpa",
|
||||
Namespace: apiv1.NamespaceDefault,
|
||||
SelfLink: "/api/mylink",
|
||||
},
|
||||
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
|
||||
Kind: "HorizontalPodAutoscaler",
|
||||
Name: "target-",
|
||||
},
|
||||
MinReplicas: min,
|
||||
MaxReplicas: max,
|
||||
TargetCPUUtilizationPercentage: targetUtilisation,
|
||||
},
|
||||
}
|
||||
}
|
@ -40,16 +40,16 @@ func init() {
|
||||
}
|
||||
|
||||
type ReplicaSetAdapter struct {
|
||||
*schedulingAdapter
|
||||
*replicaSchedulingAdapter
|
||||
client federationclientset.Interface
|
||||
}
|
||||
|
||||
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
|
||||
schedulingAdapter := schedulingAdapter{
|
||||
replicaSchedulingAdapter := replicaSchedulingAdapter{
|
||||
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
|
||||
updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
|
||||
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
|
||||
rs := obj.(*extensionsv1.ReplicaSet)
|
||||
typedStatus := status.(ReplicaSchedulingStatus)
|
||||
typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status
|
||||
if typedStatus.Replicas != rs.Status.Replicas || typedStatus.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
|
||||
typedStatus.ReadyReplicas != rs.Status.ReadyReplicas || typedStatus.AvailableReplicas != rs.Status.AvailableReplicas {
|
||||
rs.Status = extensionsv1.ReplicaSetStatus{
|
||||
@ -64,7 +64,7 @@ func NewReplicaSetAdapter(client federationclientset.Interface, config *restclie
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return &ReplicaSetAdapter{&schedulingAdapter, client}
|
||||
return &ReplicaSetAdapter{&replicaSchedulingAdapter, client}
|
||||
}
|
||||
|
||||
func (a *ReplicaSetAdapter) Kind() string {
|
||||
|
@ -37,6 +37,16 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// ScheduleAction is used by the interface ScheduleObject of SchedulingAdapter
|
||||
// to sync controller reconcile to convey the action type needed for the
|
||||
// particular cluster local object in ScheduleObject
|
||||
type ScheduleAction string
|
||||
|
||||
const (
|
||||
ActionAdd = "add"
|
||||
ActionDelete = "delete"
|
||||
)
|
||||
|
||||
// ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment)
|
||||
// that are being scheduled into joined clusters.
|
||||
type ReplicaSchedulingStatus struct {
|
||||
@ -58,26 +68,26 @@ type ReplicaSchedulingInfo struct {
|
||||
// federated type that requires more complex synchronization logic.
|
||||
type SchedulingAdapter interface {
|
||||
GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error)
|
||||
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error)
|
||||
UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error
|
||||
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error)
|
||||
UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error
|
||||
|
||||
// EquivalentIgnoringSchedule returns whether obj1 and obj2 are
|
||||
// equivalent ignoring differences due to scheduling.
|
||||
EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool
|
||||
}
|
||||
|
||||
// schedulingAdapter is meant to be embedded in other type adapters that require
|
||||
// workload scheduling.
|
||||
type schedulingAdapter struct {
|
||||
// replicaSchedulingAdapter is meant to be embedded in other type adapters that require
|
||||
// workload scheduling with actual pod replicas.
|
||||
type replicaSchedulingAdapter struct {
|
||||
preferencesAnnotationName string
|
||||
updateStatusFunc func(pkgruntime.Object, interface{}) error
|
||||
}
|
||||
|
||||
func (a *schedulingAdapter) IsSchedulingAdapter() bool {
|
||||
func (a *replicaSchedulingAdapter) IsSchedulingAdapter() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
|
||||
func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
|
||||
var clusterNames []string
|
||||
for _, cluster := range clusters {
|
||||
clusterNames = append(clusterNames, cluster.Name)
|
||||
@ -128,7 +138,7 @@ func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clust
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error) {
|
||||
func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
|
||||
typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo)
|
||||
replicas, ok := typedSchedulingInfo.Schedule[cluster.Name]
|
||||
if !ok {
|
||||
@ -152,11 +162,15 @@ func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clust
|
||||
}
|
||||
}
|
||||
}
|
||||
return federationObjCopy, replicas > 0, nil
|
||||
var action ScheduleAction = ""
|
||||
if replicas > 0 {
|
||||
action = ActionAdd
|
||||
}
|
||||
return federationObjCopy, action, nil
|
||||
}
|
||||
|
||||
func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error {
|
||||
return a.updateStatusFunc(obj, status)
|
||||
func (a *replicaSchedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error {
|
||||
return a.updateStatusFunc(obj, schedulingInfo)
|
||||
}
|
||||
|
||||
func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
|
||||
|
@ -490,8 +490,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
|
||||
if !ok {
|
||||
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
|
||||
}
|
||||
typedScheduleInfo := schedulingInfo.(*federatedtypes.ReplicaSchedulingInfo)
|
||||
err = schedulingAdapter.UpdateFederatedStatus(obj, typedScheduleInfo.Status)
|
||||
err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err))
|
||||
return statusError
|
||||
@ -548,7 +547,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
|
||||
return nil, wrappedErr
|
||||
}
|
||||
|
||||
shouldCreateIfNeeded := true
|
||||
var scheduleAction federatedtypes.ScheduleAction = federatedtypes.ActionAdd
|
||||
if adapter.IsSchedulingAdapter() {
|
||||
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
|
||||
if !ok {
|
||||
@ -559,7 +558,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
|
||||
if clusterObj != nil {
|
||||
clusterTypedObj = clusterObj.(pkgruntime.Object)
|
||||
}
|
||||
desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
|
||||
desiredObj, scheduleAction, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
|
||||
if err != nil {
|
||||
runtime.HandleError(err)
|
||||
return nil, err
|
||||
@ -568,11 +567,15 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
|
||||
|
||||
var operationType util.FederatedOperationType = ""
|
||||
if found {
|
||||
clusterObj := clusterObj.(pkgruntime.Object)
|
||||
if !adapter.Equivalent(desiredObj, clusterObj) {
|
||||
operationType = util.OperationTypeUpdate
|
||||
if scheduleAction == federatedtypes.ActionDelete {
|
||||
operationType = util.OperationTypeDelete
|
||||
} else {
|
||||
clusterObj := clusterObj.(pkgruntime.Object)
|
||||
if !adapter.Equivalent(desiredObj, clusterObj) {
|
||||
operationType = util.OperationTypeUpdate
|
||||
}
|
||||
}
|
||||
} else if shouldCreateIfNeeded {
|
||||
} else if scheduleAction == federatedtypes.ActionAdd {
|
||||
operationType = util.OperationTypeAdd
|
||||
}
|
||||
|
||||
|
@ -446,3 +446,9 @@ func AssertHasFinalizer(t *testing.T, obj runtime.Object, finalizer string) {
|
||||
require.Nil(t, err)
|
||||
assert.True(t, hasFinalizer)
|
||||
}
|
||||
|
||||
func NewInt32(val int32) *int32 {
|
||||
p := new(int32)
|
||||
*p = val
|
||||
return p
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user