mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
scheduler_perf: use ktesting.TContext + staging StartTestServer
ktesting.TContext combines several different interfaces. This makes the code simpler because less parameters need to be passed around. An intentional side effect is that the apiextensions client interface becomes available, which makes it possible to use CRDs. This will be needed for future DRA tests. Support for CRDs depends on starting the apiserver via k8s.io/kubernetes/cmd/kube-apiserver/app/testing because only that enables the CRD extensions. As discussed on Slack, the long-term goal is to replace the in-tree StartTestServer with the one in staging, so this is going in the right direction.
This commit is contained in:
parent
63aa261583
commit
c46ae1b26a
@ -19,10 +19,10 @@ package benchmark
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
// createOp defines an op where some object gets created from a template.
|
||||
@ -69,14 +69,14 @@ func (cro *createOp[T, P]) requiredNamespaces() []string {
|
||||
return []string{cro.Namespace}
|
||||
}
|
||||
|
||||
func (cro *createOp[T, P]) run(ctx context.Context, tb testing.TB, client clientset.Interface) {
|
||||
func (cro *createOp[T, P]) run(tCtx ktesting.TContext) {
|
||||
var obj *T
|
||||
var p P
|
||||
if err := getSpecFromFile(&cro.TemplatePath, &obj); err != nil {
|
||||
tb.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err)
|
||||
tCtx.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err)
|
||||
}
|
||||
if _, err := p.CreateCall(client, cro.Namespace)(ctx, obj, metav1.CreateOptions{}); err != nil {
|
||||
tb.Fatalf("create %s: %v", p.Name(), err)
|
||||
if _, err := p.CreateCall(tCtx.Client(), cro.Namespace)(tCtx, obj, metav1.CreateOptions{}); err != nil {
|
||||
tCtx.Fatalf("create %s: %v", p.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -29,6 +28,7 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
// createResourceClaimsOp defines an op where resource claims are created.
|
||||
@ -82,18 +82,18 @@ func (op *createResourceClaimsOp) requiredNamespaces() []string {
|
||||
return []string{op.Namespace}
|
||||
}
|
||||
|
||||
func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) {
|
||||
tb.Logf("creating %d claims in namespace %q", op.Count, op.Namespace)
|
||||
func (op *createResourceClaimsOp) run(tCtx ktesting.TContext) {
|
||||
tCtx.Logf("creating %d claims in namespace %q", op.Count, op.Namespace)
|
||||
|
||||
var claimTemplate *resourcev1alpha2.ResourceClaim
|
||||
if err := getSpecFromFile(&op.TemplatePath, &claimTemplate); err != nil {
|
||||
tb.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err)
|
||||
tCtx.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err)
|
||||
}
|
||||
var createErr error
|
||||
var mutex sync.Mutex
|
||||
create := func(i int) {
|
||||
err := func() error {
|
||||
if _, err := clientset.ResourceV1alpha2().ResourceClaims(op.Namespace).Create(ctx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil {
|
||||
if _, err := tCtx.Client().ResourceV1alpha2().ResourceClaims(op.Namespace).Create(tCtx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("create claim: %v", err)
|
||||
}
|
||||
return nil
|
||||
@ -109,9 +109,9 @@ func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, client
|
||||
if workers > 30 {
|
||||
workers = 30
|
||||
}
|
||||
workqueue.ParallelizeUntil(ctx, workers, op.Count, create)
|
||||
workqueue.ParallelizeUntil(tCtx, workers, op.Count, create)
|
||||
if createErr != nil {
|
||||
tb.Fatal(createErr.Error())
|
||||
tCtx.Fatal(createErr.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,8 +186,8 @@ func (op *createResourceDriverOp) patchParams(w *workload) (realOp, error) {
|
||||
|
||||
func (op *createResourceDriverOp) requiredNamespaces() []string { return nil }
|
||||
|
||||
func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) {
|
||||
tb.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes)
|
||||
func (op *createResourceDriverOp) run(tCtx ktesting.TContext) {
|
||||
tCtx.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes)
|
||||
|
||||
// Start the controller side of the DRA test driver such that it simulates
|
||||
// per-node resources.
|
||||
@ -197,22 +197,22 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client
|
||||
MaxAllocations: op.MaxClaimsPerNode,
|
||||
}
|
||||
|
||||
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
||||
nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
tb.Fatalf("list nodes: %v", err)
|
||||
tCtx.Fatalf("list nodes: %v", err)
|
||||
}
|
||||
for _, node := range nodes.Items {
|
||||
match, err := filepath.Match(op.Nodes, node.Name)
|
||||
if err != nil {
|
||||
tb.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err)
|
||||
tCtx.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err)
|
||||
}
|
||||
if match {
|
||||
resources.Nodes = append(resources.Nodes, node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
controller := draapp.NewController(clientset, resources)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
controller := draapp.NewController(tCtx.Client(), resources)
|
||||
ctx, cancel := context.WithCancel(tCtx)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@ -220,11 +220,11 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client
|
||||
ctx := klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), op.DriverName))
|
||||
controller.Run(ctx, 5 /* workers */)
|
||||
}()
|
||||
tb.Cleanup(func() {
|
||||
tb.Logf("stopping resource driver %q", op.DriverName)
|
||||
tCtx.Cleanup(func() {
|
||||
tCtx.Logf("stopping resource driver %q", op.DriverName)
|
||||
// We must cancel before waiting.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
tb.Logf("stopped resource driver %q", op.DriverName)
|
||||
tCtx.Logf("stopped resource driver %q", op.DriverName)
|
||||
})
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ import (
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/kubernetes/test/utils/ktesting/initoption"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
@ -293,7 +294,7 @@ type runnableOp interface {
|
||||
// before running the operation.
|
||||
requiredNamespaces() []string
|
||||
// run executes the steps provided by the operation.
|
||||
run(context.Context, testing.TB, clientset.Interface)
|
||||
run(ktesting.TContext)
|
||||
}
|
||||
|
||||
func isValidParameterizable(val string) bool {
|
||||
@ -674,6 +675,7 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
|
||||
if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
||||
b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
|
||||
}
|
||||
tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog))
|
||||
|
||||
// Ensure that there are no leaked
|
||||
// goroutines. They could influence
|
||||
@ -684,28 +686,19 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
|
||||
// quit *before* restoring klog settings.
|
||||
framework.GoleakCheck(b)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if *useTestingLog {
|
||||
// In addition to redirection klog
|
||||
// output, also enable contextual
|
||||
// logging.
|
||||
_, ctx = ktesting.NewTestContext(b)
|
||||
}
|
||||
|
||||
// Now that we are ready to run, start
|
||||
// etcd.
|
||||
framework.StartEtcd(b, output)
|
||||
|
||||
// 30 minutes should be plenty enough even for the 5000-node tests.
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
|
||||
b.Cleanup(cancel)
|
||||
timeout := 30 * time.Minute
|
||||
tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
|
||||
|
||||
for feature, flag := range tc.FeatureGates {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
|
||||
}
|
||||
informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
|
||||
results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false)
|
||||
informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
|
||||
results := runWorkload(tCtx, tc, w, informerFactory, false)
|
||||
dataItems.DataItems = append(dataItems.DataItems, results...)
|
||||
|
||||
if len(results) > 0 {
|
||||
@ -771,7 +764,7 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error
|
||||
return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
|
||||
}
|
||||
|
||||
func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op {
|
||||
func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *workload) []op {
|
||||
var unrolled []op
|
||||
for opIndex, o := range wt {
|
||||
realOp, err := o.realOp.patchParams(w)
|
||||
@ -794,23 +787,23 @@ func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op {
|
||||
return unrolled
|
||||
}
|
||||
|
||||
func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
|
||||
func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
|
||||
var cfg *config.KubeSchedulerConfiguration
|
||||
var err error
|
||||
if configPath != "" {
|
||||
cfg, err = loadSchedulerConfig(configPath)
|
||||
if err != nil {
|
||||
tb.Fatalf("error loading scheduler config file: %v", err)
|
||||
tCtx.Fatalf("error loading scheduler config file: %v", err)
|
||||
}
|
||||
if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
|
||||
tb.Fatalf("validate scheduler config file failed: %v", err)
|
||||
tCtx.Fatalf("validate scheduler config file failed: %v", err)
|
||||
}
|
||||
}
|
||||
return mustSetupCluster(ctx, tb, cfg, featureGates, outOfTreePluginRegistry)
|
||||
return mustSetupCluster(tCtx, cfg, featureGates, outOfTreePluginRegistry)
|
||||
}
|
||||
|
||||
func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem {
|
||||
b, benchmarking := tb.(*testing.B)
|
||||
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, cleanup bool) []DataItem {
|
||||
b, benchmarking := tCtx.TB().(*testing.B)
|
||||
if benchmarking {
|
||||
start := time.Now()
|
||||
b.Cleanup(func() {
|
||||
@ -839,10 +832,10 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
|
||||
// Everything else started by this function gets stopped before it returns.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
tCtx = ktesting.WithCancel(tCtx)
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
defer cancel()
|
||||
defer tCtx.Cancel("workload is done")
|
||||
|
||||
var mu sync.Mutex
|
||||
var dataItems []DataItem
|
||||
@ -853,48 +846,48 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
|
||||
if cleanup {
|
||||
// This must run before controllers get shut down.
|
||||
defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace)
|
||||
defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace)
|
||||
}
|
||||
|
||||
for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) {
|
||||
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
|
||||
realOp, err := op.realOp.patchParams(w)
|
||||
if err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
tb.Fatalf("op %d: %v", opIndex, ctx.Err())
|
||||
case <-tCtx.Done():
|
||||
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
|
||||
default:
|
||||
}
|
||||
switch concreteOp := realOp.(type) {
|
||||
case *createNodesOp:
|
||||
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
|
||||
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
|
||||
if err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
if cleanup {
|
||||
defer func() {
|
||||
if err := nodePreparer.CleanupNodes(ctx); err != nil {
|
||||
tb.Fatalf("failed to clean up nodes, error: %v", err)
|
||||
if err := nodePreparer.CleanupNodes(tCtx); err != nil {
|
||||
tCtx.Fatalf("failed to clean up nodes, error: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
nextNodeIndex += concreteOp.Count
|
||||
|
||||
case *createNamespacesOp:
|
||||
nsPreparer, err := newNamespacePreparer(concreteOp, client, tb)
|
||||
nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
|
||||
if err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
if err := nsPreparer.prepare(ctx); err != nil {
|
||||
err2 := nsPreparer.cleanup(ctx)
|
||||
if err := nsPreparer.prepare(tCtx); err != nil {
|
||||
err2 := nsPreparer.cleanup(tCtx)
|
||||
if err2 != nil {
|
||||
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
|
||||
}
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
for _, n := range nsPreparer.namespaces() {
|
||||
if _, ok := numPodsScheduledPerNamespace[n]; ok {
|
||||
@ -911,7 +904,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
if concreteOp.Namespace != nil {
|
||||
namespace = *concreteOp.Namespace
|
||||
}
|
||||
createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
|
||||
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
|
||||
if concreteOp.PodTemplatePath == nil {
|
||||
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
|
||||
}
|
||||
@ -919,18 +912,17 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
// This needs a separate context and wait group because
|
||||
// the code below needs to be sure that the goroutines
|
||||
// are stopped.
|
||||
var collectorCtx context.Context
|
||||
var collectorCancel func()
|
||||
var collectorCtx ktesting.TContext
|
||||
var collectorWG sync.WaitGroup
|
||||
defer collectorWG.Wait()
|
||||
|
||||
if concreteOp.CollectMetrics {
|
||||
collectorCtx, collectorCancel = context.WithCancel(ctx)
|
||||
defer collectorCancel()
|
||||
name := tb.Name()
|
||||
collectorCtx = ktesting.WithCancel(tCtx)
|
||||
defer collectorCtx.Cancel("cleaning up")
|
||||
name := tCtx.Name()
|
||||
// The first part is the same for each work load, therefore we can strip it.
|
||||
name = name[strings.Index(name, "/")+1:]
|
||||
collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
|
||||
collectors = getTestDataCollectors(collectorCtx, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
|
||||
for _, collector := range collectors {
|
||||
// Need loop-local variable for function below.
|
||||
collector := collector
|
||||
@ -941,23 +933,23 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
}()
|
||||
}
|
||||
}
|
||||
if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
if err := createPods(tCtx, namespace, concreteOp); err != nil {
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
if concreteOp.SkipWaitToCompletion {
|
||||
// Only record those namespaces that may potentially require barriers
|
||||
// in the future.
|
||||
numPodsScheduledPerNamespace[namespace] += concreteOp.Count
|
||||
} else {
|
||||
if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil {
|
||||
tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
|
||||
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
|
||||
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
|
||||
}
|
||||
}
|
||||
if concreteOp.CollectMetrics {
|
||||
// CollectMetrics and SkipWaitToCompletion can never be true at the
|
||||
// same time, so if we're here, it means that all pods have been
|
||||
// scheduled.
|
||||
collectorCancel()
|
||||
collectorCtx.Cancel("collecting metrix, collector must stop first")
|
||||
collectorWG.Wait()
|
||||
mu.Lock()
|
||||
for _, collector := range collectors {
|
||||
@ -980,11 +972,11 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
} else {
|
||||
namespace = fmt.Sprintf("namespace-%d", opIndex)
|
||||
}
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery()))
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery()))
|
||||
// Ensure the namespace exists.
|
||||
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
|
||||
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
|
||||
if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
|
||||
}
|
||||
|
||||
var churnFns []func(name string) string
|
||||
@ -992,31 +984,31 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
for i, path := range concreteOp.TemplatePaths {
|
||||
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
|
||||
if err != nil {
|
||||
tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
|
||||
tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
|
||||
}
|
||||
// Obtain GVR.
|
||||
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
||||
if err != nil {
|
||||
tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
|
||||
tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
|
||||
}
|
||||
gvr := mapping.Resource
|
||||
// Distinguish cluster-scoped with namespaced API objects.
|
||||
var dynRes dynamic.ResourceInterface
|
||||
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
||||
dynRes = dynClient.Resource(gvr).Namespace(namespace)
|
||||
dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace)
|
||||
} else {
|
||||
dynRes = dynClient.Resource(gvr)
|
||||
dynRes = tCtx.Dynamic().Resource(gvr)
|
||||
}
|
||||
|
||||
churnFns = append(churnFns, func(name string) string {
|
||||
if name != "" {
|
||||
if err := dynRes.Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
|
||||
tb.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
|
||||
if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil {
|
||||
tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{})
|
||||
live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
@ -1047,7 +1039,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
churnFns[i]("")
|
||||
}
|
||||
count++
|
||||
case <-ctx.Done():
|
||||
case <-tCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -1070,7 +1062,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
|
||||
}
|
||||
count++
|
||||
case <-ctx.Done():
|
||||
case <-tCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -1080,11 +1072,11 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
case *barrierOp:
|
||||
for _, namespace := range concreteOp.Namespaces {
|
||||
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
|
||||
tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
|
||||
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
|
||||
}
|
||||
}
|
||||
if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
|
||||
tb.Fatalf("op %d: %v", opIndex, err)
|
||||
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
// At the end of the barrier, we can be sure that there are no pods
|
||||
// pending scheduling in the namespaces that we just blocked on.
|
||||
@ -1098,25 +1090,25 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
|
||||
case *sleepOp:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-tCtx.Done():
|
||||
case <-time.After(concreteOp.Duration):
|
||||
}
|
||||
default:
|
||||
runable, ok := concreteOp.(runnableOp)
|
||||
if !ok {
|
||||
tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
|
||||
tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
|
||||
}
|
||||
for _, namespace := range runable.requiredNamespaces() {
|
||||
createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
|
||||
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
|
||||
}
|
||||
runable.run(ctx, tb, client)
|
||||
runable.run(tCtx)
|
||||
}
|
||||
}
|
||||
|
||||
// check unused params and inform users
|
||||
unusedParams := w.unusedParams()
|
||||
if len(unusedParams) != 0 {
|
||||
tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
|
||||
tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
|
||||
}
|
||||
|
||||
// Some tests have unschedulable pods. Do not add an implicit barrier at the
|
||||
@ -1133,17 +1125,17 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload,
|
||||
//
|
||||
// Calling cleanupWorkload can be skipped if it is known that the next workload
|
||||
// will run with a fresh etcd instance.
|
||||
func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) {
|
||||
func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) {
|
||||
deleteNow := *metav1.NewDeleteOptions(0)
|
||||
for namespace := range numPodsScheduledPerNamespace {
|
||||
// Pods have to be deleted explicitly, with no grace period. Normally
|
||||
// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
|
||||
// to remove a deleted pod, but we don't run kubelet...
|
||||
if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil {
|
||||
tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
|
||||
if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil {
|
||||
tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
|
||||
}
|
||||
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil {
|
||||
tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
|
||||
if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil {
|
||||
tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1151,8 +1143,8 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl
|
||||
// actually removing a namespace can take some time (garbage collecting
|
||||
// other generated object like secrets, etc.) and we don't want to
|
||||
// start the next workloads while that cleanup is still going on.
|
||||
if err := wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
|
||||
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
|
||||
if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
|
||||
namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -1165,33 +1157,33 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl
|
||||
// All namespaces gone.
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
tb.Fatalf("failed while waiting for namespace removal: %v", err)
|
||||
tCtx.Fatalf("failed while waiting for namespace removal: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {
|
||||
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
|
||||
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
||||
// The namespace has not created yet.
|
||||
// So, create that and register it.
|
||||
_, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
|
||||
_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
tb.Fatalf("failed to create namespace for Pod: %v", namespace)
|
||||
tCtx.Fatalf("failed to create namespace for Pod: %v", namespace)
|
||||
}
|
||||
(*podsPerNamespace)[namespace] = 0
|
||||
}
|
||||
}
|
||||
|
||||
type testDataCollector interface {
|
||||
run(ctx context.Context)
|
||||
run(tCtx ktesting.TContext)
|
||||
collect() []DataItem
|
||||
}
|
||||
|
||||
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
|
||||
func getTestDataCollectors(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
|
||||
if mcc == nil {
|
||||
mcc = &defaultMetricsCollectorConfig
|
||||
}
|
||||
return []testDataCollector{
|
||||
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
|
||||
newThroughputCollector(tCtx, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
|
||||
newMetricsCollector(mcc, map[string]string{"Name": name}),
|
||||
}
|
||||
}
|
||||
@ -1224,25 +1216,25 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte
|
||||
), nil
|
||||
}
|
||||
|
||||
func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error {
|
||||
func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
|
||||
strategy, err := getPodStrategy(cpo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
|
||||
tCtx.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
|
||||
config := testutils.NewTestPodCreatorConfig()
|
||||
config.AddStrategy(namespace, cpo.Count, strategy)
|
||||
podCreator := testutils.NewTestPodCreator(clientset, config)
|
||||
return podCreator.CreatePods(ctx)
|
||||
podCreator := testutils.NewTestPodCreator(tCtx.Client(), config)
|
||||
return podCreator.CreatePods(tCtx)
|
||||
}
|
||||
|
||||
// waitUntilPodsScheduledInNamespace blocks until all pods in the given
|
||||
// namespace are scheduled. Times out after 10 minutes because even at the
|
||||
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
|
||||
func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
|
||||
func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
|
||||
var pendingPod *v1.Pod
|
||||
|
||||
err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
|
||||
err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true, ctx.Err()
|
||||
@ -1253,10 +1245,10 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podIn
|
||||
return false, err
|
||||
}
|
||||
if len(scheduled) >= wantCount {
|
||||
tb.Logf("scheduling succeed")
|
||||
tCtx.Logf("scheduling succeed")
|
||||
return true, nil
|
||||
}
|
||||
tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
|
||||
tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
|
||||
if len(unscheduled) > 0 {
|
||||
pendingPod = unscheduled[0]
|
||||
} else {
|
||||
@ -1273,7 +1265,7 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podIn
|
||||
|
||||
// waitUntilPodsScheduled blocks until the all pods in the given namespaces are
|
||||
// scheduled.
|
||||
func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
|
||||
func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
|
||||
// If unspecified, default to all known namespaces.
|
||||
if len(namespaces) == 0 {
|
||||
for namespace := range numPodsScheduledPerNamespace {
|
||||
@ -1282,15 +1274,15 @@ func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer core
|
||||
}
|
||||
for _, namespace := range namespaces {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-tCtx.Done():
|
||||
return context.Cause(tCtx)
|
||||
default:
|
||||
}
|
||||
wantCount, ok := numPodsScheduledPerNamespace[namespace]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown namespace %s", namespace)
|
||||
}
|
||||
if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil {
|
||||
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
|
||||
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
|
||||
}
|
||||
}
|
||||
@ -1440,14 +1432,12 @@ func getCustomVolumeFactory(pvTemplate *v1.PersistentVolume) func(id int) *v1.Pe
|
||||
|
||||
// namespacePreparer holds configuration information for the test namespace preparer.
|
||||
type namespacePreparer struct {
|
||||
client clientset.Interface
|
||||
count int
|
||||
prefix string
|
||||
spec *v1.Namespace
|
||||
tb testing.TB
|
||||
}
|
||||
|
||||
func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) {
|
||||
func newNamespacePreparer(tCtx ktesting.TContext, cno *createNamespacesOp) (*namespacePreparer, error) {
|
||||
ns := &v1.Namespace{}
|
||||
if cno.NamespaceTemplatePath != nil {
|
||||
if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
|
||||
@ -1456,11 +1446,9 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface
|
||||
}
|
||||
|
||||
return &namespacePreparer{
|
||||
client: clientset,
|
||||
count: cno.Count,
|
||||
prefix: cno.Prefix,
|
||||
spec: ns,
|
||||
tb: tb,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -1474,17 +1462,17 @@ func (p *namespacePreparer) namespaces() []string {
|
||||
}
|
||||
|
||||
// prepare creates the namespaces.
|
||||
func (p *namespacePreparer) prepare(ctx context.Context) error {
|
||||
func (p *namespacePreparer) prepare(tCtx ktesting.TContext) error {
|
||||
base := &v1.Namespace{}
|
||||
if p.spec != nil {
|
||||
base = p.spec
|
||||
}
|
||||
p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
|
||||
tCtx.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
|
||||
for i := 0; i < p.count; i++ {
|
||||
n := base.DeepCopy()
|
||||
n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
|
||||
if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
|
||||
_, err := p.client.CoreV1().Namespaces().Create(ctx, n, metav1.CreateOptions{})
|
||||
_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, n, metav1.CreateOptions{})
|
||||
return err == nil || apierrors.IsAlreadyExists(err), nil
|
||||
}); err != nil {
|
||||
return err
|
||||
@ -1494,12 +1482,12 @@ func (p *namespacePreparer) prepare(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// cleanup deletes existing test namespaces.
|
||||
func (p *namespacePreparer) cleanup(ctx context.Context) error {
|
||||
func (p *namespacePreparer) cleanup(tCtx ktesting.TContext) error {
|
||||
var errRet error
|
||||
for i := 0; i < p.count; i++ {
|
||||
n := fmt.Sprintf("%s-%d", p.prefix, i)
|
||||
if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil {
|
||||
p.tb.Errorf("Deleting Namespace: %v", err)
|
||||
if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, n, metav1.DeleteOptions{}); err != nil {
|
||||
tCtx.Errorf("Deleting Namespace: %v", err)
|
||||
errRet = err
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package benchmark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
@ -70,16 +69,15 @@ func TestScheduling(t *testing.T) {
|
||||
for _, config := range configs {
|
||||
// Not a sub test because we don't have a good name for it.
|
||||
func() {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
tCtx := ktesting.Init(t)
|
||||
|
||||
// No timeout here because the `go test -timeout` will ensure that
|
||||
// the test doesn't get stuck forever.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
for feature, flag := range config.featureGates {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)()
|
||||
}
|
||||
informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates, nil)
|
||||
informerFactory, tCtx := setupClusterForWorkload(tCtx, config.schedulerConfigPath, config.featureGates, nil)
|
||||
|
||||
for _, tc := range testCases {
|
||||
if !config.equals(tc) {
|
||||
@ -93,8 +91,8 @@ func TestScheduling(t *testing.T) {
|
||||
if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
||||
t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
|
||||
}
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true)
|
||||
tCtx := ktesting.WithTB(tCtx, t)
|
||||
runWorkload(tCtx, tc, w, informerFactory, true)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
@ -18,7 +18,6 @@ package benchmark
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
@ -26,26 +25,22 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"testing"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2"
|
||||
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
@ -53,6 +48,7 @@ import (
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -83,33 +79,34 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
|
||||
// remove resources after finished.
|
||||
// Notes on rate limiter:
|
||||
// - client rate limit is set to 5000.
|
||||
func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
|
||||
func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
|
||||
// Run API server with minimimal logging by default. Can be raised with -v.
|
||||
framework.MinVerbosity = 0
|
||||
|
||||
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
|
||||
|
||||
// Enable DRA API group.
|
||||
if enabledFeatures[features.DynamicResourceAllocation] {
|
||||
opts.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
|
||||
resourcev1alpha2.SchemeGroupVersion.String(): "true",
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
tb.Cleanup(tearDownFn)
|
||||
// No alpha APIs (overrides api/all=true in https://github.com/kubernetes/kubernetes/blob/d647d19f6aef811bace300eec96a67644ff303d4/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go#L136),
|
||||
// except for DRA API group when needed.
|
||||
runtimeConfig := []string{"api/alpha=false"}
|
||||
if enabledFeatures[features.DynamicResourceAllocation] {
|
||||
runtimeConfig = append(runtimeConfig, "resource.k8s.io/v1alpha2=true")
|
||||
}
|
||||
customFlags := []string{
|
||||
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||
"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition,Priority",
|
||||
"--runtime-config=" + strings.Join(runtimeConfig, ","),
|
||||
}
|
||||
server, err := apiservertesting.StartTestServer(tCtx, apiservertesting.NewDefaultTestServerOptions(), customFlags, framework.SharedEtcd())
|
||||
if err != nil {
|
||||
tCtx.Fatalf("start apiserver: %v", err)
|
||||
}
|
||||
tCtx.Cleanup(server.TearDownFn)
|
||||
|
||||
// Cleanup will be in reverse order: first the clients get cancelled,
|
||||
// then the apiserver is torn down.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
tb.Cleanup(cancel)
|
||||
// then the apiserver is torn down via the automatic cancelation of
|
||||
// tCtx.
|
||||
|
||||
// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
|
||||
// support this when there is any testcase that depends on such configuration.
|
||||
cfg := restclient.CopyConfig(kubeConfig)
|
||||
cfg := restclient.CopyConfig(server.ClientConfig)
|
||||
cfg.QPS = 5000.0
|
||||
cfg.Burst = 5000
|
||||
|
||||
@ -118,34 +115,33 @@ func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSch
|
||||
var err error
|
||||
config, err = newDefaultComponentConfig()
|
||||
if err != nil {
|
||||
tb.Fatalf("Error creating default component config: %v", err)
|
||||
tCtx.Fatalf("Error creating default component config: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
client := clientset.NewForConfigOrDie(cfg)
|
||||
dynClient := dynamic.NewForConfigOrDie(cfg)
|
||||
tCtx = ktesting.WithRESTConfig(tCtx, cfg)
|
||||
|
||||
// Not all config options will be effective but only those mostly related with scheduler performance will
|
||||
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
|
||||
_, informerFactory := util.StartScheduler(ctx, client, cfg, config, outOfTreePluginRegistry)
|
||||
util.StartFakePVController(ctx, client, informerFactory)
|
||||
runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory)
|
||||
runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory)
|
||||
_, informerFactory := util.StartScheduler(tCtx, tCtx.Client(), cfg, config, outOfTreePluginRegistry)
|
||||
util.StartFakePVController(tCtx, tCtx.Client(), informerFactory)
|
||||
runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory)
|
||||
runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory)
|
||||
|
||||
runResourceClaimController := func() {}
|
||||
if enabledFeatures[features.DynamicResourceAllocation] {
|
||||
// Testing of DRA with inline resource claims depends on this
|
||||
// controller for creating and removing ResourceClaims.
|
||||
runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory)
|
||||
runResourceClaimController = util.CreateResourceClaimController(tCtx, tCtx, tCtx.Client(), informerFactory)
|
||||
}
|
||||
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
informerFactory.Start(tCtx.Done())
|
||||
informerFactory.WaitForCacheSync(tCtx.Done())
|
||||
go runGC()
|
||||
go runNS()
|
||||
go runResourceClaimController()
|
||||
|
||||
return informerFactory, client, dynClient
|
||||
return informerFactory, tCtx
|
||||
}
|
||||
|
||||
// Returns the list of scheduled and unscheduled pods in the specified namespaces.
|
||||
@ -268,7 +264,7 @@ func newMetricsCollector(config *metricsCollectorConfig, labels map[string]strin
|
||||
}
|
||||
}
|
||||
|
||||
func (*metricsCollector) run(ctx context.Context) {
|
||||
func (*metricsCollector) run(tCtx ktesting.TContext) {
|
||||
// metricCollector doesn't need to start before the tests, so nothing to do here.
|
||||
}
|
||||
|
||||
@ -342,7 +338,6 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri
|
||||
}
|
||||
|
||||
type throughputCollector struct {
|
||||
tb testing.TB
|
||||
podInformer coreinformers.PodInformer
|
||||
schedulingThroughputs []float64
|
||||
labels map[string]string
|
||||
@ -350,9 +345,8 @@ type throughputCollector struct {
|
||||
errorMargin float64
|
||||
}
|
||||
|
||||
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
|
||||
func newThroughputCollector(tb ktesting.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
|
||||
return &throughputCollector{
|
||||
tb: tb,
|
||||
podInformer: podInformer,
|
||||
labels: labels,
|
||||
namespaces: namespaces,
|
||||
@ -360,7 +354,7 @@ func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *throughputCollector) run(ctx context.Context) {
|
||||
func (tc *throughputCollector) run(tCtx ktesting.TContext) {
|
||||
podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
@ -374,7 +368,7 @@ func (tc *throughputCollector) run(ctx context.Context) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-tCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
@ -419,7 +413,7 @@ func (tc *throughputCollector) run(ctx context.Context) {
|
||||
errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
|
||||
if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
|
||||
// This might affect the result, report it.
|
||||
tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
|
||||
tCtx.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
|
||||
}
|
||||
|
||||
// To keep percentiles accurate, we have to record multiple samples with the same
|
||||
|
@ -128,7 +128,7 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
|
||||
return sched, informerFactory
|
||||
}
|
||||
|
||||
func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
|
||||
func CreateResourceClaimController(ctx context.Context, tb ktesting.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
|
||||
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
|
||||
@ -190,7 +190,7 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i
|
||||
// CreateGCController creates a garbage controller and returns a run function
|
||||
// for it. The informer factory needs to be started before invoking that
|
||||
// function.
|
||||
func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restclient.AddUserAgent(&restConfig, "gc-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(&restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(&restConfig)
|
||||
@ -227,7 +227,7 @@ func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclien
|
||||
// CreateNamespaceController creates a namespace controller and returns a run
|
||||
// function for it. The informer factory needs to be started before invoking
|
||||
// that function.
|
||||
func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
func CreateNamespaceController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restclient.AddUserAgent(&restConfig, "namespace-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(&restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(&restConfig)
|
||||
|
Loading…
Reference in New Issue
Block a user