mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #127497 from pohly/dra-scheduler-queueing-hints-fix
DRA scheduler: fix queuing hint support
This commit is contained in:
commit
67cdc26214
@ -20,6 +20,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
||||
"k8s.io/kubernetes/pkg/api/v1/resource"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
@ -65,6 +66,8 @@ var (
|
||||
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
|
||||
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
|
||||
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
|
||||
// PodGeneratedResourceClaimChange is the event when a pod's list of generated ResourceClaims changes.
|
||||
PodGeneratedResourceClaimChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodGeneratedResourceClaim, Label: "PodGeneratedResourceClaimChange"}
|
||||
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
|
||||
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
|
||||
// NodeAllocatableChange is the event when node allocatable is changed.
|
||||
@ -152,6 +155,9 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
|
||||
extractPodSchedulingGateEliminatedChange,
|
||||
extractPodTolerationChange,
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
|
||||
podChangeExtracters = append(podChangeExtracters, extractPodGeneratedResourceClaimChange)
|
||||
}
|
||||
|
||||
for _, fn := range podChangeExtracters {
|
||||
if event := fn(newPod, oldPod); event != nil {
|
||||
@ -222,6 +228,14 @@ func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *C
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractPodGeneratedResourceClaimChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
|
||||
if !resourceclaim.PodStatusEqual(newPod.Status.ResourceClaimStatuses, oldPod.Status.ResourceClaimStatuses) {
|
||||
return &PodGeneratedResourceClaimChange
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
|
||||
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
|
||||
nodeChangeExtracters := []nodeChangeExtractor{
|
||||
|
@ -25,7 +25,11 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestNodeAllocatableChange(t *testing.T) {
|
||||
@ -335,11 +339,20 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
claimStatusA := v1.PodResourceClaimStatus{
|
||||
Name: "my-claim",
|
||||
ResourceClaimName: ptr.To("claim"),
|
||||
}
|
||||
claimStatusB := v1.PodResourceClaimStatus{
|
||||
Name: "my-claim-2",
|
||||
ResourceClaimName: ptr.To("claim-2"),
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
newPod *v1.Pod
|
||||
oldPod *v1.Pod
|
||||
want []ClusterEvent
|
||||
name string
|
||||
newPod *v1.Pod
|
||||
oldPod *v1.Pod
|
||||
draDisabled bool
|
||||
want []ClusterEvent
|
||||
}{
|
||||
{
|
||||
name: "only label is updated",
|
||||
@ -389,9 +402,35 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
|
||||
oldPod: st.MakePod().Toleration("key").Obj(),
|
||||
want: []ClusterEvent{PodTolerationChange},
|
||||
},
|
||||
{
|
||||
name: "pod claim statuses change, feature disabled",
|
||||
draDisabled: true,
|
||||
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
|
||||
oldPod: st.MakePod().Obj(),
|
||||
want: []ClusterEvent{assignedPodOtherUpdate},
|
||||
},
|
||||
{
|
||||
name: "pod claim statuses change, feature enabled",
|
||||
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
|
||||
oldPod: st.MakePod().Obj(),
|
||||
want: []ClusterEvent{PodGeneratedResourceClaimChange},
|
||||
},
|
||||
{
|
||||
name: "pod claim statuses swapped",
|
||||
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
|
||||
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusB, claimStatusA).Obj(),
|
||||
want: []ClusterEvent{PodGeneratedResourceClaimChange},
|
||||
},
|
||||
{
|
||||
name: "pod claim statuses extended",
|
||||
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
|
||||
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
|
||||
want: []ClusterEvent{PodGeneratedResourceClaimChange},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, !tt.draDisabled)
|
||||
got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod)
|
||||
if diff := cmp.Diff(tt.want, got); diff != "" {
|
||||
t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff)
|
||||
|
@ -399,6 +399,8 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}},
|
||||
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
|
||||
// Adding the ResourceClaim name to the pod status makes pods waiting for their ResourceClaim schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodGeneratedResourceClaim}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
||||
// A pod might be waiting for a class to get created or modified.
|
||||
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
|
||||
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.
|
||||
@ -450,7 +452,10 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
|
||||
// This is not an unexpected error: we know that
|
||||
// foreachPodResourceClaim only returns errors for "not
|
||||
// schedulable".
|
||||
logger.V(6).Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
owner := metav1.GetControllerOf(modifiedClaim)
|
||||
loggerV.Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "claimOwner", owner, "reason", err.Error())
|
||||
}
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
@ -474,7 +479,7 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
|
||||
}
|
||||
|
||||
if originalClaim == nil {
|
||||
logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||
logger.V(5).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
@ -492,7 +497,34 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||
logger.V(5).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterPodChange is invoked for update pod events reported by
|
||||
// an informer. It checks whether that change adds the ResourceClaim(s) that the
|
||||
// pod has been waiting for.
|
||||
func (pl *dynamicResources) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
_, modifiedPod, err := schedutil.As[*v1.Pod](nil, newObj)
|
||||
if err != nil {
|
||||
// Shouldn't happen.
|
||||
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
|
||||
}
|
||||
|
||||
if pod.UID != modifiedPod.UID {
|
||||
logger.V(7).Info("pod is not schedulable after change in other pod", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
if err := pl.foreachPodResourceClaim(modifiedPod, nil); err != nil {
|
||||
// This is not an unexpected error: we know that
|
||||
// foreachPodResourceClaim only returns errors for "not
|
||||
// schedulable".
|
||||
logger.V(6).Info("pod is not schedulable after being updated", "pod", klog.KObj(pod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
logger.V(5).Info("pod got updated and is schedulable", "pod", klog.KObj(pod))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
@ -537,7 +569,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
|
||||
// Deleted? That can happen because we ourselves delete the PodSchedulingContext while
|
||||
// working on the pod. This can be ignored.
|
||||
if oldObj != nil && newObj == nil {
|
||||
logger.V(4).Info("PodSchedulingContext got deleted")
|
||||
logger.V(5).Info("PodSchedulingContext got deleted")
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
@ -568,7 +600,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
|
||||
// This is not an unexpected error: we know that
|
||||
// foreachPodResourceClaim only returns errors for "not
|
||||
// schedulable".
|
||||
logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
|
||||
logger.V(5).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
@ -589,7 +621,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
|
||||
if oldPodScheduling == nil /* create */ ||
|
||||
len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
|
||||
// This definitely is new information for the scheduler. Try again immediately.
|
||||
logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
|
||||
logger.V(5).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
|
@ -95,6 +95,11 @@ var (
|
||||
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
|
||||
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, ResourceClaimName: &claimName2}).
|
||||
Obj()
|
||||
podWithTwoClaimTemplates = st.MakePod().Name(podName).Namespace(namespace).
|
||||
UID(podUID).
|
||||
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimTemplateName: &claimName}).
|
||||
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, ResourceClaimTemplateName: &claimName}).
|
||||
Obj()
|
||||
|
||||
// Node with "instance-1" device and no device attributes.
|
||||
workerNode = &st.MakeNode().Name(nodeName).Label("kubernetes.io/hostname", nodeName).Node
|
||||
@ -1356,19 +1361,19 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
pod *v1.Pod
|
||||
claims []*resourceapi.ResourceClaim
|
||||
oldObj, newObj interface{}
|
||||
expectedHint framework.QueueingHint
|
||||
expectedErr bool
|
||||
wantHint framework.QueueingHint
|
||||
wantErr bool
|
||||
}{
|
||||
"skip-deletes": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: allocatedClaim,
|
||||
newObj: nil,
|
||||
expectedHint: framework.QueueSkip,
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: allocatedClaim,
|
||||
newObj: nil,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"backoff-wrong-new-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
newObj: "not-a-claim",
|
||||
expectedErr: true,
|
||||
pod: podWithClaimTemplate,
|
||||
newObj: "not-a-claim",
|
||||
wantErr: true,
|
||||
},
|
||||
"skip-wrong-claim": {
|
||||
pod: podWithClaimTemplate,
|
||||
@ -1377,7 +1382,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
claim.OwnerReferences[0].UID += "123"
|
||||
return claim
|
||||
}(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"skip-unrelated-claim": {
|
||||
pod: podWithClaimTemplate,
|
||||
@ -1388,19 +1393,19 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
claim.UID += "123"
|
||||
return claim
|
||||
}(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"queue-on-add": {
|
||||
pod: podWithClaimName,
|
||||
newObj: pendingClaim,
|
||||
expectedHint: framework.Queue,
|
||||
pod: podWithClaimName,
|
||||
newObj: pendingClaim,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
"backoff-wrong-old-object": {
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: "not-a-claim",
|
||||
newObj: pendingClaim,
|
||||
expectedErr: true,
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: "not-a-claim",
|
||||
newObj: pendingClaim,
|
||||
wantErr: true,
|
||||
},
|
||||
"skip-adding-finalizer": {
|
||||
pod: podWithClaimName,
|
||||
@ -1411,7 +1416,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
claim.Finalizers = append(claim.Finalizers, "foo")
|
||||
return claim
|
||||
}(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"queue-on-status-change": {
|
||||
pod: podWithClaimName,
|
||||
@ -1422,7 +1427,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
claim.Status.Allocation = &resourceapi.AllocationResult{}
|
||||
return claim
|
||||
}(),
|
||||
expectedHint: framework.Queue,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
"structured-claim-deallocate": {
|
||||
pod: podWithClaimName,
|
||||
@ -1435,7 +1440,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
}(),
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123697): don't wake up
|
||||
// claims not using structured parameters.
|
||||
expectedHint: framework.Queue,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
}
|
||||
|
||||
@ -1456,18 +1461,24 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
createClaim := claim.DeepCopy()
|
||||
createClaim.UID = ""
|
||||
storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "create claim")
|
||||
if err != nil {
|
||||
t.Fatalf("create claim: expected no error, got: %v", err)
|
||||
}
|
||||
claim = storedClaim
|
||||
} else {
|
||||
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
|
||||
require.NoError(t, err, "retrieve old claim")
|
||||
if err != nil {
|
||||
t.Fatalf("retrieve old claim: expected no error, got: %v", err)
|
||||
}
|
||||
updateClaim := claim.DeepCopy()
|
||||
// The test claim doesn't have those (generated dynamically), so copy them.
|
||||
updateClaim.UID = cachedClaim.(*resourceapi.ResourceClaim).UID
|
||||
updateClaim.ResourceVersion = cachedClaim.(*resourceapi.ResourceClaim).ResourceVersion
|
||||
|
||||
storedClaim, err := testCtx.client.ResourceV1alpha3().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{})
|
||||
require.NoError(t, err, "update claim")
|
||||
if err != nil {
|
||||
t.Fatalf("update claim: expected no error, got: %v", err)
|
||||
}
|
||||
claim = storedClaim
|
||||
}
|
||||
|
||||
@ -1485,14 +1496,98 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
// isSchedulableAfterClaimChange.
|
||||
newObj = claim
|
||||
}
|
||||
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj)
|
||||
if tc.expectedErr {
|
||||
require.Error(t, err)
|
||||
gotHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj)
|
||||
if tc.wantErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected an error, got none")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedHint, actualHint)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got: %v", err)
|
||||
}
|
||||
if tc.wantHint != gotHint {
|
||||
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_isSchedulableAfterPodChange(t *testing.T) {
|
||||
testcases := map[string]struct {
|
||||
objs []apiruntime.Object
|
||||
pod *v1.Pod
|
||||
claims []*resourceapi.ResourceClaim
|
||||
obj interface{}
|
||||
wantHint framework.QueueingHint
|
||||
wantErr bool
|
||||
}{
|
||||
"backoff-wrong-new-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
obj: "not-a-claim",
|
||||
wantErr: true,
|
||||
},
|
||||
"complete": {
|
||||
objs: []apiruntime.Object{pendingClaim},
|
||||
pod: podWithClaimTemplate,
|
||||
obj: podWithClaimTemplateInStatus,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
"wrong-pod": {
|
||||
objs: []apiruntime.Object{pendingClaim},
|
||||
pod: func() *v1.Pod {
|
||||
pod := podWithClaimTemplate.DeepCopy()
|
||||
pod.Name += "2"
|
||||
pod.UID += "2" // This is the relevant difference.
|
||||
return pod
|
||||
}(),
|
||||
obj: podWithClaimTemplateInStatus,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"missing-claim": {
|
||||
objs: nil,
|
||||
pod: podWithClaimTemplate,
|
||||
obj: podWithClaimTemplateInStatus,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"incomplete": {
|
||||
objs: []apiruntime.Object{pendingClaim},
|
||||
pod: podWithTwoClaimTemplates,
|
||||
obj: func() *v1.Pod {
|
||||
pod := podWithTwoClaimTemplates.DeepCopy()
|
||||
// Only one of two claims created.
|
||||
pod.Status.ResourceClaimStatuses = []v1.PodResourceClaimStatus{{
|
||||
Name: pod.Spec.ResourceClaims[0].Name,
|
||||
ResourceClaimName: &claimName,
|
||||
}}
|
||||
return pod
|
||||
}(),
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
features := feature.Features{
|
||||
EnableDynamicResourceAllocation: true,
|
||||
}
|
||||
testCtx := setup(t, nil, tc.claims, nil, nil, tc.objs, features)
|
||||
gotHint, err := testCtx.p.isSchedulableAfterPodChange(logger, tc.pod, nil, tc.obj)
|
||||
if tc.wantErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected an error, got none")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got: %v", err)
|
||||
}
|
||||
if tc.wantHint != gotHint {
|
||||
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1503,34 +1598,34 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
schedulings []*resourceapi.PodSchedulingContext
|
||||
claims []*resourceapi.ResourceClaim
|
||||
oldObj, newObj interface{}
|
||||
expectedHint framework.QueueingHint
|
||||
expectedErr bool
|
||||
wantHint framework.QueueingHint
|
||||
wantErr bool
|
||||
}{
|
||||
"skip-deleted": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
expectedHint: framework.QueueSkip,
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"skip-missed-deleted": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: cache.DeletedFinalStateUnknown{
|
||||
Obj: scheduling,
|
||||
},
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"backoff-wrong-old-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: "not-a-scheduling-context",
|
||||
newObj: scheduling,
|
||||
expectedErr: true,
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: "not-a-scheduling-context",
|
||||
newObj: scheduling,
|
||||
wantErr: true,
|
||||
},
|
||||
"backoff-missed-wrong-old-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: cache.DeletedFinalStateUnknown{
|
||||
Obj: "not-a-scheduling-context",
|
||||
},
|
||||
newObj: scheduling,
|
||||
expectedErr: true,
|
||||
newObj: scheduling,
|
||||
wantErr: true,
|
||||
},
|
||||
"skip-unrelated-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
@ -1540,33 +1635,33 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
scheduling.Name += "-foo"
|
||||
return scheduling
|
||||
}(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"backoff-wrong-new-object": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
newObj: "not-a-scheduling-context",
|
||||
expectedErr: true,
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
newObj: "not-a-scheduling-context",
|
||||
wantErr: true,
|
||||
},
|
||||
"skip-missing-claim": {
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
newObj: schedulingInfo,
|
||||
expectedHint: framework.QueueSkip,
|
||||
pod: podWithClaimTemplate,
|
||||
oldObj: scheduling,
|
||||
newObj: schedulingInfo,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"skip-missing-infos": {
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: scheduling,
|
||||
newObj: scheduling,
|
||||
expectedHint: framework.QueueSkip,
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: scheduling,
|
||||
newObj: scheduling,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"queue-new-infos": {
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: scheduling,
|
||||
newObj: schedulingInfo,
|
||||
expectedHint: framework.Queue,
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
claims: []*resourceapi.ResourceClaim{pendingClaim},
|
||||
oldObj: scheduling,
|
||||
newObj: schedulingInfo,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
"queue-bad-selected-node": {
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
@ -1582,7 +1677,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode)
|
||||
return scheduling
|
||||
}(),
|
||||
expectedHint: framework.Queue,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
"skip-spec-changes": {
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
@ -1593,7 +1688,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
scheduling.Spec.SelectedNode = workerNode.Name
|
||||
return scheduling
|
||||
}(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
wantHint: framework.QueueSkip,
|
||||
},
|
||||
"backoff-other-changes": {
|
||||
pod: podWithClaimTemplateInStatus,
|
||||
@ -1604,7 +1699,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
scheduling.Finalizers = append(scheduling.Finalizers, "foo")
|
||||
return scheduling
|
||||
}(),
|
||||
expectedHint: framework.Queue,
|
||||
wantHint: framework.Queue,
|
||||
},
|
||||
}
|
||||
|
||||
@ -1618,14 +1713,20 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
EnableDRAControlPlaneController: true,
|
||||
}
|
||||
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil, features)
|
||||
actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||
if tc.expectedErr {
|
||||
require.Error(t, err)
|
||||
gotHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||
if tc.wantErr {
|
||||
if err == nil {
|
||||
t.Fatal("expected an error, got none")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedHint, actualHint)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got: %v", err)
|
||||
}
|
||||
if tc.wantHint != gotHint {
|
||||
t.Fatalf("expected hint %#v, got %#v", tc.wantHint, gotHint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -74,6 +74,9 @@ const (
|
||||
UpdatePodTolerations
|
||||
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
|
||||
UpdatePodSchedulingGatesEliminated
|
||||
// UpdatePodGeneratedResourceClaim is an update of the list of ResourceClaims generated for the pod.
|
||||
// Depends on the DynamicResourceAllocation feature gate.
|
||||
UpdatePodGeneratedResourceClaim
|
||||
|
||||
// updatePodOther is a update for pod's other fields.
|
||||
// It's used only for the internal event handling, and thus unexported.
|
||||
@ -82,7 +85,7 @@ const (
|
||||
All ActionType = 1<<iota - 1
|
||||
|
||||
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
|
||||
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther
|
||||
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | UpdatePodGeneratedResourceClaim | updatePodOther
|
||||
)
|
||||
|
||||
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
|
||||
|
@ -292,6 +292,12 @@ func (p *PodWrapper) PodResourceClaims(podResourceClaims ...v1.PodResourceClaim)
|
||||
return p
|
||||
}
|
||||
|
||||
// PodResourceClaims appends claim statuses into PodSpec of the inner pod.
|
||||
func (p *PodWrapper) ResourceClaimStatuses(resourceClaimStatuses ...v1.PodResourceClaimStatus) *PodWrapper {
|
||||
p.Status.ResourceClaimStatuses = append(p.Status.ResourceClaimStatuses, resourceClaimStatuses...)
|
||||
return p
|
||||
}
|
||||
|
||||
// Priority sets a priority value into PodSpec of the inner pod.
|
||||
func (p *PodWrapper) Priority(val int32) *PodWrapper {
|
||||
p.Spec.Priority = &val
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
resourceinformers "k8s.io/client-go/informers/resource/v1alpha3"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
||||
)
|
||||
|
||||
type graphPopulator struct {
|
||||
@ -94,7 +95,7 @@ func (g *graphPopulator) updatePod(oldObj, obj interface{}) {
|
||||
}
|
||||
if oldPod, ok := oldObj.(*corev1.Pod); ok && oldPod != nil {
|
||||
if (pod.Spec.NodeName == oldPod.Spec.NodeName) && (pod.UID == oldPod.UID) &&
|
||||
resourceClaimStatusesEqual(oldPod.Status.ResourceClaimStatuses, pod.Status.ResourceClaimStatuses) {
|
||||
resourceclaim.PodStatusEqual(oldPod.Status.ResourceClaimStatuses, pod.Status.ResourceClaimStatuses) {
|
||||
// Node and uid are unchanged, all object references in the pod spec are immutable respectively unmodified (claim statuses).
|
||||
klog.V(5).Infof("updatePod %s/%s, node unchanged", pod.Namespace, pod.Name)
|
||||
return
|
||||
@ -107,29 +108,6 @@ func (g *graphPopulator) updatePod(oldObj, obj interface{}) {
|
||||
klog.V(5).Infof("updatePod %s/%s for node %s completed in %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(startTime))
|
||||
}
|
||||
|
||||
func resourceClaimStatusesEqual(statusA, statusB []corev1.PodResourceClaimStatus) bool {
|
||||
if len(statusA) != len(statusB) {
|
||||
return false
|
||||
}
|
||||
// In most cases, status entries only get added once and not modified.
|
||||
// But this cannot be guaranteed, so for the sake of correctness in all
|
||||
// cases this code here has to check.
|
||||
for i := range statusA {
|
||||
if statusA[i].Name != statusB[i].Name {
|
||||
return false
|
||||
}
|
||||
claimNameA := statusA[i].ResourceClaimName
|
||||
claimNameB := statusB[i].ResourceClaimName
|
||||
if (claimNameA == nil) != (claimNameB == nil) {
|
||||
return false
|
||||
}
|
||||
if claimNameA != nil && *claimNameA != *claimNameB {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (g *graphPopulator) deletePod(obj interface{}) {
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
|
@ -0,0 +1,48 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourceclaim
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// PodStatusEqual checks that both slices have the same number
|
||||
// of entries and that the pairs of entries are semantically
|
||||
// equivalent.
|
||||
//
|
||||
// The order of the entries matters: two slices with semantically
|
||||
// equivalent entries in different order are not equal. This is
|
||||
// done for the sake of performance because typically the
|
||||
// order of entries doesn't change.
|
||||
func PodStatusEqual(statusA, statusB []corev1.PodResourceClaimStatus) bool {
|
||||
if len(statusA) != len(statusB) {
|
||||
return false
|
||||
}
|
||||
// In most cases, status entries only get added once and not modified.
|
||||
// But this cannot be guaranteed, so for the sake of correctness in all
|
||||
// cases this code here has to check.
|
||||
for i := range statusA {
|
||||
if statusA[i].Name != statusB[i].Name {
|
||||
return false
|
||||
}
|
||||
if !ptr.Equal(statusA[i].ResourceClaimName, statusB[i].ResourceClaimName) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourceclaim
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestPodStatusEqual(t *testing.T) {
|
||||
a := corev1.PodResourceClaimStatus{
|
||||
Name: "a",
|
||||
ResourceClaimName: ptr.To("x"),
|
||||
}
|
||||
b := corev1.PodResourceClaimStatus{
|
||||
Name: "b",
|
||||
ResourceClaimName: ptr.To("y"),
|
||||
}
|
||||
empty := corev1.PodResourceClaimStatus{}
|
||||
|
||||
testcases := map[string]struct {
|
||||
sliceA, sliceB []corev1.PodResourceClaimStatus
|
||||
expectEqual bool
|
||||
}{
|
||||
"identical": {
|
||||
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
|
||||
sliceB: []corev1.PodResourceClaimStatus{a, b, empty},
|
||||
expectEqual: true,
|
||||
},
|
||||
"different-order": {
|
||||
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
|
||||
sliceB: []corev1.PodResourceClaimStatus{empty, a, b},
|
||||
expectEqual: false,
|
||||
},
|
||||
"different-length": {
|
||||
sliceA: []corev1.PodResourceClaimStatus{a, b, empty},
|
||||
sliceB: []corev1.PodResourceClaimStatus{a, b},
|
||||
expectEqual: false,
|
||||
},
|
||||
"semantically-equal-empty": {
|
||||
sliceA: []corev1.PodResourceClaimStatus{},
|
||||
sliceB: nil,
|
||||
expectEqual: true,
|
||||
},
|
||||
"semantically-equivalent": {
|
||||
sliceA: []corev1.PodResourceClaimStatus{{Name: "a", ResourceClaimName: ptr.To("x")}},
|
||||
sliceB: []corev1.PodResourceClaimStatus{{Name: "a", ResourceClaimName: ptr.To("x")}},
|
||||
expectEqual: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert.True(t, PodStatusEqual(tc.sliceA, tc.sliceA), fmt.Sprintf("%v", tc.sliceA))
|
||||
assert.True(t, PodStatusEqual(tc.sliceB, tc.sliceB), fmt.Sprintf("%v", tc.sliceB))
|
||||
assert.Equal(t, tc.expectEqual, PodStatusEqual(tc.sliceA, tc.sliceB), fmt.Sprintf("%v and %v", tc.sliceA, tc.sliceB))
|
||||
})
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user