Merge pull request #127154 from macsko/check_if_inflight_events_empty_in_testcase_end_scheduler_perf

Check if InFlightEvents is empty after scheduler_perf workload
This commit is contained in:
Kubernetes Prow Robot 2024-09-09 20:43:33 +01:00 committed by GitHub
commit 0c5e832aa2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 96 additions and 12 deletions

View File

@ -105,6 +105,42 @@ var (
WildCardEvent = ClusterEvent{Resource: WildCard, ActionType: All, Label: "WildCardEvent"} WildCardEvent = ClusterEvent{Resource: WildCard, ActionType: All, Label: "WildCardEvent"}
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. // UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, Label: "UnschedulableTimeout"} UnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, Label: "UnschedulableTimeout"}
// AllEvents contains all events defined above.
AllEvents = []ClusterEvent{
AssignedPodAdd,
NodeAdd,
NodeDelete,
AssignedPodUpdate,
UnscheduledPodAdd,
UnscheduledPodUpdate,
UnscheduledPodDelete,
assignedPodOtherUpdate,
AssignedPodDelete,
PodRequestScaledDown,
PodLabelChange,
PodTolerationChange,
PodSchedulingGateEliminatedChange,
NodeSpecUnschedulableChange,
NodeAllocatableChange,
NodeLabelChange,
NodeAnnotationChange,
NodeTaintChange,
NodeConditionChange,
PvAdd,
PvUpdate,
PvcAdd,
PvcUpdate,
StorageClassAdd,
StorageClassUpdate,
CSINodeAdd,
CSINodeUpdate,
CSIDriverAdd,
CSIDriverUpdate,
CSIStorageCapacityAdd,
CSIStorageCapacityUpdate,
WildCardEvent,
UnschedulableTimeout,
}
) )
// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). // PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s).

View File

@ -52,10 +52,13 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
schedframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
@ -927,6 +930,13 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
} }
} }
if tc.FeatureGates[features.SchedulerQueueingHints] {
// In any case, we should make sure InFlightEvents is empty after running the scenario.
if err = checkEmptyInFlightEvents(); err != nil {
tCtx.Errorf("%s: %s", w.Name, err)
}
}
// Reset metrics to prevent metrics generated in current workload gets // Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload. // carried over to the next workload.
legacyregistry.Reset() legacyregistry.Reset()
@ -1027,6 +1037,23 @@ func compareMetricWithThreshold(items []DataItem, threshold float64, metricSelec
return nil return nil
} }
func checkEmptyInFlightEvents() error {
labels := []string{metrics.PodPoppedInFlightEvent}
for _, event := range schedframework.AllEvents {
labels = append(labels, event.Label)
}
for _, label := range labels {
value, err := testutil.GetGaugeMetricValue(metrics.InFlightEvents.WithLabelValues(label))
if err != nil {
return fmt.Errorf("failed to get InFlightEvents metric for label %s", label)
}
if value > 0 {
return fmt.Errorf("InFlightEvents for label %s should be empty, but has %v items", label, value)
}
}
return nil
}
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem { func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
b, benchmarking := tCtx.TB().(*testing.B) b, benchmarking := tCtx.TB().(*testing.B)
if benchmarking { if benchmarking {
@ -1139,7 +1166,10 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
for _, collector := range collectors { for _, collector := range collectors {
// Need loop-local variable for function below. // Need loop-local variable for function below.
collector := collector collector := collector
collector.init() err = collector.init()
if err != nil {
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
}
collectorWG.Add(1) collectorWG.Add(1)
go func() { go func() {
defer collectorWG.Done() defer collectorWG.Done()
@ -1205,13 +1235,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
}() }()
} }
if !concreteOp.SkipWaitToCompletion {
// SkipWaitToCompletion=false indicates this step has waited for the Pods to be scheduled.
// So we reset the metrics in global registry; otherwise metrics gathered in this step
// will be carried over to next step.
legacyregistry.Reset()
}
case *churnOp: case *churnOp:
var namespace string var namespace string
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
@ -1376,7 +1399,7 @@ func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsP
} }
type testDataCollector interface { type testDataCollector interface {
init() init() error
run(tCtx ktesting.TContext) run(tCtx ktesting.TContext)
collect() []DataItem collect() []DataItem
} }

View File

@ -18,6 +18,9 @@ package benchmark
import ( import (
"testing" "testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/kubernetes/pkg/features"
) )
func TestScheduling(t *testing.T) { func TestScheduling(t *testing.T) {
@ -43,6 +46,17 @@ func TestScheduling(t *testing.T) {
informerFactory, tCtx := setupTestCase(t, tc, nil, nil) informerFactory, tCtx := setupTestCase(t, tc, nil, nil)
runWorkload(tCtx, tc, w, informerFactory) runWorkload(tCtx, tc, w, informerFactory)
if tc.FeatureGates[features.SchedulerQueueingHints] {
// In any case, we should make sure InFlightEvents is empty after running the scenario.
if err = checkEmptyInFlightEvents(); err != nil {
tCtx.Errorf("%s: %s", w.Name, err)
}
}
// Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload.
legacyregistry.Reset()
}) })
} }
}) })

View File

@ -263,9 +263,19 @@ func newMetricsCollector(config *metricsCollectorConfig, labels map[string]strin
} }
} }
func (mc *metricsCollector) init() { func (mc *metricsCollector) init() error {
// Reset the metrics so that the measurements do not interfere with those collected during the previous steps. // Reset the metrics so that the measurements do not interfere with those collected during the previous steps.
legacyregistry.Reset() m, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics to reset: %w", err)
}
for _, mFamily := range m {
// Reset only metrics defined in the collector.
if _, ok := mc.Metrics[mFamily.GetName()]; ok {
mFamily.Reset()
}
}
return nil
} }
func (*metricsCollector) run(tCtx ktesting.TContext) { func (*metricsCollector) run(tCtx ktesting.TContext) {
@ -381,7 +391,8 @@ func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[st
} }
} }
func (tc *throughputCollector) init() { func (tc *throughputCollector) init() error {
return nil
} }
func (tc *throughputCollector) run(tCtx ktesting.TContext) { func (tc *throughputCollector) run(tCtx ktesting.TContext) {