mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-30 05:43:58 +00:00
Merge pull request #123516 from pohly/dra-structured-parameters
DRA: structured parameters
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"net"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -33,6 +34,8 @@ import (
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
@@ -48,10 +51,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
|
||||
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
|
||||
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
|
||||
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
|
||||
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
|
||||
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
|
||||
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
|
||||
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
|
||||
NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
|
||||
)
|
||||
|
||||
type Nodes struct {
|
||||
@@ -103,6 +107,7 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a
|
||||
// not run on all nodes.
|
||||
resources.Nodes = nodes.NodeNames
|
||||
}
|
||||
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
|
||||
d.SetUp(nodes, resources)
|
||||
ginkgo.DeferCleanup(d.TearDown)
|
||||
})
|
||||
@@ -125,6 +130,12 @@ type Driver struct {
|
||||
Name string
|
||||
Nodes map[string]*app.ExamplePlugin
|
||||
|
||||
parameterMode parameterMode
|
||||
parameterAPIGroup string
|
||||
parameterAPIVersion string
|
||||
claimParameterAPIKind string
|
||||
classParameterAPIKind string
|
||||
|
||||
NodeV1alpha2, NodeV1alpha3 bool
|
||||
|
||||
mutex sync.Mutex
|
||||
@@ -132,6 +143,14 @@ type Driver struct {
|
||||
callCounts map[MethodInstance]int64
|
||||
}
|
||||
|
||||
type parameterMode string
|
||||
|
||||
const (
|
||||
parameterModeConfigMap parameterMode = "configmap" // ConfigMap parameters, control plane controller.
|
||||
parameterModeStructured parameterMode = "structured" // No ConfigMaps, directly create and reference in-tree parameter objects.
|
||||
parameterModeTranslated parameterMode = "translated" // Reference ConfigMaps in claim and class, generate in-tree parameter objects.
|
||||
)
|
||||
|
||||
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
|
||||
d.Nodes = map[string]*app.ExamplePlugin{}
|
||||
@@ -147,19 +166,44 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
d.ctx = ctx
|
||||
d.cleanup = append(d.cleanup, cancel)
|
||||
|
||||
// The controller is easy: we simply connect to the API server.
|
||||
d.Controller = app.NewController(d.f.ClientSet, resources)
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
d.Controller.Run(d.ctx, 5 /* workers */)
|
||||
}()
|
||||
switch d.parameterMode {
|
||||
case "", parameterModeConfigMap:
|
||||
// The controller is easy: we simply connect to the API server.
|
||||
d.Controller = app.NewController(d.f.ClientSet, resources)
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
d.Controller.Run(d.ctx, 5 /* workers */)
|
||||
}()
|
||||
}
|
||||
|
||||
manifests := []string{
|
||||
// The code below matches the content of this manifest (ports,
|
||||
// container names, etc.).
|
||||
"test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml",
|
||||
}
|
||||
if d.parameterMode == "" {
|
||||
d.parameterMode = parameterModeConfigMap
|
||||
}
|
||||
var numResourceInstances = -1 // disabled
|
||||
if d.parameterMode != parameterModeConfigMap {
|
||||
numResourceInstances = resources.MaxAllocations
|
||||
}
|
||||
switch d.parameterMode {
|
||||
case parameterModeConfigMap, parameterModeTranslated:
|
||||
d.parameterAPIGroup = ""
|
||||
d.parameterAPIVersion = "v1"
|
||||
d.claimParameterAPIKind = "ConfigMap"
|
||||
d.classParameterAPIKind = "ConfigMap"
|
||||
case parameterModeStructured:
|
||||
d.parameterAPIGroup = "resource.k8s.io"
|
||||
d.parameterAPIVersion = "v1alpha2"
|
||||
d.claimParameterAPIKind = "ResourceClaimParameters"
|
||||
d.classParameterAPIKind = "ResourceClassParameters"
|
||||
default:
|
||||
framework.Failf("unknown test driver parameter mode: %s", d.parameterMode)
|
||||
}
|
||||
|
||||
instanceKey := "app.kubernetes.io/instance"
|
||||
rsName := ""
|
||||
draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
|
||||
@@ -192,6 +236,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
|
||||
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock")
|
||||
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock")
|
||||
case *apiextensionsv1.CustomResourceDefinition:
|
||||
item.Name = strings.ReplaceAll(item.Name, "dra.e2e.example.com", d.parameterAPIGroup)
|
||||
item.Spec.Group = d.parameterAPIGroup
|
||||
|
||||
}
|
||||
return nil
|
||||
}, manifests...)
|
||||
@@ -218,7 +266,8 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
pod := pod
|
||||
nodename := pod.Spec.NodeName
|
||||
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
|
||||
plugin, err := app.StartPlugin(logger, "/cdi", d.Name, nodename,
|
||||
loggerCtx := klog.NewContext(ctx, logger)
|
||||
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
|
||||
app.FileOperations{
|
||||
Create: func(name string, content []byte) error {
|
||||
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
|
||||
@@ -228,11 +277,15 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
|
||||
return d.removeFile(&pod, name)
|
||||
},
|
||||
NumResourceInstances: numResourceInstances,
|
||||
},
|
||||
kubeletplugin.GRPCVerbosity(0),
|
||||
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
return d.interceptor(nodename, ctx, req, info, handler)
|
||||
}),
|
||||
kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
|
||||
return d.streamInterceptor(nodename, srv, ss, info, handler)
|
||||
}),
|
||||
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
|
||||
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
|
||||
kubeletplugin.KubeletPluginSocketPath(draAddr),
|
||||
@@ -308,6 +361,16 @@ func (d *Driver) TearDown() {
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
func (d *Driver) IsGone(ctx context.Context) {
|
||||
gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.ResourceSlice, error) {
|
||||
slices, err := d.f.ClientSet.ResourceV1alpha2().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return slices.Items, err
|
||||
}).Should(gomega.BeEmpty())
|
||||
}
|
||||
|
||||
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
@@ -321,6 +384,22 @@ func (d *Driver) interceptor(nodename string, ctx context.Context, req interface
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
// Stream calls block for a long time. We must not hold the lock while
|
||||
// they are running.
|
||||
d.mutex.Lock()
|
||||
m := MethodInstance{nodename, info.FullMethod}
|
||||
d.callCounts[m]++
|
||||
fail := d.fail[m]
|
||||
d.mutex.Unlock()
|
||||
|
||||
if fail {
|
||||
return errors.New("injected error")
|
||||
}
|
||||
|
||||
return handler(srv, stream)
|
||||
}
|
||||
|
||||
func (d *Driver) Fail(m MethodInstance, injectError bool) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
1472
test/e2e/dra/dra.go
1472
test/e2e/dra/dra.go
File diff suppressed because it is too large
Load Diff
@@ -19,13 +19,18 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
resourceapi "k8s.io/api/resource/v1alpha2"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/dynamic-resource-allocation/kubeletplugin"
|
||||
"k8s.io/klog/v2"
|
||||
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
|
||||
@@ -33,6 +38,7 @@ import (
|
||||
)
|
||||
|
||||
type ExamplePlugin struct {
|
||||
stopCh <-chan struct{}
|
||||
logger klog.Logger
|
||||
d kubeletplugin.DRAPlugin
|
||||
fileOps FileOperations
|
||||
@@ -71,13 +77,15 @@ type ClaimID struct {
|
||||
}
|
||||
|
||||
var _ drapbv1alpha2.NodeServer = &ExamplePlugin{}
|
||||
var _ drapbv1alpha3.NodeServer = &ExamplePlugin{}
|
||||
|
||||
// getJSONFilePath returns the absolute path where CDI file is/should be.
|
||||
func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string {
|
||||
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID))
|
||||
}
|
||||
|
||||
// FileOperations defines optional callbacks for handling CDI files.
|
||||
// FileOperations defines optional callbacks for handling CDI files
|
||||
// and some other configuration.
|
||||
type FileOperations struct {
|
||||
// Create must overwrite the file.
|
||||
Create func(name string, content []byte) error
|
||||
@@ -85,10 +93,16 @@ type FileOperations struct {
|
||||
// Remove must remove the file. It must not return an error when the
|
||||
// file does not exist.
|
||||
Remove func(name string) error
|
||||
|
||||
// NumResourceInstances determines whether the plugin reports resources
|
||||
// instances and how many. A negative value causes it to report "not implemented"
|
||||
// in the NodeListAndWatchResources gRPC call.
|
||||
NumResourceInstances int
|
||||
}
|
||||
|
||||
// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
|
||||
func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
|
||||
func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if fileOps.Create == nil {
|
||||
fileOps.Create = func(name string, content []byte) error {
|
||||
return os.WriteFile(name, content, os.FileMode(0644))
|
||||
@@ -103,6 +117,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string,
|
||||
}
|
||||
}
|
||||
ex := &ExamplePlugin{
|
||||
stopCh: ctx.Done(),
|
||||
logger: logger,
|
||||
fileOps: fileOps,
|
||||
cdiDir: cdiDir,
|
||||
@@ -115,6 +130,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string,
|
||||
kubeletplugin.Logger(logger),
|
||||
kubeletplugin.DriverName(driverName),
|
||||
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
|
||||
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
|
||||
)
|
||||
d, err := kubeletplugin.Start(ex, opts...)
|
||||
if err != nil {
|
||||
@@ -160,8 +176,33 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
|
||||
|
||||
// Determine environment variables.
|
||||
var p parameters
|
||||
if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal resource handle: %w", err)
|
||||
switch len(req.StructuredResourceHandle) {
|
||||
case 0:
|
||||
// Control plane controller did the allocation.
|
||||
if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal resource handle: %w", err)
|
||||
}
|
||||
case 1:
|
||||
// Scheduler did the allocation with structured parameters.
|
||||
handle := req.StructuredResourceHandle[0]
|
||||
if handle == nil {
|
||||
return nil, errors.New("unexpected nil StructuredResourceHandle")
|
||||
}
|
||||
p.NodeName = handle.NodeName
|
||||
if err := extractParameters(handle.VendorClassParameters, &p.EnvVars, "admin"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := extractParameters(handle.VendorClaimParameters, &p.EnvVars, "user"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, result := range handle.Results {
|
||||
if err := extractParameters(result.VendorRequestParameters, &p.EnvVars, "user"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Huh?
|
||||
return nil, fmt.Errorf("invalid length of NodePrepareResourceRequest.StructuredResourceHandle: %d", len(req.StructuredResourceHandle))
|
||||
}
|
||||
|
||||
// Sanity check scheduling.
|
||||
@@ -212,16 +253,34 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func extractParameters(parameters runtime.RawExtension, env *map[string]string, kind string) error {
|
||||
if len(parameters.Raw) == 0 {
|
||||
return nil
|
||||
}
|
||||
var data map[string]string
|
||||
if err := json.Unmarshal(parameters.Raw, &data); err != nil {
|
||||
return fmt.Errorf("decoding %s parameters: %v", kind, err)
|
||||
}
|
||||
if len(data) > 0 && *env == nil {
|
||||
*env = make(map[string]string)
|
||||
}
|
||||
for key, value := range data {
|
||||
(*env)[kind+"_"+key] = value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
|
||||
resp := &drapbv1alpha3.NodePrepareResourcesResponse{
|
||||
Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
|
||||
}
|
||||
for _, claimReq := range req.Claims {
|
||||
claimResp, err := ex.NodePrepareResource(ctx, &drapbv1alpha2.NodePrepareResourceRequest{
|
||||
Namespace: claimReq.Namespace,
|
||||
ClaimName: claimReq.Name,
|
||||
ClaimUid: claimReq.Uid,
|
||||
ResourceHandle: claimReq.ResourceHandle,
|
||||
Namespace: claimReq.Namespace,
|
||||
ClaimName: claimReq.Name,
|
||||
ClaimUid: claimReq.Uid,
|
||||
ResourceHandle: claimReq.ResourceHandle,
|
||||
StructuredResourceHandle: claimReq.StructuredResourceHandle,
|
||||
})
|
||||
if err != nil {
|
||||
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodePrepareResourceResponse{
|
||||
@@ -284,6 +343,39 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, stream drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
|
||||
if ex.fileOps.NumResourceInstances < 0 {
|
||||
ex.logger.Info("Sending no NodeResourcesResponse")
|
||||
return status.New(codes.Unimplemented, "node resource support disabled").Err()
|
||||
}
|
||||
|
||||
instances := make([]resourceapi.NamedResourcesInstance, ex.fileOps.NumResourceInstances)
|
||||
for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
|
||||
instances[i].Name = fmt.Sprintf("instance-%d", i)
|
||||
}
|
||||
resp := &drapbv1alpha3.NodeListAndWatchResourcesResponse{
|
||||
Resources: []*resourceapi.NodeResourceModel{
|
||||
{
|
||||
NamedResources: &resourceapi.NamedResourcesResources{
|
||||
Instances: instances,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ex.logger.Info("Sending NodeListAndWatchResourcesResponse", "response", resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Keep the stream open until the test is done.
|
||||
// TODO: test sending more updates later
|
||||
<-ex.stopCh
|
||||
ex.logger.Info("Done sending NodeListAndWatchResourcesResponse, closing stream")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {
|
||||
ex.mutex.Lock()
|
||||
defer ex.mutex.Unlock()
|
||||
@@ -314,6 +406,25 @@ func (ex *ExamplePlugin) recordGRPCCall(ctx context.Context, req interface{}, in
|
||||
return call.Response, call.Err
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) recordGRPCStream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
call := GRPCCall{
|
||||
FullMethod: info.FullMethod,
|
||||
}
|
||||
ex.mutex.Lock()
|
||||
ex.gRPCCalls = append(ex.gRPCCalls, call)
|
||||
index := len(ex.gRPCCalls) - 1
|
||||
ex.mutex.Unlock()
|
||||
|
||||
// We don't hold the mutex here to allow concurrent calls.
|
||||
call.Err = handler(srv, stream)
|
||||
|
||||
ex.mutex.Lock()
|
||||
ex.gRPCCalls[index] = call
|
||||
ex.mutex.Unlock()
|
||||
|
||||
return call.Err
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) GetGRPCCalls() []GRPCCall {
|
||||
ex.mutex.Lock()
|
||||
defer ex.mutex.Unlock()
|
||||
|
||||
@@ -287,7 +287,7 @@ func NewCommand() *cobra.Command {
|
||||
return fmt.Errorf("create socket directory: %w", err)
|
||||
}
|
||||
|
||||
plugin, err := StartPlugin(logger, *cdiDir, *driverName, "", FileOperations{},
|
||||
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
|
||||
kubeletplugin.PluginSocketPath(*endpoint),
|
||||
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
|
||||
kubeletplugin.KubeletPluginSocketPath(*draAddress),
|
||||
|
||||
Reference in New Issue
Block a user