Merge pull request #45993 from irfanurrehman/fed-hpa

Automatic merge from submit-queue (batch tested with PRs 45993, 50293)

[Federation] HPA controller

This PR implements the design listed in https://github.com/kubernetes/community/pull/593.
This is still a work in progress, and needs more unit tests to be added.
I will add the integration tests and e2e tests in a separate PR(s).

@kubernetes/sig-federation-pr-reviews 

**Release note**:

```
Horizontal Pod Autoscaling is now available as an alpha feature in federation. 
It can be used to distribute and scale workload across clusters joined in a federation. 
In its current form, it works only on cpu utilization and the support for other metrics is yet to be built in.
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-08 02:54:53 -07:00 committed by GitHub
commit f0ff280f42
8 changed files with 1249 additions and 29 deletions

View File

@ -10,11 +10,16 @@ load(
go_test(
name = "go_default_test",
srcs = ["scheduling_test.go"],
srcs = [
"hpa_test.go",
"scheduling_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/pkg/federation-controller/util/test:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -29,6 +34,7 @@ go_library(
"configmap.go",
"daemonset.go",
"deployment.go",
"hpa.go",
"namespace.go",
"qualifiedname.go",
"registry.go",
@ -48,12 +54,14 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/controller/namespace/deletion:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -40,16 +40,16 @@ func init() {
}
type DeploymentAdapter struct {
*schedulingAdapter
*replicaSchedulingAdapter
client federationclientset.Interface
}
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{
schedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
deployment := obj.(*extensionsv1.Deployment)
typedStatus := status.(ReplicaSchedulingStatus)
typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status
if typedStatus.Replicas != deployment.Status.Replicas || typedStatus.UpdatedReplicas != deployment.Status.UpdatedReplicas ||
typedStatus.ReadyReplicas != deployment.Status.ReadyReplicas || typedStatus.AvailableReplicas != deployment.Status.AvailableReplicas {
deployment.Status = extensionsv1.DeploymentStatus{

View File

@ -0,0 +1,927 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"time"
"fmt"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
kubeclientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
)
const (
HpaKind = "horizontalpodautoscaler"
HpaControllerName = "horizontalpodautoscalers"
// This is a tunable which does not change replica nums
// on an existing local hpa, before this timeout, if it
// did scale already (avoids thrashing of replicas around).
scaleForbiddenWindow = 5 * time.Minute
// This is used as the default min for hpa object submitted
// to federation, in a situation where the default is for
// some reason not present (Spec.MinReplicas == nil)
hpaMinReplicaDefault = int32(1)
)
func init() {
RegisterFederatedType(HpaKind, HpaControllerName, []schema.GroupVersionResource{autoscalingv1.SchemeGroupVersion.WithResource(HpaControllerName)}, NewHpaAdapter)
}
type HpaAdapter struct {
client federationclientset.Interface
}
func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &HpaAdapter{client: client}
}
func (a *HpaAdapter) Kind() string {
return HpaKind
}
func (a *HpaAdapter) ObjectType() pkgruntime.Object {
return &autoscalingv1.HorizontalPodAutoscaler{}
}
func (a *HpaAdapter) IsExpectedType(obj interface{}) bool {
_, ok := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return ok
}
func (a *HpaAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return &autoscalingv1.HorizontalPodAutoscaler{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(hpa.ObjectMeta),
Spec: *fedutil.DeepCopyApiTypeOrPanic(&hpa.Spec).(*autoscalingv1.HorizontalPodAutoscalerSpec),
}
}
func (a *HpaAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2)
}
func (a *HpaAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return QualifiedName{Namespace: hpa.Namespace, Name: hpa.Name}
}
func (a *HpaAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*autoscalingv1.HorizontalPodAutoscaler).ObjectMeta
}
func (a *HpaAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa)
}
func (a *HpaAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *HpaAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *HpaAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
}
func (a *HpaAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return a.client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa)
}
func (a *HpaAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
return a.client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
}
func (a *HpaAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Create(hpa)
}
func (a *HpaAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *HpaAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.AutoscalingV1().HorizontalPodAutoscalers(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *HpaAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
}
func (a *HpaAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
return client.AutoscalingV1().HorizontalPodAutoscalers(hpa.Namespace).Update(hpa)
}
func (a *HpaAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
}
func (a *HpaAdapter) NewTestObject(namespace string) pkgruntime.Object {
var min int32 = 4
var targetCPU int32 = 70
return &autoscalingv1.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-hpa-",
Namespace: namespace,
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Kind: "replicaset",
Name: "myrs",
},
MinReplicas: &min,
MaxReplicas: int32(10),
TargetCPUUtilizationPercentage: &targetCPU,
},
}
}
func (a *HpaAdapter) IsSchedulingAdapter() bool {
return true
}
func (a *HpaAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool {
hpa1 := obj1.(*autoscalingv1.HorizontalPodAutoscaler)
hpa2 := a.Copy(obj2).(*autoscalingv1.HorizontalPodAutoscaler)
if hpa1.Spec.MinReplicas == nil {
hpa2.Spec.MinReplicas = nil
} else if hpa2.Spec.MinReplicas == nil {
var r int32 = *hpa1.Spec.MinReplicas
hpa2.Spec.MinReplicas = &r
} else {
*hpa2.Spec.MinReplicas = *hpa1.Spec.MinReplicas
}
hpa2.Spec.MaxReplicas = hpa1.Spec.MaxReplicas
return fedutil.ObjectMetaAndSpecEquivalent(hpa1, hpa2)
}
type replicaNums struct {
min int32
max int32
}
type hpaFederatedStatus struct {
lastScaleTime *metav1.Time
// Indicates how many clusters have hpa/replicas.
// Used to average the cpu utilization which is
// reflected to the federation user.
count int32
aggregateCPUUtilizationPercentage *int32
currentReplicas int32
desiredReplicas int32
}
type hpaSchedulingInfo struct {
scheduleState map[string]*replicaNums
fedStatus hpaFederatedStatus
}
// List of cluster names.
type hpaLists struct {
// Stores names of those clusters which can offer min.
availableMin sets.String
// Stores names of those clusters which can offer max.
availableMax sets.String
// Stores names of those clusters which do not have hpa yet.
noHpa sets.String
}
func (a *HpaAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
currentClusterObjs, err := getCurrentClusterObjs(informer, key, clusters)
if err != nil {
return nil, err
}
// Initialise averaged cpu utilisation for this reconcile.
var ccup int32 = 0
fedStatus := hpaFederatedStatus{
aggregateCPUUtilizationPercentage: &ccup,
count: int32(0),
desiredReplicas: int32(0),
currentReplicas: int32(0),
}
fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
// We assign the last known scale time here, which we update with
// the latest time from among all clusters in ScheduleObject()
if fedHpa.Status.LastScaleTime != nil {
t := metav1.NewTime(fedHpa.Status.LastScaleTime.Time)
fedStatus.lastScaleTime = &t
}
return &hpaSchedulingInfo{
scheduleState: getHpaScheduleState(obj, currentClusterObjs),
fedStatus: fedStatus,
}, nil
}
func getCurrentClusterObjs(informer fedutil.FederatedInformer, key string, clusters []*federationapi.Cluster) (map[string]pkgruntime.Object, error) {
currentClusterObjs := make(map[string]pkgruntime.Object)
for _, cluster := range clusters {
clusterName := cluster.Name
clusterObj, found, err := informer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return nil, err
}
currentClusterObjs[clusterName] = nil
if found {
currentClusterObjs[clusterName] = clusterObj.(pkgruntime.Object)
}
}
return currentClusterObjs, nil
}
// The algorithm used for scheduling is briefed as below:
//
// 1. Find clusters which can offer max and min, if any (lists.availableMax and
// lists.availableMin) in one pass on all clusters.
//
// 2. Reduce the replicas (both min and max) if needed (situation when fedHpa
// has lesser replicas then all cluster local hpa replicas totalled together).
// In this step reduce first from those hpas which already have max (and min)
// reducible. Once such clusters are over and reduction still needed, reduce
// one at a time from all clusters, randomly. This step will ensure that the
// exceeding replicas in local hpas are reduced to match the fedHpa.
// This step would ideally be a noop in most cases because its rare that fedHpa
// would have lesser replicas then the cluster local total (probably when user
// forces update if fedHpa).
//
// 3. Distribute the replicas. In this step we have replicas to distribute (which
// are fed replicas exceeding the sum total of local cluster replicas). If clusters
// already have replicas, one replica from each cluster which can offer replicas
// (both for max and min) are also added to this replicas to distribute numbers (min
// and max).
// 3a. We first do a sub-pass to distribute to clusters which need replicas, considering
// those as clusters in crucial need of replicas.
// 3b. After previous sub-pass, if we still have replicas remaining, in the sub-pass
// we distribute to those clusters which do not yet have any hpa.
// 3c. After previous if we still have more to distribute, then we distribute to all
// clusters randomly, giving replica distribution count (rdc=total-fed-replicas/no-of-clusters)
// to each at a time.
//
// The above algorithm is run to first distribute max and then distribute min to those clusters
// which get max.
func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
fedHpa := fedObj.(*autoscalingv1.HorizontalPodAutoscaler)
requestedMin := hpaMinReplicaDefault
if fedHpa.Spec.MinReplicas != nil {
requestedMin = *fedHpa.Spec.MinReplicas
}
requestedReplicas := replicaNums{
min: requestedMin,
max: fedHpa.Spec.MaxReplicas,
}
// replica distribution count, per cluster
rdc := replicaNums{
min: requestedReplicas.min / int32(len(currentObjs)),
max: requestedReplicas.max / int32(len(currentObjs)),
}
if rdc.min < 1 {
rdc.min = 1
}
// TODO: Is there a better way?
// We need to cap the lowest limit of Max to 2, because in a
// situation like both min and max become 1 (same) for all clusters,
// no rebalancing would happen.
if rdc.max < 2 {
rdc.max = 2
}
// Pass 1: Analyse existing local hpa's if any.
// clusterLists holds the list of those clusters which can offer
// min and max replicas, to those which want them.
// For example new clusters joining the federation and/or
// those clusters which need to increase or reduce replicas
// beyond min/max limits.
// schedStatus currently have status of existing hpas.
// It will eventually have desired status for this reconcile.
clusterLists, currentReplicas, scheduleState := prepareForScheduling(currentObjs)
remainingReplicas := replicaNums{
min: requestedReplicas.min - currentReplicas.min,
max: requestedReplicas.max - currentReplicas.max,
}
// Pass 2: reduction of replicas if needed ( situation that fedHpa updated replicas
// to lesser then existing).
// In this pass, we remain pessimistic and reduce one replica per cluster at a time.
if remainingReplicas.min < 0 {
excessMin := (remainingReplicas.min * int32(-1))
remainingReplicas.min = reduceMinReplicas(excessMin, clusterLists.availableMin, scheduleState)
}
if remainingReplicas.max < 0 {
excessMax := (remainingReplicas.max * int32(-1))
remainingReplicas.max = reduceMaxReplicas(excessMax, clusterLists.availableMax, scheduleState)
}
toDistribute := replicaNums{
min: remainingReplicas.min + int32(clusterLists.availableMin.Len()),
max: remainingReplicas.max + int32(clusterLists.availableMax.Len()),
}
// Pass 3: Distribute Max and then Min.
// Here we first distribute max and then (in the next loop)
// distribute min into those clusters which already get the
// max fixed.
// In this process we might not meet the min limit and total of
// min limits might remain more then the requested federated min.
// This is partially because a min per cluster cannot be lesser
// then 1, but min could be requested as 1 at federation.
// Additionally we first increase replicas into those clusters
// which already have hpa's and are in a condition to increase.
// This will save cluster related resources for the user, such that
// if an already existing cluster can satisfy users request why send
// the workload to another.
// We then go ahead to give the replicas to those which do not
// have any hpa. In this pass however we try to ensure that all
// our Max are consumed in this reconcile.
distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)
// We distribute min to those clusters which:
// 1 - can adjust min (our increase step would be only 1)
// 2 - which do not have this hpa and got max(increase step rdcMin)
// We might exhaust all min replicas here, with
// some clusters still needing them. We adjust this in finalise by
// assigning min replicas to 1 into those clusters which got max
// but min remains 0.
distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)
return finaliseScheduleState(scheduleState)
}
func (a *HpaAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
// Update federated status info
typedInfo := schedulingInfo.(*hpaSchedulingInfo)
if clusterObj != nil {
clusterHpa := clusterObj.(*autoscalingv1.HorizontalPodAutoscaler)
if clusterHpa.Status.CurrentCPUUtilizationPercentage != nil {
*typedInfo.fedStatus.aggregateCPUUtilizationPercentage +=
(*clusterHpa.Status.CurrentCPUUtilizationPercentage * clusterHpa.Status.CurrentReplicas)
typedInfo.fedStatus.count += clusterHpa.Status.CurrentReplicas
}
if clusterHpa.Status.LastScaleTime != nil {
t := metav1.NewTime(clusterHpa.Status.LastScaleTime.Time)
if typedInfo.fedStatus.lastScaleTime != nil &&
t.After(typedInfo.fedStatus.lastScaleTime.Time) {
typedInfo.fedStatus.lastScaleTime = &t
}
}
typedInfo.fedStatus.currentReplicas += clusterHpa.Status.CurrentReplicas
typedInfo.fedStatus.desiredReplicas += clusterHpa.Status.DesiredReplicas
}
// Update the cluster obj and the needed action on the cluster
clusterHpaState := typedInfo.scheduleState[cluster.Name]
desiredHpa := federationObjCopy.(*autoscalingv1.HorizontalPodAutoscaler)
if clusterHpaState != nil {
desiredHpa.Spec.MaxReplicas = clusterHpaState.max
if desiredHpa.Spec.MinReplicas == nil {
min := int32(0)
desiredHpa.Spec.MinReplicas = &min
}
*desiredHpa.Spec.MinReplicas = clusterHpaState.min
}
var defaultAction ScheduleAction = ""
switch {
case clusterHpaState != nil && clusterObj != nil:
return desiredHpa, defaultAction, nil
case clusterHpaState != nil && clusterObj == nil:
return desiredHpa, ActionAdd, nil
case clusterHpaState == nil && clusterObj != nil:
return nil, ActionDelete, nil
}
return nil, defaultAction, nil
}
func (a *HpaAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error {
fedHpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
needUpdate, newFedHpaStatus := updateStatus(fedHpa, schedulingInfo.(*hpaSchedulingInfo).fedStatus)
if needUpdate {
fedHpa.Status = newFedHpaStatus
_, err := a.client.AutoscalingV1().HorizontalPodAutoscalers(fedHpa.Namespace).UpdateStatus(fedHpa)
if err != nil {
return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err)
}
}
return nil
}
func updateStatus(fedHpa *autoscalingv1.HorizontalPodAutoscaler, newStatus hpaFederatedStatus) (bool, autoscalingv1.HorizontalPodAutoscalerStatus) {
averageCPUUtilizationPercentage := int32(0)
// Average out the available current utilisation
if *newStatus.aggregateCPUUtilizationPercentage != 0 && newStatus.count != 0 {
averageCPUUtilizationPercentage = *newStatus.aggregateCPUUtilizationPercentage / newStatus.count
}
gen := fedHpa.Generation
newFedHpaStatus := autoscalingv1.HorizontalPodAutoscalerStatus{ObservedGeneration: &gen}
needUpdate := false
if (fedHpa.Status.CurrentCPUUtilizationPercentage == nil &&
averageCPUUtilizationPercentage != 0) ||
(fedHpa.Status.CurrentCPUUtilizationPercentage != nil &&
averageCPUUtilizationPercentage !=
*fedHpa.Status.CurrentCPUUtilizationPercentage) {
needUpdate = true
newFedHpaStatus.CurrentCPUUtilizationPercentage = &averageCPUUtilizationPercentage
}
if (fedHpa.Status.LastScaleTime == nil && newStatus.lastScaleTime != nil) ||
(fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime == nil) ||
((fedHpa.Status.LastScaleTime != nil && newStatus.lastScaleTime != nil) &&
newStatus.lastScaleTime.After(fedHpa.Status.LastScaleTime.Time)) {
needUpdate = true
newFedHpaStatus.LastScaleTime = newStatus.lastScaleTime
}
if fedHpa.Status.DesiredReplicas != newStatus.desiredReplicas {
needUpdate = true
newFedHpaStatus.CurrentReplicas = newStatus.currentReplicas
}
if fedHpa.Status.CurrentReplicas != newStatus.currentReplicas {
needUpdate = true
newFedHpaStatus.DesiredReplicas = newStatus.desiredReplicas
}
return needUpdate, newFedHpaStatus
}
// prepareForScheduling prepares the lists and totals from the
// existing objs.
// currentObjs has the list of all clusters, with obj as nil
// for those clusters which do not have hpa yet.
func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
lists := hpaLists{
availableMax: sets.NewString(),
availableMin: sets.NewString(),
noHpa: sets.NewString(),
}
existingTotal := replicaNums{
min: int32(0),
max: int32(0),
}
scheduleState := make(map[string]*replicaNums)
for cluster, obj := range currentObjs {
if obj == nil {
lists.noHpa.Insert(cluster)
scheduleState[cluster] = nil
continue
}
if maxReplicasReducible(obj) {
lists.availableMax.Insert(cluster)
}
if minReplicasReducible(obj) {
lists.availableMin.Insert(cluster)
}
replicas := replicaNums{min: 0, max: 0}
scheduleState[cluster] = &replicas
if obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas != nil {
existingTotal.min += *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas
replicas.min = *obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MinReplicas
}
existingTotal.max += obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas
replicas.max = obj.(*autoscalingv1.HorizontalPodAutoscaler).Spec.MaxReplicas
}
return lists, existingTotal, scheduleState
}
// Note: reduceMinReplicas and reduceMaxReplicas, look quite similar in flow
// and code, however there are subtle differences. They together can be made
// into 1 function with an arg governing the functionality difference and
// additional args (superset of args in both) as needed. Doing so however
// makes the logical flow quite less readable. They are thus left as 2 for
// readability.
// reduceMinReplicas reduces the min replicas from existing clusters.
// At the end of the function excessMin should be 0 and the MinList
// and the scheduledReplicas properly updated in place.
func reduceMinReplicas(excessMin int32, availableMinList sets.String, scheduled map[string]*replicaNums) int32 {
if excessMin > 0 {
// first we try reducing from those clusters which already offer min
if availableMinList.Len() > 0 {
for _, cluster := range availableMinList.List() {
replicas := scheduled[cluster]
if replicas.min > 1 {
replicas.min--
availableMinList.Delete(cluster)
excessMin--
if excessMin <= 0 {
break
}
}
}
}
}
// If we could not get needed replicas from already offered min above
// we abruptly start removing replicas from some/all clusters.
// Here we might make some min to 0 signalling that this hpa might be a
// candidate to be removed from this cluster altogether.
for excessMin > 0 {
for _, replicas := range scheduled {
if replicas != nil &&
replicas.min > 0 {
replicas.min--
excessMin--
if excessMin <= 0 {
break
}
}
}
}
return excessMin
}
// reduceMaxReplicas reduces the max replicas from existing clusters.
// At the end of the function excessMax should be 0 and the MaxList
// and the scheduledReplicas properly updated in place.
func reduceMaxReplicas(excessMax int32, availableMaxList sets.String, scheduled map[string]*replicaNums) int32 {
if excessMax > 0 {
// first we try reducing from those clusters which already offer max
if availableMaxList.Len() > 0 {
for _, cluster := range availableMaxList.List() {
replicas := scheduled[cluster]
if replicas != nil && !((replicas.max - replicas.min) < 0) {
replicas.max--
availableMaxList.Delete(cluster)
excessMax--
if excessMax <= 0 {
break
}
}
}
}
}
// If we could not get needed replicas to reduce from already offered
// max above we abruptly start removing replicas from some/all clusters.
// Here we might make some max and min to 0, signalling that this hpa be
// removed from this cluster altogether
for excessMax > 0 {
for _, replicas := range scheduled {
if replicas != nil &&
!((replicas.max - replicas.min) < 0) {
replicas.max--
excessMax--
if excessMax <= 0 {
break
}
}
}
}
return excessMax
}
// distributeMaxReplicas
// Takes input:
// toDistributeMax: number of replicas to distribute.
// lists: cluster name lists, which have clusters with available max,
// available min and those with no hpas yet.
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMax == 0 {
break
}
if replicas == nil {
continue
}
if maxReplicasNeeded(currentObjs[cluster]) {
replicas.max++
if lists.availableMax.Len() > 0 {
popped, notEmpty := lists.availableMax.PopAny()
if notEmpty {
// Boundary checks have happened earlier in
// minReplicasReducible().
scheduled[popped].max--
}
}
// Any which ways utilise available map replicas
toDistributeMax--
}
}
// If we have new clusters where we can give our replicas,
// then give away all our replicas to the new clusters first.
if lists.noHpa.Len() > 0 {
for toDistributeMax > 0 {
for _, cluster := range lists.noHpa.UnsortedList() {
if scheduled[cluster] == nil {
scheduled[cluster] = &replicaNums{min: 0, max: 0}
}
replicas := scheduled[cluster]
// first give away max from clusters offering them
// this case especially helps getting hpa into newly joining
// clusters.
if lists.availableMax.Len() > 0 {
popped, notEmpty := lists.availableMax.PopAny()
if notEmpty {
// Boundary checks to reduce max have happened earlier in
// minReplicasReducible().
replicas.max++
scheduled[popped].max--
toDistributeMax--
continue
}
}
if toDistributeMax < rdc.max {
replicas.max += toDistributeMax
toDistributeMax = 0
break
}
replicas.max += rdc.max
toDistributeMax -= rdc.max
}
}
} else { // we have no new clusters but if still have max replicas to distribute;
// just distribute all in current clusters.
for toDistributeMax > 0 {
for cluster, replicas := range scheduled {
if replicas == nil {
replicas = &replicaNums{min: 0, max: 0}
scheduled[cluster] = replicas
}
// First give away max from clusters offering them.
// This case especially helps getting hpa into newly joining
// clusters.
if lists.availableMax.Len() > 0 {
popped, notEmpty := lists.availableMax.PopAny()
if notEmpty {
// Boundary checks have happened earlier in
// minReplicasReducible().
replicas.max++
scheduled[popped].max--
toDistributeMax--
continue
}
}
if toDistributeMax < rdc.max {
replicas.max += toDistributeMax
toDistributeMax = 0
break
}
replicas.max += rdc.max
toDistributeMax -= rdc.max
}
}
}
return toDistributeMax
}
// distributeMinReplicas
// Takes input:
// toDistributeMin: number of replicas to distribute.
// lists: cluster name lists, which have clusters with available max,
// available min and those with no hpas yet.
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMin == 0 {
break
}
// We have distriubted Max and thus scheduled might not be nil
// but probably current (what we got originally) is nil(no hpa)
if replicas == nil || currentObjs[cluster] == nil {
continue
}
if minReplicasIncreasable(currentObjs[cluster]) {
if lists.availableMin.Len() > 0 {
popped, notEmpty := lists.availableMin.PopAny()
if notEmpty {
// Boundary checks have happened earlier.
scheduled[popped].min--
replicas.min++
toDistributeMin--
}
}
}
}
if lists.noHpa.Len() > 0 {
// TODO: can this become an infinite loop?
for toDistributeMin > 0 {
for _, cluster := range lists.noHpa.UnsortedList() {
replicas := scheduled[cluster]
if replicas == nil {
// We did not get max here so this cluster
// remains without hpa
continue
}
var replicaNum int32 = 0
if toDistributeMin < rdc.min {
replicaNum = toDistributeMin
} else {
replicaNum = rdc.min
}
if (replicas.max - replicaNum) < replicas.min {
// Cannot increase the min in this cluster
// as it will go beyond max
continue
}
if lists.availableMin.Len() > 0 {
popped, notEmpty := lists.availableMin.PopAny()
if notEmpty {
// Boundary checks have happened earlier.
scheduled[popped].min--
replicas.min++
toDistributeMin--
continue
}
}
replicas.min += replicaNum
toDistributeMin -= replicaNum
}
}
} else { // we have no new clusters but if still have min replicas to distribute;
// just distribute all in current clusters.
for toDistributeMin > 0 {
for _, replicas := range scheduled {
if replicas == nil {
// We did not get max here so this cluster
// remains without hpa
continue
}
var replicaNum int32 = 0
if toDistributeMin < rdc.min {
replicaNum = toDistributeMin
} else {
replicaNum = rdc.min
}
if (replicas.max - replicaNum) < replicas.min {
// Cannot increase the min in this cluster
// as it will go beyond max
continue
}
if lists.availableMin.Len() > 0 {
popped, notEmpty := lists.availableMin.PopAny()
if notEmpty {
// Boundary checks have happened earlier.
scheduled[popped].min--
replicas.min++
toDistributeMin--
continue
}
}
replicas.min += replicaNum
toDistributeMin -= replicaNum
}
}
}
return toDistributeMin
}
// finaliseScheduleState ensures that the minReplica count is made to 1
// for those clusters which got max, but did not get min. This is because
// k8s hpa does not accept hpas with 0 min replicas.
// The replica num distribution can thus have more mins then fedHpa requested
// but its better then having all replicas go into one cluster (if fedHpa
// requested min=1 (which is the most usual case).
func finaliseScheduleState(scheduled map[string]*replicaNums) map[string]*replicaNums {
for _, replicas := range scheduled {
if (replicas != nil) && (replicas.min <= 0) && (replicas.max > 0) {
// Min total does not necessarily meet the federated min limit.
replicas.min = 1
}
}
return scheduled
}
// isPristine is used to determine if so far local controller has been
// able to really determine, what should be the desired replica number for
// this cluster.
// This is used to get hpas into those clusters which might join fresh,
// and so far other cluster hpas haven't really reached anywhere.
// TODO: There is a flaw here, that a just born object would also offer its
// replicas which can also lead to fast thrashing.
// The only better way is to either ensure that object creation time stamp is set
// and can be used authoritatively; or have another field on the local object
// which is mandatorily set on creation and can be used authoritatively.
// Should we abuse annotations again for this, or this can be a proper requirement?
func isPristine(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
if hpa.Status.LastScaleTime == nil &&
hpa.Status.DesiredReplicas == 0 {
return true
}
return false
}
// isScaleable tells if it already has been a reasonable amount of
// time since this hpa scaled. Its used to avoid fast thrashing.
func isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
if hpa.Status.LastScaleTime == nil {
return false
}
t := hpa.Status.LastScaleTime.Add(scaleForbiddenWindow)
if t.After(time.Now()) {
return false
}
return true
}
func maxReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if (hpa.Spec.MinReplicas != nil) &&
(((hpa.Spec.MaxReplicas - 1) - *hpa.Spec.MinReplicas) < 0) {
return false
}
if isPristine(hpa) {
return true
}
if !isScaleable(hpa) {
return false
}
if (hpa.Status.DesiredReplicas < hpa.Status.CurrentReplicas) ||
((hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) &&
(hpa.Status.DesiredReplicas < hpa.Spec.MaxReplicas)) {
return true
}
return false
}
// minReplicasReducible checks if this cluster (hpa) can offer replicas which are
// stuck here because of min limit.
// Its noteworthy, that min and max are adjusted separately, but if the replicas
// are not being used here, the max adjustment will lead it to become equal to min,
// but will not be able to scale down further and offer max to some other cluster
// which needs replicas.
func minReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if isPristine(hpa) && (hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas > 1) &&
(*hpa.Spec.MinReplicas <= hpa.Spec.MaxReplicas) {
return true
}
if !isScaleable(hpa) {
return false
}
if (hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas > 1) &&
(hpa.Status.DesiredReplicas == hpa.Status.CurrentReplicas) &&
(hpa.Status.CurrentReplicas == *hpa.Spec.MinReplicas) {
return true
}
return false
}
func maxReplicasNeeded(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) {
return false
}
if (hpa.Status.CurrentReplicas == hpa.Status.DesiredReplicas) &&
(hpa.Status.CurrentReplicas == hpa.Spec.MaxReplicas) {
return true
}
return false
}
func minReplicasIncreasable(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) ||
((hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas) >= hpa.Spec.MaxReplicas) {
return false
}
if (hpa.Spec.MinReplicas != nil) &&
(hpa.Status.DesiredReplicas > *hpa.Spec.MinReplicas) {
return true
}
return false
}

View File

@ -0,0 +1,262 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"testing"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"github.com/stretchr/testify/assert"
)
type replicas struct {
min int32
max int32
}
func TestGetHpaScheduleState(t *testing.T) {
defaultFedHpa := newHpaWithReplicas(NewInt32(1), NewInt32(70), 10)
testCases := map[string]struct {
fedHpa *autoscalingv1.HorizontalPodAutoscaler
localHpas map[string]pkgruntime.Object
expectedReplicas map[string]*replicas
}{
"Distribiutes replicas randomly if no existing hpa in any local cluster": {
localHpas: func() map[string]pkgruntime.Object {
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = nil
hpas["c2"] = nil
return hpas
}(),
},
"Cluster with no hpa gets replicas if other clusters have replicas": {
localHpas: func() map[string]pkgruntime.Object {
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 10)
hpas["c2"] = nil
return hpas
}(),
expectedReplicas: map[string]*replicas{
"c1": {
min: int32(1),
max: int32(9),
},
"c2": {
min: int32(1),
max: int32(1),
},
},
},
"Cluster needing max replicas gets it if there is another cluster to offer max": {
localHpas: func() map[string]pkgruntime.Object {
hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 7)
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 5, 5, true)
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1)
hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true)
// include third object to ensure, it does not break the test
hpa3 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 2)
hpa3 = updateHpaStatus(hpa3, NewInt32(70), 1, 1, false)
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = hpa1
hpas["c2"] = hpa2
hpas["c3"] = hpa3
return hpas
}(),
expectedReplicas: map[string]*replicas{
"c1": {
min: int32(1),
max: int32(6),
},
"c2": {
min: int32(1),
max: int32(2),
},
"c3": {
min: int32(1),
max: int32(2),
},
},
},
"Cluster needing max replicas does not get it if there is no cluster offerring max": {
localHpas: func() map[string]pkgruntime.Object {
hpa1 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 9)
hpa1 = updateHpaStatus(hpa1, NewInt32(70), 9, 9, false)
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 1)
hpa2 = updateHpaStatus(hpa2, NewInt32(70), 1, 1, true)
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = hpa1
hpas["c2"] = hpa2
return hpas
}(),
expectedReplicas: map[string]*replicas{
"c1": {
min: int32(1),
max: int32(9),
},
"c2": {
min: int32(1),
max: int32(1),
},
},
},
"Cluster which can increase min replicas gets to increase min if there is a cluster offering min": {
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10),
localHpas: func() map[string]pkgruntime.Object {
hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6)
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 3, 3, true)
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4)
hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true)
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = hpa1
hpas["c2"] = hpa2
return hpas
}(),
expectedReplicas: map[string]*replicas{
"c1": {
min: int32(2),
max: int32(6),
},
"c2": {
min: int32(2),
max: int32(4),
},
},
},
"Cluster which can increase min replicas does not increase if there are no clusters offering min": {
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 10),
localHpas: func() map[string]pkgruntime.Object {
hpa1 := newHpaWithReplicas(NewInt32(3), NewInt32(70), 6)
hpa1 = updateHpaStatus(hpa1, NewInt32(50), 4, 4, true)
hpa2 := newHpaWithReplicas(NewInt32(1), NewInt32(70), 4)
hpa2 = updateHpaStatus(hpa2, NewInt32(50), 3, 3, true)
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = hpa1
hpas["c2"] = hpa2
return hpas
}(),
expectedReplicas: map[string]*replicas{
"c1": {
min: int32(3),
max: int32(6),
},
"c2": {
min: int32(1),
max: int32(4),
},
},
},
"Increasing replicas on fed object increases the same on clusters": {
// Existing total of local min, max = 1+1, 5+5 decreasing to below
fedHpa: newHpaWithReplicas(NewInt32(4), NewInt32(70), 14),
localHpas: func() map[string]pkgruntime.Object {
// does not matter if scaleability is true
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5)
hpas["c2"] = newHpaWithReplicas(NewInt32(1), NewInt32(70), 5)
return hpas
}(),
// We dont know which cluster gets how many, but the resultant total should match
},
"Decreasing replicas on fed object decreases the same on clusters": {
// Existing total of local min, max = 2+2, 8+8 decreasing to below
fedHpa: newHpaWithReplicas(NewInt32(3), NewInt32(70), 8),
localHpas: func() map[string]pkgruntime.Object {
// does not matter if scaleability is true
hpas := make(map[string]pkgruntime.Object)
hpas["c1"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8)
hpas["c2"] = newHpaWithReplicas(NewInt32(2), NewInt32(70), 8)
return hpas
}(),
// We dont know which cluster gets how many, but the resultant total should match
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
if testCase.fedHpa == nil {
testCase.fedHpa = defaultFedHpa
}
scheduledState := getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
checkClusterConditions(t, testCase.fedHpa, scheduledState)
if testCase.expectedReplicas != nil {
for cluster, replicas := range testCase.expectedReplicas {
scheduledReplicas := scheduledState[cluster]
assert.Equal(t, replicas.min, scheduledReplicas.min)
assert.Equal(t, replicas.max, scheduledReplicas.max)
}
}
})
}
}
func updateHpaStatus(hpa *autoscalingv1.HorizontalPodAutoscaler, currentUtilisation *int32, current, desired int32, scaleable bool) *autoscalingv1.HorizontalPodAutoscaler {
hpa.Status.CurrentReplicas = current
hpa.Status.DesiredReplicas = desired
hpa.Status.CurrentCPUUtilizationPercentage = currentUtilisation
now := metav1.Now()
scaledTime := now
if scaleable {
// definitely more then 5 minutes ago
scaledTime = metav1.NewTime(now.Time.Add(-6 * time.Minute))
}
hpa.Status.LastScaleTime = &scaledTime
return hpa
}
func checkClusterConditions(t *testing.T, fedHpa *autoscalingv1.HorizontalPodAutoscaler, scheduled map[string]*replicaNums) {
minTotal := int32(0)
maxTotal := int32(0)
for _, replicas := range scheduled {
minTotal += replicas.min
maxTotal += replicas.max
}
// - Total of max matches the fed max
assert.Equal(t, fedHpa.Spec.MaxReplicas, maxTotal)
// - Total of min is not less then fed min
assert.Condition(t, func() bool {
if *fedHpa.Spec.MinReplicas <= minTotal {
return true
}
return false
})
}
func newHpaWithReplicas(min, targetUtilisation *int32, max int32) *autoscalingv1.HorizontalPodAutoscaler {
return &autoscalingv1.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "myhpa",
Namespace: apiv1.NamespaceDefault,
SelfLink: "/api/mylink",
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Kind: "HorizontalPodAutoscaler",
Name: "target-",
},
MinReplicas: min,
MaxReplicas: max,
TargetCPUUtilizationPercentage: targetUtilisation,
},
}
}

View File

@ -40,16 +40,16 @@ func init() {
}
type ReplicaSetAdapter struct {
*schedulingAdapter
*replicaSchedulingAdapter
client federationclientset.Interface
}
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{
replicaSchedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
rs := obj.(*extensionsv1.ReplicaSet)
typedStatus := status.(ReplicaSchedulingStatus)
typedStatus := schedulingInfo.(*ReplicaSchedulingInfo).Status
if typedStatus.Replicas != rs.Status.Replicas || typedStatus.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
typedStatus.ReadyReplicas != rs.Status.ReadyReplicas || typedStatus.AvailableReplicas != rs.Status.AvailableReplicas {
rs.Status = extensionsv1.ReplicaSetStatus{
@ -64,7 +64,7 @@ func NewReplicaSetAdapter(client federationclientset.Interface, config *restclie
return nil
},
}
return &ReplicaSetAdapter{&schedulingAdapter, client}
return &ReplicaSetAdapter{&replicaSchedulingAdapter, client}
}
func (a *ReplicaSetAdapter) Kind() string {

View File

@ -37,6 +37,16 @@ import (
"github.com/golang/glog"
)
// ScheduleAction is used by the interface ScheduleObject of SchedulingAdapter
// to sync controller reconcile to convey the action type needed for the
// particular cluster local object in ScheduleObject
type ScheduleAction string
const (
ActionAdd = "add"
ActionDelete = "delete"
)
// ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment)
// that are being scheduled into joined clusters.
type ReplicaSchedulingStatus struct {
@ -58,26 +68,26 @@ type ReplicaSchedulingInfo struct {
// federated type that requires more complex synchronization logic.
type SchedulingAdapter interface {
GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error)
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error)
UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error)
UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error
// EquivalentIgnoringSchedule returns whether obj1 and obj2 are
// equivalent ignoring differences due to scheduling.
EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool
}
// schedulingAdapter is meant to be embedded in other type adapters that require
// workload scheduling.
type schedulingAdapter struct {
// replicaSchedulingAdapter is meant to be embedded in other type adapters that require
// workload scheduling with actual pod replicas.
type replicaSchedulingAdapter struct {
preferencesAnnotationName string
updateStatusFunc func(pkgruntime.Object, interface{}) error
}
func (a *schedulingAdapter) IsSchedulingAdapter() bool {
func (a *replicaSchedulingAdapter) IsSchedulingAdapter() bool {
return true
}
func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
@ -128,7 +138,7 @@ func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clust
}, nil
}
func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error) {
func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo)
replicas, ok := typedSchedulingInfo.Schedule[cluster.Name]
if !ok {
@ -152,11 +162,15 @@ func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clust
}
}
}
return federationObjCopy, replicas > 0, nil
var action ScheduleAction = ""
if replicas > 0 {
action = ActionAdd
}
return federationObjCopy, action, nil
}
func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error {
return a.updateStatusFunc(obj, status)
func (a *replicaSchedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo interface{}) error {
return a.updateStatusFunc(obj, schedulingInfo)
}
func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {

View File

@ -490,8 +490,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
}
typedScheduleInfo := schedulingInfo.(*federatedtypes.ReplicaSchedulingInfo)
err = schedulingAdapter.UpdateFederatedStatus(obj, typedScheduleInfo.Status)
err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo)
if err != nil {
runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err))
return statusError
@ -548,7 +547,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
return nil, wrappedErr
}
shouldCreateIfNeeded := true
var scheduleAction federatedtypes.ScheduleAction = federatedtypes.ActionAdd
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
@ -559,7 +558,7 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
if clusterObj != nil {
clusterTypedObj = clusterObj.(pkgruntime.Object)
}
desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
desiredObj, scheduleAction, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
if err != nil {
runtime.HandleError(err)
return nil, err
@ -568,11 +567,15 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
var operationType util.FederatedOperationType = ""
if found {
clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate
if scheduleAction == federatedtypes.ActionDelete {
operationType = util.OperationTypeDelete
} else {
clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate
}
}
} else if shouldCreateIfNeeded {
} else if scheduleAction == federatedtypes.ActionAdd {
operationType = util.OperationTypeAdd
}

View File

@ -446,3 +446,9 @@ func AssertHasFinalizer(t *testing.T, obj runtime.Object, finalizer string) {
require.Nil(t, err)
assert.True(t, hasFinalizer)
}
func NewInt32(val int32) *int32 {
p := new(int32)
*p = val
return p
}