mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #129909 from YamasouA/refactor/scheduler_perf
Refactor scheduler_perf runWorkload
This commit is contained in:
commit
672f57e2a4
@ -1194,7 +1194,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin
|
|||||||
b.Fatalf("workload %s is not valid: %v", w.Name, err)
|
b.Fatalf("workload %s is not valid: %v", w.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
results := runWorkload(tCtx, tc, w, informerFactory)
|
results, err := runWorkload(tCtx, tc, w, informerFactory)
|
||||||
|
if err != nil {
|
||||||
|
tCtx.Fatalf("Error running workload %s: %s", w.Name, err)
|
||||||
|
}
|
||||||
dataItems.DataItems = append(dataItems.DataItems, results...)
|
dataItems.DataItems = append(dataItems.DataItems, results...)
|
||||||
|
|
||||||
if len(results) > 0 {
|
if len(results) > 0 {
|
||||||
@ -1292,7 +1295,10 @@ func RunIntegrationPerfScheduling(t *testing.T, configFile string) {
|
|||||||
t.Fatalf("workload %s is not valid: %v", w.Name, err)
|
t.Fatalf("workload %s is not valid: %v", w.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runWorkload(tCtx, tc, w, informerFactory)
|
_, err = runWorkload(tCtx, tc, w, informerFactory)
|
||||||
|
if err != nil {
|
||||||
|
tCtx.Fatalf("Error running workload %s: %s", w.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
if featureGates[features.SchedulerQueueingHints] {
|
if featureGates[features.SchedulerQueueingHints] {
|
||||||
// In any case, we should make sure InFlightEvents is empty after running the scenario.
|
// In any case, we should make sure InFlightEvents is empty after running the scenario.
|
||||||
@ -1410,7 +1416,7 @@ func checkEmptyInFlightEvents() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) {
|
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector, error) {
|
||||||
collectorCtx := ktesting.WithCancel(tCtx)
|
collectorCtx := ktesting.WithCancel(tCtx)
|
||||||
workloadName := tCtx.Name()
|
workloadName := tCtx.Name()
|
||||||
// The first part is the same for each workload, therefore we can strip it.
|
// The first part is the same for each workload, therefore we can strip it.
|
||||||
@ -1421,20 +1427,23 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup,
|
|||||||
collector := collector
|
collector := collector
|
||||||
err := collector.init()
|
err := collector.init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
|
return nil, nil, fmt.Errorf("failed to initialize data collector: %w", err)
|
||||||
}
|
}
|
||||||
|
tCtx.TB().Cleanup(func() {
|
||||||
|
collectorCtx.Cancel("cleaning up")
|
||||||
|
})
|
||||||
collectorWG.Add(1)
|
collectorWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer collectorWG.Done()
|
defer collectorWG.Done()
|
||||||
collector.run(collectorCtx)
|
collector.run(collectorCtx)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return collectorCtx, collectors
|
return collectorCtx, collectors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem {
|
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) ([]DataItem, error) {
|
||||||
if collectorCtx == nil {
|
if collectorCtx == nil {
|
||||||
tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex)
|
return nil, fmt.Errorf("missing startCollectingMetrics operation before stopping")
|
||||||
}
|
}
|
||||||
collectorCtx.Cancel("collecting metrics, collector must stop first")
|
collectorCtx.Cancel("collecting metrics, collector must stop first")
|
||||||
collectorWG.Wait()
|
collectorWG.Wait()
|
||||||
@ -1447,10 +1456,25 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
|
|||||||
tCtx.Errorf("op %d: %s", opIndex, err)
|
tCtx.Errorf("op %d: %s", opIndex, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dataItems
|
return dataItems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
|
type WorkloadExecutor struct {
|
||||||
|
tCtx ktesting.TContext
|
||||||
|
wg sync.WaitGroup
|
||||||
|
collectorCtx ktesting.TContext
|
||||||
|
collectorWG sync.WaitGroup
|
||||||
|
collectors []testDataCollector
|
||||||
|
dataItems []DataItem
|
||||||
|
numPodsScheduledPerNamespace map[string]int
|
||||||
|
podInformer coreinformers.PodInformer
|
||||||
|
throughputErrorMargin float64
|
||||||
|
testCase *testCase
|
||||||
|
workload *workload
|
||||||
|
nextNodeIndex int
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) ([]DataItem, error) {
|
||||||
b, benchmarking := tCtx.TB().(*testing.B)
|
b, benchmarking := tCtx.TB().(*testing.B)
|
||||||
if benchmarking {
|
if benchmarking {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@ -1481,343 +1505,408 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
|||||||
|
|
||||||
// Everything else started by this function gets stopped before it returns.
|
// Everything else started by this function gets stopped before it returns.
|
||||||
tCtx = ktesting.WithCancel(tCtx)
|
tCtx = ktesting.WithCancel(tCtx)
|
||||||
var wg sync.WaitGroup
|
|
||||||
defer wg.Wait()
|
|
||||||
defer tCtx.Cancel("workload is done")
|
|
||||||
|
|
||||||
var dataItems []DataItem
|
executor := WorkloadExecutor{
|
||||||
nextNodeIndex := 0
|
tCtx: tCtx,
|
||||||
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
|
numPodsScheduledPerNamespace: make(map[string]int),
|
||||||
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
|
podInformer: podInformer,
|
||||||
numPodsScheduledPerNamespace := make(map[string]int)
|
throughputErrorMargin: throughputErrorMargin,
|
||||||
|
testCase: tc,
|
||||||
|
workload: w,
|
||||||
|
}
|
||||||
|
|
||||||
var collectors []testDataCollector
|
tCtx.TB().Cleanup(func() {
|
||||||
// This needs a separate context and wait group because
|
tCtx.Cancel("workload is done")
|
||||||
// the metrics collecting needs to be sure that the goroutines
|
executor.collectorWG.Wait()
|
||||||
// are stopped.
|
executor.wg.Wait()
|
||||||
var collectorCtx ktesting.TContext
|
})
|
||||||
var collectorWG sync.WaitGroup
|
|
||||||
defer collectorWG.Wait()
|
|
||||||
|
|
||||||
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
|
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
|
||||||
realOp, err := op.realOp.patchParams(w)
|
realOp, err := op.realOp.patchParams(w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
return nil, fmt.Errorf("op %d: %w", opIndex, err)
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-tCtx.Done():
|
case <-tCtx.Done():
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
|
return nil, fmt.Errorf("op %d: %w", opIndex, context.Cause(tCtx))
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
switch concreteOp := realOp.(type) {
|
err = executor.runOp(realOp, opIndex)
|
||||||
case *createNodesOp:
|
if err != nil {
|
||||||
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
|
return nil, fmt.Errorf("op %d: %w", opIndex, err)
|
||||||
if err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
nextNodeIndex += concreteOp.Count
|
|
||||||
|
|
||||||
case *createNamespacesOp:
|
|
||||||
nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
|
|
||||||
if err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
if err := nsPreparer.prepare(tCtx); err != nil {
|
|
||||||
err2 := nsPreparer.cleanup(tCtx)
|
|
||||||
if err2 != nil {
|
|
||||||
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
|
|
||||||
}
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
for _, n := range nsPreparer.namespaces() {
|
|
||||||
if _, ok := numPodsScheduledPerNamespace[n]; ok {
|
|
||||||
// this namespace has been already created.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
numPodsScheduledPerNamespace[n] = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
case *createPodsOp:
|
|
||||||
var namespace string
|
|
||||||
// define Pod's namespace automatically, and create that namespace.
|
|
||||||
namespace = fmt.Sprintf("namespace-%d", opIndex)
|
|
||||||
if concreteOp.Namespace != nil {
|
|
||||||
namespace = *concreteOp.Namespace
|
|
||||||
}
|
|
||||||
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
|
|
||||||
if concreteOp.PodTemplatePath == nil {
|
|
||||||
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
|
|
||||||
}
|
|
||||||
|
|
||||||
if concreteOp.CollectMetrics {
|
|
||||||
if collectorCtx != nil {
|
|
||||||
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
|
|
||||||
}
|
|
||||||
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
|
|
||||||
defer collectorCtx.Cancel("cleaning up")
|
|
||||||
}
|
|
||||||
if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
switch {
|
|
||||||
case concreteOp.SkipWaitToCompletion:
|
|
||||||
// Only record those namespaces that may potentially require barriers
|
|
||||||
// in the future.
|
|
||||||
numPodsScheduledPerNamespace[namespace] += concreteOp.Count
|
|
||||||
case concreteOp.SteadyState:
|
|
||||||
if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if concreteOp.CollectMetrics {
|
|
||||||
// CollectMetrics and SkipWaitToCompletion can never be true at the
|
|
||||||
// same time, so if we're here, it means that all pods have been
|
|
||||||
// scheduled.
|
|
||||||
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
|
|
||||||
dataItems = append(dataItems, items...)
|
|
||||||
collectorCtx = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
case *deletePodsOp:
|
|
||||||
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
|
|
||||||
|
|
||||||
podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
|
|
||||||
if err != nil {
|
|
||||||
tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
deletePods := func(opIndex int) {
|
|
||||||
if concreteOp.DeletePodsPerSecond > 0 {
|
|
||||||
ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond))
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for i := 0; i < len(podsToDelete); i++ {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
|
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
|
|
||||||
}
|
|
||||||
case <-tCtx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
listOpts := metav1.ListOptions{
|
|
||||||
LabelSelector: labelSelector.String(),
|
|
||||||
}
|
|
||||||
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
|
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if concreteOp.SkipWaitToCompletion {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(opIndex int) {
|
|
||||||
defer wg.Done()
|
|
||||||
deletePods(opIndex)
|
|
||||||
}(opIndex)
|
|
||||||
} else {
|
|
||||||
deletePods(opIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
case *churnOp:
|
|
||||||
var namespace string
|
|
||||||
if concreteOp.Namespace != nil {
|
|
||||||
namespace = *concreteOp.Namespace
|
|
||||||
} else {
|
|
||||||
namespace = fmt.Sprintf("namespace-%d", opIndex)
|
|
||||||
}
|
|
||||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery()))
|
|
||||||
// Ensure the namespace exists.
|
|
||||||
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
|
|
||||||
if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
|
|
||||||
tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var churnFns []func(name string) string
|
|
||||||
|
|
||||||
for i, path := range concreteOp.TemplatePaths {
|
|
||||||
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
|
|
||||||
if err != nil {
|
|
||||||
tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
|
|
||||||
}
|
|
||||||
// Obtain GVR.
|
|
||||||
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
|
||||||
if err != nil {
|
|
||||||
tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
|
|
||||||
}
|
|
||||||
gvr := mapping.Resource
|
|
||||||
// Distinguish cluster-scoped with namespaced API objects.
|
|
||||||
var dynRes dynamic.ResourceInterface
|
|
||||||
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
|
||||||
dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace)
|
|
||||||
} else {
|
|
||||||
dynRes = tCtx.Dynamic().Resource(gvr)
|
|
||||||
}
|
|
||||||
|
|
||||||
churnFns = append(churnFns, func(name string) string {
|
|
||||||
if name != "" {
|
|
||||||
if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
|
|
||||||
tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return live.GetName()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
var interval int64 = 500
|
|
||||||
if concreteOp.IntervalMilliseconds != 0 {
|
|
||||||
interval = concreteOp.IntervalMilliseconds
|
|
||||||
}
|
|
||||||
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
switch concreteOp.Mode {
|
|
||||||
case Create:
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
count, threshold := 0, concreteOp.Number
|
|
||||||
if threshold == 0 {
|
|
||||||
threshold = math.MaxInt32
|
|
||||||
}
|
|
||||||
for count < threshold {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
for i := range churnFns {
|
|
||||||
churnFns[i]("")
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
case <-tCtx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
case Recreate:
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
retVals := make([][]string, len(churnFns))
|
|
||||||
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
|
|
||||||
for i := range retVals {
|
|
||||||
retVals[i] = make([]string, concreteOp.Number)
|
|
||||||
}
|
|
||||||
|
|
||||||
count := 0
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
for i := range churnFns {
|
|
||||||
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
case <-tCtx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
case *barrierOp:
|
|
||||||
for _, namespace := range concreteOp.Namespaces {
|
|
||||||
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
|
|
||||||
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
switch concreteOp.StageRequirement {
|
|
||||||
case Attempted:
|
|
||||||
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
case Scheduled:
|
|
||||||
// Default should be treated like "Scheduled", so handling both in the same way.
|
|
||||||
fallthrough
|
|
||||||
default:
|
|
||||||
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
|
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
|
||||||
}
|
|
||||||
// At the end of the barrier, we can be sure that there are no pods
|
|
||||||
// pending scheduling in the namespaces that we just blocked on.
|
|
||||||
if len(concreteOp.Namespaces) == 0 {
|
|
||||||
numPodsScheduledPerNamespace = make(map[string]int)
|
|
||||||
} else {
|
|
||||||
for _, namespace := range concreteOp.Namespaces {
|
|
||||||
delete(numPodsScheduledPerNamespace, namespace)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case *sleepOp:
|
|
||||||
select {
|
|
||||||
case <-tCtx.Done():
|
|
||||||
case <-time.After(concreteOp.Duration.Duration):
|
|
||||||
}
|
|
||||||
|
|
||||||
case *startCollectingMetricsOp:
|
|
||||||
if collectorCtx != nil {
|
|
||||||
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
|
|
||||||
}
|
|
||||||
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
|
|
||||||
defer collectorCtx.Cancel("cleaning up")
|
|
||||||
|
|
||||||
case *stopCollectingMetricsOp:
|
|
||||||
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
|
|
||||||
dataItems = append(dataItems, items...)
|
|
||||||
collectorCtx = nil
|
|
||||||
|
|
||||||
default:
|
|
||||||
runable, ok := concreteOp.(runnableOp)
|
|
||||||
if !ok {
|
|
||||||
tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
|
|
||||||
}
|
|
||||||
for _, namespace := range runable.requiredNamespaces() {
|
|
||||||
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
|
|
||||||
}
|
|
||||||
runable.run(tCtx)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check unused params and inform users
|
// check unused params and inform users
|
||||||
unusedParams := w.unusedParams()
|
unusedParams := w.unusedParams()
|
||||||
if len(unusedParams) != 0 {
|
if len(unusedParams) != 0 {
|
||||||
tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
|
return nil, fmt.Errorf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos", unusedParams, w.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Some tests have unschedulable pods. Do not add an implicit barrier at the
|
// Some tests have unschedulable pods. Do not add an implicit barrier at the
|
||||||
// end as we do not want to wait for them.
|
// end as we do not want to wait for them.
|
||||||
return dataItems
|
return executor.dataItems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
|
func (e *WorkloadExecutor) runOp(op realOp, opIndex int) error {
|
||||||
|
switch concreteOp := op.(type) {
|
||||||
|
case *createNodesOp:
|
||||||
|
return e.runCreateNodesOp(opIndex, concreteOp)
|
||||||
|
case *createNamespacesOp:
|
||||||
|
return e.runCreateNamespaceOp(opIndex, concreteOp)
|
||||||
|
case *createPodsOp:
|
||||||
|
return e.runCreatePodsOp(opIndex, concreteOp)
|
||||||
|
case *deletePodsOp:
|
||||||
|
return e.runDeletePodsOp(opIndex, concreteOp)
|
||||||
|
case *churnOp:
|
||||||
|
return e.runChurnOp(opIndex, concreteOp)
|
||||||
|
case *barrierOp:
|
||||||
|
return e.runBarrierOp(opIndex, concreteOp)
|
||||||
|
case *sleepOp:
|
||||||
|
return e.runSleepOp(concreteOp)
|
||||||
|
case *startCollectingMetricsOp:
|
||||||
|
return e.runStartCollectingMetricsOp(opIndex, concreteOp)
|
||||||
|
case *stopCollectingMetricsOp:
|
||||||
|
return e.runStopCollectingMetrics(opIndex)
|
||||||
|
default:
|
||||||
|
return e.runDefaultOp(opIndex, concreteOp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) error {
|
||||||
|
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, e.tCtx.Client())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := nodePreparer.PrepareNodes(e.tCtx, e.nextNodeIndex); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e.nextNodeIndex += op.Count
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) error {
|
||||||
|
nsPreparer, err := newNamespacePreparer(e.tCtx, op)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := nsPreparer.prepare(e.tCtx); err != nil {
|
||||||
|
err2 := nsPreparer.cleanup(e.tCtx)
|
||||||
|
if err2 != nil {
|
||||||
|
err = fmt.Errorf("prepare: %w; cleanup: %w", err, err2)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, n := range nsPreparer.namespaces() {
|
||||||
|
if _, ok := e.numPodsScheduledPerNamespace[n]; ok {
|
||||||
|
// this namespace has been already created.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
e.numPodsScheduledPerNamespace[n] = 0
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) error {
|
||||||
|
for _, namespace := range op.Namespaces {
|
||||||
|
if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok {
|
||||||
|
return fmt.Errorf("unknown namespace %s", namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch op.StageRequirement {
|
||||||
|
case Attempted:
|
||||||
|
if err := waitUntilPodsAttempted(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case Scheduled:
|
||||||
|
// Default should be treated like "Scheduled", so handling both in the same way.
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
if err := waitUntilPodsScheduled(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// At the end of the barrier, we can be sure that there are no pods
|
||||||
|
// pending scheduling in the namespaces that we just blocked on.
|
||||||
|
if len(op.Namespaces) == 0 {
|
||||||
|
e.numPodsScheduledPerNamespace = make(map[string]int)
|
||||||
|
} else {
|
||||||
|
for _, namespace := range op.Namespaces {
|
||||||
|
delete(e.numPodsScheduledPerNamespace, namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runSleepOp(op *sleepOp) error {
|
||||||
|
select {
|
||||||
|
case <-e.tCtx.Done():
|
||||||
|
case <-time.After(op.Duration.Duration):
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) error {
|
||||||
|
items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e.dataItems = append(e.dataItems, items...)
|
||||||
|
e.collectorCtx = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) error {
|
||||||
|
// define Pod's namespace automatically, and create that namespace.
|
||||||
|
namespace := fmt.Sprintf("namespace-%d", opIndex)
|
||||||
|
if op.Namespace != nil {
|
||||||
|
namespace = *op.Namespace
|
||||||
|
}
|
||||||
|
err := createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if op.PodTemplatePath == nil {
|
||||||
|
op.PodTemplatePath = e.testCase.DefaultPodTemplatePath
|
||||||
|
}
|
||||||
|
|
||||||
|
if op.CollectMetrics {
|
||||||
|
if e.collectorCtx != nil {
|
||||||
|
return fmt.Errorf("metrics collection is overlapping. Probably second collector was started before stopping a previous one")
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := createPodsRapidly(e.tCtx, namespace, op); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case op.SkipWaitToCompletion:
|
||||||
|
// Only record those namespaces that may potentially require barriers
|
||||||
|
// in the future.
|
||||||
|
e.numPodsScheduledPerNamespace[namespace] += op.Count
|
||||||
|
case op.SteadyState:
|
||||||
|
if err := createPodsSteadily(e.tCtx, namespace, e.podInformer, op); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if err := waitUntilPodsScheduledInNamespace(e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil {
|
||||||
|
return fmt.Errorf("error in waiting for pods to get scheduled: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if op.CollectMetrics {
|
||||||
|
// CollectMetrics and SkipWaitToCompletion can never be true at the
|
||||||
|
// same time, so if we're here, it means that all pods have been
|
||||||
|
// scheduled.
|
||||||
|
items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e.dataItems = append(e.dataItems, items...)
|
||||||
|
e.collectorCtx = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) error {
|
||||||
|
labelSelector := labels.ValidatedSetSelector(op.LabelSelector)
|
||||||
|
|
||||||
|
podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error in listing pods in the namespace %s: %w", op.Namespace, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
deletePods := func(opIndex int) {
|
||||||
|
if op.DeletePodsPerSecond > 0 {
|
||||||
|
ticker := time.NewTicker(time.Second / time.Duration(op.DeletePodsPerSecond))
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for i := 0; i < len(podsToDelete); i++ {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).Delete(e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
|
||||||
|
}
|
||||||
|
case <-e.tCtx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
listOpts := metav1.ListOptions{
|
||||||
|
LabelSelector: labelSelector.String(),
|
||||||
|
}
|
||||||
|
if err := e.tCtx.Client().CoreV1().Pods(op.Namespace).DeleteCollection(e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if op.SkipWaitToCompletion {
|
||||||
|
e.wg.Add(1)
|
||||||
|
go func(opIndex int) {
|
||||||
|
defer e.wg.Done()
|
||||||
|
deletePods(opIndex)
|
||||||
|
}(opIndex)
|
||||||
|
} else {
|
||||||
|
deletePods(opIndex)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) error {
|
||||||
|
var namespace string
|
||||||
|
if op.Namespace != nil {
|
||||||
|
namespace = *op.Namespace
|
||||||
|
} else {
|
||||||
|
namespace = fmt.Sprintf("namespace-%d", opIndex)
|
||||||
|
}
|
||||||
|
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(e.tCtx.Client().Discovery()))
|
||||||
|
// Ensure the namespace exists.
|
||||||
|
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
|
||||||
|
if _, err := e.tCtx.Client().CoreV1().Namespaces().Create(e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||||
|
return fmt.Errorf("unable to create namespace %v: %w", namespace, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var churnFns []func(name string) string
|
||||||
|
|
||||||
|
for i, path := range op.TemplatePaths {
|
||||||
|
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to parse the %v-th template path: %w", i, err)
|
||||||
|
}
|
||||||
|
// Obtain GVR.
|
||||||
|
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to find GVR for %v: %w", gvk, err)
|
||||||
|
}
|
||||||
|
gvr := mapping.Resource
|
||||||
|
// Distinguish cluster-scoped with namespaced API objects.
|
||||||
|
var dynRes dynamic.ResourceInterface
|
||||||
|
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
||||||
|
dynRes = e.tCtx.Dynamic().Resource(gvr).Namespace(namespace)
|
||||||
|
} else {
|
||||||
|
dynRes = e.tCtx.Dynamic().Resource(gvr)
|
||||||
|
}
|
||||||
|
|
||||||
|
churnFns = append(churnFns, func(name string) string {
|
||||||
|
if name != "" {
|
||||||
|
if err := dynRes.Delete(e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
|
e.tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
live, err := dynRes.Create(e.tCtx, unstructuredObj, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return live.GetName()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var interval int64 = 500
|
||||||
|
if op.IntervalMilliseconds != 0 {
|
||||||
|
interval = op.IntervalMilliseconds
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
|
||||||
|
|
||||||
|
switch op.Mode {
|
||||||
|
case Create:
|
||||||
|
e.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer e.wg.Done()
|
||||||
|
defer ticker.Stop()
|
||||||
|
count, threshold := 0, op.Number
|
||||||
|
if threshold == 0 {
|
||||||
|
threshold = math.MaxInt32
|
||||||
|
}
|
||||||
|
for count < threshold {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
for i := range churnFns {
|
||||||
|
churnFns[i]("")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
case <-e.tCtx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
case Recreate:
|
||||||
|
e.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer e.wg.Done()
|
||||||
|
defer ticker.Stop()
|
||||||
|
retVals := make([][]string, len(churnFns))
|
||||||
|
// For each churn function, instantiate a slice of strings with length "op.Number".
|
||||||
|
for i := range retVals {
|
||||||
|
retVals[i] = make([]string, op.Number)
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
for i := range churnFns {
|
||||||
|
retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number])
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
case <-e.tCtx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) error {
|
||||||
|
runable, ok := op.(runnableOp)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid op %v", op)
|
||||||
|
}
|
||||||
|
for _, namespace := range runable.requiredNamespaces() {
|
||||||
|
err := createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runable.run(e.tCtx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) error {
|
||||||
|
if e.collectorCtx != nil {
|
||||||
|
return fmt.Errorf("metrics collection is overlapping. Probably second collector was started before stopping a previous one")
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) error {
|
||||||
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
||||||
// The namespace has not created yet.
|
// The namespace has not created yet.
|
||||||
// So, create that and register it.
|
// So, create that and register it.
|
||||||
_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
|
_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tCtx.Fatalf("failed to create namespace for Pod: %v", namespace)
|
return fmt.Errorf("failed to create namespace for Pod: %v", namespace)
|
||||||
}
|
}
|
||||||
(*podsPerNamespace)[namespace] = 0
|
(*podsPerNamespace)[namespace] = 0
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type testDataCollector interface {
|
type testDataCollector interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user