dra scheduler: unit tests

Coverage was checked with a cover profile. The biggest remaining gap is for
isSchedulableAfterClaimParametersChange and
isSchedulableAfterClassParametersChange which will get handled when refactoring
the
foreachPodResourceClaim (https://github.com/kubernetes/kubernetes/issues/123697).
This commit is contained in:
Patrick Ohly 2024-03-14 13:58:18 +01:00
parent 607261e4c5
commit 458e227de0
6 changed files with 2372 additions and 28 deletions

View File

@ -1411,7 +1411,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
} }
state.informationsForClaim[index].allocation = allocation state.informationsForClaim[index].allocation = allocation
state.informationsForClaim[index].allocationDriverName = driverName state.informationsForClaim[index].allocationDriverName = driverName
// Strictly speaking, we don't need to store the full modified object.
// The allocation would be enough. The full object is useful for
// debugging and testing, so let's make it realistic.
claim = claim.DeepCopy() claim = claim.DeepCopy()
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
claim.Status.DriverName = driverName claim.Status.DriverName = driverName
claim.Status.Allocation = allocation claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim) pl.inFlightAllocations.Store(claim.UID, claim)

View File

@ -39,12 +39,12 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
cgotesting "k8s.io/client-go/testing" cgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
) )
var ( var (
@ -65,6 +65,39 @@ var (
}, },
DriverName: "some-driver", DriverName: "some-driver",
} }
structuredResourceClass = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
}
structuredResourceClassWithParams = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
ParametersRef: &resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "resource.k8s.io",
},
}
structuredResourceClassWithCRD = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
ParametersRef: &resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "example.com",
},
}
podWithClaimName = st.MakePod().Name(podName).Namespace(namespace). podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID). UID(podUID).
@ -94,7 +127,27 @@ var (
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}). PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}).
Obj() Obj()
workerNode = &st.MakeNode().Name("worker").Label("nodename", "worker").Node workerNode = &st.MakeNode().Name("worker").Label("kubernetes.io/hostname", "worker").Node
workerNodeSlice = st.MakeResourceSlice("worker", "some-driver").NamedResourcesInstances("instance-1").Obj()
claimParameters = st.MakeClaimParameters().Name(claimName).Namespace(namespace).
NamedResourcesRequests("some-driver", "true").
Shareable(true).
GeneratedFrom(&resourcev1alpha2.ResourceClaimParametersReference{
Name: claimName,
Kind: "ResourceClaimParameters",
APIGroup: "example.com",
}).
Obj()
classParameters = st.MakeClassParameters().Name(className).Namespace(namespace).
NamedResourcesFilters("some-driver", "true").
GeneratedFrom(&resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "example.com",
}).
Obj()
claim = st.MakeResourceClaim(). claim = st.MakeResourceClaim().
Name(claimName). Name(claimName).
@ -104,6 +157,10 @@ var (
pendingImmediateClaim = st.FromResourceClaim(claim). pendingImmediateClaim = st.FromResourceClaim(claim).
AllocationMode(resourcev1alpha2.AllocationModeImmediate). AllocationMode(resourcev1alpha2.AllocationModeImmediate).
Obj() Obj()
structuredAllocatedImmediateClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
Structured("worker", "instance-1").
Obj()
pendingDelayedClaim = st.FromResourceClaim(claim). pendingDelayedClaim = st.FromResourceClaim(claim).
OwnerReference(podName, podUID, podKind). OwnerReference(podName, podUID, podKind).
AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer). AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer).
@ -112,25 +169,44 @@ var (
Name(claimName2). Name(claimName2).
Obj() Obj()
deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim). deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha2.AllocationResult{}). Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
DeallocationRequested(true). DeallocationRequested(true).
Obj() Obj()
inUseClaim = st.FromResourceClaim(pendingImmediateClaim). inUseClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha2.AllocationResult{}). Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). ReservedForPod(podName, types.UID(podUID)).
Obj() Obj()
structuredInUseClaim = st.FromResourceClaim(inUseClaim).
Structured("worker", "instance-1").
Obj()
allocatedClaim = st.FromResourceClaim(pendingDelayedClaim). allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
Allocation(&resourcev1alpha2.AllocationResult{}). Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
Obj() Obj()
pendingDelayedClaimWithParams = st.FromResourceClaim(pendingDelayedClaim).ParametersRef(claimName).Obj()
structuredAllocatedClaim = st.FromResourceClaim(allocatedClaim).Structured("worker", "instance-1").Obj()
structuredAllocatedClaimWithParams = st.FromResourceClaim(structuredAllocatedClaim).ParametersRef(claimName).Obj()
otherStructuredAllocatedClaim = st.FromResourceClaim(structuredAllocatedClaim).Name(structuredAllocatedClaim.Name + "-other").Obj()
allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim). allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}). Allocation("some-driver", &resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}).
Obj() Obj()
structuredAllocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
Structured("worker-2", "instance-1").
Obj()
allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology). allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
AllocationMode(resourcev1alpha2.AllocationModeImmediate). AllocationMode(resourcev1alpha2.AllocationModeImmediate).
Obj() Obj()
structuredAllocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedImmediateClaimWithWrongTopology).
Structured("worker-2", "instance-1").
Obj()
allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim). allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("nodename", []string{"worker"}).Obj()}). Allocation("some-driver", &resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("kubernetes.io/hostname", []string{"worker"}).Obj()}).
Obj() Obj()
structuredAllocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaimWithGoodTopology).
Structured("worker", "instance-1").
Obj()
otherClaim = st.MakeResourceClaim(). otherClaim = st.MakeResourceClaim().
Name("not-my-claim"). Name("not-my-claim").
Namespace(namespace). Namespace(namespace).
@ -152,6 +228,44 @@ var (
Obj() Obj()
) )
func reserve(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(claim).
ReservedForPod(pod.Name, types.UID(pod.UID)).
Obj()
}
// claimWithCRD replaces the in-tree group with "example.com".
func claimWithCRD(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Spec.ParametersRef.APIGroup = "example.com"
return claim
}
// classWithCRD replaces the in-tree group with "example.com".
func classWithCRD(class *resourcev1alpha2.ResourceClass) *resourcev1alpha2.ResourceClass {
class = class.DeepCopy()
class.ParametersRef.APIGroup = "example.com"
return class
}
func breakCELInClaimParameters(parameters *resourcev1alpha2.ResourceClaimParameters) *resourcev1alpha2.ResourceClaimParameters {
parameters = parameters.DeepCopy()
for i := range parameters.DriverRequests {
for e := range parameters.DriverRequests[i].Requests {
parameters.DriverRequests[i].Requests[e].NamedResources.Selector = `attributes.bool["no-such-attribute"]`
}
}
return parameters
}
func breakCELInClassParameters(parameters *resourcev1alpha2.ResourceClassParameters) *resourcev1alpha2.ResourceClassParameters {
parameters = parameters.DeepCopy()
for i := range parameters.Filters {
parameters.Filters[i].NamedResources.Selector = `attributes.bool["no-such-attribute"]`
}
return parameters
}
// result defines the expected outcome of some operation. It covers // result defines the expected outcome of some operation. It covers
// operation's status and the state of the world (= objects). // operation's status and the state of the world (= objects).
type result struct { type result struct {
@ -166,6 +280,14 @@ type result struct {
// removed contains objects deleted by the operation. // removed contains objects deleted by the operation.
removed []metav1.Object removed []metav1.Object
// assumedClaim is the one claim which is expected to be assumed,
// nil if none.
assumedClaim *resourcev1alpha2.ResourceClaim
// inFlightClaim is the one claim which is expected to be tracked as
// in flight, nil if none.
inFlightClaim *resourcev1alpha2.ResourceClaim
} }
// change contains functions for modifying objects of a certain type. These // change contains functions for modifying objects of a certain type. These
@ -200,6 +322,10 @@ type want struct {
// unreserveAfterBindFailure, if set, triggers a call to Unreserve // unreserveAfterBindFailure, if set, triggers a call to Unreserve
// after PreBind, as if the actual Bind had failed. // after PreBind, as if the actual Bind had failed.
unreserveAfterBindFailure *result unreserveAfterBindFailure *result
// unreserveBeforePreBind, if set, triggers a call to Unreserve
// before PreBind, as if the some other PreBind plugin had failed.
unreserveBeforePreBind *result
} }
// prepare contains changes for objects in the API server. // prepare contains changes for objects in the API server.
@ -224,6 +350,10 @@ func TestPlugin(t *testing.T) {
classes []*resourcev1alpha2.ResourceClass classes []*resourcev1alpha2.ResourceClass
schedulings []*resourcev1alpha2.PodSchedulingContext schedulings []*resourcev1alpha2.PodSchedulingContext
// objs get stored directly in the fake client, without passing
// through reactors, in contrast to the types above.
objs []apiruntime.Object
prepare prepare prepare prepare
want want want want
disable bool disable bool
@ -256,6 +386,23 @@ func TestPlugin(t *testing.T) {
}, },
}, },
}, },
"claim-reference-structured": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaim, otherClaim},
want: want{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
}
return claim
},
},
},
},
},
"claim-template": { "claim-template": {
pod: podWithClaimTemplateInStatus, pod: podWithClaimTemplateInStatus,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
@ -273,6 +420,23 @@ func TestPlugin(t *testing.T) {
}, },
}, },
}, },
"claim-template-structured": {
pod: podWithClaimTemplateInStatus,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaim, otherClaim},
want: want{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
}
return claim
},
},
},
},
},
"missing-claim": { "missing-claim": {
pod: podWithClaimTemplate, // status not set pod: podWithClaimTemplate, // status not set
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
@ -321,6 +485,239 @@ func TestPlugin(t *testing.T) {
}, },
}, },
}, },
"immediate-allocation-structured-no-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"immediate-allocation-structured-with-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedImmediateClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedImmediateClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedImmediateClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedImmediateClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-no-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"delayed-allocation-structured-with-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-skip-bind": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
unreserveBeforePreBind: &result{},
},
},
"delayed-allocation-structured-exhausted-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"with-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters, classParameters, workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaimWithParams,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaimWithParams, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaimWithParams, podWithClaimName),
},
},
},
"with-translated-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{claimParameters, classParameters, workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: claimWithCRD(structuredAllocatedClaimWithParams),
},
prebind: result{
assumedClaim: reserve(claimWithCRD(structuredAllocatedClaimWithParams), podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(claimWithCRD(structuredAllocatedClaimWithParams), podWithClaimName),
},
},
},
"missing-class-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters /* classParameters, */, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `class parameters default/my-resource-class not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"missing-claim-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{ /* claimParameters, */ classParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim parameters default/my-pod-my-resource not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"claim-parameters-CEL-runtime-error": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{breakCELInClaimParameters(claimParameters), classParameters, workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `checking structured parameters failed: checking node "worker" and resources of driver "some-driver": evaluate request CEL expression: no such key: no-such-attribute`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"class-parameters-CEL-runtime-error": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters, breakCELInClassParameters(classParameters), workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `checking structured parameters failed: checking node "worker" and resources of driver "some-driver": evaluate filter CEL expression: no such key: no-such-attribute`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"waiting-for-deallocation": { "waiting-for-deallocation": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{deallocatingClaim}, claims: []*resourcev1alpha2.ResourceClaim{deallocatingClaim},
@ -496,6 +893,49 @@ func TestPlugin(t *testing.T) {
}, },
}, },
}, },
"wrong-topology-delayed-allocation-structured": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedDelayedClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with delayed allocation and structured parameters get deallocated immediately.
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
Allocation("", nil).
Obj()
},
},
status: framework.NewStatus(framework.Unschedulable, `deallocation of ResourceClaim completed`),
},
},
},
"wrong-topology-immediate-allocation-structured": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedImmediateClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with immediate allocation don't. The allocation is considered
// more important than the pod and pods need to wait for the node to
// become available again.
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"good-topology": { "good-topology": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology}, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
@ -535,6 +975,30 @@ func TestPlugin(t *testing.T) {
}, },
}, },
}, },
"bind-failure-structured": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaimWithGoodTopology},
want: want{
prebind: result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Obj()
},
},
},
unreserveAfterBindFailure: &result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
out := in.DeepCopy()
out.Status.ReservedFor = []resourcev1alpha2.ResourceClaimConsumerReference{}
return out
},
},
},
},
},
"reserved-okay": { "reserved-okay": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{inUseClaim}, claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
@ -560,7 +1024,7 @@ func TestPlugin(t *testing.T) {
if nodes == nil { if nodes == nil {
nodes = []*v1.Node{workerNode} nodes = []*v1.Node{workerNode}
} }
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings) testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings, tc.objs)
testCtx.p.enabled = !tc.disable testCtx.p.enabled = !tc.disable
initialObjects := testCtx.listAll(t) initialObjects := testCtx.listAll(t)
@ -638,6 +1102,15 @@ func TestPlugin(t *testing.T) {
testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status) testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status)
}) })
} else { } else {
if tc.want.unreserveBeforePreBind != nil {
initialObjects = testCtx.listAll(t)
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("unreserveBeforePreBind", func(t *testing.T) {
testCtx.verify(t, *tc.want.unreserveBeforePreBind, initialObjects, nil, status)
})
return
}
initialObjects = testCtx.listAll(t) initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind) initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind)
status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
@ -701,7 +1174,24 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me
sortObjects(wantObjects) sortObjects(wantObjects)
stripObjects(wantObjects) stripObjects(wantObjects)
stripObjects(objects) stripObjects(objects)
assert.Equal(t, wantObjects, objects) // Sometimes assert strips the diff too much, let's do it ourselves...
if diff := cmp.Diff(wantObjects, objects); diff != "" {
t.Errorf("Stored objects are different (- expected, + actual):\n%s", diff)
}
var expectAssumedClaims []metav1.Object
if expected.assumedClaim != nil {
expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim)
}
actualAssumedClaims := tc.listAssumedClaims()
assert.Equal(t, expectAssumedClaims, actualAssumedClaims, "assumed claims")
var expectInFlightClaims []metav1.Object
if expected.inFlightClaim != nil {
expectInFlightClaims = append(expectInFlightClaims, expected.inFlightClaim)
}
actualInFlightClaims := tc.listInFlightClaims()
assert.Equal(t, expectInFlightClaims, actualInFlightClaims, "in-flight claims")
} }
// setGVK is implemented by metav1.TypeMeta and thus all API objects, in // setGVK is implemented by metav1.TypeMeta and thus all API objects, in
@ -741,6 +1231,32 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
return return
} }
func (tc *testContext) listAssumedClaims() []metav1.Object {
var assumedClaims []metav1.Object
for _, obj := range tc.p.claimAssumeCache.List(nil) {
claim := obj.(*resourcev1alpha2.ResourceClaim)
obj, _ := tc.p.claimAssumeCache.Get(claim.Namespace + "/" + claim.Name)
apiObj, _ := tc.p.claimAssumeCache.GetAPIObj(claim.Namespace + "/" + claim.Name)
if obj != apiObj {
assumedClaims = append(assumedClaims, claim)
}
}
sortObjects(assumedClaims)
stripObjects(assumedClaims)
return assumedClaims
}
func (tc *testContext) listInFlightClaims() []metav1.Object {
var inFlightClaims []metav1.Object
tc.p.inFlightAllocations.Range(func(key, value any) bool {
inFlightClaims = append(inFlightClaims, value.(*resourcev1alpha2.ResourceClaim))
return true
})
sortObjects(inFlightClaims)
stripObjects(inFlightClaims)
return inFlightClaims
}
// updateAPIServer modifies objects and stores any changed object in the API server. // updateAPIServer modifies objects and stores any changed object in the API server.
func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object { func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
modified := update(t, objects, updates) modified := update(t, objects, updates)
@ -801,30 +1317,36 @@ func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Obje
return updated return updated
} }
func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceClaim, classes []*resourcev1alpha2.ResourceClass, schedulings []*resourcev1alpha2.PodSchedulingContext) (result *testContext) { func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceClaim, classes []*resourcev1alpha2.ResourceClass, schedulings []*resourcev1alpha2.PodSchedulingContext, objs []apiruntime.Object) (result *testContext) {
t.Helper() t.Helper()
tc := &testContext{} tc := &testContext{}
_, ctx := ktesting.NewTestContext(t) tCtx := ktesting.Init(t)
ctx, cancel := context.WithCancel(ctx) tc.ctx = tCtx
t.Cleanup(cancel)
tc.ctx = ctx
tc.client = fake.NewSimpleClientset() tc.client = fake.NewSimpleClientset(objs...)
reactor := createReactor(tc.client.Tracker()) reactor := createReactor(tc.client.Tracker())
tc.client.PrependReactor("*", "*", reactor) tc.client.PrependReactor("*", "*", reactor)
// Quick-and-dirty workaround for fake client storing ResourceClassParameters and
// ResourceClaimParameters as "resourceclassparameterses" and "resourceclaimparameterses":
// intercept the correct LIST from the informers and reply to them with the incorrect
// LIST result.
tc.client.PrependReactor("list", "resourceclaimparameters", createListReactor(tc.client.Tracker(), "ResourceClaimParameters"))
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
opts := []runtime.Option{ opts := []runtime.Option{
runtime.WithClientSet(tc.client), runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory), runtime.WithInformerFactory(tc.informerFactory),
} }
fh, err := runtime.NewFramework(ctx, nil, nil, opts...) fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pl, err := New(ctx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) pl, err := New(tCtx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -848,7 +1370,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.informerFactory.Start(tc.ctx.Done()) tc.informerFactory.Start(tc.ctx.Done())
t.Cleanup(func() { t.Cleanup(func() {
// Need to cancel before waiting for the shutdown. // Need to cancel before waiting for the shutdown.
cancel() tCtx.Cancel("test is done")
// Now we can wait for all goroutines to stop. // Now we can wait for all goroutines to stop.
tc.informerFactory.Shutdown() tc.informerFactory.Shutdown()
}) })
@ -896,7 +1418,7 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
} }
obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter))) obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter)))
uidCounter++ uidCounter++
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter)) obj.SetResourceVersion(fmt.Sprintf("%d", resourceVersionCounter))
resourceVersionCounter++ resourceVersionCounter++
case "update": case "update":
uid := obj.GetUID() uid := obj.GetUID()
@ -920,13 +1442,24 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
return true, nil, errors.New("ResourceVersion must match the object that gets updated") return true, nil, errors.New("ResourceVersion must match the object that gets updated")
} }
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter)) obj.SetResourceVersion(fmt.Sprintf("%d", resourceVersionCounter))
resourceVersionCounter++ resourceVersionCounter++
} }
return false, nil, nil return false, nil, nil
} }
} }
func createListReactor(tracker cgotesting.ObjectTracker, kind string) func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
return func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
// listAction := action.(cgotesting.ListAction)
gvr := action.GetResource()
ns := action.GetNamespace()
gvr.Resource += "es"
list, err := tracker.List(gvr, schema.GroupVersionKind{Group: gvr.Group, Version: gvr.Version, Kind: kind}, ns)
return true, list, err
}
}
func Test_isSchedulableAfterClaimChange(t *testing.T) { func Test_isSchedulableAfterClaimChange(t *testing.T) {
testcases := map[string]struct { testcases := map[string]struct {
pod *v1.Pod pod *v1.Pod
@ -999,12 +1532,30 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}(), }(),
expectedHint: framework.Queue, expectedHint: framework.Queue,
}, },
"structured-claim-deallocate": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
oldObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
return claim
}(),
newObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
claim.Status.Allocation = nil
return claim
}(),
// TODO (https://github.com/kubernetes/kubernetes/issues/123697): don't wake up
// claims not using structured parameters.
expectedHint: framework.Queue,
},
} }
for name, tc := range testcases { for name, tc := range testcases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, nil) testCtx := setup(t, nil, tc.claims, nil, nil, nil)
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
// Update the informer because the lister gets called and must have the claim. // Update the informer because the lister gets called and must have the claim.
store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore()
@ -1142,7 +1693,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
t.Parallel() t.Parallel()
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings) testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil)
actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj) actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr { if tc.expectedErr {
require.Error(t, err) require.Error(t, err)

View File

@ -0,0 +1,329 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namedresources
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
func instance(allocated bool, name string, attributes ...resourceapi.NamedResourcesAttribute) InstanceAllocation {
return InstanceAllocation{
Allocated: allocated,
Instance: &resourceapi.NamedResourcesInstance{
Name: name,
Attributes: attributes,
},
}
}
func TestModel(t *testing.T) {
testcases := map[string]struct {
resources []*resourceapi.NamedResourcesResources
allocations []*resourceapi.NamedResourcesAllocationResult
expectModel Model
}{
"empty": {},
"nil": {
resources: []*resourceapi.NamedResourcesResources{nil},
allocations: []*resourceapi.NamedResourcesAllocationResult{nil},
},
"available": {
resources: []*resourceapi.NamedResourcesResources{
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "a"},
{Name: "b"},
},
},
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "x"},
{Name: "y"},
},
},
},
expectModel: Model{Instances: []InstanceAllocation{instance(false, "a"), instance(false, "b"), instance(false, "x"), instance(false, "y")}},
},
"allocated": {
resources: []*resourceapi.NamedResourcesResources{
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "a"},
{Name: "b"},
},
},
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "x"},
{Name: "y"},
},
},
},
allocations: []*resourceapi.NamedResourcesAllocationResult{
{
Name: "something-else",
},
{
Name: "a",
},
},
expectModel: Model{Instances: []InstanceAllocation{instance(true, "a"), instance(false, "b"), instance(false, "x"), instance(false, "y")}},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
var actualModel Model
for _, resources := range tc.resources {
AddResources(&actualModel, resources)
}
for _, allocation := range tc.allocations {
AddAllocation(&actualModel, allocation)
}
require.Equal(t, tc.expectModel, actualModel)
})
}
}
func TestController(t *testing.T) {
filterAny := &resourceapi.NamedResourcesFilter{
Selector: "true",
}
filterNone := &resourceapi.NamedResourcesFilter{
Selector: "false",
}
filterBrokenType := &resourceapi.NamedResourcesFilter{
Selector: "1",
}
filterBrokenEvaluation := &resourceapi.NamedResourcesFilter{
Selector: `attributes.bool["no-such-attribute"]`,
}
filterAttribute := &resourceapi.NamedResourcesFilter{
Selector: `attributes.bool["usable"]`,
}
requestAny := &resourceapi.NamedResourcesRequest{
Selector: "true",
}
requestNone := &resourceapi.NamedResourcesRequest{
Selector: "false",
}
requestBrokenType := &resourceapi.NamedResourcesRequest{
Selector: "1",
}
requestBrokenEvaluation := &resourceapi.NamedResourcesRequest{
Selector: `attributes.bool["no-such-attribute"]`,
}
requestAttribute := &resourceapi.NamedResourcesRequest{
Selector: `attributes.bool["usable"]`,
}
instance1 := "instance-1"
oneInstance := Model{
Instances: []InstanceAllocation{{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance1,
},
}},
}
instance2 := "instance-2"
twoInstances := Model{
Instances: []InstanceAllocation{
{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance1,
Attributes: []resourceapi.NamedResourcesAttribute{{
Name: "usable",
NamedResourcesAttributeValue: resourceapi.NamedResourcesAttributeValue{
BoolValue: ptr.To(false),
},
}},
},
},
{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance2,
Attributes: []resourceapi.NamedResourcesAttribute{{
Name: "usable",
NamedResourcesAttributeValue: resourceapi.NamedResourcesAttributeValue{
BoolValue: ptr.To(true),
},
}},
},
},
},
}
testcases := map[string]struct {
model Model
filter *resourceapi.NamedResourcesFilter
requests []*resourceapi.NamedResourcesRequest
expectCreateErr string
expectAllocation []string
expectAllocateErr string
}{
"empty": {},
"broken-filter": {
filter: filterBrokenType,
expectCreateErr: "compile class filter CEL expression: must evaluate to bool",
},
"broken-request": {
requests: []*resourceapi.NamedResourcesRequest{requestBrokenType},
expectCreateErr: "compile request CEL expression: must evaluate to bool",
},
"no-resources": {
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: "insufficient resources",
},
"okay": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocation: []string{instance1},
},
"filter-mismatch": {
model: oneInstance,
filter: filterNone,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: "insufficient resources",
},
"request-mismatch": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestNone},
expectAllocateErr: "insufficient resources",
},
"many": {
model: twoInstances,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny, requestAny},
expectAllocation: []string{instance1, instance2},
},
"too-many": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny, requestAny},
expectAllocateErr: "insufficient resources",
},
"filter-evaluation-error": {
model: oneInstance,
filter: filterBrokenEvaluation,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: "evaluate filter CEL expression: no such key: no-such-attribute",
},
"request-evaluation-error": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestBrokenEvaluation},
expectAllocateErr: "evaluate request CEL expression: no such key: no-such-attribute",
},
"filter-attribute": {
model: twoInstances,
filter: filterAttribute,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocation: []string{instance2},
},
"request-attribute": {
model: twoInstances,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAttribute},
expectAllocation: []string{instance2},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
tCtx := ktesting.Init(t)
controller, createErr := NewClaimController(tc.filter, tc.requests)
if createErr != nil {
if tc.expectCreateErr == "" {
tCtx.Fatalf("unexpected create error: %v", createErr)
}
require.Equal(tCtx, tc.expectCreateErr, createErr.Error())
return
}
if tc.expectCreateErr != "" {
tCtx.Fatalf("did not get expected create error: %v", tc.expectCreateErr)
}
allocation, createErr := controller.Allocate(tCtx, tc.model)
if createErr != nil {
if tc.expectAllocateErr == "" {
tCtx.Fatalf("unexpected allocate error: %v", createErr)
}
require.Equal(tCtx, tc.expectAllocateErr, createErr.Error())
return
}
if tc.expectAllocateErr != "" {
tCtx.Fatalf("did not get expected allocate error: %v", tc.expectAllocateErr)
}
expectAllocation := []*resourceapi.NamedResourcesAllocationResult{}
for _, name := range tc.expectAllocation {
expectAllocation = append(expectAllocation, &resourceapi.NamedResourcesAllocationResult{Name: name})
}
require.Equal(tCtx, expectAllocation, allocation)
isSuitable, isSuitableErr := controller.NodeIsSuitable(tCtx, tc.model)
assert.Equal(tCtx, len(expectAllocation) == len(tc.requests), isSuitable, "is suitable")
assert.Equal(tCtx, createErr, isSuitableErr)
})
}
}

View File

@ -25,10 +25,8 @@ import (
resourcev1alpha2 "k8s.io/api/resource/v1alpha2" resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/klog/v2" "k8s.io/klog/v2"
namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources" namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
) )
// resources is a map "node name" -> "driver name" -> available and // resources is a map "node name" -> "driver name" -> available and
@ -41,11 +39,22 @@ type ResourceModels struct {
NamedResources namedresourcesmodel.Model NamedResources namedresourcesmodel.Model
} }
// resourceSliceLister is the subset of resourcev1alpha2listers.ResourceSliceLister needed by
// newResourceModel.
type resourceSliceLister interface {
List(selector labels.Selector) (ret []*resourcev1alpha2.ResourceSlice, err error)
}
// assumeCacheLister is the subset of volumebinding.AssumeCache needed by newResourceModel.
type assumeCacheLister interface {
List(indexObj interface{}) []interface{}
}
// newResourceModel parses the available information about resources. Objects // newResourceModel parses the available information about resources. Objects
// with an unknown structured parameter model silently ignored. An error gets // with an unknown structured parameter model silently ignored. An error gets
// logged later when parameters required for a pod depend on such an unknown // logged later when parameters required for a pod depend on such an unknown
// model. // model.
func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) { func newResourceModel(logger klog.Logger, resourceSliceLister resourceSliceLister, claimAssumeCache assumeCacheLister, inFlightAllocations *sync.Map) (resources, error) {
model := make(resources) model := make(resources)
slices, err := resourceSliceLister.List(labels.Everything()) slices, err := resourceSliceLister.List(labels.Everything())

File diff suppressed because it is too large Load Diff

View File

@ -938,6 +938,16 @@ func (wrapper *ResourceClaimWrapper) AllocationMode(a resourcev1alpha2.Allocatio
return wrapper return wrapper
} }
// ParametersRef sets a reference to a ResourceClaimParameters.resource.k8s.io.
func (wrapper *ResourceClaimWrapper) ParametersRef(name string) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.ParametersRef = &resourcev1alpha2.ResourceClaimParametersReference{
Name: name,
Kind: "ResourceClaimParameters",
APIGroup: "resource.k8s.io",
}
return wrapper
}
// ResourceClassName sets the resource class name of the inner object. // ResourceClassName sets the resource class name of the inner object.
func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceClaimWrapper { func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.ResourceClassName = name wrapper.ResourceClaim.Spec.ResourceClassName = name
@ -945,11 +955,60 @@ func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceCla
} }
// Allocation sets the allocation of the inner object. // Allocation sets the allocation of the inner object.
func (wrapper *ResourceClaimWrapper) Allocation(allocation *resourcev1alpha2.AllocationResult) *ResourceClaimWrapper { func (wrapper *ResourceClaimWrapper) Allocation(driverName string, allocation *resourcev1alpha2.AllocationResult) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.DriverName = driverName
wrapper.ResourceClaim.Status.Allocation = allocation wrapper.ResourceClaim.Status.Allocation = allocation
return wrapper return wrapper
} }
// Structured turns a "normal" claim into one which was allocated via structured parameters.
// This modifies the allocation result and adds the reserved finalizer if the claim
// is allocated. The claim has to become local to a node. The assumption is that
// "named resources" are used.
func (wrapper *ResourceClaimWrapper) Structured(nodeName string, namedResourcesInstances ...string) *ResourceClaimWrapper {
if wrapper.ResourceClaim.Status.Allocation != nil {
wrapper.ResourceClaim.Finalizers = append(wrapper.ResourceClaim.Finalizers, resourcev1alpha2.Finalizer)
for i, resourceHandle := range wrapper.ResourceClaim.Status.Allocation.ResourceHandles {
resourceHandle.Data = ""
resourceHandle.StructuredData = &resourcev1alpha2.StructuredResourceHandle{
NodeName: nodeName,
}
wrapper.ResourceClaim.Status.Allocation.ResourceHandles[i] = resourceHandle
}
if len(wrapper.ResourceClaim.Status.Allocation.ResourceHandles) == 0 {
wrapper.ResourceClaim.Status.Allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{{
DriverName: wrapper.ResourceClaim.Status.DriverName,
StructuredData: &resourcev1alpha2.StructuredResourceHandle{
NodeName: nodeName,
},
}}
}
for _, resourceHandle := range wrapper.ResourceClaim.Status.Allocation.ResourceHandles {
for _, name := range namedResourcesInstances {
result := resourcev1alpha2.DriverAllocationResult{
AllocationResultModel: resourcev1alpha2.AllocationResultModel{
NamedResources: &resourcev1alpha2.NamedResourcesAllocationResult{
Name: name,
},
},
}
resourceHandle.StructuredData.Results = append(resourceHandle.StructuredData.Results, result)
}
}
wrapper.ResourceClaim.Status.Allocation.Shareable = true
wrapper.ResourceClaim.Status.Allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{
MatchExpressions: []v1.NodeSelectorRequirement{{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
}},
}},
}
}
return wrapper
}
// DeallocationRequested sets that field of the inner object. // DeallocationRequested sets that field of the inner object.
func (wrapper *ResourceClaimWrapper) DeallocationRequested(deallocationRequested bool) *ResourceClaimWrapper { func (wrapper *ResourceClaimWrapper) DeallocationRequested(deallocationRequested bool) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.DeallocationRequested = deallocationRequested wrapper.ResourceClaim.Status.DeallocationRequested = deallocationRequested
@ -962,6 +1021,11 @@ func (wrapper *ResourceClaimWrapper) ReservedFor(consumers ...resourcev1alpha2.R
return wrapper return wrapper
} }
// ReservedFor sets that field of the inner object given information about one pod.
func (wrapper *ResourceClaimWrapper) ReservedForPod(podName string, podUID types.UID) *ResourceClaimWrapper {
return wrapper.ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: podUID})
}
// PodSchedulingWrapper wraps a PodSchedulingContext inside. // PodSchedulingWrapper wraps a PodSchedulingContext inside.
type PodSchedulingWrapper struct { type PodSchedulingWrapper struct {
resourcev1alpha2.PodSchedulingContext resourcev1alpha2.PodSchedulingContext
@ -1041,3 +1105,131 @@ func (wrapper *PodSchedulingWrapper) ResourceClaims(statuses ...resourcev1alpha2
wrapper.Status.ResourceClaims = statuses wrapper.Status.ResourceClaims = statuses
return wrapper return wrapper
} }
type ResourceSliceWrapper struct {
resourcev1alpha2.ResourceSlice
}
func MakeResourceSlice(nodeName, driverName string) *ResourceSliceWrapper {
wrapper := new(ResourceSliceWrapper)
wrapper.Name = nodeName + "-" + driverName
wrapper.NodeName = nodeName
wrapper.DriverName = driverName
return wrapper
}
func (wrapper *ResourceSliceWrapper) Obj() *resourcev1alpha2.ResourceSlice {
return &wrapper.ResourceSlice
}
func (wrapper *ResourceSliceWrapper) NamedResourcesInstances(names ...string) *ResourceSliceWrapper {
wrapper.ResourceModel = resourcev1alpha2.ResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{}}
for _, name := range names {
wrapper.ResourceModel.NamedResources.Instances = append(wrapper.ResourceModel.NamedResources.Instances,
resourcev1alpha2.NamedResourcesInstance{Name: name},
)
}
return wrapper
}
type ClaimParametersWrapper struct {
resourcev1alpha2.ResourceClaimParameters
}
func MakeClaimParameters() *ClaimParametersWrapper {
return &ClaimParametersWrapper{}
}
func (wrapper *ClaimParametersWrapper) Obj() *resourcev1alpha2.ResourceClaimParameters {
return &wrapper.ResourceClaimParameters
}
func (wrapper *ClaimParametersWrapper) Name(s string) *ClaimParametersWrapper {
wrapper.SetName(s)
return wrapper
}
func (wrapper *ClaimParametersWrapper) UID(s string) *ClaimParametersWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
func (wrapper *ClaimParametersWrapper) Namespace(s string) *ClaimParametersWrapper {
wrapper.SetNamespace(s)
return wrapper
}
func (wrapper *ClaimParametersWrapper) Shareable(value bool) *ClaimParametersWrapper {
wrapper.ResourceClaimParameters.Shareable = value
return wrapper
}
func (wrapper *ClaimParametersWrapper) GeneratedFrom(value *resourcev1alpha2.ResourceClaimParametersReference) *ClaimParametersWrapper {
wrapper.ResourceClaimParameters.GeneratedFrom = value
return wrapper
}
func (wrapper *ClaimParametersWrapper) NamedResourcesRequests(driverName string, selectors ...string) *ClaimParametersWrapper {
requests := resourcev1alpha2.DriverRequests{
DriverName: driverName,
}
for _, selector := range selectors {
request := resourcev1alpha2.ResourceRequest{
ResourceRequestModel: resourcev1alpha2.ResourceRequestModel{
NamedResources: &resourcev1alpha2.NamedResourcesRequest{
Selector: selector,
},
},
}
requests.Requests = append(requests.Requests, request)
}
wrapper.DriverRequests = append(wrapper.DriverRequests, requests)
return wrapper
}
type ClassParametersWrapper struct {
resourcev1alpha2.ResourceClassParameters
}
func MakeClassParameters() *ClassParametersWrapper {
return &ClassParametersWrapper{}
}
func (wrapper *ClassParametersWrapper) Obj() *resourcev1alpha2.ResourceClassParameters {
return &wrapper.ResourceClassParameters
}
func (wrapper *ClassParametersWrapper) Name(s string) *ClassParametersWrapper {
wrapper.SetName(s)
return wrapper
}
func (wrapper *ClassParametersWrapper) UID(s string) *ClassParametersWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
func (wrapper *ClassParametersWrapper) Namespace(s string) *ClassParametersWrapper {
wrapper.SetNamespace(s)
return wrapper
}
func (wrapper *ClassParametersWrapper) GeneratedFrom(value *resourcev1alpha2.ResourceClassParametersReference) *ClassParametersWrapper {
wrapper.ResourceClassParameters.GeneratedFrom = value
return wrapper
}
func (wrapper *ClassParametersWrapper) NamedResourcesFilters(driverName string, selectors ...string) *ClassParametersWrapper {
for _, selector := range selectors {
filter := resourcev1alpha2.ResourceFilter{
DriverName: driverName,
ResourceFilterModel: resourcev1alpha2.ResourceFilterModel{
NamedResources: &resourcev1alpha2.NamedResourcesFilter{
Selector: selector,
},
},
}
wrapper.Filters = append(wrapper.Filters, filter)
}
return wrapper
}