DRA scheduler: add special ActionType for ResourceClaim changes

Having a dedicated ActionType which only gets used when the scheduler itself
already detects some change in the list of generated ResourceClaims of a pod
avoids calling the DRA plugin for unrelated Pod changes.
This commit is contained in:
Patrick Ohly 2024-09-20 16:54:53 +02:00
parent d425353c13
commit aee77bfc84
9 changed files with 366 additions and 99 deletions

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
) )
@ -65,6 +66,8 @@ var (
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"} PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed. // PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"} PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
// PodGeneratedResourceClaimChange is the event when a pod's list of generated ResourceClaims changes.
PodGeneratedResourceClaimChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodGeneratedResourceClaim, Label: "PodGeneratedResourceClaimChange"}
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. // NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"} NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
// NodeAllocatableChange is the event when node allocatable is changed. // NodeAllocatableChange is the event when node allocatable is changed.
@ -152,6 +155,9 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
extractPodSchedulingGateEliminatedChange, extractPodSchedulingGateEliminatedChange,
extractPodTolerationChange, extractPodTolerationChange,
} }
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
podChangeExtracters = append(podChangeExtracters, extractPodGeneratedResourceClaimChange)
}
for _, fn := range podChangeExtracters { for _, fn := range podChangeExtracters {
if event := fn(newPod, oldPod); event != nil { if event := fn(newPod, oldPod); event != nil {
@ -222,6 +228,14 @@ func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *C
return nil return nil
} }
func extractPodGeneratedResourceClaimChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if !resourceclaim.PodStatusEqual(newPod.Status.ResourceClaimStatuses, oldPod.Status.ResourceClaimStatuses) {
return &PodGeneratedResourceClaimChange
}
return nil
}
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s). // NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) { func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
nodeChangeExtracters := []nodeChangeExtractor{ nodeChangeExtracters := []nodeChangeExtractor{

View File

@ -24,7 +24,11 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/utils/ptr"
) )
func TestNodeAllocatableChange(t *testing.T) { func TestNodeAllocatableChange(t *testing.T) {
@ -334,11 +338,20 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
}, },
}, },
} }
claimStatusA := v1.PodResourceClaimStatus{
Name: "my-claim",
ResourceClaimName: ptr.To("claim"),
}
claimStatusB := v1.PodResourceClaimStatus{
Name: "my-claim-2",
ResourceClaimName: ptr.To("claim-2"),
}
tests := []struct { tests := []struct {
name string name string
newPod *v1.Pod newPod *v1.Pod
oldPod *v1.Pod oldPod *v1.Pod
want []ClusterEvent draDisabled bool
want []ClusterEvent
}{ }{
{ {
name: "only label is updated", name: "only label is updated",
@ -388,9 +401,35 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
oldPod: st.MakePod().Toleration("key").Obj(), oldPod: st.MakePod().Toleration("key").Obj(),
want: []ClusterEvent{PodTolerationChange}, want: []ClusterEvent{PodTolerationChange},
}, },
{
name: "pod claim statuses change, feature disabled",
draDisabled: true,
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
oldPod: st.MakePod().Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "pod claim statuses change, feature enabled",
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
oldPod: st.MakePod().Obj(),
want: []ClusterEvent{PodGeneratedResourceClaimChange},
},
{
name: "pod claim statuses swapped",
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusB, claimStatusA).Obj(),
want: []ClusterEvent{PodGeneratedResourceClaimChange},
},
{
name: "pod claim statuses extended",
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
want: []ClusterEvent{PodGeneratedResourceClaimChange},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, !tt.draDisabled)
got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod) got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod)
if diff := cmp.Diff(tt.want, got); diff != "" { if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff) t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff)

View File

@ -400,7 +400,7 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
// Adding the ResourceClaim name to the pod status makes pods waiting for their ResourceClaim schedulable. // Adding the ResourceClaim name to the pod status makes pods waiting for their ResourceClaim schedulable.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodGeneratedResourceClaim}, QueueingHintFn: pl.isSchedulableAfterPodChange},
// A pod might be waiting for a class to get created or modified. // A pod might be waiting for a class to get created or modified.
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available. // Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.

View File

@ -95,6 +95,11 @@ var (
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}). PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, ResourceClaimName: &claimName2}). PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, ResourceClaimName: &claimName2}).
Obj() Obj()
podWithTwoClaimTemplates = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimTemplateName: &claimName}).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, ResourceClaimTemplateName: &claimName}).
Obj()
// Node with "instance-1" device and no device attributes. // Node with "instance-1" device and no device attributes.
workerNode = &st.MakeNode().Name(nodeName).Label("kubernetes.io/hostname", nodeName).Node workerNode = &st.MakeNode().Name(nodeName).Label("kubernetes.io/hostname", nodeName).Node
@ -1356,19 +1361,19 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
pod *v1.Pod pod *v1.Pod
claims []*resourceapi.ResourceClaim claims []*resourceapi.ResourceClaim
oldObj, newObj interface{} oldObj, newObj interface{}
expectedHint framework.QueueingHint wantHint framework.QueueingHint
expectedErr bool wantErr bool
}{ }{
"skip-deletes": { "skip-deletes": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: allocatedClaim, oldObj: allocatedClaim,
newObj: nil, newObj: nil,
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"backoff-wrong-new-object": { "backoff-wrong-new-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
newObj: "not-a-claim", newObj: "not-a-claim",
expectedErr: true, wantErr: true,
}, },
"skip-wrong-claim": { "skip-wrong-claim": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -1377,7 +1382,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
claim.OwnerReferences[0].UID += "123" claim.OwnerReferences[0].UID += "123"
return claim return claim
}(), }(),
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"skip-unrelated-claim": { "skip-unrelated-claim": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -1388,19 +1393,19 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
claim.UID += "123" claim.UID += "123"
return claim return claim
}(), }(),
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"queue-on-add": { "queue-on-add": {
pod: podWithClaimName, pod: podWithClaimName,
newObj: pendingClaim, newObj: pendingClaim,
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourceapi.ResourceClaim{pendingClaim}, claims: []*resourceapi.ResourceClaim{pendingClaim},
oldObj: "not-a-claim", oldObj: "not-a-claim",
newObj: pendingClaim, newObj: pendingClaim,
expectedErr: true, wantErr: true,
}, },
"skip-adding-finalizer": { "skip-adding-finalizer": {
pod: podWithClaimName, pod: podWithClaimName,
@ -1411,7 +1416,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
claim.Finalizers = append(claim.Finalizers, "foo") claim.Finalizers = append(claim.Finalizers, "foo")
return claim return claim
}(), }(),
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"queue-on-status-change": { "queue-on-status-change": {
pod: podWithClaimName, pod: podWithClaimName,
@ -1422,7 +1427,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
claim.Status.Allocation = &resourceapi.AllocationResult{} claim.Status.Allocation = &resourceapi.AllocationResult{}
return claim return claim
}(), }(),
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
"structured-claim-deallocate": { "structured-claim-deallocate": {
pod: podWithClaimName, pod: podWithClaimName,
@ -1435,7 +1440,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}(), }(),
// TODO (https://github.com/kubernetes/kubernetes/issues/123697): don't wake up // TODO (https://github.com/kubernetes/kubernetes/issues/123697): don't wake up
// claims not using structured parameters. // claims not using structured parameters.
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
} }
@ -1456,18 +1461,24 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
createClaim := claim.DeepCopy() createClaim := claim.DeepCopy()
createClaim.UID = "" createClaim.UID = ""
storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{}) storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{})
require.NoError(t, err, "create claim") if err != nil {
t.Fatalf("create claim: expected no error, got: %v", err)
}
claim = storedClaim claim = storedClaim
} else { } else {
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
require.NoError(t, err, "retrieve old claim") if err != nil {
t.Fatalf("retrieve old claim: expected no error, got: %v", err)
}
updateClaim := claim.DeepCopy() updateClaim := claim.DeepCopy()
// The test claim doesn't have those (generated dynamically), so copy them. // The test claim doesn't have those (generated dynamically), so copy them.
updateClaim.UID = cachedClaim.(*resourceapi.ResourceClaim).UID updateClaim.UID = cachedClaim.(*resourceapi.ResourceClaim).UID
updateClaim.ResourceVersion = cachedClaim.(*resourceapi.ResourceClaim).ResourceVersion updateClaim.ResourceVersion = cachedClaim.(*resourceapi.ResourceClaim).ResourceVersion
storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{}) storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{})
require.NoError(t, err, "update claim") if err != nil {
t.Fatalf("update claim: expected no error, got: %v", err)
}
claim = storedClaim claim = storedClaim
} }
@ -1485,14 +1496,98 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
// isSchedulableAfterClaimChange. // isSchedulableAfterClaimChange.
newObj = claim newObj = claim
} }
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj) gotHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj)
if tc.expectedErr { if tc.wantErr {
require.Error(t, err) if err == nil {
t.Fatal("expected an error, got none")
}
return return
} }
require.NoError(t, err) if err != nil {
require.Equal(t, tc.expectedHint, actualHint) t.Fatalf("expected no error, got: %v", err)
}
if tc.wantHint != gotHint {
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
}
})
}
}
func Test_isSchedulableAfterPodChange(t *testing.T) {
testcases := map[string]struct {
objs []apiruntime.Object
pod *v1.Pod
claims []*resourceapi.ResourceClaim
obj interface{}
wantHint framework.QueueingHint
wantErr bool
}{
"backoff-wrong-new-object": {
pod: podWithClaimTemplate,
obj: "not-a-claim",
wantErr: true,
},
"complete": {
objs: []apiruntime.Object{pendingClaim},
pod: podWithClaimTemplate,
obj: podWithClaimTemplateInStatus,
wantHint: framework.Queue,
},
"wrong-pod": {
objs: []apiruntime.Object{pendingClaim},
pod: func() *v1.Pod {
pod := podWithClaimTemplate.DeepCopy()
pod.Name += "2"
pod.UID += "2" // This is the relevant difference.
return pod
}(),
obj: podWithClaimTemplateInStatus,
wantHint: framework.QueueSkip,
},
"missing-claim": {
objs: nil,
pod: podWithClaimTemplate,
obj: podWithClaimTemplateInStatus,
wantHint: framework.QueueSkip,
},
"incomplete": {
objs: []apiruntime.Object{pendingClaim},
pod: podWithTwoClaimTemplates,
obj: func() *v1.Pod {
pod := podWithTwoClaimTemplates.DeepCopy()
// Only one of two claims created.
pod.Status.ResourceClaimStatuses = []v1.PodResourceClaimStatus{{
Name: pod.Spec.ResourceClaims[0].Name,
ResourceClaimName: &claimName,
}}
return pod
}(),
wantHint: framework.QueueSkip,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
features := feature.Features{
EnableDynamicResourceAllocation: true,
}
testCtx := setup(t, nil, tc.claims, nil, nil, tc.objs, features)
gotHint, err := testCtx.p.isSchedulableAfterPodChange(logger, tc.pod, nil, tc.obj)
if tc.wantErr {
if err == nil {
t.Fatal("expected an error, got none")
}
return
}
if err != nil {
t.Fatalf("expected no error, got: %v", err)
}
if tc.wantHint != gotHint {
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
}
}) })
} }
} }
@ -1503,34 +1598,34 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
schedulings []*resourceapi.PodSchedulingContext schedulings []*resourceapi.PodSchedulingContext
claims []*resourceapi.ResourceClaim claims []*resourceapi.ResourceClaim
oldObj, newObj interface{} oldObj, newObj interface{}
expectedHint framework.QueueingHint wantHint framework.QueueingHint
expectedErr bool wantErr bool
}{ }{
"skip-deleted": { "skip-deleted": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: scheduling, oldObj: scheduling,
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"skip-missed-deleted": { "skip-missed-deleted": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: cache.DeletedFinalStateUnknown{ oldObj: cache.DeletedFinalStateUnknown{
Obj: scheduling, Obj: scheduling,
}, },
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: "not-a-scheduling-context", oldObj: "not-a-scheduling-context",
newObj: scheduling, newObj: scheduling,
expectedErr: true, wantErr: true,
}, },
"backoff-missed-wrong-old-object": { "backoff-missed-wrong-old-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: cache.DeletedFinalStateUnknown{ oldObj: cache.DeletedFinalStateUnknown{
Obj: "not-a-scheduling-context", Obj: "not-a-scheduling-context",
}, },
newObj: scheduling, newObj: scheduling,
expectedErr: true, wantErr: true,
}, },
"skip-unrelated-object": { "skip-unrelated-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -1540,33 +1635,33 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
scheduling.Name += "-foo" scheduling.Name += "-foo"
return scheduling return scheduling
}(), }(),
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"backoff-wrong-new-object": { "backoff-wrong-new-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: scheduling, oldObj: scheduling,
newObj: "not-a-scheduling-context", newObj: "not-a-scheduling-context",
expectedErr: true, wantErr: true,
}, },
"skip-missing-claim": { "skip-missing-claim": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: scheduling, oldObj: scheduling,
newObj: schedulingInfo, newObj: schedulingInfo,
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"skip-missing-infos": { "skip-missing-infos": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
claims: []*resourceapi.ResourceClaim{pendingClaim}, claims: []*resourceapi.ResourceClaim{pendingClaim},
oldObj: scheduling, oldObj: scheduling,
newObj: scheduling, newObj: scheduling,
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"queue-new-infos": { "queue-new-infos": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
claims: []*resourceapi.ResourceClaim{pendingClaim}, claims: []*resourceapi.ResourceClaim{pendingClaim},
oldObj: scheduling, oldObj: scheduling,
newObj: schedulingInfo, newObj: schedulingInfo,
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
"queue-bad-selected-node": { "queue-bad-selected-node": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
@ -1582,7 +1677,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode) scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode)
return scheduling return scheduling
}(), }(),
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
"skip-spec-changes": { "skip-spec-changes": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
@ -1593,7 +1688,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
scheduling.Spec.SelectedNode = workerNode.Name scheduling.Spec.SelectedNode = workerNode.Name
return scheduling return scheduling
}(), }(),
expectedHint: framework.QueueSkip, wantHint: framework.QueueSkip,
}, },
"backoff-other-changes": { "backoff-other-changes": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
@ -1604,7 +1699,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
scheduling.Finalizers = append(scheduling.Finalizers, "foo") scheduling.Finalizers = append(scheduling.Finalizers, "foo")
return scheduling return scheduling
}(), }(),
expectedHint: framework.Queue, wantHint: framework.Queue,
}, },
} }
@ -1618,14 +1713,20 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
EnableDRAControlPlaneController: true, EnableDRAControlPlaneController: true,
} }
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil, features) testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil, features)
actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj) gotHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr { if tc.wantErr {
require.Error(t, err) if err == nil {
t.Fatal("expected an error, got none")
}
return return
} }
require.NoError(t, err) if err != nil {
require.Equal(t, tc.expectedHint, actualHint) t.Fatalf("expected no error, got: %v", err)
}
if tc.wantHint != gotHint {
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
}
}) })
} }
} }

View File

@ -73,6 +73,9 @@ const (
UpdatePodTolerations UpdatePodTolerations
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod. // UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
UpdatePodSchedulingGatesEliminated UpdatePodSchedulingGatesEliminated
// UpdatePodGeneratedResourceClaim is an update of the list of ResourceClaims generated for the pod.
// Depends on the DynamicResourceAllocation feature gate.
UpdatePodGeneratedResourceClaim
// updatePodOther is a update for pod's other fields. // updatePodOther is a update for pod's other fields.
// It's used only for the internal event handling, and thus unexported. // It's used only for the internal event handling, and thus unexported.
@ -81,7 +84,7 @@ const (
All ActionType = 1<<iota - 1 All ActionType = 1<<iota - 1
// Use the general Update type if you don't either know or care the specific sub-Update type to use. // Use the general Update type if you don't either know or care the specific sub-Update type to use.
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | UpdatePodGeneratedResourceClaim | updatePodOther
) )
// GVK is short for group/version/kind, which can uniquely represent a particular API resource. // GVK is short for group/version/kind, which can uniquely represent a particular API resource.

View File

@ -292,6 +292,12 @@ func (p *PodWrapper) PodResourceClaims(podResourceClaims ...v1.PodResourceClaim)
return p return p
} }
// PodResourceClaims appends claim statuses into PodSpec of the inner pod.
func (p *PodWrapper) ResourceClaimStatuses(resourceClaimStatuses ...v1.PodResourceClaimStatus) *PodWrapper {
p.Status.ResourceClaimStatuses = append(p.Status.ResourceClaimStatuses, resourceClaimStatuses...)
return p
}
// Priority sets a priority value into PodSpec of the inner pod. // Priority sets a priority value into PodSpec of the inner pod.
func (p *PodWrapper) Priority(val int32) *PodWrapper { func (p *PodWrapper) Priority(val int32) *PodWrapper {
p.Spec.Priority = &val p.Spec.Priority = &val

View File

@ -29,6 +29,7 @@ import (
resourceinformers "k8s.io/client-go/informers/resource/v1alpha3" resourceinformers "k8s.io/client-go/informers/resource/v1alpha3"
storageinformers "k8s.io/client-go/informers/storage/v1" storageinformers "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/resourceclaim"
) )
type graphPopulator struct { type graphPopulator struct {
@ -94,7 +95,7 @@ func (g *graphPopulator) updatePod(oldObj, obj interface{}) {
} }
if oldPod, ok := oldObj.(*corev1.Pod); ok && oldPod != nil { if oldPod, ok := oldObj.(*corev1.Pod); ok && oldPod != nil {
if (pod.Spec.NodeName == oldPod.Spec.NodeName) && (pod.UID == oldPod.UID) && if (pod.Spec.NodeName == oldPod.Spec.NodeName) && (pod.UID == oldPod.UID) &&
resourceClaimStatusesEqual(oldPod.Status.ResourceClaimStatuses, pod.Status.ResourceClaimStatuses) { resourceclaim.PodStatusEqual(oldPod.Status.ResourceClaimStatuses, pod.Status.ResourceClaimStatuses) {
// Node and uid are unchanged, all object references in the pod spec are immutable respectively unmodified (claim statuses). // Node and uid are unchanged, all object references in the pod spec are immutable respectively unmodified (claim statuses).
klog.V(5).Infof("updatePod %s/%s, node unchanged", pod.Namespace, pod.Name) klog.V(5).Infof("updatePod %s/%s, node unchanged", pod.Namespace, pod.Name)
return return
@ -107,29 +108,6 @@ func (g *graphPopulator) updatePod(oldObj, obj interface{}) {
klog.V(5).Infof("updatePod %s/%s for node %s completed in %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(startTime)) klog.V(5).Infof("updatePod %s/%s for node %s completed in %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(startTime))
} }
func resourceClaimStatusesEqual(statusA, statusB []corev1.PodResourceClaimStatus) bool {
if len(statusA) != len(statusB) {
return false
}
// In most cases, status entries only get added once and not modified.
// But this cannot be guaranteed, so for the sake of correctness in all
// cases this code here has to check.
for i := range statusA {
if statusA[i].Name != statusB[i].Name {
return false
}
claimNameA := statusA[i].ResourceClaimName
claimNameB := statusB[i].ResourceClaimName
if (claimNameA == nil) != (claimNameB == nil) {
return false
}
if claimNameA != nil && *claimNameA != *claimNameB {
return false
}
}
return true
}
func (g *graphPopulator) deletePod(obj interface{}) { func (g *graphPopulator) deletePod(obj interface{}) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj obj = tombstone.Obj

View File

@ -0,0 +1,48 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceclaim
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
)
// PodStatusEqual checks that both slices have the same number
// of entries and that the pairs of entries are semantically
// equivalent.
//
// The order of the entries matters: two slices with semantically
// equivalent entries in different order are not equal. This is
// done for the sake of performance because typically the
// order of entries doesn't change.
func PodStatusEqual(statusA, statusB []corev1.PodResourceClaimStatus) bool {
if len(statusA) != len(statusB) {
return false
}
// In most cases, status entries only get added once and not modified.
// But this cannot be guaranteed, so for the sake of correctness in all
// cases this code here has to check.
for i := range statusA {
if statusA[i].Name != statusB[i].Name {
return false
}
if !ptr.Equal(statusA[i].ResourceClaimName, statusB[i].ResourceClaimName) {
return false
}
}
return true
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceclaim
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
)
func TestPodStatusEqual(t *testing.T) {
a := corev1.PodResourceClaimStatus{
Name: "a",
ResourceClaimName: ptr.To("x"),
}
b := corev1.PodResourceClaimStatus{
Name: "b",
ResourceClaimName: ptr.To("y"),
}
empty := corev1.PodResourceClaimStatus{}
testcases := map[string]struct {
sliceA, sliceB []corev1.PodResourceClaimStatus
expectEqual bool
}{
"identical": {
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
sliceB: []corev1.PodResourceClaimStatus{a, b, empty},
expectEqual: true,
},
"different-order": {
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
sliceB: []corev1.PodResourceClaimStatus{empty, a, b},
expectEqual: false,
},
"different-length": {
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
sliceB: []corev1.PodResourceClaimStatus{a, b},
expectEqual: false,
},
"semantically-equal-empty": {
sliceA: []corev1.PodResourceClaimStatus{},
sliceB: nil,
expectEqual: true,
},
"semantically-equivalent": {
sliceA: []corev1.PodResourceClaimStatus{{Name: "a", ResourceClaimName: ptr.To("x")}},
sliceB: []corev1.PodResourceClaimStatus{{Name: "a", ResourceClaimName: ptr.To("x")}},
expectEqual: true,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
assert.True(t, PodStatusEqual(tc.sliceA, tc.sliceA), fmt.Sprintf("%v", tc.sliceA))
assert.True(t, PodStatusEqual(tc.sliceB, tc.sliceB), fmt.Sprintf("%v", tc.sliceB))
assert.Equal(t, tc.expectEqual, PodStatusEqual(tc.sliceA, tc.sliceB), fmt.Sprintf("%v and %v", tc.sliceA, tc.sliceB))
})
}
}