Merge pull request #105461 from damemi/wire-contexts-autoscaling

Wire contexts to Autoscaling controllers
This commit is contained in:
Kubernetes Prow Robot 2021-10-14 06:59:33 -07:00 committed by GitHub
commit 3aafe75698
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 57 additions and 52 deletions

View File

@ -88,6 +88,6 @@ func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Done())
).Run(ctx)
return nil, true, nil
}

View File

@ -162,21 +162,21 @@ func NewHorizontalController(
}
// Run begins watching and syncing.
func (a *HorizontalController) Run(stopCh <-chan struct{}) {
func (a *HorizontalController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
klog.Infof("Starting HPA controller")
defer klog.Infof("Shutting down HPA controller")
if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
return
}
// start a single worker (we may wish to start more in the future)
go wait.Until(a.worker, time.Second, stopCh)
go wait.UntilWithContext(ctx, a.worker, time.Second)
<-stopCh
<-ctx.Done()
}
// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
@ -209,20 +209,20 @@ func (a *HorizontalController) deleteHPA(obj interface{}) {
a.queue.Forget(key)
}
func (a *HorizontalController) worker() {
for a.processNextWorkItem() {
func (a *HorizontalController) worker(ctx context.Context) {
for a.processNextWorkItem(ctx) {
}
klog.Infof("horizontal pod autoscaler controller worker shutting down")
}
func (a *HorizontalController) processNextWorkItem() bool {
func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
key, quit := a.queue.Get()
if quit {
return false
}
defer a.queue.Done(key)
deleted, err := a.reconcileKey(key.(string))
deleted, err := a.reconcileKey(ctx, key.(string))
if err != nil {
utilruntime.HandleError(err)
}
@ -245,7 +245,7 @@ func (a *HorizontalController) processNextWorkItem() bool {
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
if scale.Status.Selector == "" {
@ -272,7 +272,7 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition
for i, metricSpec := range metricSpecs {
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
if err != nil {
if invalidMetricsCount <= 0 {
@ -301,7 +301,7 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
// Computes the desired number of replicas for a specific hpa and metric specification,
// returning the metric status and a proposed condition to be set on the HPA object.
func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
@ -327,12 +327,12 @@ func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.Horiz
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(specReplicas, spec, hpa, selector, status)
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, err
}
case autoscalingv2.ContainerResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(specReplicas, spec, hpa, selector, status)
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, err
}
@ -350,7 +350,7 @@ func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.Horiz
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true, err
@ -368,7 +368,7 @@ func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error
return false, err
}
return false, a.reconcileAutoscaler(hpa, key)
return false, a.reconcileAutoscaler(ctx, hpa, key)
}
// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.
@ -442,13 +442,13 @@ func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32,
return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
func (a *HorizontalController) computeStatusForResourceMetricGeneric(currentReplicas int32, target autoscalingv2.MetricTarget,
func (a *HorizontalController) computeStatusForResourceMetricGeneric(ctx context.Context, currentReplicas int32, target autoscalingv2.MetricTarget,
resourceName v1.ResourceName, namespace string, container string, selector labels.Selector) (replicaCountProposal int32,
metricStatus *autoscalingv2.MetricValueStatus, timestampProposal time.Time, metricNameProposal string,
condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
if target.AverageValue != nil {
var rawProposal int64
replicaCountProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetRawResourceReplicas(currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container)
replicaCountProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetRawResourceReplicas(ctx, currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container)
if err != nil {
return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", resourceName, err)
}
@ -465,7 +465,7 @@ func (a *HorizontalController) computeStatusForResourceMetricGeneric(currentRepl
}
targetUtilization := *target.AverageUtilization
replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, resourceName, namespace, selector, container)
replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetResourceReplicas(ctx, currentReplicas, targetUtilization, resourceName, namespace, selector, container)
if err != nil {
return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", resourceName, err)
}
@ -479,10 +479,10 @@ func (a *HorizontalController) computeStatusForResourceMetricGeneric(currentRepl
}
// computeStatusForResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
func (a *HorizontalController) computeStatusForResourceMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
func (a *HorizontalController) computeStatusForResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(currentReplicas, metricSpec.Resource.Target, metricSpec.Resource.Name, hpa.Namespace, "", selector)
replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.Resource.Target, metricSpec.Resource.Name, hpa.Namespace, "", selector)
if err != nil {
condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
@ -498,10 +498,10 @@ func (a *HorizontalController) computeStatusForResourceMetric(currentReplicas in
}
// computeStatusForContainerResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
func (a *HorizontalController) computeStatusForContainerResourceMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
func (a *HorizontalController) computeStatusForContainerResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector)
replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector)
if err != nil {
condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetContainerResourceMetric", err)
return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
@ -571,7 +571,7 @@ func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32
}
}
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpav1 := hpav1Shared.DeepCopy()
// then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
@ -589,7 +589,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("invalid API version in scale target reference: %v", err)
}
@ -602,15 +602,15 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
}
scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
@ -650,10 +650,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
desiredReplicas = minReplicas
} else {
var metricTimestamp time.Time
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
@ -683,12 +683,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
if rescale {
scale.Spec.Replicas = desiredReplicas
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(context.TODO(), targetGR, scale, metav1.UpdateOptions{})
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to rescale %s: %v", reference, err)
@ -704,7 +704,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
}
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}
// stabilizeRecommendation:
@ -1099,11 +1099,11 @@ func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleEvents []t
// in turn until a working one is found. If none work, the first error
// is returned. It returns both the scale, as well as the group-resource from
// the working mapping.
func (a *HorizontalController) scaleForResourceMappings(namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
func (a *HorizontalController) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
var firstErr error
for i, mapping := range mappings {
targetGR := mapping.Resource.GroupResource()
scale, err := a.scaleNamespacer.Scales(namespace).Get(context.TODO(), targetGR, name, metav1.GetOptions{})
scale, err := a.scaleNamespacer.Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{})
if err == nil {
return scale, targetGR, nil
}
@ -1146,16 +1146,16 @@ func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutosca
}
// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
func (a *HorizontalController) updateStatusIfNeeded(ctx context.Context, oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
// skip a write if we wouldn't need to update
if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
return nil
}
return a.updateStatus(newHPA)
return a.updateStatus(ctx, newHPA)
}
// updateStatus actually does the update request for the status of the given HPA
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
func (a *HorizontalController) updateStatus(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// convert back to autoscalingv1
hpaRaw, err := unsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
if err != nil {
@ -1164,7 +1164,7 @@ func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAuto
}
hpav1 := hpaRaw.(*autoscalingv1.HorizontalPodAutoscaler)
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpav1.Namespace).UpdateStatus(context.TODO(), hpav1, metav1.UpdateOptions{})
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpav1.Namespace).UpdateStatus(ctx, hpav1, metav1.UpdateOptions{})
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)

View File

@ -17,6 +17,7 @@ limitations under the License.
package podautoscaler
import (
"context"
"encoding/json"
"fmt"
"math"
@ -744,10 +745,10 @@ func coolCPUCreationTime() metav1.Time {
}
func (tc *testCase) runTestWithController(t *testing.T, hpaController *HorizontalController, informerFactory informers.SharedInformerFactory) {
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
go hpaController.Run(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
go hpaController.Run(ctx)
tc.Lock()
shouldWait := tc.verifyEvents

View File

@ -64,8 +64,8 @@ type resourceMetricsClient struct {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
func (c *resourceMetricsClient) GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error) {
metrics, err := c.client.PodMetricses(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})
func (c *resourceMetricsClient) GetResourceMetric(ctx context.Context, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error) {
metrics, err := c.client.PodMetricses(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package metrics
import (
"context"
"fmt"
"testing"
"time"
@ -239,7 +240,7 @@ func (tc *restClientTestCase) runTest(t *testing.T) {
isResource := len(tc.resourceName) > 0
isExternal := tc.metricSelector != nil
if isResource {
info, timestamp, err := metricsClient.GetResourceMetric(v1.ResourceName(tc.resourceName), tc.namespace, tc.selector, tc.container)
info, timestamp, err := metricsClient.GetResourceMetric(context.TODO(), v1.ResourceName(tc.resourceName), tc.namespace, tc.selector, tc.container)
tc.verifyResults(t, info, timestamp, err)
} else if isExternal {
tc.metricLabelSelector, err = metav1.LabelSelectorAsSelector(tc.metricSelector)

View File

@ -17,6 +17,7 @@ limitations under the License.
package metrics
import (
"context"
"time"
autoscaling "k8s.io/api/autoscaling/v2beta2"
@ -40,7 +41,7 @@ type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for the specified named container in all pods matching the specified selector in the given namespace and when
// the container is an empty string it returns the sum of all the container metrics.
GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error)
GetResourceMetric(ctx context.Context, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace

View File

@ -17,6 +17,7 @@ limitations under the License.
package podautoscaler
import (
"context"
"fmt"
"math"
"time"
@ -61,8 +62,8 @@ func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister c
// GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector, container)
func (c *ReplicaCalculator) GetResourceReplicas(ctx context.Context, currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
if err != nil {
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
@ -150,8 +151,8 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
// GetRawResourceReplicas calculates the desired replica count based on a target resource utilization (as a raw milli-value)
// for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetRawResourceReplicas(currentReplicas int32, targetUtilization int64, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector, container)
func (c *ReplicaCalculator) GetRawResourceReplicas(ctx context.Context, currentReplicas int32, targetUtilization int64, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package podautoscaler
import (
"context"
"fmt"
"math"
"testing"
@ -359,7 +360,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
}
if tc.resource != nil {
outReplicas, outUtilization, outRawValue, outTimestamp, err := replicaCalc.GetResourceReplicas(tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector, tc.container)
outReplicas, outUtilization, outRawValue, outTimestamp, err := replicaCalc.GetResourceReplicas(context.TODO(), tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector, tc.container)
if tc.expectedError != nil {
require.Error(t, err, "there should be an error calculating the replica count")