Merge pull request #120534 from pohly/dra-scheduler-ssa-as-fallback

dra scheduler: fall back to SSA for PodSchedulingContext updates
This commit is contained in:
Kubernetes Prow Robot 2023-10-23 21:06:58 +02:00 committed by GitHub
commit 5a4e792e06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 229 additions and 5 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2"
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
@ -187,6 +188,41 @@ func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset
logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
if apierrors.IsConflict(err) {
// We don't use SSA by default for performance reasons
// (https://github.com/kubernetes/kubernetes/issues/113700#issuecomment-1698563918)
// because most of the time an Update doesn't encounter
// a conflict and is faster.
//
// We could return an error here and rely on
// backoff+retry, but scheduling attempts are expensive
// and the backoff delay would cause a (small)
// slowdown. Therefore we fall back to SSA here if needed.
//
// Using SSA instead of Get+Update has the advantage that
// there is no delay for the Get. SSA is safe because only
// the scheduler updates these fields.
spec := resourcev1alpha2apply.PodSchedulingContextSpec()
spec.SelectedNode = p.selectedNode
if p.potentialNodes != nil {
spec.PotentialNodes = *p.potentialNodes
} else {
// Unchanged. Has to be set because the object that we send
// must represent the "fully specified intent". Not sending
// the list would clear it.
spec.PotentialNodes = p.schedulingCtx.Spec.PotentialNodes
}
schedulingCtxApply := resourcev1alpha2apply.PodSchedulingContext(pod.Name, pod.Namespace).WithSpec(spec)
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod), "podSchedulingCtxApply", klog.Format(schedulingCtxApply))
} else {
logger.V(5).Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Apply(ctx, schedulingCtxApply, metav1.ApplyOptions{FieldManager: "kube-scheduler", Force: true})
}
} else {
// Create it.
schedulingCtx := &resourcev1alpha2.PodSchedulingContext{

View File

@ -21,24 +21,33 @@ package scheduler
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/format"
"k8s.io/utils/pointer"
)
@ -611,3 +620,131 @@ func TestNodeEvents(t *testing.T) {
}
}
// TestPodSchedulingContextSSA checks that the dynamicresources plugin falls
// back to SSA successfully when the normal Update call encountered
// a conflict.
//
// This is an integration test because:
// - Unit testing does not cover RBAC rules.
// - Triggering this particular race is harder in E2E testing
// and harder to verify (needs apiserver metrics and there's
// no standard API for those).
func TestPodSchedulingContextSSA(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, true)()
testCtx := testutils.InitTestAPIServer(t, "podschedulingcontext-ssa", nil)
testCtx.DisableEventSink = true
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
// Set up enough objects that the scheduler will start trying to
// schedule the pod and create the PodSchedulingContext.
nodeRes := map[v1.ResourceName]string{
v1.ResourcePods: "32",
v1.ResourceCPU: "30m",
v1.ResourceMemory: "30",
}
for _, name := range []string{"node-a", "node-b"} {
if _, err := testutils.CreateNode(testCtx.ClientSet, st.MakeNode().Name(name).Capacity(nodeRes).Obj()); err != nil {
t.Fatalf("Failed to create node: %v", err)
}
}
defer func() {
if err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
t.Errorf("Unexpected error deleting ResourceClasses: %v", err)
}
}()
class := &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: "my-class",
},
DriverName: "does-not-matter",
}
if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().Create(testCtx.Ctx, class, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create class: %v", err)
}
claim := &resourcev1alpha2.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "my-claim",
Namespace: testCtx.NS.Name,
},
Spec: resourcev1alpha2.ResourceClaimSpec{
ResourceClassName: class.Name,
},
}
if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClaims(claim.Namespace).Create(testCtx.Ctx, claim, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create claim: %v", err)
}
podConf := testutils.PausePodConfig{
Name: "testpod",
Namespace: testCtx.NS.Name,
}
pod := testutils.InitPausePod(&podConf)
podClaimName := "myclaim"
pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}}
pod.Spec.ResourceClaims = []v1.PodResourceClaim{{Name: podClaimName, Source: v1.ClaimSource{ResourceClaimName: &claim.Name}}}
if _, err := testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
// Check that the PodSchedulingContext exists and has a selected node.
var schedulingCtx *resourcev1alpha2.PodSchedulingContext
if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, 30*time.Second, true,
func(context.Context) (bool, error) {
var err error
schedulingCtx, err = testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err == nil && schedulingCtx.Spec.SelectedNode != "" {
return true, nil
}
return false, err
}); err != nil {
t.Fatalf("Failed while waiting for PodSchedulingContext with selected node: %v\nLast PodSchedulingContext:\n%s", err, format.Object(schedulingCtx, 1))
}
// Force the plugin to use SSA.
var podSchedulingContextPatchCounter atomic.Int64
roundTrip := testutils.RoundTripWrapper(func(transport http.RoundTripper, req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/apis/resource.k8s.io/") &&
strings.HasSuffix(req.URL.Path, "/podschedulingcontexts/"+pod.Name) {
switch req.Method {
case http.MethodPut, http.MethodPost:
return &http.Response{
Status: fmt.Sprintf("%d %s", http.StatusConflict, metav1.StatusReasonConflict),
StatusCode: http.StatusConflict,
}, nil
case http.MethodPatch:
podSchedulingContextPatchCounter.Add(1)
}
}
return transport.RoundTrip(req)
})
testCtx.RoundTrip.Store(&roundTrip)
// Now force the scheduler to update the PodSchedulingContext by setting UnsuitableNodes so that
// the selected node is not suitable.
schedulingCtx.Status.ResourceClaims = []resourcev1alpha2.ResourceClaimSchedulingStatus{{
Name: podClaimName,
UnsuitableNodes: []string{schedulingCtx.Spec.SelectedNode},
}}
if _, err := testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).UpdateStatus(testCtx.Ctx, schedulingCtx, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Unexpected PodSchedulingContext status update error: %v", err)
}
// We know that the scheduler has to use SSA because above we inject a conflict
// error whenever it tries to use a plain update. We just need to wait for it...
if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, time.Minute, true,
func(context.Context) (bool, error) {
return podSchedulingContextPatchCounter.Load() > 0, nil
}); err != nil {
t.Fatalf("Failed while waiting for PodSchedulingContext Patch: %v", err)
}
}

View File

@ -21,17 +21,21 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"
"testing"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
@ -45,6 +49,7 @@ import (
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
cliflag "k8s.io/component-base/cli/flag"
pvutil "k8s.io/component-helpers/storage/volume"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
@ -56,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/resourceclaim"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
@ -239,8 +245,15 @@ func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig re
}
}
// TestContext store necessary context info
// TestContext store necessary context info.
// It also contains some optional parameters for InitTestScheduler.
type TestContext struct {
// DisableEventSink, if set to true before calling InitTestScheduler,
// will skip the eventBroadcaster.StartRecordingToSink and thus
// some extra goroutines which are tricky to get rid of after
// a test.
DisableEventSink bool
NS *v1.Namespace
ClientSet clientset.Interface
KubeConfig *restclient.Config
@ -257,8 +270,29 @@ type TestContext struct {
// SchedulerCloseFn will tear down the resources in creating scheduler,
// including the scheduler itself.
SchedulerCloseFn framework.TearDownFunc
// RoundTrip, if set, will be called for every HTTP request going to the apiserver.
// It can be used for error injection.
RoundTrip atomic.Pointer[RoundTripWrapper]
}
type RoundTripWrapper func(http.RoundTripper, *http.Request) (*http.Response, error)
type roundTripWrapper struct {
tc *TestContext
transport http.RoundTripper
}
func (r roundTripWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
wrapper := r.tc.RoundTrip.Load()
if wrapper != nil {
return (*wrapper)(r.transport, req)
}
return r.transport.RoundTrip(req)
}
var _ http.RoundTripper = roundTripWrapper{}
// CleanupNodes cleans all nodes which were created during integration test
func CleanupNodes(cs clientset.Interface, t *testing.T) {
err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), *metav1.NewDeleteOptions(0), metav1.ListOptions{})
@ -468,11 +502,16 @@ func UpdateNodeStatus(cs clientset.Interface, node *v1.Node) error {
func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
testCtx := TestContext{Ctx: ctx}
testCtx := &TestContext{Ctx: ctx}
testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(ctx, t, framework.TestServerSetup{
ModifyServerRunOptions: func(options *options.ServerRunOptions) {
options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
resourcev1alpha2.SchemeGroupVersion.String(): "true",
}
}
},
ModifyServerConfig: func(config *controlplane.Config) {
if admission != nil {
@ -481,6 +520,16 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
},
})
// Support wrapping HTTP requests.
testCtx.KubeConfig.Wrap(func(transport http.RoundTripper) http.RoundTripper {
return roundTripWrapper{tc: testCtx, transport: transport}
})
var err error
testCtx.ClientSet, err = clientset.NewForConfig(testCtx.KubeConfig)
if err != nil {
t.Fatal(err)
}
oldCloseFn := testCtx.CloseFn
testCtx.CloseFn = func() {
cancel()
@ -494,10 +543,10 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
}
t.Cleanup(func() {
CleanupTest(t, &testCtx)
CleanupTest(t, testCtx)
})
return &testCtx
return testCtx
}
// WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete
@ -560,7 +609,9 @@ func InitTestSchedulerWithOptions(
t.Fatalf("Couldn't create scheduler: %v", err)
}
eventBroadcaster.StartRecordingToSink(ctx.Done())
if !testCtx.DisableEventSink {
eventBroadcaster.StartRecordingToSink(ctx.Done())
}
oldCloseFn := testCtx.CloseFn
testCtx.CloseFn = func() {