Merge pull request #123938 from pohly/dra-structured-parameters-tests

DRA: test for structured parameters
This commit is contained in:
Kubernetes Prow Robot 2024-04-18 02:10:08 -07:00 committed by GitHub
commit d2ce87eb94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 2623 additions and 59 deletions

View File

@ -293,6 +293,31 @@ func TestSyncHandler(t *testing.T) {
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-delayed-allocation-structured",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha2.ResourceClaim{structuredParameters(testClaimReserved)},
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := testClaimAllocated.DeepCopy()
claim.Finalizers = []string{}
claim.Status.Allocation = nil
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "dont-clear-reserved-delayed-allocation-structured",
pods: []*v1.Pod{testPodWithResource},
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimReserved)
claim = reserveClaim(claim, otherTestPod)
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
expectedClaims: []resourcev1alpha2.ResourceClaim{*structuredParameters(testClaimReserved)},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-immediate-allocation",
pods: []*v1.Pod{},
@ -309,6 +334,62 @@ func TestSyncHandler(t *testing.T) {
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-immediate-allocation-structured",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimReserved.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimAllocated.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-immediate-allocation-structured-deleted",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimReserved.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
claim.DeletionTimestamp = &metav1.Time{}
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimAllocated.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
claim.DeletionTimestamp = &metav1.Time{}
claim.Finalizers = []string{}
claim.Status.Allocation = nil
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "immediate-allocation-structured-deleted",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimAllocated.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
claim.DeletionTimestamp = &metav1.Time{}
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := structuredParameters(testClaimAllocated.DeepCopy())
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
claim.DeletionTimestamp = &metav1.Time{}
claim.Finalizers = []string{}
claim.Status.Allocation = nil
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-when-done-delayed-allocation",
pods: func() []*v1.Pod {
@ -546,6 +627,14 @@ func allocateClaim(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.Reso
return claim
}
func structuredParameters(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
// As far the controller is concerned, a claim was allocated by us if it has
// this finalizer. For testing we don't need to update the allocation result.
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
return claim
}
func reserveClaim(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,

View File

@ -1218,7 +1218,7 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
// Then we can simply clear the allocation. Once the
// claim informer catches up, the controllers will
// be notified about this change.
clearAllocation := state.informationsForClaim[index].controller != nil
clearAllocation := state.informationsForClaim[index].structuredParameters
// Before we tell a driver to deallocate a claim, we
// have to stop telling it to allocate. Otherwise,
@ -1237,6 +1237,7 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
claim := claim.DeepCopy()
claim.Status.ReservedFor = nil
if clearAllocation {
claim.Status.DriverName = ""
claim.Status.Allocation = nil
} else {
claim.Status.DeallocationRequested = true
@ -1419,7 +1420,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
}
state.informationsForClaim[index].allocation = allocation
state.informationsForClaim[index].allocationDriverName = driverName
// Strictly speaking, we don't need to store the full modified object.
// The allocation would be enough. The full object is useful for
// debugging and testing, so let's make it realistic.
claim = claim.DeepCopy()
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
claim.Status.DriverName = driverName
claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim)

View File

@ -39,12 +39,12 @@ import (
"k8s.io/client-go/kubernetes/fake"
cgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
var (
@ -65,6 +65,39 @@ var (
},
DriverName: "some-driver",
}
structuredResourceClass = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
}
structuredResourceClassWithParams = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
ParametersRef: &resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "resource.k8s.io",
},
}
structuredResourceClassWithCRD = &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
StructuredParameters: ptr.To(true),
ParametersRef: &resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "example.com",
},
}
podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
@ -94,7 +127,27 @@ var (
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}).
Obj()
workerNode = &st.MakeNode().Name("worker").Label("nodename", "worker").Node
workerNode = &st.MakeNode().Name("worker").Label("kubernetes.io/hostname", "worker").Node
workerNodeSlice = st.MakeResourceSlice("worker", "some-driver").NamedResourcesInstances("instance-1").Obj()
claimParameters = st.MakeClaimParameters().Name(claimName).Namespace(namespace).
NamedResourcesRequests("some-driver", "true").
Shareable(true).
GeneratedFrom(&resourcev1alpha2.ResourceClaimParametersReference{
Name: claimName,
Kind: "ResourceClaimParameters",
APIGroup: "example.com",
}).
Obj()
classParameters = st.MakeClassParameters().Name(className).Namespace(namespace).
NamedResourcesFilters("some-driver", "true").
GeneratedFrom(&resourcev1alpha2.ResourceClassParametersReference{
Name: className,
Namespace: namespace,
Kind: "ResourceClassParameters",
APIGroup: "example.com",
}).
Obj()
claim = st.MakeResourceClaim().
Name(claimName).
@ -104,6 +157,10 @@ var (
pendingImmediateClaim = st.FromResourceClaim(claim).
AllocationMode(resourcev1alpha2.AllocationModeImmediate).
Obj()
structuredAllocatedImmediateClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
Structured("worker", "instance-1").
Obj()
pendingDelayedClaim = st.FromResourceClaim(claim).
OwnerReference(podName, podUID, podKind).
AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer).
@ -112,25 +169,44 @@ var (
Name(claimName2).
Obj()
deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha2.AllocationResult{}).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
DeallocationRequested(true).
Obj()
inUseClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha2.AllocationResult{}).
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
ReservedForPod(podName, types.UID(podUID)).
Obj()
structuredInUseClaim = st.FromResourceClaim(inUseClaim).
Structured("worker", "instance-1").
Obj()
allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
Allocation(&resourcev1alpha2.AllocationResult{}).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{}).
Obj()
pendingDelayedClaimWithParams = st.FromResourceClaim(pendingDelayedClaim).ParametersRef(claimName).Obj()
structuredAllocatedClaim = st.FromResourceClaim(allocatedClaim).Structured("worker", "instance-1").Obj()
structuredAllocatedClaimWithParams = st.FromResourceClaim(structuredAllocatedClaim).ParametersRef(claimName).Obj()
otherStructuredAllocatedClaim = st.FromResourceClaim(structuredAllocatedClaim).Name(structuredAllocatedClaim.Name + "-other").Obj()
allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}).
Obj()
structuredAllocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
Structured("worker-2", "instance-1").
Obj()
allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
AllocationMode(resourcev1alpha2.AllocationModeImmediate).
Obj()
structuredAllocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedImmediateClaimWithWrongTopology).
Structured("worker-2", "instance-1").
Obj()
allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("nodename", []string{"worker"}).Obj()}).
Allocation("some-driver", &resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("kubernetes.io/hostname", []string{"worker"}).Obj()}).
Obj()
structuredAllocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaimWithGoodTopology).
Structured("worker", "instance-1").
Obj()
otherClaim = st.MakeResourceClaim().
Name("not-my-claim").
Namespace(namespace).
@ -152,6 +228,44 @@ var (
Obj()
)
func reserve(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(claim).
ReservedForPod(pod.Name, types.UID(pod.UID)).
Obj()
}
// claimWithCRD replaces the in-tree group with "example.com".
func claimWithCRD(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Spec.ParametersRef.APIGroup = "example.com"
return claim
}
// classWithCRD replaces the in-tree group with "example.com".
func classWithCRD(class *resourcev1alpha2.ResourceClass) *resourcev1alpha2.ResourceClass {
class = class.DeepCopy()
class.ParametersRef.APIGroup = "example.com"
return class
}
func breakCELInClaimParameters(parameters *resourcev1alpha2.ResourceClaimParameters) *resourcev1alpha2.ResourceClaimParameters {
parameters = parameters.DeepCopy()
for i := range parameters.DriverRequests {
for e := range parameters.DriverRequests[i].Requests {
parameters.DriverRequests[i].Requests[e].NamedResources.Selector = `attributes.bool["no-such-attribute"]`
}
}
return parameters
}
func breakCELInClassParameters(parameters *resourcev1alpha2.ResourceClassParameters) *resourcev1alpha2.ResourceClassParameters {
parameters = parameters.DeepCopy()
for i := range parameters.Filters {
parameters.Filters[i].NamedResources.Selector = `attributes.bool["no-such-attribute"]`
}
return parameters
}
// result defines the expected outcome of some operation. It covers
// operation's status and the state of the world (= objects).
type result struct {
@ -166,6 +280,14 @@ type result struct {
// removed contains objects deleted by the operation.
removed []metav1.Object
// assumedClaim is the one claim which is expected to be assumed,
// nil if none.
assumedClaim *resourcev1alpha2.ResourceClaim
// inFlightClaim is the one claim which is expected to be tracked as
// in flight, nil if none.
inFlightClaim *resourcev1alpha2.ResourceClaim
}
// change contains functions for modifying objects of a certain type. These
@ -200,6 +322,10 @@ type want struct {
// unreserveAfterBindFailure, if set, triggers a call to Unreserve
// after PreBind, as if the actual Bind had failed.
unreserveAfterBindFailure *result
// unreserveBeforePreBind, if set, triggers a call to Unreserve
// before PreBind, as if the some other PreBind plugin had failed.
unreserveBeforePreBind *result
}
// prepare contains changes for objects in the API server.
@ -224,6 +350,10 @@ func TestPlugin(t *testing.T) {
classes []*resourcev1alpha2.ResourceClass
schedulings []*resourcev1alpha2.PodSchedulingContext
// objs get stored directly in the fake client, without passing
// through reactors, in contrast to the types above.
objs []apiruntime.Object
prepare prepare
want want
disable bool
@ -256,6 +386,23 @@ func TestPlugin(t *testing.T) {
},
},
},
"claim-reference-structured": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaim, otherClaim},
want: want{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
}
return claim
},
},
},
},
},
"claim-template": {
pod: podWithClaimTemplateInStatus,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
@ -273,6 +420,23 @@ func TestPlugin(t *testing.T) {
},
},
},
"claim-template-structured": {
pod: podWithClaimTemplateInStatus,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaim, otherClaim},
want: want{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
}
return claim
},
},
},
},
},
"missing-claim": {
pod: podWithClaimTemplate, // status not set
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
@ -321,6 +485,239 @@ func TestPlugin(t *testing.T) {
},
},
},
"immediate-allocation-structured-no-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"immediate-allocation-structured-with-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedImmediateClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedImmediateClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedImmediateClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedImmediateClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-no-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"delayed-allocation-structured-with-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-skip-bind": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
unreserveBeforePreBind: &result{},
},
},
"delayed-allocation-structured-exhausted-resources": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim cannot be allocated for the node (unsuitable)`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"with-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters, classParameters, workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaimWithParams,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaimWithParams, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaimWithParams, podWithClaimName),
},
},
},
"with-translated-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{claimParameters, classParameters, workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: claimWithCRD(structuredAllocatedClaimWithParams),
},
prebind: result{
assumedClaim: reserve(claimWithCRD(structuredAllocatedClaimWithParams), podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(claimWithCRD(structuredAllocatedClaimWithParams), podWithClaimName),
},
},
},
"missing-class-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters /* classParameters, */, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `class parameters default/my-resource-class not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"missing-claim-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{ /* claimParameters, */ classParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim parameters default/my-pod-my-resource not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"claim-parameters-CEL-runtime-error": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{breakCELInClaimParameters(claimParameters), classParameters, workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `checking structured parameters failed: checking node "worker" and resources of driver "some-driver": evaluate request CEL expression: no such key: no-such-attribute`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"class-parameters-CEL-runtime-error": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters, breakCELInClassParameters(classParameters), workerNodeSlice},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `checking structured parameters failed: checking node "worker" and resources of driver "some-driver": evaluate filter CEL expression: no such key: no-such-attribute`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"waiting-for-deallocation": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{deallocatingClaim},
@ -496,6 +893,49 @@ func TestPlugin(t *testing.T) {
},
},
},
"wrong-topology-delayed-allocation-structured": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedDelayedClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with delayed allocation and structured parameters get deallocated immediately.
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
Allocation("", nil).
Obj()
},
},
status: framework.NewStatus(framework.Unschedulable, `deallocation of ResourceClaim completed`),
},
},
},
"wrong-topology-immediate-allocation-structured": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedImmediateClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with immediate allocation don't. The allocation is considered
// more important than the pod and pods need to wait for the node to
// become available again.
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"good-topology": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
@ -535,6 +975,30 @@ func TestPlugin(t *testing.T) {
},
},
},
"bind-failure-structured": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{structuredAllocatedClaimWithGoodTopology},
want: want{
prebind: result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Obj()
},
},
},
unreserveAfterBindFailure: &result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
out := in.DeepCopy()
out.Status.ReservedFor = []resourcev1alpha2.ResourceClaimConsumerReference{}
return out
},
},
},
},
},
"reserved-okay": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
@ -560,7 +1024,7 @@ func TestPlugin(t *testing.T) {
if nodes == nil {
nodes = []*v1.Node{workerNode}
}
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings)
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings, tc.objs)
testCtx.p.enabled = !tc.disable
initialObjects := testCtx.listAll(t)
@ -638,6 +1102,15 @@ func TestPlugin(t *testing.T) {
testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status)
})
} else {
if tc.want.unreserveBeforePreBind != nil {
initialObjects = testCtx.listAll(t)
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("unreserveBeforePreBind", func(t *testing.T) {
testCtx.verify(t, *tc.want.unreserveBeforePreBind, initialObjects, nil, status)
})
return
}
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind)
status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
@ -701,7 +1174,24 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me
sortObjects(wantObjects)
stripObjects(wantObjects)
stripObjects(objects)
assert.Equal(t, wantObjects, objects)
// Sometimes assert strips the diff too much, let's do it ourselves...
if diff := cmp.Diff(wantObjects, objects); diff != "" {
t.Errorf("Stored objects are different (- expected, + actual):\n%s", diff)
}
var expectAssumedClaims []metav1.Object
if expected.assumedClaim != nil {
expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim)
}
actualAssumedClaims := tc.listAssumedClaims()
assert.Equal(t, expectAssumedClaims, actualAssumedClaims, "assumed claims")
var expectInFlightClaims []metav1.Object
if expected.inFlightClaim != nil {
expectInFlightClaims = append(expectInFlightClaims, expected.inFlightClaim)
}
actualInFlightClaims := tc.listInFlightClaims()
assert.Equal(t, expectInFlightClaims, actualInFlightClaims, "in-flight claims")
}
// setGVK is implemented by metav1.TypeMeta and thus all API objects, in
@ -741,6 +1231,32 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
return
}
func (tc *testContext) listAssumedClaims() []metav1.Object {
var assumedClaims []metav1.Object
for _, obj := range tc.p.claimAssumeCache.List(nil) {
claim := obj.(*resourcev1alpha2.ResourceClaim)
obj, _ := tc.p.claimAssumeCache.Get(claim.Namespace + "/" + claim.Name)
apiObj, _ := tc.p.claimAssumeCache.GetAPIObj(claim.Namespace + "/" + claim.Name)
if obj != apiObj {
assumedClaims = append(assumedClaims, claim)
}
}
sortObjects(assumedClaims)
stripObjects(assumedClaims)
return assumedClaims
}
func (tc *testContext) listInFlightClaims() []metav1.Object {
var inFlightClaims []metav1.Object
tc.p.inFlightAllocations.Range(func(key, value any) bool {
inFlightClaims = append(inFlightClaims, value.(*resourcev1alpha2.ResourceClaim))
return true
})
sortObjects(inFlightClaims)
stripObjects(inFlightClaims)
return inFlightClaims
}
// updateAPIServer modifies objects and stores any changed object in the API server.
func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
modified := update(t, objects, updates)
@ -801,30 +1317,36 @@ func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Obje
return updated
}
func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceClaim, classes []*resourcev1alpha2.ResourceClass, schedulings []*resourcev1alpha2.PodSchedulingContext) (result *testContext) {
func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceClaim, classes []*resourcev1alpha2.ResourceClass, schedulings []*resourcev1alpha2.PodSchedulingContext, objs []apiruntime.Object) (result *testContext) {
t.Helper()
tc := &testContext{}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
tc.ctx = ctx
tCtx := ktesting.Init(t)
tc.ctx = tCtx
tc.client = fake.NewSimpleClientset()
tc.client = fake.NewSimpleClientset(objs...)
reactor := createReactor(tc.client.Tracker())
tc.client.PrependReactor("*", "*", reactor)
// Quick-and-dirty workaround for fake client storing ResourceClassParameters and
// ResourceClaimParameters as "resourceclassparameterses" and "resourceclaimparameterses":
// intercept the correct LIST from the informers and reply to them with the incorrect
// LIST result.
tc.client.PrependReactor("list", "resourceclaimparameters", createListReactor(tc.client.Tracker(), "ResourceClaimParameters"))
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory),
}
fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil {
t.Fatal(err)
}
pl, err := New(ctx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
pl, err := New(tCtx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
if err != nil {
t.Fatal(err)
}
@ -848,7 +1370,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.informerFactory.Start(tc.ctx.Done())
t.Cleanup(func() {
// Need to cancel before waiting for the shutdown.
cancel()
tCtx.Cancel("test is done")
// Now we can wait for all goroutines to stop.
tc.informerFactory.Shutdown()
})
@ -896,7 +1418,7 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
}
obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter)))
uidCounter++
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
obj.SetResourceVersion(fmt.Sprintf("%d", resourceVersionCounter))
resourceVersionCounter++
case "update":
uid := obj.GetUID()
@ -920,13 +1442,24 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
return true, nil, errors.New("ResourceVersion must match the object that gets updated")
}
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
obj.SetResourceVersion(fmt.Sprintf("%d", resourceVersionCounter))
resourceVersionCounter++
}
return false, nil, nil
}
}
func createListReactor(tracker cgotesting.ObjectTracker, kind string) func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
return func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
// listAction := action.(cgotesting.ListAction)
gvr := action.GetResource()
ns := action.GetNamespace()
gvr.Resource += "es"
list, err := tracker.List(gvr, schema.GroupVersionKind{Group: gvr.Group, Version: gvr.Version, Kind: kind}, ns)
return true, list, err
}
}
func Test_isSchedulableAfterClaimChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
@ -999,12 +1532,30 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}(),
expectedHint: framework.Queue,
},
"structured-claim-deallocate": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
oldObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
return claim
}(),
newObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
claim.Status.Allocation = nil
return claim
}(),
// TODO (https://github.com/kubernetes/kubernetes/issues/123697): don't wake up
// claims not using structured parameters.
expectedHint: framework.Queue,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, nil)
testCtx := setup(t, nil, tc.claims, nil, nil, nil)
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
// Update the informer because the lister gets called and must have the claim.
store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore()
@ -1142,7 +1693,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()
logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings)
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil)
actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)

View File

@ -0,0 +1,327 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namedresources
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
func instance(allocated bool, name string, attributes ...resourceapi.NamedResourcesAttribute) InstanceAllocation {
return InstanceAllocation{
Allocated: allocated,
Instance: &resourceapi.NamedResourcesInstance{
Name: name,
Attributes: attributes,
},
}
}
func TestModel(t *testing.T) {
testcases := map[string]struct {
resources []*resourceapi.NamedResourcesResources
allocations []*resourceapi.NamedResourcesAllocationResult
expectModel Model
}{
"empty": {},
"nil": {
resources: []*resourceapi.NamedResourcesResources{nil},
allocations: []*resourceapi.NamedResourcesAllocationResult{nil},
},
"available": {
resources: []*resourceapi.NamedResourcesResources{
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "a"},
{Name: "b"},
},
},
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "x"},
{Name: "y"},
},
},
},
expectModel: Model{Instances: []InstanceAllocation{instance(false, "a"), instance(false, "b"), instance(false, "x"), instance(false, "y")}},
},
"allocated": {
resources: []*resourceapi.NamedResourcesResources{
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "a"},
{Name: "b"},
},
},
{
Instances: []resourceapi.NamedResourcesInstance{
{Name: "x"},
{Name: "y"},
},
},
},
allocations: []*resourceapi.NamedResourcesAllocationResult{
{
Name: "something-else",
},
{
Name: "a",
},
},
expectModel: Model{Instances: []InstanceAllocation{instance(true, "a"), instance(false, "b"), instance(false, "x"), instance(false, "y")}},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
var actualModel Model
for _, resources := range tc.resources {
AddResources(&actualModel, resources)
}
for _, allocation := range tc.allocations {
AddAllocation(&actualModel, allocation)
}
require.Equal(t, tc.expectModel, actualModel)
})
}
}
func TestController(t *testing.T) {
filterAny := &resourceapi.NamedResourcesFilter{
Selector: "true",
}
filterNone := &resourceapi.NamedResourcesFilter{
Selector: "false",
}
filterBrokenType := &resourceapi.NamedResourcesFilter{
Selector: "1",
}
filterBrokenEvaluation := &resourceapi.NamedResourcesFilter{
Selector: `attributes.bool["no-such-attribute"]`,
}
filterAttribute := &resourceapi.NamedResourcesFilter{
Selector: `attributes.bool["usable"]`,
}
requestAny := &resourceapi.NamedResourcesRequest{
Selector: "true",
}
requestNone := &resourceapi.NamedResourcesRequest{
Selector: "false",
}
requestBrokenType := &resourceapi.NamedResourcesRequest{
Selector: "1",
}
requestBrokenEvaluation := &resourceapi.NamedResourcesRequest{
Selector: `attributes.bool["no-such-attribute"]`,
}
requestAttribute := &resourceapi.NamedResourcesRequest{
Selector: `attributes.bool["usable"]`,
}
instance1 := "instance-1"
oneInstance := Model{
Instances: []InstanceAllocation{{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance1,
},
}},
}
instance2 := "instance-2"
twoInstances := Model{
Instances: []InstanceAllocation{
{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance1,
Attributes: []resourceapi.NamedResourcesAttribute{{
Name: "usable",
NamedResourcesAttributeValue: resourceapi.NamedResourcesAttributeValue{
BoolValue: ptr.To(false),
},
}},
},
},
{
Instance: &resourceapi.NamedResourcesInstance{
Name: instance2,
Attributes: []resourceapi.NamedResourcesAttribute{{
Name: "usable",
NamedResourcesAttributeValue: resourceapi.NamedResourcesAttributeValue{
BoolValue: ptr.To(true),
},
}},
},
},
},
}
testcases := map[string]struct {
model Model
filter *resourceapi.NamedResourcesFilter
requests []*resourceapi.NamedResourcesRequest
expectCreateErr bool
expectAllocation []string
expectAllocateErr bool
}{
"empty": {},
"broken-filter": {
filter: filterBrokenType,
expectCreateErr: true,
},
"broken-request": {
requests: []*resourceapi.NamedResourcesRequest{requestBrokenType},
expectCreateErr: true,
},
"no-resources": {
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: true,
},
"okay": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocation: []string{instance1},
},
"filter-mismatch": {
model: oneInstance,
filter: filterNone,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: true,
},
"request-mismatch": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestNone},
expectAllocateErr: true,
},
"many": {
model: twoInstances,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny, requestAny},
expectAllocation: []string{instance1, instance2},
},
"too-many": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAny, requestAny},
expectAllocateErr: true,
},
"filter-evaluation-error": {
model: oneInstance,
filter: filterBrokenEvaluation,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocateErr: true,
},
"request-evaluation-error": {
model: oneInstance,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestBrokenEvaluation},
expectAllocateErr: true,
},
"filter-attribute": {
model: twoInstances,
filter: filterAttribute,
requests: []*resourceapi.NamedResourcesRequest{requestAny},
expectAllocation: []string{instance2},
},
"request-attribute": {
model: twoInstances,
filter: filterAny,
requests: []*resourceapi.NamedResourcesRequest{requestAttribute},
expectAllocation: []string{instance2},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
tCtx := ktesting.Init(t)
controller, createErr := NewClaimController(tc.filter, tc.requests)
if createErr != nil {
if !tc.expectCreateErr {
tCtx.Fatalf("unexpected create error: %v", createErr)
}
return
}
if tc.expectCreateErr {
tCtx.Fatalf("did not get expected create error")
}
allocation, createErr := controller.Allocate(tCtx, tc.model)
if createErr != nil {
if !tc.expectAllocateErr {
tCtx.Fatalf("unexpected allocate error: %v", createErr)
}
return
}
if tc.expectAllocateErr {
tCtx.Fatalf("did not get expected allocate error")
}
expectAllocation := []*resourceapi.NamedResourcesAllocationResult{}
for _, name := range tc.expectAllocation {
expectAllocation = append(expectAllocation, &resourceapi.NamedResourcesAllocationResult{Name: name})
}
require.Equal(tCtx, expectAllocation, allocation)
isSuitable, isSuitableErr := controller.NodeIsSuitable(tCtx, tc.model)
assert.Equal(tCtx, len(expectAllocation) == len(tc.requests), isSuitable, "is suitable")
assert.Equal(tCtx, createErr, isSuitableErr)
})
}
}

View File

@ -25,10 +25,8 @@ import (
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/klog/v2"
namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
)
// resources is a map "node name" -> "driver name" -> available and
@ -41,11 +39,22 @@ type ResourceModels struct {
NamedResources namedresourcesmodel.Model
}
// resourceSliceLister is the subset of resourcev1alpha2listers.ResourceSliceLister needed by
// newResourceModel.
type resourceSliceLister interface {
List(selector labels.Selector) (ret []*resourcev1alpha2.ResourceSlice, err error)
}
// assumeCacheLister is the subset of volumebinding.AssumeCache needed by newResourceModel.
type assumeCacheLister interface {
List(indexObj interface{}) []interface{}
}
// newResourceModel parses the available information about resources. Objects
// with an unknown structured parameter model silently ignored. An error gets
// logged later when parameters required for a pod depend on such an unknown
// model.
func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) {
func newResourceModel(logger klog.Logger, resourceSliceLister resourceSliceLister, claimAssumeCache assumeCacheLister, inFlightAllocations *sync.Map) (resources, error) {
model := make(resources)
slices, err := resourceSliceLister.List(labels.Everything())
@ -112,7 +121,7 @@ func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClas
p.parameters = append(p.parameters, request.VendorParameters)
p.requests = append(p.requests, request.ResourceRequestModel.NamedResources)
default:
return nil, fmt.Errorf("claim parameters %s: driverRequersts[%d].requests[%d]: no supported structured parameters found", klog.KObj(claimParameters), i, e)
return nil, fmt.Errorf("claim parameters %s: driverRequests[%d].requests[%d]: no supported structured parameters found", klog.KObj(claimParameters), i, e)
}
}
if len(p.requests) > 0 {

File diff suppressed because it is too large Load Diff

View File

@ -938,6 +938,16 @@ func (wrapper *ResourceClaimWrapper) AllocationMode(a resourcev1alpha2.Allocatio
return wrapper
}
// ParametersRef sets a reference to a ResourceClaimParameters.resource.k8s.io.
func (wrapper *ResourceClaimWrapper) ParametersRef(name string) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.ParametersRef = &resourcev1alpha2.ResourceClaimParametersReference{
Name: name,
Kind: "ResourceClaimParameters",
APIGroup: "resource.k8s.io",
}
return wrapper
}
// ResourceClassName sets the resource class name of the inner object.
func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.ResourceClassName = name
@ -945,11 +955,60 @@ func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceCla
}
// Allocation sets the allocation of the inner object.
func (wrapper *ResourceClaimWrapper) Allocation(allocation *resourcev1alpha2.AllocationResult) *ResourceClaimWrapper {
func (wrapper *ResourceClaimWrapper) Allocation(driverName string, allocation *resourcev1alpha2.AllocationResult) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.DriverName = driverName
wrapper.ResourceClaim.Status.Allocation = allocation
return wrapper
}
// Structured turns a "normal" claim into one which was allocated via structured parameters.
// This modifies the allocation result and adds the reserved finalizer if the claim
// is allocated. The claim has to become local to a node. The assumption is that
// "named resources" are used.
func (wrapper *ResourceClaimWrapper) Structured(nodeName string, namedResourcesInstances ...string) *ResourceClaimWrapper {
if wrapper.ResourceClaim.Status.Allocation != nil {
wrapper.ResourceClaim.Finalizers = append(wrapper.ResourceClaim.Finalizers, resourcev1alpha2.Finalizer)
for i, resourceHandle := range wrapper.ResourceClaim.Status.Allocation.ResourceHandles {
resourceHandle.Data = ""
resourceHandle.StructuredData = &resourcev1alpha2.StructuredResourceHandle{
NodeName: nodeName,
}
wrapper.ResourceClaim.Status.Allocation.ResourceHandles[i] = resourceHandle
}
if len(wrapper.ResourceClaim.Status.Allocation.ResourceHandles) == 0 {
wrapper.ResourceClaim.Status.Allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{{
DriverName: wrapper.ResourceClaim.Status.DriverName,
StructuredData: &resourcev1alpha2.StructuredResourceHandle{
NodeName: nodeName,
},
}}
}
for _, resourceHandle := range wrapper.ResourceClaim.Status.Allocation.ResourceHandles {
for _, name := range namedResourcesInstances {
result := resourcev1alpha2.DriverAllocationResult{
AllocationResultModel: resourcev1alpha2.AllocationResultModel{
NamedResources: &resourcev1alpha2.NamedResourcesAllocationResult{
Name: name,
},
},
}
resourceHandle.StructuredData.Results = append(resourceHandle.StructuredData.Results, result)
}
}
wrapper.ResourceClaim.Status.Allocation.Shareable = true
wrapper.ResourceClaim.Status.Allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{
MatchExpressions: []v1.NodeSelectorRequirement{{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
}},
}},
}
}
return wrapper
}
// DeallocationRequested sets that field of the inner object.
func (wrapper *ResourceClaimWrapper) DeallocationRequested(deallocationRequested bool) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.DeallocationRequested = deallocationRequested
@ -962,6 +1021,11 @@ func (wrapper *ResourceClaimWrapper) ReservedFor(consumers ...resourcev1alpha2.R
return wrapper
}
// ReservedFor sets that field of the inner object given information about one pod.
func (wrapper *ResourceClaimWrapper) ReservedForPod(podName string, podUID types.UID) *ResourceClaimWrapper {
return wrapper.ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: podUID})
}
// PodSchedulingWrapper wraps a PodSchedulingContext inside.
type PodSchedulingWrapper struct {
resourcev1alpha2.PodSchedulingContext
@ -1041,3 +1105,131 @@ func (wrapper *PodSchedulingWrapper) ResourceClaims(statuses ...resourcev1alpha2
wrapper.Status.ResourceClaims = statuses
return wrapper
}
type ResourceSliceWrapper struct {
resourcev1alpha2.ResourceSlice
}
func MakeResourceSlice(nodeName, driverName string) *ResourceSliceWrapper {
wrapper := new(ResourceSliceWrapper)
wrapper.Name = nodeName + "-" + driverName
wrapper.NodeName = nodeName
wrapper.DriverName = driverName
return wrapper
}
func (wrapper *ResourceSliceWrapper) Obj() *resourcev1alpha2.ResourceSlice {
return &wrapper.ResourceSlice
}
func (wrapper *ResourceSliceWrapper) NamedResourcesInstances(names ...string) *ResourceSliceWrapper {
wrapper.ResourceModel = resourcev1alpha2.ResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{}}
for _, name := range names {
wrapper.ResourceModel.NamedResources.Instances = append(wrapper.ResourceModel.NamedResources.Instances,
resourcev1alpha2.NamedResourcesInstance{Name: name},
)
}
return wrapper
}
type ClaimParametersWrapper struct {
resourcev1alpha2.ResourceClaimParameters
}
func MakeClaimParameters() *ClaimParametersWrapper {
return &ClaimParametersWrapper{}
}
func (wrapper *ClaimParametersWrapper) Obj() *resourcev1alpha2.ResourceClaimParameters {
return &wrapper.ResourceClaimParameters
}
func (wrapper *ClaimParametersWrapper) Name(s string) *ClaimParametersWrapper {
wrapper.SetName(s)
return wrapper
}
func (wrapper *ClaimParametersWrapper) UID(s string) *ClaimParametersWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
func (wrapper *ClaimParametersWrapper) Namespace(s string) *ClaimParametersWrapper {
wrapper.SetNamespace(s)
return wrapper
}
func (wrapper *ClaimParametersWrapper) Shareable(value bool) *ClaimParametersWrapper {
wrapper.ResourceClaimParameters.Shareable = value
return wrapper
}
func (wrapper *ClaimParametersWrapper) GeneratedFrom(value *resourcev1alpha2.ResourceClaimParametersReference) *ClaimParametersWrapper {
wrapper.ResourceClaimParameters.GeneratedFrom = value
return wrapper
}
func (wrapper *ClaimParametersWrapper) NamedResourcesRequests(driverName string, selectors ...string) *ClaimParametersWrapper {
requests := resourcev1alpha2.DriverRequests{
DriverName: driverName,
}
for _, selector := range selectors {
request := resourcev1alpha2.ResourceRequest{
ResourceRequestModel: resourcev1alpha2.ResourceRequestModel{
NamedResources: &resourcev1alpha2.NamedResourcesRequest{
Selector: selector,
},
},
}
requests.Requests = append(requests.Requests, request)
}
wrapper.DriverRequests = append(wrapper.DriverRequests, requests)
return wrapper
}
type ClassParametersWrapper struct {
resourcev1alpha2.ResourceClassParameters
}
func MakeClassParameters() *ClassParametersWrapper {
return &ClassParametersWrapper{}
}
func (wrapper *ClassParametersWrapper) Obj() *resourcev1alpha2.ResourceClassParameters {
return &wrapper.ResourceClassParameters
}
func (wrapper *ClassParametersWrapper) Name(s string) *ClassParametersWrapper {
wrapper.SetName(s)
return wrapper
}
func (wrapper *ClassParametersWrapper) UID(s string) *ClassParametersWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
func (wrapper *ClassParametersWrapper) Namespace(s string) *ClassParametersWrapper {
wrapper.SetNamespace(s)
return wrapper
}
func (wrapper *ClassParametersWrapper) GeneratedFrom(value *resourcev1alpha2.ResourceClassParametersReference) *ClassParametersWrapper {
wrapper.ResourceClassParameters.GeneratedFrom = value
return wrapper
}
func (wrapper *ClassParametersWrapper) NamedResourcesFilters(driverName string, selectors ...string) *ClassParametersWrapper {
for _, selector := range selectors {
filter := resourcev1alpha2.ResourceFilter{
DriverName: driverName,
ResourceFilterModel: resourcev1alpha2.ResourceFilterModel{
NamedResources: &resourcev1alpha2.NamedResourcesFilter{
Selector: selector,
},
},
}
wrapper.Filters = append(wrapper.Filters, filter)
}
return wrapper
}

View File

@ -28,8 +28,10 @@ import (
"sync"
"time"
"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"google.golang.org/grpc"
appsv1 "k8s.io/api/apps/v1"
@ -39,6 +41,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
resourceapiinformer "k8s.io/client-go/informers/resource/v1alpha2"
"k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
@ -84,10 +88,56 @@ func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes.NodeNames = append(nodes.NodeNames, node.Name)
}
framework.Logf("testing on nodes %v", nodes.NodeNames)
// Watch claims in the namespace. This is useful for monitoring a test
// and enables additional sanity checks.
claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil)
cancelCtx, cancel := context.WithCancelCause(context.Background())
var wg sync.WaitGroup
ginkgo.DeferCleanup(func() {
cancel(errors.New("test has completed"))
wg.Wait()
})
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourcev1alpha2.ResourceClaim)
framework.Logf("New claim:\n%s", format.Object(claim, 1))
validateClaim(claim)
},
UpdateFunc: func(oldObj, newObj any) {
defer ginkgo.GinkgoRecover()
oldClaim := oldObj.(*resourcev1alpha2.ResourceClaim)
newClaim := newObj.(*resourcev1alpha2.ResourceClaim)
framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim))
validateClaim(newClaim)
},
DeleteFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourcev1alpha2.ResourceClaim)
framework.Logf("Deleted claim:\n%s", format.Object(claim, 1))
},
})
framework.ExpectNoError(err, "AddEventHandler")
wg.Add(1)
go func() {
defer wg.Done()
claimInformer.Run(cancelCtx.Done())
}()
})
return nodes
}
func validateClaim(claim *resourcev1alpha2.ResourceClaim) {
// The apiserver doesn't enforce that a claim always has a finalizer
// while being allocated. This is a convention that whoever allocates a
// claim has to follow to prevent using a claim that is at risk of
// being deleted.
if claim.Status.Allocation != nil && len(claim.Finalizers) == 0 {
framework.Failf("Invalid claim: allocated without any finalizer:\n%s", format.Object(claim, 1))
}
}
// NewDriver sets up controller (as client of the cluster) and
// kubelet plugin (via proxy) before the test runs. It cleans
// up after the test.

View File

@ -128,8 +128,25 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
// arbitrary types we can simply fake somthing here.
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
b.create(ctx, claim)
claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get claim")
claim.Finalizers = append(claim.Finalizers, "e2e.test/delete-protection")
claim, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Update(ctx, claim, metav1.UpdateOptions{})
framework.ExpectNoError(err, "add claim finalizer")
ginkgo.DeferCleanup(func(ctx context.Context) {
claim.Status.Allocation = nil
claim.Status.ReservedFor = nil
claim, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
framework.ExpectNoError(err, "update claim")
claim.Finalizers = nil
_, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Update(ctx, claim, metav1.UpdateOptions{})
framework.ExpectNoError(err, "remove claim finalizer")
})
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
claim.Status.DriverName = driver.Name
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{
@ -138,7 +155,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
Name: "thing",
UID: "12345",
})
_, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
claim, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
framework.ExpectNoError(err, "update claim")
pod := b.podExternal()
@ -925,7 +942,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
"NodeName": gomega.Equal(nodeName),
"DriverName": gomega.Equal(driver.Name),
"ResourceModel": gomega.Equal(resourcev1alpha2.ResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{
Instances: []resourcev1alpha2.NamedResourcesInstance{{Name: "instance-0"}},
Instances: []resourcev1alpha2.NamedResourcesInstance{{Name: "instance-00"}},
}}),
}),
)

View File

@ -25,12 +25,14 @@ import (
"path/filepath"
"sync"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
@ -46,10 +48,12 @@ type ExamplePlugin struct {
cdiDir string
driverName string
nodeName string
instances sets.Set[string]
mutex sync.Mutex
prepared map[ClaimID]bool
gRPCCalls []GRPCCall
mutex sync.Mutex
instancesInUse sets.Set[string]
prepared map[ClaimID]any
gRPCCalls []GRPCCall
block bool
}
@ -117,13 +121,19 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string
}
}
ex := &ExamplePlugin{
stopCh: ctx.Done(),
logger: logger,
fileOps: fileOps,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
prepared: make(map[ClaimID]bool),
stopCh: ctx.Done(),
logger: logger,
fileOps: fileOps,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
instances: sets.New[string](),
instancesInUse: sets.New[string](),
prepared: make(map[ClaimID]any),
}
for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
ex.instances.Insert(fmt.Sprintf("instance-%02d", i))
}
opts = append(opts,
@ -174,14 +184,32 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
return nil, ctx.Err()
}
ex.mutex.Lock()
defer ex.mutex.Unlock()
deviceName := "claim-" + req.ClaimUid
vendor := ex.driverName
class := "test"
dev := vendor + "/" + class + "=" + deviceName
resp := &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{dev}}
claimID := ClaimID{Name: req.ClaimName, UID: req.ClaimUid}
if _, ok := ex.prepared[claimID]; ok {
// Idempotent call, nothing to do.
return resp, nil
}
// Determine environment variables.
var p parameters
var resourceHandle any
var instanceNames []string
switch len(req.StructuredResourceHandle) {
case 0:
// Control plane controller did the allocation.
if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil {
return nil, fmt.Errorf("unmarshal resource handle: %w", err)
}
resourceHandle = req.ResourceHandle
case 1:
// Scheduler did the allocation with structured parameters.
handle := req.StructuredResourceHandle[0]
@ -199,7 +227,23 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
if err := extractParameters(result.VendorRequestParameters, &p.EnvVars, "user"); err != nil {
return nil, err
}
namedResources := result.NamedResources
if namedResources == nil {
return nil, errors.New("missing named resources allocation result")
}
instanceName := namedResources.Name
if instanceName == "" {
return nil, errors.New("empty named resources instance name")
}
if !ex.instances.Has(instanceName) {
return nil, fmt.Errorf("unknown allocated instance %q", instanceName)
}
if ex.instancesInUse.Has(instanceName) {
return nil, fmt.Errorf("resource instance %q used more than once", instanceName)
}
instanceNames = append(instanceNames, instanceName)
}
resourceHandle = handle
default:
// Huh?
return nil, fmt.Errorf("invalid length of NodePrepareResourceRequest.StructuredResourceHandle: %d", len(req.StructuredResourceHandle))
@ -216,9 +260,6 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
envs = append(envs, key+"="+val)
}
deviceName := "claim-" + req.ClaimUid
vendor := ex.driverName
class := "test"
spec := &spec{
Version: "0.3.0", // This has to be a version accepted by the runtimes.
Kind: vendor + "/" + class,
@ -242,12 +283,10 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
return nil, fmt.Errorf("failed to write CDI file %v", err)
}
dev := vendor + "/" + class + "=" + deviceName
resp := &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{dev}}
ex.mutex.Lock()
defer ex.mutex.Unlock()
ex.prepared[ClaimID{Name: req.ClaimName, UID: req.ClaimUid}] = true
ex.prepared[claimID] = resourceHandle
for _, instanceName := range instanceNames {
ex.instancesInUse.Insert(instanceName)
}
logger.V(3).Info("CDI file created", "path", filePath, "device", dev)
return resp, nil
@ -316,7 +355,34 @@ func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1
ex.mutex.Lock()
defer ex.mutex.Unlock()
delete(ex.prepared, ClaimID{Name: req.ClaimName, UID: req.ClaimUid})
claimID := ClaimID{Name: req.ClaimName, UID: req.ClaimUid}
resp := &drapbv1alpha2.NodeUnprepareResourceResponse{}
expectedResourceHandle, ok := ex.prepared[claimID]
if !ok {
// Idempotent call, nothing to do.
return resp, nil
}
var actualResourceHandle any = req.ResourceHandle
if req.StructuredResourceHandle != nil {
if len(req.StructuredResourceHandle) != 1 {
return nil, fmt.Errorf("unexpected number of entries in StructuredResourceHandle: %d", len(req.StructuredResourceHandle))
}
actualResourceHandle = req.StructuredResourceHandle[0]
}
if diff := cmp.Diff(expectedResourceHandle, actualResourceHandle); diff != "" {
return nil, fmt.Errorf("difference between expected (-) and actual resource handle (+):\n%s", diff)
}
delete(ex.prepared, claimID)
if structuredResourceHandle := req.StructuredResourceHandle; structuredResourceHandle != nil {
for _, handle := range structuredResourceHandle {
for _, result := range handle.Results {
instanceName := result.NamedResources.Name
ex.instancesInUse.Delete(instanceName)
}
}
}
return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
}
@ -327,10 +393,11 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
}
for _, claimReq := range req.Claims {
_, err := ex.NodeUnprepareResource(ctx, &drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
StructuredResourceHandle: claimReq.StructuredResourceHandle,
})
if err != nil {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodeUnprepareResourceResponse{
@ -349,9 +416,9 @@ func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAn
return status.New(codes.Unimplemented, "node resource support disabled").Err()
}
instances := make([]resourceapi.NamedResourcesInstance, ex.fileOps.NumResourceInstances)
for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
instances[i].Name = fmt.Sprintf("instance-%d", i)
instances := make([]resourceapi.NamedResourcesInstance, len(ex.instances))
for i, name := range sets.List(ex.instances) {
instances[i].Name = name
}
resp := &drapbv1alpha3.NodeListAndWatchResourcesResponse{
Resources: []*resourceapi.ResourceModel{