Use informer cache instead of active pod gets in HPA controller.

This commit is contained in:
Krzysztof Jastrzebski 2018-09-04 20:16:48 +02:00
parent be11540775
commit 985ba931b1
8 changed files with 178 additions and 87 deletions

View File

@ -80,22 +80,19 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
return nil, false, err return nil, false, err
} }
replicaCalc := podautoscaler.NewReplicaCalculator(
metricsClient,
hpaClient.CoreV1(),
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
)
go podautoscaler.NewHorizontalController( go podautoscaler.NewHorizontalController(
hpaClient.CoreV1(), hpaClient.CoreV1(),
scaleClient, scaleClient,
hpaClient.AutoscalingV1(), hpaClient.AutoscalingV1(),
ctx.RESTMapper, ctx.RESTMapper,
replicaCalc, metricsClient,
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Stop) ).Run(ctx.Stop)
return nil, true, nil return nil, true, nil
} }

View File

@ -30,10 +30,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/autoscaling/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/autoscaling/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/autoscaling/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library", "//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
@ -83,7 +85,6 @@ go_test(
"//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/fake:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/fake:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/custom_metrics/fake:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/custom_metrics/fake:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/external_metrics/fake:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/external_metrics/fake:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/heapster/metrics/api/v1/types:go_default_library", "//vendor/k8s.io/heapster/metrics/api/v1/types:go_default_library",

View File

@ -36,16 +36,19 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
autoscalinginformers "k8s.io/client-go/informers/autoscaling/v1" autoscalinginformers "k8s.io/client-go/informers/autoscaling/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v1" autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
autoscalinglisters "k8s.io/client-go/listers/autoscaling/v1" autoscalinglisters "k8s.io/client-go/listers/autoscaling/v1"
corelisters "k8s.io/client-go/listers/core/v1"
scaleclient "k8s.io/client-go/scale" scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
) )
var ( var (
@ -76,6 +79,11 @@ type HorizontalController struct {
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
hpaListerSynced cache.InformerSynced hpaListerSynced cache.InformerSynced
// podLister is able to list/get Pods from the shared cache from the informer passed in to
// NewHorizontalController.
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
// Controllers that need to be synced // Controllers that need to be synced
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
@ -89,10 +97,14 @@ func NewHorizontalController(
scaleNamespacer scaleclient.ScalesGetter, scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter, hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper, mapper apimeta.RESTMapper,
replicaCalc *ReplicaCalculator, metricsClient metricsclient.MetricsClient,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer, hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration, resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration, downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,
) *HorizontalController { ) *HorizontalController {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
@ -101,7 +113,6 @@ func NewHorizontalController(
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})
hpaController := &HorizontalController{ hpaController := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder, eventRecorder: recorder,
scaleNamespacer: scaleNamespacer, scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer, hpaNamespacer: hpaNamespacer,
@ -122,6 +133,18 @@ func NewHorizontalController(
hpaController.hpaLister = hpaInformer.Lister() hpaController.hpaLister = hpaInformer.Lister()
hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
hpaController.podLister = podInformer.Lister()
hpaController.podListerSynced = podInformer.Informer().HasSynced
replicaCalc := NewReplicaCalculator(
metricsClient,
hpaController.podLister,
tolerance,
cpuInitializationPeriod,
delayOfInitialReadinessStatus,
)
hpaController.replicaCalc = replicaCalc
return hpaController return hpaController
} }
@ -133,7 +156,7 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
glog.Infof("Starting HPA controller") glog.Infof("Starting HPA controller")
defer glog.Infof("Shutting down HPA controller") defer glog.Infof("Shutting down HPA controller")
if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) { if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
return return
} }

View File

@ -51,7 +51,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/golang/glog"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install" _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install"
) )
@ -662,8 +661,6 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
return true, obj, nil return true, obj, nil
}) })
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
defaultDownscalestabilizationWindow := 5 * time.Minute defaultDownscalestabilizationWindow := 5 * time.Minute
@ -672,10 +669,14 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
testScaleClient, testScaleClient,
testClient.Autoscaling(), testClient.Autoscaling(),
testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme), testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme),
replicaCalc, metricsClient,
informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
informerFactory.Core().V1().Pods(),
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
defaultDownscalestabilizationWindow, defaultDownscalestabilizationWindow,
defaultTestingTolerance,
defaultTestingCpuInitializationPeriod,
defaultTestingDelayOfInitialReadinessStatus,
) )
hpaController.hpaListerSynced = alwaysReady hpaController.hpaListerSynced = alwaysReady
if tc.recommendations != nil { if tc.recommendations != nil {
@ -715,7 +716,6 @@ func (tc *testCase) runTestWithController(t *testing.T, hpaController *Horizonta
func (tc *testCase) runTest(t *testing.T) { func (tc *testCase) runTest(t *testing.T) {
hpaController, informerFactory := tc.setupController(t) hpaController, informerFactory := tc.setupController(t)
tc.runTestWithController(t, hpaController, informerFactory) tc.runTestWithController(t, hpaController, informerFactory)
glog.Errorf("recommendations: %+v", hpaController.recommendations)
} }
func TestScaleUp(t *testing.T) { func TestScaleUp(t *testing.T) {
@ -2227,11 +2227,12 @@ func TestScaleDownRCImmediately(t *testing.T) {
} }
func TestAvoidUncessaryUpdates(t *testing.T) { func TestAvoidUncessaryUpdates(t *testing.T) {
now := metav1.Time{Time: time.Now().Add(-time.Hour)}
tc := testCase{ tc := testCase{
minReplicas: 2, minReplicas: 2,
maxReplicas: 6, maxReplicas: 6,
initialReplicas: 3, initialReplicas: 2,
expectedDesiredReplicas: 3, expectedDesiredReplicas: 2,
CPUTarget: 30, CPUTarget: 30,
CPUCurrent: 40, CPUCurrent: 40,
verifyCPUCurrent: true, verifyCPUCurrent: true,
@ -2239,41 +2240,95 @@ func TestAvoidUncessaryUpdates(t *testing.T) {
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
reportedPodStartTime: []metav1.Time{coolCpuCreationTime(), hotCpuCreationTime(), hotCpuCreationTime()}, reportedPodStartTime: []metav1.Time{coolCpuCreationTime(), hotCpuCreationTime(), hotCpuCreationTime()},
useMetricsAPI: true, useMetricsAPI: true,
lastScaleTime: &now,
} }
testClient, _, _, _, _ := tc.prepareTestClient(t) testClient, _, _, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient tc.testClient = testClient
var savedHPA *autoscalingv1.HorizontalPodAutoscaler
testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock() tc.Lock()
defer tc.Unlock() defer tc.Unlock()
// fake out the verification logic and mark that we're done processing
go func() {
// wait a tick and then mark that we're finished (otherwise, we have no
// way to indicate that we're finished, because the function decides not to do anything)
time.Sleep(1 * time.Second)
tc.statusUpdated = true
tc.processed <- "test-hpa"
}()
if savedHPA != nil { quantity := resource.MustParse("400m")
// fake out the verification logic and mark that we're done processing obj := &autoscalingv2.HorizontalPodAutoscalerList{
go func() { Items: []autoscalingv2.HorizontalPodAutoscaler{
// wait a tick and then mark that we're finished (otherwise, we have no {
// way to indicate that we're finished, because the function decides not to do anything) ObjectMeta: metav1.ObjectMeta{
time.Sleep(1 * time.Second) Name: "test-hpa",
tc.statusUpdated = true Namespace: "test-namespace",
tc.processed <- "test-hpa" SelfLink: "experimental/v1/namespaces/test-namespace/horizontalpodautoscalers/test-hpa",
}() },
return true, &autoscalingv1.HorizontalPodAutoscalerList{ Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA}, ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
}, nil Kind: "ReplicationController",
Name: "test-rc",
APIVersion: "v1",
},
MinReplicas: &tc.minReplicas,
MaxReplicas: tc.maxReplicas,
},
Status: autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: tc.initialReplicas,
DesiredReplicas: tc.initialReplicas,
LastScaleTime: tc.lastScaleTime,
CurrentMetrics: []autoscalingv2.MetricStatus{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: v1.ResourceCPU,
Current: autoscalingv2.MetricValueStatus{
AverageValue: &quantity,
AverageUtilization: &tc.CPUCurrent,
},
},
},
},
Conditions: []autoscalingv2.HorizontalPodAutoscalerCondition{
{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
LastTransitionTime: *tc.lastScaleTime,
Reason: "ReadyForNewScale",
Message: "recommended size matches current size",
},
{
Type: autoscalingv2.ScalingActive,
Status: v1.ConditionTrue,
LastTransitionTime: *tc.lastScaleTime,
Reason: "ValidMetricFound",
Message: "the HPA was able to successfully calculate a replica count from cpu resource utilization (percentage of request)",
},
{
Type: autoscalingv2.ScalingLimited,
Status: v1.ConditionTrue,
LastTransitionTime: *tc.lastScaleTime,
Reason: "TooFewReplicas",
Message: "the desired replica count is more than the maximum replica count",
},
},
},
},
},
}
// and... convert to autoscaling v1 to return the right type
objv1, err := unsafeConvertToVersionVia(obj, autoscalingv1.SchemeGroupVersion)
if err != nil {
return true, nil, err
} }
// fallthrough return true, objv1, nil
return false, nil, nil
}) })
testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock() tc.Lock()
defer tc.Unlock() defer tc.Unlock()
if savedHPA == nil {
// save the HPA and return it
savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler)
return true, savedHPA, nil
}
assert.Fail(t, "should not have attempted to update the HPA when nothing changed") assert.Fail(t, "should not have attempted to update the HPA when nothing changed")
// mark that we've processed this HPA // mark that we've processed this HPA
tc.processed <- "" tc.processed <- ""
@ -2281,17 +2336,6 @@ func TestAvoidUncessaryUpdates(t *testing.T) {
}) })
controller, informerFactory := tc.setupController(t) controller, informerFactory := tc.setupController(t)
// fake an initial processing loop to populate savedHPA
initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := controller.reconcileAutoscaler(&initialHPAs.Items[0], ""); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// actually run the test
tc.runTestWithController(t, controller, informerFactory) tc.runTestWithController(t, controller, informerFactory)
} }

View File

@ -485,8 +485,6 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
return true, obj, nil return true, obj, nil
}) })
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
defaultDownscaleStabilisationWindow := 5 * time.Minute defaultDownscaleStabilisationWindow := 5 * time.Minute
@ -495,10 +493,14 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
testScaleClient, testScaleClient,
testClient.Autoscaling(), testClient.Autoscaling(),
testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme), testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme),
replicaCalc, metricsClient,
informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
informerFactory.Core().V1().Pods(),
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
defaultDownscaleStabilisationWindow, defaultDownscaleStabilisationWindow,
defaultTestingTolerance,
defaultTestingCpuInitializationPeriod,
defaultTestingDelayOfInitialReadinessStatus,
) )
hpaController.hpaListerSynced = alwaysReady hpaController.hpaListerSynced = alwaysReady

View File

@ -29,9 +29,11 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
heapster "k8s.io/heapster/metrics/api/v1/types" heapster "k8s.io/heapster/metrics/api/v1/types"
@ -186,7 +188,17 @@ func (tc *legacyReplicaCalcTestCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t) testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
informer := informerFactory.Core().V1().Pods()
replicaCalc := NewReplicaCalculator(metricsClient, informer.Lister(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) {
return
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix}, MatchLabels: map[string]string{"name": podNamePrefix},

View File

@ -26,7 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
v1coreclient "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
) )
@ -41,16 +41,16 @@ const (
type ReplicaCalculator struct { type ReplicaCalculator struct {
metricsClient metricsclient.MetricsClient metricsClient metricsclient.MetricsClient
podsGetter v1coreclient.PodsGetter podLister corelisters.PodLister
tolerance float64 tolerance float64
cpuInitializationPeriod time.Duration cpuInitializationPeriod time.Duration
delayOfInitialReadinessStatus time.Duration delayOfInitialReadinessStatus time.Duration
} }
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1coreclient.PodsGetter, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator { func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister corelisters.PodLister, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
return &ReplicaCalculator{ return &ReplicaCalculator{
metricsClient: metricsClient, metricsClient: metricsClient,
podsGetter: podsGetter, podLister: podLister,
tolerance: tolerance, tolerance: tolerance,
cpuInitializationPeriod: cpuInitializationPeriod, cpuInitializationPeriod: cpuInitializationPeriod,
delayOfInitialReadinessStatus: delayOfInitialReadinessStatus, delayOfInitialReadinessStatus: delayOfInitialReadinessStatus,
@ -64,20 +64,19 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
if err != nil { if err != nil {
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
} }
podList, err := c.podLister.Pods(namespace).List(selector)
podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil { if err != nil {
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err) return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
} }
itemsLen := len(podList.Items) itemsLen := len(podList)
if itemsLen == 0 { if itemsLen == 0 {
return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count") return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
} }
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods) removeMetricsForPods(metrics, ignoredPods)
requests, err := calculatePodRequests(podList.Items, resource) requests, err := calculatePodRequests(podList, resource)
if err != nil { if err != nil {
return 0, 0, 0, time.Time{}, err return 0, 0, 0, time.Time{}, err
} }
@ -167,16 +166,17 @@ func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtili
// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics. // calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {
podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
podList, err := c.podLister.Pods(namespace).List(selector)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
} }
if len(podList.Items) == 0 { if len(podList) == 0 {
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count") return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
} }
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods) removeMetricsForPods(metrics, ignoredPods)
if len(metrics) == 0 { if len(metrics) == 0 {
@ -261,19 +261,19 @@ func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targe
// of this function. Make this function generic, so we don't repeat the same // of this function. Make this function generic, so we don't repeat the same
// logic in multiple places. // logic in multiple places.
func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) { func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) {
podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) podList, err := c.podLister.Pods(namespace).List(selector)
if err != nil { if err != nil {
return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
} }
if len(podList.Items) == 0 { if len(podList) == 0 {
return 0, fmt.Errorf("no pods returned by selector while calculating replica count") return 0, fmt.Errorf("no pods returned by selector while calculating replica count")
} }
readyPodCount := 0 readyPodCount := 0
for _, pod := range podList.Items { for _, pod := range podList {
if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(&pod) { if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) {
readyPodCount++ readyPodCount++
} }
} }
@ -340,7 +340,7 @@ func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(currentReplicas int3
return replicaCount, utilization, timestamp, nil return replicaCount, utilization, timestamp, nil
} }
func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) { func groupPods(pods []*v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) {
missingPods = sets.NewString() missingPods = sets.NewString()
ignoredPods = sets.NewString() ignoredPods = sets.NewString()
for _, pod := range pods { for _, pod := range pods {
@ -377,7 +377,7 @@ func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.
return return
} }
func calculatePodRequests(pods []v1.Pod, resource v1.ResourceName) (map[string]int64, error) { func calculatePodRequests(pods []*v1.Pod, resource v1.ResourceName) (map[string]int64, error) {
requests := make(map[string]int64, len(pods)) requests := make(map[string]int64, len(pods))
for _, pod := range pods { for _, pod := range pods {
podSum := int64(0) podSum := int64(0)

View File

@ -31,9 +31,11 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2" cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2"
emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1" emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
@ -338,7 +340,17 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
testClient, testMetricsClient, testCMClient, testEMClient := tc.prepareTestClient(t) testClient, testMetricsClient, testCMClient, testEMClient := tc.prepareTestClient(t)
metricsClient := metricsclient.NewRESTMetricsClient(testMetricsClient.MetricsV1beta1(), testCMClient, testEMClient) metricsClient := metricsclient.NewRESTMetricsClient(testMetricsClient.MetricsV1beta1(), testCMClient, testEMClient)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
informer := informerFactory.Core().V1().Pods()
replicaCalc := NewReplicaCalculator(metricsClient, informer.Lister(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) {
return
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix}, MatchLabels: map[string]string{"name": podNamePrefix},
@ -1224,7 +1236,7 @@ func TestReplicaCalcComputedToleranceAlgImplementation(t *testing.T) {
func TestGroupPods(t *testing.T) { func TestGroupPods(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pods []v1.Pod pods []*v1.Pod
metrics metricsclient.PodMetricsInfo metrics metricsclient.PodMetricsInfo
resource v1.ResourceName resource v1.ResourceName
expectReadyPodCount int expectReadyPodCount int
@ -1233,7 +1245,7 @@ func TestGroupPods(t *testing.T) {
}{ }{
{ {
"void", "void",
[]v1.Pod{}, []*v1.Pod{},
metricsclient.PodMetricsInfo{}, metricsclient.PodMetricsInfo{},
v1.ResourceCPU, v1.ResourceCPU,
0, 0,
@ -1242,7 +1254,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"count in a ready pod - memory", "count in a ready pod - memory",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "bentham", Name: "bentham",
@ -1262,7 +1274,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"ignore a pod without ready condition - CPU", "ignore a pod without ready condition - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "lucretius", Name: "lucretius",
@ -1285,7 +1297,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"count in a ready pod with fresh metrics during initialization period - CPU", "count in a ready pod with fresh metrics during initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "bentham", Name: "bentham",
@ -1315,7 +1327,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"ignore a ready pod without fresh metrics during initialization period - CPU", "ignore a ready pod without fresh metrics during initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "bentham", Name: "bentham",
@ -1345,7 +1357,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"ignore an unready pod during initialization period - CPU", "ignore an unready pod during initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "lucretius", Name: "lucretius",
@ -1375,7 +1387,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"count in a ready pod without fresh metrics after initialization period - CPU", "count in a ready pod without fresh metrics after initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "bentham", Name: "bentham",
@ -1406,7 +1418,7 @@ func TestGroupPods(t *testing.T) {
{ {
"count in an unready pod that was ready after initialization period - CPU", "count in an unready pod that was ready after initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "lucretius", Name: "lucretius",
@ -1436,7 +1448,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"ignore pod that has never been ready after initialization period - CPU", "ignore pod that has never been ready after initialization period - CPU",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "lucretius", Name: "lucretius",
@ -1466,7 +1478,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"a missing pod", "a missing pod",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "epicurus", Name: "epicurus",
@ -1487,7 +1499,7 @@ func TestGroupPods(t *testing.T) {
}, },
{ {
"several pods", "several pods",
[]v1.Pod{ []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "lucretius", Name: "lucretius",