Merge pull request #118209 from pohly/dra-pre-scheduled-pods

dra: pre-scheduled pods
This commit is contained in:
Kubernetes Prow Robot 2023-07-13 14:43:37 -07:00 committed by GitHub
commit bea27f82d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 545 additions and 141 deletions

View File

@ -356,6 +356,7 @@ func startResourceClaimController(ctx context.Context, controllerContext Control
klog.FromContext(ctx),
controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Resource().V1alpha2().PodSchedulingContexts(),
controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaims(),
controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaimTemplates())
if err != nil {

View File

@ -87,6 +87,13 @@ type Controller struct {
podLister v1listers.PodLister
podSynced cache.InformerSynced
// podSchedulingList is the shared PodSchedulingContext lister used to
// fetch scheduling objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
// as immutable.
podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister
podSchedulingSynced cache.InformerSynced
// templateLister is the shared ResourceClaimTemplate lister used to
// fetch template objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
@ -119,20 +126,23 @@ func NewController(
logger klog.Logger,
kubeClient clientset.Interface,
podInformer v1informers.PodInformer,
podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer,
claimInformer resourcev1alpha2informers.ResourceClaimInformer,
templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) {
ec := &Controller{
kubeClient: kubeClient,
podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimsSynced: claimInformer.Informer().HasSynced,
templateLister: templateInformer.Lister(),
templatesSynced: templateInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
deletedObjects: newUIDCache(maxUIDCacheEntries),
kubeClient: kubeClient,
podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced,
podSchedulingLister: podSchedulingInformer.Lister(),
podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimsSynced: claimInformer.Informer().HasSynced,
templateLister: templateInformer.Lister(),
templatesSynced: templateInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
deletedObjects: newUIDCache(maxUIDCacheEntries),
}
metrics.RegisterMetrics()
@ -152,13 +162,16 @@ func NewController(
}
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ec.onResourceClaimAddOrUpdate(logger, obj)
logger.V(6).Info("new claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, false)
},
UpdateFunc: func(old, updated interface{}) {
ec.onResourceClaimAddOrUpdate(logger, updated)
logger.V(6).Info("updated claim", "claimDump", updated)
ec.enqueueResourceClaim(logger, updated, false)
},
DeleteFunc: func(obj interface{}) {
ec.onResourceClaimDelete(logger, obj)
logger.V(6).Info("deleted claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, true)
},
}); err != nil {
return nil, err
@ -199,22 +212,24 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
pod, ok := obj.(*v1.Pod)
if !ok {
// Not a pod?!
logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
return
}
if deleted {
ec.deletedObjects.Add(pod.UID)
}
if len(pod.Spec.ResourceClaims) == 0 {
// Nothing to do for it at all.
return
}
if deleted {
logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
ec.deletedObjects.Add(pod.UID)
}
logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
// Release reservations of a deleted or completed pod?
if deleted || isPodDone(pod) {
if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims {
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &podClaim)
switch {
@ -222,68 +237,150 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
// Either the claim was not created (nothing to do here) or
// the API changed. The later will also get reported elsewhere,
// so here it's just a debug message.
klog.TODO().V(6).Info("Nothing to do for claim during pod change", "err", err)
logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason)
case claimName != nil:
key := claimKeyPrefix + pod.Namespace + "/" + *claimName
logger.V(6).Info("pod is deleted or done, process claim", "pod", klog.KObj(pod), "key", key)
ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + *claimName)
logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason)
ec.queue.Add(key)
default:
// Nothing to do, claim wasn't generated.
klog.TODO().V(6).Info("Nothing to do for skipped claim during pod change")
logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason)
}
}
}
// Create ResourceClaim for inline templates?
if pod.DeletionTimestamp == nil {
for _, podClaim := range pod.Spec.ResourceClaims {
needsWork, reason := ec.podNeedsWork(pod)
if needsWork {
logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
return
}
logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
}
func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
if deleted {
return false, "pod got removed"
}
if podutil.IsPodTerminal(pod) {
return false, "pod has terminated"
}
if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
return false, "pod got deleted before scheduling"
}
// Still needs claims.
return true, "pod might run"
}
// podNeedsWork checks whether a new or modified pod needs to be processed
// further by a worker. It returns a boolean with the result and an explanation
// for it.
func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
if pod.DeletionTimestamp != nil {
// Nothing else to do for the pod.
return false, "pod is deleted"
}
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
if err != nil {
return true, err.Error()
}
// If the claimName is nil, then it has been determined before
// that the claim is not needed.
if claimName == nil {
return false, "claim is not needed"
}
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
if apierrors.IsNotFound(err) {
if podClaim.Source.ResourceClaimTemplateName != nil {
// It has at least one inline template, work on it.
key := podKeyPrefix + pod.Namespace + "/" + pod.Name
logger.V(6).Info("pod is not deleted, process it", "pod", klog.KObj(pod), "key", key)
ec.queue.Add(key)
break
return true, "must create ResourceClaim from template"
}
// User needs to create claim.
return false, "claim is missing and must be created by user"
}
if err != nil {
// Shouldn't happen.
return true, fmt.Sprintf("internal error while checking for claim: %v", err)
}
if checkOwner &&
resourceclaim.IsForPod(pod, claim) != nil {
// Cannot proceed with the pod unless that other claim gets deleted.
return false, "conflicting claim needs to be removed by user"
}
// This check skips over the reasons below that only apply
// when a pod has been scheduled already. We need to keep checking
// for more claims that might need to be created.
if pod.Spec.NodeName == "" {
continue
}
// Create PodSchedulingContext if the pod got scheduled without triggering
// delayed allocation.
//
// These can happen when:
// - a user created a pod with spec.nodeName set, perhaps for testing
// - some scheduler was used which is unaware of DRA
// - DRA was not enabled in kube-scheduler (version skew, configuration)
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
claim.Status.Allocation == nil {
scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
if apierrors.IsNotFound(err) {
return true, "need to create PodSchedulingContext for scheduled pod"
}
if err != nil {
// Shouldn't happen.
return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err)
}
if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
// Need to update PodSchedulingContext.
return true, "need to updated PodSchedulingContext for scheduled pod"
}
}
if claim.Status.Allocation != nil &&
!resourceclaim.IsReservedForPod(pod, claim) &&
resourceclaim.CanBeReserved(claim) {
// Need to reserve it.
return true, "need to reserve claim for pod"
}
}
return false, "nothing to do"
}
func (ec *Controller) onResourceClaimAddOrUpdate(logger klog.Logger, obj interface{}) {
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return
}
// When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added
// that already no longer exists.
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
logger.V(6).Info("claim is new or updated, process it", "key", key)
ec.queue.Add(key)
}
func (ec *Controller) onResourceClaimDelete(logger klog.Logger, obj interface{}) {
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return
if !deleted {
// When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added
// that already no longer exists.
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
ec.queue.Add(key)
} else {
logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
}
// Someone deleted a ResourceClaim, either intentionally or
// accidentally. If there is a pod referencing it because of
// an inline resource, then we should re-create the ResourceClaim.
// The common indexer does some prefiltering for us by
// limiting the list to those pods which reference
// the ResourceClaim.
// Also check whether this causes work for any of the currently
// known pods which use the ResourceClaim.
objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
if err != nil {
runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
logger.Error(err, "listing pods from cache")
return
}
if len(objs) == 0 {
logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
return
}
logger = klog.LoggerWithValues(logger, "claim", klog.KObj(claim))
for _, obj := range objs {
ec.enqueuePod(logger, obj, false)
}
@ -403,6 +500,49 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
}
}
if pod.Spec.NodeName == "" {
// Scheduler will handle PodSchedulingContext and reservations.
logger.V(5).Info("nothing to do for pod, scheduler will deal with it")
return nil
}
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
if err != nil {
return err
}
// If nil, then it has been determined that the claim is not needed
// and can be skipped.
if claimName == nil {
continue
}
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("retrieve claim: %v", err)
}
if checkOwner {
if err := resourceclaim.IsForPod(pod, claim); err != nil {
return err
}
}
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
claim.Status.Allocation == nil {
logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim))
return ec.ensurePodSchedulingContext(ctx, pod)
}
if claim.Status.Allocation != nil &&
!resourceclaim.IsReservedForPod(pod, claim) &&
resourceclaim.CanBeReserved(claim) {
logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim))
if err := ec.reserveForPod(ctx, pod, claim); err != nil {
return err
}
}
}
return nil
}
@ -562,6 +702,64 @@ func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceC
return nil, nil
}
func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error {
scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("retrieve PodSchedulingContext: %v", err)
}
if scheduling == nil {
scheduling = &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: pointer.Bool(true),
},
},
},
Spec: resourcev1alpha2.PodSchedulingContextSpec{
SelectedNode: pod.Spec.NodeName,
// There is no need for negotiation about
// potential and suitable nodes anymore, so
// PotentialNodes can be left empty.
},
}
if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create PodSchedulingContext: %v", err)
}
return nil
}
if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
scheduling := scheduling.DeepCopy()
scheduling.Spec.SelectedNode = pod.Spec.NodeName
if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err)
}
}
return nil
}
func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error {
claim = claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
})
if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("reserve claim for pod: %v", err)
}
return nil
}
func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
@ -716,7 +914,7 @@ func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) {
}
// podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (=
// namespace/name) for ResourceClaimTemplates in a given pod.
// namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod.
func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
@ -724,16 +922,14 @@ func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
}
keys := []string{}
for _, podClaim := range pod.Spec.ResourceClaims {
if podClaim.Source.ResourceClaimTemplateName != nil {
claimName, _, err := resourceclaim.Name(pod, &podClaim)
if err != nil || claimName == nil {
// Index functions are not supposed to fail, the caller will panic.
// For both error reasons (claim not created yet, unknown API)
// we simply don't index.
continue
}
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName))
claimName, _, err := resourceclaim.Name(pod, &podClaim)
if err != nil || claimName == nil {
// Index functions are not supposed to fail, the caller will panic.
// For both error reasons (claim not created yet, unknown API)
// we simply don't index.
continue
}
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName))
}
return keys, nil
}

View File

@ -40,6 +40,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/utils/pointer"
)
var (
@ -50,37 +51,53 @@ var (
podResourceClaimName = "acme-resource"
templateName = "my-template"
className = "my-resource-class"
nodeName = "worker"
testPod = makePod(testPodName, testNamespace, testPodUID)
testPodWithResource = makePod(testPodName, testNamespace, testPodUID, *makePodResourceClaim(podResourceClaimName, templateName))
otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II")
testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true))
generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true))
testClaimReserved = func() *resourcev1alpha2.ResourceClaim {
claim := testClaim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: testPodWithResource.Name,
UID: testPodWithResource.UID,
},
)
return claim
}()
testClaimReservedTwice = func() *resourcev1alpha2.ResourceClaim {
claim := testClaimReserved.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: otherTestPod.Name,
UID: otherTestPod.UID,
},
)
return claim
}()
testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true))
testClaimAllocated = allocateClaim(testClaim)
testClaimReserved = reserveClaim(testClaimAllocated, testPodWithResource)
testClaimReservedTwice = reserveClaim(testClaimReserved, otherTestPod)
generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true))
generatedTestClaimAllocated = allocateClaim(generatedTestClaim)
generatedTestClaimReserved = reserveClaim(generatedTestClaimAllocated, testPodWithResource)
conflictingClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, nil)
otherNamespaceClaim = makeClaim(testPodName+"-"+podResourceClaimName, otherNamespace, className, nil)
template = makeTemplate(templateName, testNamespace, className)
testPodWithNodeName = func() *v1.Pod {
pod := testPodWithResource.DeepCopy()
pod.Spec.NodeName = nodeName
pod.Status.ResourceClaimStatuses = append(pod.Status.ResourceClaimStatuses, v1.PodResourceClaimStatus{
Name: pod.Spec.ResourceClaims[0].Name,
ResourceClaimName: &generatedTestClaim.Name,
})
return pod
}()
podSchedulingContext = resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: testPodName,
Namespace: testNamespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: testPodName,
UID: testPodUID,
Controller: pointer.Bool(true),
},
},
},
Spec: resourcev1alpha2.PodSchedulingContextSpec{
SelectedNode: nodeName,
},
}
)
func init() {
@ -89,17 +106,18 @@ func init() {
func TestSyncHandler(t *testing.T) {
tests := []struct {
name string
key string
claims []*resourcev1alpha2.ResourceClaim
claimsInCache []*resourcev1alpha2.ResourceClaim
pods []*v1.Pod
podsLater []*v1.Pod
templates []*resourcev1alpha2.ResourceClaimTemplate
expectedClaims []resourcev1alpha2.ResourceClaim
expectedStatuses map[string][]v1.PodResourceClaimStatus
expectedError bool
expectedMetrics expectedMetrics
name string
key string
claims []*resourcev1alpha2.ResourceClaim
claimsInCache []*resourcev1alpha2.ResourceClaim
pods []*v1.Pod
podsLater []*v1.Pod
templates []*resourcev1alpha2.ResourceClaimTemplate
expectedClaims []resourcev1alpha2.ResourceClaim
expectedPodSchedulingContexts []resourcev1alpha2.PodSchedulingContext
expectedStatuses map[string][]v1.PodResourceClaimStatus
expectedError bool
expectedMetrics expectedMetrics
}{
{
name: "create",
@ -264,15 +282,35 @@ func TestSyncHandler(t *testing.T) {
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha2.ResourceClaim{testClaimReserved},
expectedClaims: []resourcev1alpha2.ResourceClaim{*testClaim},
name: "clear-reserved-delayed-allocation",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: []*resourcev1alpha2.ResourceClaim{testClaimReserved},
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := testClaimAllocated.DeepCopy()
claim.Status.DeallocationRequested = true
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-when-done",
name: "clear-reserved-immediate-allocation",
pods: []*v1.Pod{},
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := testClaimReserved.DeepCopy()
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claim := testClaimAllocated.DeepCopy()
claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return []resourcev1alpha2.ResourceClaim{*claim}
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-when-done-delayed-allocation",
pods: func() []*v1.Pod {
pods := []*v1.Pod{testPodWithResource.DeepCopy()}
pods[0].Status.Phase = v1.PodSucceeded
@ -285,9 +323,31 @@ func TestSyncHandler(t *testing.T) {
return claims
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claims := []resourcev1alpha2.ResourceClaim{*testClaimReserved.DeepCopy()}
claims := []resourcev1alpha2.ResourceClaim{*testClaimAllocated.DeepCopy()}
claims[0].OwnerReferences = nil
claims[0].Status.ReservedFor = nil
claims[0].Status.DeallocationRequested = true
return claims
}(),
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "clear-reserved-when-done-immediate-allocation",
pods: func() []*v1.Pod {
pods := []*v1.Pod{testPodWithResource.DeepCopy()}
pods[0].Status.Phase = v1.PodSucceeded
return pods
}(),
key: claimKey(testClaimReserved),
claims: func() []*resourcev1alpha2.ResourceClaim {
claims := []*resourcev1alpha2.ResourceClaim{testClaimReserved.DeepCopy()}
claims[0].OwnerReferences = nil
claims[0].Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return claims
}(),
expectedClaims: func() []resourcev1alpha2.ResourceClaim {
claims := []resourcev1alpha2.ResourceClaim{*testClaimAllocated.DeepCopy()}
claims[0].OwnerReferences = nil
claims[0].Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate
return claims
}(),
expectedMetrics: expectedMetrics{0, 0},
@ -312,6 +372,35 @@ func TestSyncHandler(t *testing.T) {
expectedClaims: nil,
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "trigger-allocation",
pods: []*v1.Pod{testPodWithNodeName},
key: podKey(testPodWithNodeName),
templates: []*resourcev1alpha2.ResourceClaimTemplate{template},
claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaim},
expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaim},
expectedStatuses: map[string][]v1.PodResourceClaimStatus{
testPodWithNodeName.Name: {
{Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
},
},
expectedPodSchedulingContexts: []resourcev1alpha2.PodSchedulingContext{podSchedulingContext},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "add-reserved",
pods: []*v1.Pod{testPodWithNodeName},
key: podKey(testPodWithNodeName),
templates: []*resourcev1alpha2.ResourceClaimTemplate{template},
claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaimAllocated},
expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaimReserved},
expectedStatuses: map[string][]v1.PodResourceClaimStatus{
testPodWithNodeName.Name: {
{Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
},
},
expectedMetrics: expectedMetrics{0, 0},
},
}
for _, tc := range tests {
@ -340,10 +429,11 @@ func TestSyncHandler(t *testing.T) {
setupMetrics()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
podSchedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
ec, err := NewController(klog.TODO(), fakeKubeClient, podInformer, claimInformer, templateInformer)
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, podSchedulingInformer, claimInformer, templateInformer)
if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err)
}
@ -402,6 +492,12 @@ func TestSyncHandler(t *testing.T) {
}
assert.Equal(t, tc.expectedStatuses, actualStatuses, "pod resource claim statuses")
scheduling, err := fakeKubeClient.ResourceV1alpha2().PodSchedulingContexts("").List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error while listing claims: %v", err)
}
assert.Equal(t, normalizeScheduling(tc.expectedPodSchedulingContexts), normalizeScheduling(scheduling.Items))
expectMetrics(t, tc.expectedMetrics)
})
}
@ -412,6 +508,7 @@ func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference)
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: resourcev1alpha2.ResourceClaimSpec{
ResourceClassName: classname,
AllocationMode: resourcev1alpha2.AllocationModeWaitForFirstConsumer,
},
}
if owner != nil {
@ -431,6 +528,7 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string,
},
Spec: resourcev1alpha2.ResourceClaimSpec{
ResourceClassName: classname,
AllocationMode: resourcev1alpha2.AllocationModeWaitForFirstConsumer,
},
}
if owner != nil {
@ -440,6 +538,26 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string,
return claim
}
func allocateClaim(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{
Shareable: true,
}
return claim
}
func reserveClaim(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
},
)
return claim
}
func makePodResourceClaim(name, templateName string) *v1.PodResourceClaim {
return &v1.PodResourceClaim{
Name: name,
@ -506,10 +624,22 @@ func normalizeClaims(claims []resourcev1alpha2.ResourceClaim) []resourcev1alpha2
if len(claims[i].Status.ReservedFor) == 0 {
claims[i].Status.ReservedFor = nil
}
if claims[i].Spec.AllocationMode == "" {
// This emulates defaulting.
claims[i].Spec.AllocationMode = resourcev1alpha2.AllocationModeWaitForFirstConsumer
}
}
return claims
}
func normalizeScheduling(scheduling []resourcev1alpha2.PodSchedulingContext) []resourcev1alpha2.PodSchedulingContext {
sort.Slice(scheduling, func(i, j int) bool {
return scheduling[i].Namespace < scheduling[j].Namespace ||
scheduling[i].Name < scheduling[j].Name
})
return scheduling
}
func createTestClient(objects ...runtime.Object) *fake.Clientset {
fakeClient := fake.NewSimpleClientset(objects...)
fakeClient.PrependReactor("create", "resourceclaims", createResourceClaimReactor())

View File

@ -213,6 +213,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create", "delete").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create", "update", "patch").Groups(resourceGroup).Resources("podschedulingcontexts").RuleOrDie(),
rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(),
rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
eventsRule(),

View File

@ -50,6 +50,14 @@ import (
type Controller interface {
// Run starts the controller.
Run(workers int)
// SetReservedFor can be used to disable adding the Pod which
// triggered allocation to the status.reservedFor. Normally,
// DRA drivers should always do that, so it's the default.
// But nothing in the protocol between the scheduler and
// a driver requires it, so at least for testing the control
// plane components it is useful to disable it.
SetReservedFor(enabled bool)
}
// Driver provides the actual allocation and deallocation operations.
@ -146,6 +154,7 @@ type controller struct {
name string
finalizer string
driver Driver
setReservedFor bool
kubeClient kubernetes.Interface
queue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
@ -207,6 +216,7 @@ func New(
name: name,
finalizer: name + "/deletion-protection",
driver: driver,
setReservedFor: true,
kubeClient: kubeClient,
rcLister: rcInformer.Lister(),
rcSynced: rcInformer.Informer().HasSynced,
@ -232,6 +242,10 @@ func New(
return ctrl
}
func (ctrl *controller) SetReservedFor(enabled bool) {
ctrl.setReservedFor = enabled
}
func resourceEventHandlerFuncs(logger *klog.Logger, ctrl *controller) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -609,7 +623,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc
claim := claimAllocation.Claim.DeepCopy()
claim.Status.Allocation = claimAllocation.Allocation
claim.Status.DriverName = ctrl.name
if selectedUser != nil {
if selectedUser != nil && ctrl.setReservedFor {
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)

View File

@ -98,16 +98,31 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
})
ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) {
parameters := b.parameters()
claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate)
// Pretend that the resource is allocated and reserved for some other entity.
// Until the resourceclaim controller learns to remove reservations for
// arbitrary types we can simply fake somthing here.
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
b.create(ctx, claim)
claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get claim")
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
claim.Status.DriverName = driver.Name
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{
APIGroup: "example.com",
Resource: "some",
Name: "thing",
UID: "12345",
})
_, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
framework.ExpectNoError(err, "update claim")
pod := b.podExternal()
// This bypasses scheduling and therefore the pod gets
// to run on the node although it never gets added to
// the `ReservedFor` field of the claim.
pod.Spec.NodeName = nodes.NodeNames[0]
b.create(ctx, parameters, claim, pod)
b.create(ctx, pod)
gomega.Consistently(func() error {
testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
@ -178,7 +193,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
})
ginkgo.Context("cluster", func() {
nodes := NewNodes(f, 1, 4)
nodes := NewNodes(f, 1, 1)
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
@ -210,10 +225,14 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod))
})
})
ginkgo.Context("cluster", func() {
nodes := NewNodes(f, 1, 4)
// claimTests tries out several different combinations of pods with
// claims, both inline and external.
claimTests := func(allocationMode resourcev1alpha2.AllocationMode) {
claimTests := func(b *builder, driver *Driver, allocationMode resourcev1alpha2.AllocationMode) {
ginkgo.It("supports simple pod referencing inline resource claim", func(ctx context.Context) {
parameters := b.parameters()
pod, template := b.podInline(allocationMode)
@ -322,35 +341,75 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
}).WithTimeout(f.Timeouts.PodStartSlow).Should(gomega.HaveField("Status.ContainerStatuses", gomega.ContainElements(gomega.HaveField("RestartCount", gomega.BeNumerically(">=", 2)))))
gomega.Expect(driver.Controller.GetNumAllocations()).To(gomega.Equal(int64(1)), "number of allocations")
})
ginkgo.It("must deallocate after use when using delayed allocation", func(ctx context.Context) {
parameters := b.parameters()
pod := b.podExternal()
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
b.create(ctx, parameters, claim, pod)
gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) {
return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
}).WithTimeout(f.Timeouts.PodDelete).ShouldNot(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil)))
b.testPod(ctx, f.ClientSet, pod)
ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(pod)))
framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
ginkgo.By("waiting for claim to get deallocated")
gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) {
return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
}).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil)))
})
// kube-controller-manager can trigger delayed allocation for pods where the
// node name was already selected when creating the pod. For immediate
// allocation, the creator has to ensure that the node matches the claims.
// This does not work for resource claim templates and only isn't
// a problem here because the resource is network-attached and available
// on all nodes.
ginkgo.It("supports scheduled pod referencing inline resource claim", func(ctx context.Context) {
parameters := b.parameters()
pod, template := b.podInline(allocationMode)
pod.Spec.NodeName = nodes.NodeNames[0]
b.create(ctx, parameters, pod, template)
b.testPod(ctx, f.ClientSet, pod)
})
ginkgo.It("supports scheduled pod referencing external resource claim", func(ctx context.Context) {
parameters := b.parameters()
claim := b.externalClaim(allocationMode)
pod := b.podExternal()
pod.Spec.NodeName = nodes.NodeNames[0]
b.create(ctx, parameters, claim, pod)
b.testPod(ctx, f.ClientSet, pod)
})
}
ginkgo.Context("with delayed allocation", func() {
claimTests(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
ginkgo.Context("with delayed allocation and setting ReservedFor", func() {
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
claimTests(b, driver, resourcev1alpha2.AllocationModeWaitForFirstConsumer)
})
ginkgo.Context("with delayed allocation and not setting ReservedFor", func() {
driver := NewDriver(f, nodes, func() app.Resources {
resources := networkResources()
resources.DontSetReservedFor = true
return resources
})
b := newBuilder(f, driver)
claimTests(b, driver, resourcev1alpha2.AllocationModeWaitForFirstConsumer)
})
ginkgo.Context("with immediate allocation", func() {
claimTests(resourcev1alpha2.AllocationModeImmediate)
})
ginkgo.It("must deallocate after use when using delayed allocation", func(ctx context.Context) {
parameters := b.parameters()
pod := b.podExternal()
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
b.create(ctx, parameters, claim, pod)
gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) {
return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
}).WithTimeout(f.Timeouts.PodDelete).ShouldNot(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil)))
b.testPod(ctx, f.ClientSet, pod)
ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(pod)))
framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
ginkgo.By("waiting for claim to get deallocated")
gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) {
return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
}).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil)))
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
claimTests(b, driver, resourcev1alpha2.AllocationModeImmediate)
})
})

View File

@ -38,10 +38,11 @@ import (
)
type Resources struct {
NodeLocal bool
Nodes []string
MaxAllocations int
Shareable bool
DontSetReservedFor bool
NodeLocal bool
Nodes []string
MaxAllocations int
Shareable bool
// AllocateWrapper, if set, gets called for each Allocate call.
AllocateWrapper AllocateWrapperType
@ -80,6 +81,7 @@ func NewController(clientset kubernetes.Interface, driverName string, resources
func (c *ExampleController) Run(ctx context.Context, workers int) {
informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */)
ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory)
ctrl.SetReservedFor(!c.resources.DontSetReservedFor)
informerFactory.Start(ctx.Done())
ctrl.Run(workers)
// If we get here, the context was canceled and we can wait for informer factory goroutines.

View File

@ -121,9 +121,10 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
podInformer := informerFactory.Core().V1().Pods()
schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, claimInformer, claimTemplateInformer)
claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, schedulingInformer, claimInformer, claimTemplateInformer)
if err != nil {
tb.Fatalf("Error creating claim controller: %v", err)
}