diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 2eab75dc589..71378705ebb 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -136,6 +136,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames)) d.Nodes = map[string]*app.ExamplePlugin{} d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io" + resources.DriverName = d.Name ctx, cancel := context.WithCancel(context.Background()) if d.NameSuffix != "" { @@ -147,7 +148,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { d.cleanup = append(d.cleanup, cancel) // The controller is easy: we simply connect to the API server. - d.Controller = app.NewController(d.f.ClientSet, d.Name, resources) + d.Controller = app.NewController(d.f.ClientSet, resources) d.wg.Add(1) go func() { defer d.wg.Done() diff --git a/test/e2e/dra/test-driver/app/controller.go b/test/e2e/dra/test-driver/app/controller.go index 68f7d90fb58..c647796678f 100644 --- a/test/e2e/dra/test-driver/app/controller.go +++ b/test/e2e/dra/test-driver/app/controller.go @@ -30,24 +30,101 @@ import ( v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/dynamic-resource-allocation/controller" "k8s.io/klog/v2" ) type Resources struct { + DriverName string DontSetReservedFor bool NodeLocal bool - Nodes []string - MaxAllocations int - Shareable bool + // Nodes is a fixed list of node names on which resources are + // available. Mutually exclusive with NodeLabels. + Nodes []string + // NodeLabels are labels which determine on which nodes resources are + // available. Mutually exclusive with Nodes. + NodeLabels labels.Set + MaxAllocations int + Shareable bool // AllocateWrapper, if set, gets called for each Allocate call. AllocateWrapper AllocateWrapperType } +func (r Resources) AllNodes(nodeLister listersv1.NodeLister) []string { + if len(r.NodeLabels) > 0 { + // Determine nodes with resources dynamically. + nodes, _ := nodeLister.List(labels.SelectorFromValidatedSet(r.NodeLabels)) + nodeNames := make([]string, 0, len(nodes)) + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) + } + return nodeNames + } + return r.Nodes +} + +func (r Resources) NewAllocation(node string, data []byte) *resourcev1alpha2.AllocationResult { + allocation := &resourcev1alpha2.AllocationResult{ + Shareable: r.Shareable, + } + allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{ + { + DriverName: r.DriverName, + Data: string(data), + }, + } + if node == "" && len(r.NodeLabels) > 0 { + // Available on all nodes matching the labels. + var requirements []v1.NodeSelectorRequirement + for key, value := range r.NodeLabels { + requirements = append(requirements, v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + }) + } + allocation.AvailableOnNodes = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: requirements, + }, + }, + } + } else { + var nodes []string + if node != "" { + // Local to one node. + nodes = append(nodes, node) + } else { + // Available on the fixed set of nodes. + nodes = r.Nodes + } + if len(nodes) > 0 { + allocation.AvailableOnNodes = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: nodes, + }, + }, + }, + }, + } + } + } + + return allocation +} + type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string, handler func(ctx context.Context, @@ -57,8 +134,8 @@ type AllocateWrapperType func(ctx context.Context, claimAllocations []*controlle type ExampleController struct { clientset kubernetes.Interface + nodeLister listersv1.NodeLister resources Resources - driverName string mutex sync.Mutex // allocated maps claim.UID to the node (if network-attached) or empty (if not). @@ -70,11 +147,10 @@ type ExampleController struct { numAllocations, numDeallocations int64 } -func NewController(clientset kubernetes.Interface, driverName string, resources Resources) *ExampleController { +func NewController(clientset kubernetes.Interface, resources Resources) *ExampleController { c := &ExampleController{ - clientset: clientset, - resources: resources, - driverName: driverName, + clientset: clientset, + resources: resources, allocated: make(map[types.UID]string), claimsPerNode: make(map[string]int), @@ -84,7 +160,8 @@ func NewController(clientset kubernetes.Interface, driverName string, resources func (c *ExampleController) Run(ctx context.Context, workers int) { informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */) - ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory) + ctrl := controller.New(ctx, c.resources.DriverName, c, c.clientset, informerFactory) + c.nodeLister = informerFactory.Core().V1().Nodes().Lister() ctrl.SetReservedFor(!c.resources.DontSetReservedFor) informerFactory.Start(ctx.Done()) ctrl.Run(workers) @@ -190,13 +267,14 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al logger.V(3).V(3).Info("already allocated") } else { logger.V(3).Info("starting", "selectedNode", selectedNode) + nodes := c.resources.AllNodes(c.nodeLister) if c.resources.NodeLocal { node = selectedNode if node == "" { // If none has been selected because we do immediate allocation, // then we need to pick one ourselves. var viableNodes []string - for _, n := range c.resources.Nodes { + for _, n := range nodes { if c.resources.MaxAllocations == 0 || c.claimsPerNode[n] < c.resources.MaxAllocations { viableNodes = append(viableNodes, n) @@ -209,7 +287,7 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al // number of allocations (even spreading) or the most (packing). node = viableNodes[rand.Intn(len(viableNodes))] logger.V(3).Info("picked a node ourselves", "selectedNode", selectedNode) - } else if !contains(c.resources.Nodes, node) || + } else if !contains(nodes, node) || c.resources.MaxAllocations > 0 && c.claimsPerNode[node] >= c.resources.MaxAllocations { return nil, fmt.Errorf("resources exhausted on node %q", node) @@ -222,9 +300,6 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al } } - allocation := &resourcev1alpha2.AllocationResult{ - Shareable: c.resources.Shareable, - } p := parameters{ EnvVars: make(map[string]string), NodeName: node, @@ -235,33 +310,7 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al if err != nil { return nil, fmt.Errorf("encode parameters: %w", err) } - allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{ - { - DriverName: c.driverName, - Data: string(data), - }, - } - var nodes []string - if node != "" { - nodes = append(nodes, node) - } else { - nodes = c.resources.Nodes - } - if len(nodes) > 0 { - allocation.AvailableOnNodes = &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "kubernetes.io/hostname", - Operator: v1.NodeSelectorOpIn, - Values: nodes, - }, - }, - }, - }, - } - } + allocation := c.resources.NewAllocation(node, data) if !alreadyAllocated { c.numAllocations++ c.allocated[claim.UID] = node @@ -303,6 +352,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl // All nodes are suitable. return nil } + nodes := c.resources.AllNodes(c.nodeLister) if c.resources.NodeLocal { for _, claim := range claims { claim.UnsuitableNodes = nil @@ -312,7 +362,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl // can only work if a node has capacity left // for all of them. Also, nodes that the driver // doesn't run on cannot be used. - if !contains(c.resources.Nodes, node) || + if !contains(nodes, node) || c.claimsPerNode[node]+len(claims) > c.resources.MaxAllocations { claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) } @@ -325,7 +375,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl for _, claim := range claims { claim.UnsuitableNodes = nil for _, node := range potentialNodes { - if !contains(c.resources.Nodes, node) || + if !contains(nodes, node) || allocations+len(claims) > c.resources.MaxAllocations { claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) } diff --git a/test/e2e/dra/test-driver/app/server.go b/test/e2e/dra/test-driver/app/server.go index b9127a18d2b..8bafdb649fc 100644 --- a/test/e2e/dra/test-driver/app/server.go +++ b/test/e2e/dra/test-driver/app/server.go @@ -81,7 +81,9 @@ func NewCommand() *cobra.Command { profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.") fs = sharedFlagSets.FlagSet("CDI") - driverName := fs.String("drivername", "test-driver.cdi.k8s.io", "Resource driver name.") + driverNameFlagName := "drivername" + driverName := fs.String(driverNameFlagName, "test-driver.cdi.k8s.io", "Resource driver name.") + driverNameFlag := fs.Lookup(driverNameFlagName) fs = sharedFlagSets.FlagSet("other") featureGate := featuregate.NewFeatureGate() @@ -192,6 +194,7 @@ func NewCommand() *cobra.Command { "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.") leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions.") + fs = controllerFlagSets.FlagSet("controller") resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.") fs = controller.Flags() for _, f := range controllerFlagSets.FlagSets { @@ -211,9 +214,12 @@ func NewCommand() *cobra.Command { return fmt.Errorf("parse resource config %q: %w", *resourceConfig, err) } } + if resources.DriverName == "" || driverNameFlag.Changed { + resources.DriverName = *driverName + } run := func() { - controller := NewController(clientset, *driverName, resources) + controller := NewController(clientset, resources) controller.Run(ctx, *workers) } diff --git a/test/integration/scheduler_perf/dra_test.go b/test/integration/scheduler_perf/dra_test.go index 3078df0728e..eef0b2c1abd 100644 --- a/test/integration/scheduler_perf/dra_test.go +++ b/test/integration/scheduler_perf/dra_test.go @@ -192,6 +192,7 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client // Start the controller side of the DRA test driver such that it simulates // per-node resources. resources := draapp.Resources{ + DriverName: op.DriverName, NodeLocal: true, MaxAllocations: op.MaxClaimsPerNode, } @@ -210,7 +211,7 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client } } - controller := draapp.NewController(clientset, op.DriverName, resources) + controller := draapp.NewController(clientset, resources) ctx, cancel := context.WithCancel(ctx) var wg sync.WaitGroup wg.Add(1)