Merge pull request #120873 from pohly/dra-e2e-test-driver-enhancements

e2e dra: enhance test driver
This commit is contained in:
Kubernetes Prow Robot 2023-10-10 13:32:55 +02:00 committed by GitHub
commit 4b9e15e0fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 47 deletions

View File

@ -136,6 +136,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
d.Nodes = map[string]*app.ExamplePlugin{}
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
resources.DriverName = d.Name
ctx, cancel := context.WithCancel(context.Background())
if d.NameSuffix != "" {
@ -147,7 +148,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
d.cleanup = append(d.cleanup, cancel)
// The controller is easy: we simply connect to the API server.
d.Controller = app.NewController(d.f.ClientSet, d.Name, resources)
d.Controller = app.NewController(d.f.ClientSet, resources)
d.wg.Add(1)
go func() {
defer d.wg.Done()

View File

@ -30,24 +30,101 @@ import (
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/dynamic-resource-allocation/controller"
"k8s.io/klog/v2"
)
type Resources struct {
DriverName string
DontSetReservedFor bool
NodeLocal bool
Nodes []string
MaxAllocations int
Shareable bool
// Nodes is a fixed list of node names on which resources are
// available. Mutually exclusive with NodeLabels.
Nodes []string
// NodeLabels are labels which determine on which nodes resources are
// available. Mutually exclusive with Nodes.
NodeLabels labels.Set
MaxAllocations int
Shareable bool
// AllocateWrapper, if set, gets called for each Allocate call.
AllocateWrapper AllocateWrapperType
}
func (r Resources) AllNodes(nodeLister listersv1.NodeLister) []string {
if len(r.NodeLabels) > 0 {
// Determine nodes with resources dynamically.
nodes, _ := nodeLister.List(labels.SelectorFromValidatedSet(r.NodeLabels))
nodeNames := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}
return nodeNames
}
return r.Nodes
}
func (r Resources) NewAllocation(node string, data []byte) *resourcev1alpha2.AllocationResult {
allocation := &resourcev1alpha2.AllocationResult{
Shareable: r.Shareable,
}
allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{
{
DriverName: r.DriverName,
Data: string(data),
},
}
if node == "" && len(r.NodeLabels) > 0 {
// Available on all nodes matching the labels.
var requirements []v1.NodeSelectorRequirement
for key, value := range r.NodeLabels {
requirements = append(requirements, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
})
}
allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: requirements,
},
},
}
} else {
var nodes []string
if node != "" {
// Local to one node.
nodes = append(nodes, node)
} else {
// Available on the fixed set of nodes.
nodes = r.Nodes
}
if len(nodes) > 0 {
allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: nodes,
},
},
},
},
}
}
}
return allocation
}
type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation,
selectedNode string,
handler func(ctx context.Context,
@ -57,8 +134,8 @@ type AllocateWrapperType func(ctx context.Context, claimAllocations []*controlle
type ExampleController struct {
clientset kubernetes.Interface
nodeLister listersv1.NodeLister
resources Resources
driverName string
mutex sync.Mutex
// allocated maps claim.UID to the node (if network-attached) or empty (if not).
@ -70,11 +147,10 @@ type ExampleController struct {
numAllocations, numDeallocations int64
}
func NewController(clientset kubernetes.Interface, driverName string, resources Resources) *ExampleController {
func NewController(clientset kubernetes.Interface, resources Resources) *ExampleController {
c := &ExampleController{
clientset: clientset,
resources: resources,
driverName: driverName,
clientset: clientset,
resources: resources,
allocated: make(map[types.UID]string),
claimsPerNode: make(map[string]int),
@ -84,7 +160,8 @@ func NewController(clientset kubernetes.Interface, driverName string, resources
func (c *ExampleController) Run(ctx context.Context, workers int) {
informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */)
ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory)
ctrl := controller.New(ctx, c.resources.DriverName, c, c.clientset, informerFactory)
c.nodeLister = informerFactory.Core().V1().Nodes().Lister()
ctrl.SetReservedFor(!c.resources.DontSetReservedFor)
informerFactory.Start(ctx.Done())
ctrl.Run(workers)
@ -190,13 +267,14 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al
logger.V(3).V(3).Info("already allocated")
} else {
logger.V(3).Info("starting", "selectedNode", selectedNode)
nodes := c.resources.AllNodes(c.nodeLister)
if c.resources.NodeLocal {
node = selectedNode
if node == "" {
// If none has been selected because we do immediate allocation,
// then we need to pick one ourselves.
var viableNodes []string
for _, n := range c.resources.Nodes {
for _, n := range nodes {
if c.resources.MaxAllocations == 0 ||
c.claimsPerNode[n] < c.resources.MaxAllocations {
viableNodes = append(viableNodes, n)
@ -209,7 +287,7 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al
// number of allocations (even spreading) or the most (packing).
node = viableNodes[rand.Intn(len(viableNodes))]
logger.V(3).Info("picked a node ourselves", "selectedNode", selectedNode)
} else if !contains(c.resources.Nodes, node) ||
} else if !contains(nodes, node) ||
c.resources.MaxAllocations > 0 &&
c.claimsPerNode[node] >= c.resources.MaxAllocations {
return nil, fmt.Errorf("resources exhausted on node %q", node)
@ -222,9 +300,6 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al
}
}
allocation := &resourcev1alpha2.AllocationResult{
Shareable: c.resources.Shareable,
}
p := parameters{
EnvVars: make(map[string]string),
NodeName: node,
@ -235,33 +310,7 @@ func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1al
if err != nil {
return nil, fmt.Errorf("encode parameters: %w", err)
}
allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{
{
DriverName: c.driverName,
Data: string(data),
},
}
var nodes []string
if node != "" {
nodes = append(nodes, node)
} else {
nodes = c.resources.Nodes
}
if len(nodes) > 0 {
allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: nodes,
},
},
},
},
}
}
allocation := c.resources.NewAllocation(node, data)
if !alreadyAllocated {
c.numAllocations++
c.allocated[claim.UID] = node
@ -303,6 +352,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl
// All nodes are suitable.
return nil
}
nodes := c.resources.AllNodes(c.nodeLister)
if c.resources.NodeLocal {
for _, claim := range claims {
claim.UnsuitableNodes = nil
@ -312,7 +362,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl
// can only work if a node has capacity left
// for all of them. Also, nodes that the driver
// doesn't run on cannot be used.
if !contains(c.resources.Nodes, node) ||
if !contains(nodes, node) ||
c.claimsPerNode[node]+len(claims) > c.resources.MaxAllocations {
claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
}
@ -325,7 +375,7 @@ func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, cl
for _, claim := range claims {
claim.UnsuitableNodes = nil
for _, node := range potentialNodes {
if !contains(c.resources.Nodes, node) ||
if !contains(nodes, node) ||
allocations+len(claims) > c.resources.MaxAllocations {
claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
}

View File

@ -81,7 +81,9 @@ func NewCommand() *cobra.Command {
profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.")
fs = sharedFlagSets.FlagSet("CDI")
driverName := fs.String("drivername", "test-driver.cdi.k8s.io", "Resource driver name.")
driverNameFlagName := "drivername"
driverName := fs.String(driverNameFlagName, "test-driver.cdi.k8s.io", "Resource driver name.")
driverNameFlag := fs.Lookup(driverNameFlagName)
fs = sharedFlagSets.FlagSet("other")
featureGate := featuregate.NewFeatureGate()
@ -192,6 +194,7 @@ func NewCommand() *cobra.Command {
"Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.")
leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second,
"Duration, in seconds, the LeaderElector clients should wait between tries of actions.")
fs = controllerFlagSets.FlagSet("controller")
resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.")
fs = controller.Flags()
for _, f := range controllerFlagSets.FlagSets {
@ -211,9 +214,12 @@ func NewCommand() *cobra.Command {
return fmt.Errorf("parse resource config %q: %w", *resourceConfig, err)
}
}
if resources.DriverName == "" || driverNameFlag.Changed {
resources.DriverName = *driverName
}
run := func() {
controller := NewController(clientset, *driverName, resources)
controller := NewController(clientset, resources)
controller.Run(ctx, *workers)
}

View File

@ -192,6 +192,7 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client
// Start the controller side of the DRA test driver such that it simulates
// per-node resources.
resources := draapp.Resources{
DriverName: op.DriverName,
NodeLocal: true,
MaxAllocations: op.MaxClaimsPerNode,
}
@ -210,7 +211,7 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client
}
}
controller := draapp.NewController(clientset, op.DriverName, resources)
controller := draapp.NewController(clientset, resources)
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)