From 7cac1dcf6789aaa9b4ef25d0d894bdba7a4cf9ad Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 6 Sep 2023 19:44:29 +0200 Subject: [PATCH] dra scheduler: fall back to SSA for PodSchedulingContext updates During scheduler_perf testing, roughly 10% of the PodSchedulingContext update operations failed with a conflict error. Using SSA would avoid that, but performance measurements showed that this causes a considerable slowdown (primarily because of the slower encoding with JSON instead of protobuf, but also because server-side processing is more expensive). Therefore a normal update is tried first and SSA only gets used when there has been a conflict. Using SSA in that case instead of giving up outright is better because it avoids another scheduling attempt. --- .../dynamicresources/dynamicresources.go | 36 +++++ test/integration/scheduler/scheduler_test.go | 137 ++++++++++++++++++ test/integration/util/util.go | 61 +++++++- 3 files changed, 229 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 2c0541b1edd..c33518058bd 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2" "k8s.io/client-go/kubernetes" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" @@ -187,6 +188,41 @@ func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx)) } _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{}) + if apierrors.IsConflict(err) { + // We don't use SSA by default for performance reasons + // (https://github.com/kubernetes/kubernetes/issues/113700#issuecomment-1698563918) + // because most of the time an Update doesn't encounter + // a conflict and is faster. + // + // We could return an error here and rely on + // backoff+retry, but scheduling attempts are expensive + // and the backoff delay would cause a (small) + // slowdown. Therefore we fall back to SSA here if needed. + // + // Using SSA instead of Get+Update has the advantage that + // there is no delay for the Get. SSA is safe because only + // the scheduler updates these fields. + spec := resourcev1alpha2apply.PodSchedulingContextSpec() + spec.SelectedNode = p.selectedNode + if p.potentialNodes != nil { + spec.PotentialNodes = *p.potentialNodes + } else { + // Unchanged. Has to be set because the object that we send + // must represent the "fully specified intent". Not sending + // the list would clear it. + spec.PotentialNodes = p.schedulingCtx.Spec.PotentialNodes + } + schedulingCtxApply := resourcev1alpha2apply.PodSchedulingContext(pod.Name, pod.Namespace).WithSpec(spec) + + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod), "podSchedulingCtxApply", klog.Format(schedulingCtxApply)) + } else { + logger.V(5).Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod)) + } + _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Apply(ctx, schedulingCtxApply, metav1.ApplyOptions{FieldManager: "kube-scheduler", Force: true}) + } + } else { // Create it. schedulingCtx := &resourcev1alpha2.PodSchedulingContext{ diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index bb2d3851aa4..66fe6460470 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -21,24 +21,33 @@ package scheduler import ( "context" "fmt" + "net/http" + "strings" + "sync/atomic" "testing" "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" + "k8s.io/kubernetes/test/utils/format" "k8s.io/utils/pointer" ) @@ -611,3 +620,131 @@ func TestNodeEvents(t *testing.T) { } } + +// TestPodSchedulingContextSSA checks that the dynamicresources plugin falls +// back to SSA successfully when the normal Update call encountered +// a conflict. +// +// This is an integration test because: +// - Unit testing does not cover RBAC rules. +// - Triggering this particular race is harder in E2E testing +// and harder to verify (needs apiserver metrics and there's +// no standard API for those). +func TestPodSchedulingContextSSA(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, true)() + + testCtx := testutils.InitTestAPIServer(t, "podschedulingcontext-ssa", nil) + testCtx.DisableEventSink = true + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0) + testutils.SyncSchedulerInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.SchedulerCtx) + + // Set up enough objects that the scheduler will start trying to + // schedule the pod and create the PodSchedulingContext. + nodeRes := map[v1.ResourceName]string{ + v1.ResourcePods: "32", + v1.ResourceCPU: "30m", + v1.ResourceMemory: "30", + } + for _, name := range []string{"node-a", "node-b"} { + if _, err := testutils.CreateNode(testCtx.ClientSet, st.MakeNode().Name(name).Capacity(nodeRes).Obj()); err != nil { + t.Fatalf("Failed to create node: %v", err) + } + } + + defer func() { + if err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil { + t.Errorf("Unexpected error deleting ResourceClasses: %v", err) + } + }() + class := &resourcev1alpha2.ResourceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-class", + }, + DriverName: "does-not-matter", + } + if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().Create(testCtx.Ctx, class, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create class: %v", err) + } + + claim := &resourcev1alpha2.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-claim", + Namespace: testCtx.NS.Name, + }, + Spec: resourcev1alpha2.ResourceClaimSpec{ + ResourceClassName: class.Name, + }, + } + if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClaims(claim.Namespace).Create(testCtx.Ctx, claim, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create claim: %v", err) + } + + podConf := testutils.PausePodConfig{ + Name: "testpod", + Namespace: testCtx.NS.Name, + } + pod := testutils.InitPausePod(&podConf) + podClaimName := "myclaim" + pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}} + pod.Spec.ResourceClaims = []v1.PodResourceClaim{{Name: podClaimName, Source: v1.ClaimSource{ResourceClaimName: &claim.Name}}} + if _, err := testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + + // Check that the PodSchedulingContext exists and has a selected node. + var schedulingCtx *resourcev1alpha2.PodSchedulingContext + if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, 30*time.Second, true, + func(context.Context) (bool, error) { + var err error + schedulingCtx, err = testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, nil + } + if err == nil && schedulingCtx.Spec.SelectedNode != "" { + return true, nil + } + return false, err + }); err != nil { + t.Fatalf("Failed while waiting for PodSchedulingContext with selected node: %v\nLast PodSchedulingContext:\n%s", err, format.Object(schedulingCtx, 1)) + } + + // Force the plugin to use SSA. + var podSchedulingContextPatchCounter atomic.Int64 + roundTrip := testutils.RoundTripWrapper(func(transport http.RoundTripper, req *http.Request) (*http.Response, error) { + if strings.HasPrefix(req.URL.Path, "/apis/resource.k8s.io/") && + strings.HasSuffix(req.URL.Path, "/podschedulingcontexts/"+pod.Name) { + switch req.Method { + case http.MethodPut, http.MethodPost: + return &http.Response{ + Status: fmt.Sprintf("%d %s", http.StatusConflict, metav1.StatusReasonConflict), + StatusCode: http.StatusConflict, + }, nil + case http.MethodPatch: + podSchedulingContextPatchCounter.Add(1) + } + } + return transport.RoundTrip(req) + }) + testCtx.RoundTrip.Store(&roundTrip) + + // Now force the scheduler to update the PodSchedulingContext by setting UnsuitableNodes so that + // the selected node is not suitable. + schedulingCtx.Status.ResourceClaims = []resourcev1alpha2.ResourceClaimSchedulingStatus{{ + Name: podClaimName, + UnsuitableNodes: []string{schedulingCtx.Spec.SelectedNode}, + }} + + if _, err := testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).UpdateStatus(testCtx.Ctx, schedulingCtx, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Unexpected PodSchedulingContext status update error: %v", err) + } + + // We know that the scheduler has to use SSA because above we inject a conflict + // error whenever it tries to use a plain update. We just need to wait for it... + if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, time.Minute, true, + func(context.Context) (bool, error) { + return podSchedulingContextPatchCounter.Load() > 0, nil + }); err != nil { + t.Fatalf("Failed while waiting for PodSchedulingContext Patch: %v", err) + } +} diff --git a/test/integration/util/util.go b/test/integration/util/util.go index e8d860b8ada..445a9bf727b 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -21,17 +21,21 @@ import ( "encoding/json" "errors" "fmt" + "net/http" + "sync/atomic" "testing" "time" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" + utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" @@ -45,6 +49,7 @@ import ( "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + cliflag "k8s.io/component-base/cli/flag" pvutil "k8s.io/component-helpers/storage/volume" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" @@ -56,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" @@ -239,8 +245,15 @@ func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig re } } -// TestContext store necessary context info +// TestContext store necessary context info. +// It also contains some optional parameters for InitTestScheduler. type TestContext struct { + // DisableEventSink, if set to true before calling InitTestScheduler, + // will skip the eventBroadcaster.StartRecordingToSink and thus + // some extra goroutines which are tricky to get rid of after + // a test. + DisableEventSink bool + NS *v1.Namespace ClientSet clientset.Interface KubeConfig *restclient.Config @@ -257,8 +270,29 @@ type TestContext struct { // SchedulerCloseFn will tear down the resources in creating scheduler, // including the scheduler itself. SchedulerCloseFn framework.TearDownFunc + + // RoundTrip, if set, will be called for every HTTP request going to the apiserver. + // It can be used for error injection. + RoundTrip atomic.Pointer[RoundTripWrapper] } +type RoundTripWrapper func(http.RoundTripper, *http.Request) (*http.Response, error) + +type roundTripWrapper struct { + tc *TestContext + transport http.RoundTripper +} + +func (r roundTripWrapper) RoundTrip(req *http.Request) (*http.Response, error) { + wrapper := r.tc.RoundTrip.Load() + if wrapper != nil { + return (*wrapper)(r.transport, req) + } + return r.transport.RoundTrip(req) +} + +var _ http.RoundTripper = roundTripWrapper{} + // CleanupNodes cleans all nodes which were created during integration test func CleanupNodes(cs clientset.Interface, t *testing.T) { err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), *metav1.NewDeleteOptions(0), metav1.ListOptions{}) @@ -468,11 +502,16 @@ func UpdateNodeStatus(cs clientset.Interface, node *v1.Node) error { func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) - testCtx := TestContext{Ctx: ctx} + testCtx := &TestContext{Ctx: ctx} testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(ctx, t, framework.TestServerSetup{ ModifyServerRunOptions: func(options *options.ServerRunOptions) { options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"} + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{ + resourcev1alpha2.SchemeGroupVersion.String(): "true", + } + } }, ModifyServerConfig: func(config *controlplane.Config) { if admission != nil { @@ -481,6 +520,16 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf }, }) + // Support wrapping HTTP requests. + testCtx.KubeConfig.Wrap(func(transport http.RoundTripper) http.RoundTripper { + return roundTripWrapper{tc: testCtx, transport: transport} + }) + var err error + testCtx.ClientSet, err = clientset.NewForConfig(testCtx.KubeConfig) + if err != nil { + t.Fatal(err) + } + oldCloseFn := testCtx.CloseFn testCtx.CloseFn = func() { cancel() @@ -494,10 +543,10 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf } t.Cleanup(func() { - CleanupTest(t, &testCtx) + CleanupTest(t, testCtx) }) - return &testCtx + return testCtx } // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete @@ -560,7 +609,9 @@ func InitTestSchedulerWithOptions( t.Fatalf("Couldn't create scheduler: %v", err) } - eventBroadcaster.StartRecordingToSink(ctx.Done()) + if !testCtx.DisableEventSink { + eventBroadcaster.StartRecordingToSink(ctx.Done()) + } oldCloseFn := testCtx.CloseFn testCtx.CloseFn = func() {