Merge pull request #49583 from irfanurrehman/fed-hpa-configTimeout

Automatic merge from submit-queue (batch tested with PRs 50016, 49583, 49930, 46254, 50337)

[Federation] Make the hpa scale time window configurable

This PR is on top of open pr https://github.com/kubernetes/kubernetes/pull/45993.
Please review only the last commit in this PR.
This adds a config param to controller manager, the value of which gets passed to hpa adapter via sync controller.
This is needed to reduce the overall time limit of the hpa scaling window to much lesser (then the default 2 mins) to get e2e tests run faster. Please see the comment on the newly added parameter.

**Special notes for your reviewer**:
@kubernetes/sig-federation-pr-reviews 
@quinton-hoole 
@marun to please validate the mechanism used to pass a parameter from cmd line to adapter.

**Release note**:

``` 
federation-controller-manager gets a new flag --hpa-scale-forbidden-window.
This flag is used to configure the duration used by federation hpa controller to determine if it can move max and/or min replicas 
around (or not), of a cluster local hpa object, by comparing current time with the last scaled time of that cluster local hpa. 
Lower value will result in faster response to scalibility conditions achieved by cluster local hpas on local replicas, but too low 
a value can result in thrashing. Higher values will result in slower response to scalibility conditions on local replicas.
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-09 14:14:27 -07:00 committed by GitHub
commit 82b3a80ad1
17 changed files with 72 additions and 47 deletions

View File

@ -150,9 +150,11 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
}
adapterSpecificArgs := make(map[string]interface{})
adapterSpecificArgs[federatedtypes.HpaKind] = &s.HpaScaleForbiddenWindow
for kind, federatedType := range federatedtypes.FederatedTypes() {
if controllerEnabled(s.Controllers, serverResources, federatedType.ControllerName, federatedType.RequiredResources, true) {
synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency)
synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency, adapterSpecificArgs)
}
}

View File

@ -76,6 +76,13 @@ type ControllerManagerConfiguration struct {
ContentType string `json:"contentType"`
// ConfigurationMap determining which controllers should be enabled or disabled
Controllers utilflag.ConfigurationMap `json:"controllers"`
// HpaScaleForbiddenWindow is the duration used by federation hpa controller to
// determine if it can move max and/or min replicas around (or not), of a cluster local
// hpa object, by comparing current time with the last scaled time of that cluster local hpa.
// Lower value will result in faster response to scalibility conditions achieved
// by cluster local hpas on local replicas, but too low a value can result in thrashing.
// Higher values will result in slower response to scalibility conditions on local replicas.
HpaScaleForbiddenWindow metav1.Duration `json:"HpaScaleForbiddenWindow"`
}
// CMServer is the main context object for the controller manager.
@ -105,6 +112,7 @@ func NewCMServer() *CMServer {
APIServerBurst: 30,
LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(),
Controllers: make(utilflag.ConfigurationMap),
HpaScaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute},
},
}
return &s
@ -131,6 +139,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.APIServerBurst, "federated-api-burst", s.APIServerBurst, "Burst to use while talking with federation apiserver")
fs.StringVar(&s.DnsProvider, "dns-provider", s.DnsProvider, "DNS provider. Valid values are: "+fmt.Sprintf("%q", dnsprovider.RegisteredDnsProviders()))
fs.StringVar(&s.DnsConfigFile, "dns-provider-config", s.DnsConfigFile, "Path to config file for configuring DNS provider.")
fs.DurationVar(&s.HpaScaleForbiddenWindow.Duration, "hpa-scale-forbidden-window", s.HpaScaleForbiddenWindow.Duration, "The time window wrt cluster local hpa lastscale time, during which federated hpa would not move the hpa max/min replicas around")
fs.Var(&s.Controllers, "controllers", ""+
"A set of key=value pairs that describe controller configuration "+
"to enable/disable specific controllers. Key should be the resource name (like services) and value should be true or false. "+

View File

@ -62,7 +62,7 @@ type FederatedTypeAdapter interface {
// that create instances of FederatedTypeAdapter. Such methods should
// be registered with RegisterAdapterFactory to ensure the type
// adapter is discoverable.
type AdapterFactory func(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter
type AdapterFactory func(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter
// SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map
func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) {

View File

@ -41,7 +41,7 @@ type ConfigMapAdapter struct {
client federationclientset.Interface
}
func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &ConfigMapAdapter{client: client}
}

View File

@ -44,7 +44,7 @@ type DaemonSetAdapter struct {
client federationclientset.Interface
}
func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &DaemonSetAdapter{client: client}
}

View File

@ -44,7 +44,7 @@ type DeploymentAdapter struct {
client federationclientset.Interface
}
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
schedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {

View File

@ -36,14 +36,14 @@ import (
const (
HpaKind = "horizontalpodautoscaler"
HpaControllerName = "horizontalpodautoscalers"
// This is a tunable which does not change replica nums
// on an existing local hpa, before this timeout, if it
// did scale already (avoids thrashing of replicas around).
scaleForbiddenWindow = 5 * time.Minute
// This is used as the default min for hpa object submitted
// to federation, in a situation where the default is for
// some reason not present (Spec.MinReplicas == nil)
hpaMinReplicaDefault = int32(1)
// This is a tunable which does not change replica nums
// on an existing local hpa, before this timeout, if it
// did scale already (avoids thrashing of replicas around).
ScaleForbiddenWindow = 2 * time.Minute
)
func init() {
@ -52,10 +52,21 @@ func init() {
type HpaAdapter struct {
client federationclientset.Interface
scaleForbiddenWindow time.Duration
}
func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &HpaAdapter{client: client}
func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
var scaleForbiddenWindow time.Duration
if adapterSpecificArgs != nil && adapterSpecificArgs[HpaKind] != nil {
scaleForbiddenWindow = adapterSpecificArgs[HpaKind].(*metav1.Duration).Duration
} else {
scaleForbiddenWindow = ScaleForbiddenWindow
}
return &HpaAdapter{
client: client,
scaleForbiddenWindow: scaleForbiddenWindow,
}
}
func (a *HpaAdapter) Kind() string {
@ -237,7 +248,7 @@ func (a *HpaAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*
}
return &hpaSchedulingInfo{
scheduleState: getHpaScheduleState(obj, currentClusterObjs),
scheduleState: a.getHpaScheduleState(obj, currentClusterObjs),
fedStatus: fedStatus,
}, nil
}
@ -288,7 +299,7 @@ func getCurrentClusterObjs(informer fedutil.FederatedInformer, key string, clust
//
// The above algorithm is run to first distribute max and then distribute min to those clusters
// which get max.
func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
func (a *HpaAdapter) getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
fedHpa := fedObj.(*autoscalingv1.HorizontalPodAutoscaler)
requestedMin := hpaMinReplicaDefault
if fedHpa.Spec.MinReplicas != nil {
@ -322,7 +333,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// beyond min/max limits.
// schedStatus currently have status of existing hpas.
// It will eventually have desired status for this reconcile.
clusterLists, currentReplicas, scheduleState := prepareForScheduling(currentObjs)
clusterLists, currentReplicas, scheduleState := a.prepareForScheduling(currentObjs)
remainingReplicas := replicaNums{
min: requestedReplicas.min - currentReplicas.min,
@ -362,7 +373,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// We then go ahead to give the replicas to those which do not
// have any hpa. In this pass however we try to ensure that all
// our Max are consumed in this reconcile.
distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)
a.distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)
// We distribute min to those clusters which:
// 1 - can adjust min (our increase step would be only 1)
@ -371,7 +382,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// some clusters still needing them. We adjust this in finalise by
// assigning min replicas to 1 into those clusters which got max
// but min remains 0.
distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)
a.distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)
return finaliseScheduleState(scheduleState)
}
@ -474,7 +485,7 @@ func updateStatus(fedHpa *autoscalingv1.HorizontalPodAutoscaler, newStatus hpaFe
// existing objs.
// currentObjs has the list of all clusters, with obj as nil
// for those clusters which do not have hpa yet.
func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
func (a *HpaAdapter) prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
lists := hpaLists{
availableMax: sets.NewString(),
availableMin: sets.NewString(),
@ -493,10 +504,10 @@ func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, r
continue
}
if maxReplicasReducible(obj) {
if a.maxReplicasReducible(obj) {
lists.availableMax.Insert(cluster)
}
if minReplicasReducible(obj) {
if a.minReplicasReducible(obj) {
lists.availableMin.Insert(cluster)
}
@ -609,7 +620,7 @@ func reduceMaxReplicas(excessMax int32, availableMaxList sets.String, scheduled
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
func (a *HpaAdapter) distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMax == 0 {
@ -618,7 +629,7 @@ func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNum
if replicas == nil {
continue
}
if maxReplicasNeeded(currentObjs[cluster]) {
if a.maxReplicasNeeded(currentObjs[cluster]) {
replicas.max++
if lists.availableMax.Len() > 0 {
popped, notEmpty := lists.availableMax.PopAny()
@ -708,7 +719,7 @@ func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNum
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
func (a *HpaAdapter) distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMin == 0 {
@ -719,7 +730,7 @@ func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNum
if replicas == nil || currentObjs[cluster] == nil {
continue
}
if minReplicasIncreasable(currentObjs[cluster]) {
if a.minReplicasIncreasable(currentObjs[cluster]) {
if lists.availableMin.Len() > 0 {
popped, notEmpty := lists.availableMin.PopAny()
if notEmpty {
@ -842,18 +853,18 @@ func isPristine(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
// isScaleable tells if it already has been a reasonable amount of
// time since this hpa scaled. Its used to avoid fast thrashing.
func isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
func (a *HpaAdapter) isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
if hpa.Status.LastScaleTime == nil {
return false
}
t := hpa.Status.LastScaleTime.Add(scaleForbiddenWindow)
t := hpa.Status.LastScaleTime.Add(a.scaleForbiddenWindow)
if t.After(time.Now()) {
return false
}
return true
}
func maxReplicasReducible(obj pkgruntime.Object) bool {
func (a *HpaAdapter) maxReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if (hpa.Spec.MinReplicas != nil) &&
(((hpa.Spec.MaxReplicas - 1) - *hpa.Spec.MinReplicas) < 0) {
@ -862,7 +873,7 @@ func maxReplicasReducible(obj pkgruntime.Object) bool {
if isPristine(hpa) {
return true
}
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}
if (hpa.Status.DesiredReplicas < hpa.Status.CurrentReplicas) ||
@ -879,14 +890,14 @@ func maxReplicasReducible(obj pkgruntime.Object) bool {
// are not being used here, the max adjustment will lead it to become equal to min,
// but will not be able to scale down further and offer max to some other cluster
// which needs replicas.
func minReplicasReducible(obj pkgruntime.Object) bool {
func (a *HpaAdapter) minReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if isPristine(hpa) && (hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas > 1) &&
(*hpa.Spec.MinReplicas <= hpa.Spec.MaxReplicas) {
return true
}
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}
if (hpa.Spec.MinReplicas != nil) &&
@ -898,9 +909,9 @@ func minReplicasReducible(obj pkgruntime.Object) bool {
return false
}
func maxReplicasNeeded(obj pkgruntime.Object) bool {
func (a *HpaAdapter) maxReplicasNeeded(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}
@ -911,9 +922,9 @@ func maxReplicasNeeded(obj pkgruntime.Object) bool {
return false
}
func minReplicasIncreasable(obj pkgruntime.Object) bool {
func (a *HpaAdapter) minReplicasIncreasable(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) ||
if !a.isScaleable(hpa) ||
((hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas) >= hpa.Spec.MaxReplicas) {
return false

View File

@ -18,7 +18,6 @@ package federatedtypes
import (
"testing"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
apiv1 "k8s.io/api/core/v1"
@ -191,12 +190,15 @@ func TestGetHpaScheduleState(t *testing.T) {
},
}
adapter := &HpaAdapter{
scaleForbiddenWindow: ScaleForbiddenWindow,
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
if testCase.fedHpa == nil {
testCase.fedHpa = defaultFedHpa
}
scheduledState := getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
scheduledState := adapter.getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
checkClusterConditions(t, testCase.fedHpa, scheduledState)
if testCase.expectedReplicas != nil {
for cluster, replicas := range testCase.expectedReplicas {
@ -216,8 +218,8 @@ func updateHpaStatus(hpa *autoscalingv1.HorizontalPodAutoscaler, currentUtilisat
now := metav1.Now()
scaledTime := now
if scaleable {
// definitely more then 5 minutes ago
scaledTime = metav1.NewTime(now.Time.Add(-6 * time.Minute))
// definitely more then ScaleForbiddenWindow time ago
scaledTime = metav1.NewTime(now.Time.Add(-2 * ScaleForbiddenWindow))
}
hpa.Status.LastScaleTime = &scaledTime
return hpa

View File

@ -50,7 +50,7 @@ type NamespaceAdapter struct {
deleter deletion.NamespacedResourcesDeleterInterface
}
func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
dynamicClientPool := dynamic.NewDynamicClientPool(config)
discoverResourcesFunc := client.Discovery().ServerPreferredNamespacedResources
deleter := deletion.NewNamespacedResourcesDeleter(

View File

@ -44,7 +44,7 @@ type ReplicaSetAdapter struct {
client federationclientset.Interface
}
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
replicaSchedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {

View File

@ -41,7 +41,7 @@ type SecretAdapter struct {
client federationclientset.Interface
}
func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &SecretAdapter{client: client}
}

View File

@ -93,10 +93,10 @@ type FederationSyncController struct {
}
// StartFederationSyncController starts a new sync controller for a type adapter
func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool, adapterSpecificArgs map[string]interface{}) {
restclient.AddUserAgent(config, fmt.Sprintf("federation-%s-controller", kind))
client := federationclientset.NewForConfigOrDie(config)
adapter := adapterFactory(client, config)
adapter := adapterFactory(client, config, adapterSpecificArgs)
controller := newFederationSyncController(client, adapter)
if minimizeLatency {
controller.minimizeLatency()

View File

@ -333,6 +333,7 @@ host-network-sources
host-pid-sources
host-port-endpoints
host-system-namespace
hpa-scale-forbidden-window
http-check-frequency
http-port
ignore-daemonsets

View File

@ -45,7 +45,7 @@ var _ = framework.KubeDescribe("Federated types [Feature:Federation][Experimenta
if clusterClients == nil {
clusterClients = f.GetClusterClients()
}
adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig)
adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig, nil)
crudTester := fedframework.NewFederatedTypeCRUDTester(adapter, clusterClients)
obj := adapter.NewTestObject(f.FederationNamespace.Name)
crudTester.CheckLifecycle(obj)

View File

@ -38,7 +38,7 @@ type SimpleUpgradeTest struct {
// Setup creates a resource and validates its propagation to member clusters
func (ut *SimpleUpgradeTest) Setup(f *fedframework.Framework) {
adapter := ut.adapterFactory(f.FederationClientset, f.FederationConfig)
adapter := ut.adapterFactory(f.FederationClientset, f.FederationConfig, nil)
clients := f.GetClusterClients()
ut.crudTester = fedframework.NewFederatedTypeCRUDTester(adapter, clients)

View File

@ -104,7 +104,7 @@ func initCRUDTest(t *testing.T, fedFixture *framework.FederationFixture, adapter
fixture := framework.NewControllerFixture(t, kind, adapterFactory, config)
client := fedFixture.APIFixture.NewClient(fmt.Sprintf("crud-test-%s", kind))
adapter := adapterFactory(client, config)
adapter := adapterFactory(client, config, nil)
crudTester := framework.NewFederatedTypeCRUDTester(t, adapter, fedFixture.ClusterClients)

View File

@ -34,7 +34,7 @@ func NewControllerFixture(t *testing.T, kind string, adapterFactory federatedtyp
f := &ControllerFixture{
stopChan: make(chan struct{}),
}
synccontroller.StartFederationSyncController(kind, adapterFactory, config, f.stopChan, true)
synccontroller.StartFederationSyncController(kind, adapterFactory, config, f.stopChan, true, nil)
return f
}