DRA kubeletplugin: turn helper into wrapper

The goal is to simplify writing DRA drivers. This is also a first step towards
supporting seamless upgrades.

DRA drivers no longer need to implement the kubelet plugin API
directly. Instead, the helper wraps an implementation of an interface. The
helper then provides common functionality:

- retrieve and validate ResourceClaims
- serialize gRPC calls (enabled by default, can be opted out)
- gRPC logging

The definition of that interface is meant to be comprehensive enough that a
correct DRA driver can be implemented by following the documentation of the
package, without having to cross-reference KEPs.

The DRAPlugin interface used to be the abstract API of the helper. Now it's
what the DRA driver kubelet plugin needs to implement. The helper is a concrete
Server struct with no exported fields. It only exports the methods that
drivers need when using the helper.

While at it, support for the v1alpha4 API gets removed from the helper, which
implies removing the corresponding E2E tests. The kubelet implementation will
be dropped separately.
This commit is contained in:
Patrick Ohly 2025-03-10 21:28:11 +01:00
parent be32ca61a6
commit c6252daccb
7 changed files with 398 additions and 200 deletions

View File

@ -26,51 +26,122 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/resourceslice"
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
// DRAPlugin gets returned by Start and defines the public API of the generic
// dynamic resource allocation plugin.
// DRAPlugin is the interface that needs to be implemented by a DRA driver to
// use this helper package. The helper package then implements the gRPC
// interface expected by the kubelet by wrapping the DRAPlugin implementation.
type DRAPlugin interface {
// Stop ensures that all spawned goroutines are stopped and frees
// resources.
Stop()
// PrepareResourceClaims is called to prepare all resources allocated
// for the given ResourceClaims. This is used to implement
// the gRPC NodePrepareResources call.
//
// It gets called with the complete list of claims that are needed
// by some pod. In contrast to the gRPC call, the helper has
// already retrieved the actual ResourceClaim objects.
//
// In addition to that, the helper also:
// - verifies that all claims are really allocated
// - increments a numeric counter for each call and
// adds its value to a per-context logger with "requestID" as key
// - adds the method name with "method" as key to that logger
// - logs the gRPC call and response (configurable with GRPCVerbosity)
// - serializes all gRPC calls unless the driver explicitly opted out of that
//
// This call must be idempotent because the kubelet might have to ask
// for preparation multiple times, for example if it gets restarted.
//
// A DRA driver should verify that all devices listed in a
// [resourceapi.DeviceRequestAllocationResult] are not already in use
// for some other ResourceClaim. Kubernetes tries very hard to ensure
// that, but if something went wrong, then the DRA driver is the last
// line of defense against using the same device for two different
// unrelated workloads.
//
// If an error is returned, the result is ignored. Otherwise the result
// must have exactly one entry for each claim, identified by the UID of
// the corresponding ResourceClaim. For each claim, preparation
// can be either successful (no error set in the per-ResourceClaim PrepareResult)
// or can be reported as failed.
//
// It is possible to create the CDI spec files which define the CDI devices
// on-the-fly in PrepareResourceClaims. UnprepareResourceClaims then can
// remove them. Container runtimes may cache CDI specs but must reload
// files in case of a cache miss. To avoid false cache hits, the unique
// name in the CDI device ID should not be reused. A DRA driver can use
// the claim UID for it.
PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (result map[types.UID]PrepareResult, err error)
// RegistrationStatus returns the result of registration, nil if none
// received yet.
RegistrationStatus() *registerapi.RegistrationStatus
// UnprepareResourceClaims must undo whatever work PrepareResourceClaims did.
//
// At the time when this gets called, the original ResourceClaims may have
// been deleted already. They also don't get cached by the kubelet. Therefore
// parameters for each ResourcClaim are only the UID, namespace and name.
// It is the responsibility of the DRA driver to cache whatever additional
// information it might need about prepared resources.
//
// This call must be idempotent because the kubelet might have to ask
// for un-preparation multiple times, for example if it gets restarted.
// Therefore it is not an error if this gets called for a ResourceClaim
// which is not currently prepared.
//
// As with PrepareResourceClaims, the helper takes care of logging
// and serialization.
//
// The conventions for returning one overall error and several per-ResourceClaim
// errors are the same as in PrepareResourceClaims.
UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error)
}
// PublishResources may be called one or more times to publish
// resource information in ResourceSlice objects. If it never gets
// called, then the kubelet plugin does not manage any ResourceSlice
// objects.
//
// PublishResources does not block, so it might still take a while
// after it returns before all information is actually written
// to the API server.
//
// It is the responsibility of the caller to ensure that the pools and
// slices described in the driver resources parameters are valid
// according to the restrictions defined in the resource.k8s.io API.
//
// Invalid ResourceSlices will be rejected by the apiserver during
// publishing, which happens asynchronously and thus does not
// get returned as error here. The only error returned here is
// when publishing was not set up properly, for example missing
// [KubeClient] or [NodeName] options.
//
// The caller may modify the resources after this call returns.
PublishResources(ctx context.Context, resources resourceslice.DriverResources) error
// PrepareResult contains the result of preparing one particular ResourceClaim.
type PrepareResult struct {
// Err, if non-nil, describes a problem that occurred while preparing
// the ResourceClaim. The devices are then ignored and the kubelet will
// try to prepare the ResourceClaim again later.
Err error
// This unexported method ensures that we can modify the interface
// without causing an API break of the package
// (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme).
internal()
// Devices contains the IDs of CDI devices associated with specific requests
// in a ResourceClaim. Those IDs will be passed on to the container runtime
// by the kubelet.
//
// The empty slice is also valid.
Devices []Device
}
// Device provides the CDI device IDs for one request in a ResourceClaim.
type Device struct {
// Requests lists the names of requests or subrequests in the
// ResourceClaim that this device is associated with. The subrequest
// name may be included here, but it is also okay to just return
// the request name.
//
// A DRA driver can get this string from the Request field in
// [resourceapi.DeviceRequestAllocationResult], which includes the
// subrequest name if there is one.
//
// If empty, the device is associated with all requests.
Requests []string
// PoolName identifies the DRA driver's pool which contains the device.
// Must not be empty.
PoolName string
// DeviceName identifies the device inside that pool.
// Must not be empty.
DeviceName string
// CDIDeviceIDs lists all CDI devices associated with this DRA device.
// Each ID must be of the form "<vendor ID>/<class>=<unique name>".
// May be empty.
CDIDeviceIDs []string
}
// Option implements the functional options pattern for Start.
@ -173,17 +244,11 @@ func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
}
}
// NodeV1alpha4 explicitly chooses whether the DRA gRPC API v1alpha4
// gets enabled.
func NodeV1alpha4(enabled bool) Option {
return func(o *options) error {
o.nodeV1alpha4 = enabled
return nil
}
}
// NodeV1beta1 explicitly chooses whether the DRA gRPC API v1beta1
// gets enabled.
// gets enabled. True by default.
//
// This is used in Kubernetes for end-to-end testing. The default should
// be fine for DRA drivers.
func NodeV1beta1(enabled bool) Option {
return func(o *options) error {
o.nodeV1beta1 = enabled
@ -223,6 +288,18 @@ func NodeUID(nodeUID types.UID) Option {
}
}
// Serialize overrides whether the helper serializes the prepare and unprepare
// calls. The default is to serialize.
//
// A DRA driver can opt out of that to speed up parallel processing, but then
// must handle concurrency itself.
func Serialize(enabled bool) Option {
return func(o *options) error {
o.serialize = enabled
return nil
}
}
type options struct {
logger klog.Logger
grpcVerbosity int
@ -235,25 +312,27 @@ type options struct {
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface
nodeV1alpha4 bool
nodeV1beta1 bool
serialize bool
nodeV1beta1 bool
}
// draPlugin combines the kubelet registration service and the DRA node plugin
// service.
type draPlugin struct {
// Helper combines the kubelet registration service and the DRA node plugin
// service and implements them by calling a [DRAPlugin] implementation.
type Helper struct {
// backgroundCtx is for activities that are started later.
backgroundCtx context.Context
// cancel cancels the backgroundCtx.
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
plugin *grpcServer
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
pluginServer *grpcServer
plugin DRAPlugin
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
// Information about resource publishing changes concurrently and thus
// must be protected by the mutex. The controller gets started only
@ -263,7 +342,7 @@ type draPlugin struct {
}
// Start sets up two gRPC servers (one for registration, one for the DRA node
// client). By default, all APIs implemented by the nodeServer get registered.
// client) and implements them by calling a [DRAPlugin] implementation.
//
// The context and/or DRAPlugin.Stop can be used to stop all background activity.
// Stop also blocks. A logger can be stored in the context to add values or
@ -271,18 +350,12 @@ type draPlugin struct {
//
// If the plugin will be used to publish resources, [KubeClient] and [NodeName]
// options are mandatory.
//
// The DRA driver decides which gRPC interfaces it implements. At least one
// implementation of [drapbv1alpha4.NodeServer] or [drapbv1beta1.DRAPluginServer]
// is required. Implementing drapbv1beta1.DRAPluginServer is recommended for
// DRA driver targeting Kubernetes >= 1.32. To be compatible with Kubernetes 1.31,
// DRA drivers must implement only [drapbv1alpha4.NodeServer].
func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (result DRAPlugin, finalErr error) {
func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) {
logger := klog.FromContext(ctx)
o := options{
logger: klog.Background(),
grpcVerbosity: 6, // Logs requests and responses, which can be large.
nodeV1alpha4: true,
serialize: true,
nodeV1beta1: true,
}
for _, option := range opts {
@ -305,11 +378,13 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar")
}
d := &draPlugin{
d := &Helper{
driverName: o.driverName,
nodeName: o.nodeName,
nodeUID: o.nodeUID,
kubeClient: o.kubeClient,
serialize: o.serialize,
plugin: plugin,
}
// Stop calls cancel and therefore both cancellation
@ -337,37 +412,21 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
// Run the node plugin gRPC server first to ensure that it is ready.
var supportedServices []string
plugin, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
for _, nodeServer := range nodeServers {
if nodeServer, ok := nodeServer.(drapbv1alpha4.NodeServer); ok && o.nodeV1alpha4 {
logger.V(5).Info("registering v1alpha4.Node gGRPC service")
drapbv1alpha4.RegisterNodeServer(grpcServer, nodeServer)
supportedServices = append(supportedServices, drapbv1alpha4.NodeService)
}
if nodeServer, ok := nodeServer.(drapbv1beta1.DRAPluginServer); ok && o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapbv1beta1.RegisterDRAPluginServer(grpcServer, nodeServer)
supportedServices = append(supportedServices, drapbv1beta1.DRAPluginService)
}
pluginServer, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
if o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
supportedServices = append(supportedServices, drapb.DRAPluginService)
}
})
if err != nil {
return nil, fmt.Errorf("start node client: %v", err)
}
d.plugin = plugin
d.pluginServer = pluginServer
if len(supportedServices) == 0 {
return nil, errors.New("no supported DRA gRPC API is implemented and enabled")
}
// Backwards compatibility hack: if only the alpha gRPC service is enabled,
// then we can support registration against a 1.31 kubelet by reporting "1.0.0"
// as version. That also works with 1.32 because 1.32 supports that legacy
// behavior and 1.31 works because it doesn't fail while parsing "v1alpha3.Node"
// as version.
if len(supportedServices) == 1 && supportedServices[0] == drapbv1alpha4.NodeService {
supportedServices = []string{"1.0.0"}
}
// Now make it available to kubelet.
registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint)
if err != nil {
@ -383,7 +442,7 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
<-ctx.Done()
// Time to stop.
d.plugin.stop()
d.pluginServer.stop()
d.registrar.stop()
// d.resourceSliceController is set concurrently.
@ -395,8 +454,8 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
return d, nil
}
// Stop implements [DRAPlugin.Stop].
func (d *draPlugin) Stop() {
// Stop ensures that all spawned goroutines are stopped and frees resources.
func (d *Helper) Stop() {
if d == nil {
return
}
@ -405,9 +464,27 @@ func (d *draPlugin) Stop() {
d.wg.Wait()
}
// PublishResources implements [DRAPlugin.PublishResources]. Returns en error if
// kubeClient or nodeName are unset.
func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.DriverResources) error {
// PublishResources may be called one or more times to publish
// resource information in ResourceSlice objects. If it never gets
// called, then the kubelet plugin does not manage any ResourceSlice
// objects.
//
// PublishResources does not block, so it might still take a while
// after it returns before all information is actually written
// to the API server.
//
// It is the responsibility of the caller to ensure that the pools and
// slices described in the driver resources parameters are valid
// according to the restrictions defined in the resource.k8s.io API.
//
// Invalid ResourceSlices will be rejected by the apiserver during
// publishing, which happens asynchronously and thus does not
// get returned as error here. The only error returned here is
// when publishing was not set up properly, for example missing
// [KubeClient] or [NodeName] options.
//
// The caller may modify the resources after this call returns.
func (d *Helper) PublishResources(_ context.Context, resources resourceslice.DriverResources) error {
if d.kubeClient == nil {
return errors.New("no KubeClient found to publish resources")
}
@ -456,12 +533,118 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.
return nil
}
// RegistrationStatus implements [DRAPlugin.RegistrationStatus].
func (d *draPlugin) RegistrationStatus() *registerapi.RegistrationStatus {
// RegistrationStatus returns the result of registration, nil if none received yet.
func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus {
if d.registrar == nil {
return nil
}
// TODO: protect against concurrency issues.
return d.registrar.status
}
func (d *draPlugin) internal() {}
// serializeGRPCIfEnabled locks a mutex if serialization is enabled.
// Either way it returns a method that the caller must invoke
// via defer.
func (d *Helper) serializeGRPCIfEnabled() func() {
if !d.serialize {
return func() {}
}
d.grpcMutex.Lock()
return d.grpcMutex.Unlock
}
// nodePluginImplementation is a thin wrapper around the helper instance.
// It prevents polluting the public API with these implementation details.
type nodePluginImplementation struct {
*Helper
}
// NodePrepareResources implements [drapb.NodePrepareResources].
func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
// Do slow API calls before serializing.
claims, err := d.getResourceClaims(ctx, req.Claims)
if err != nil {
return nil, fmt.Errorf("get resource claims: %w", err)
}
defer d.serializeGRPCIfEnabled()()
result, err := d.plugin.PrepareResourceClaims(ctx, claims)
if err != nil {
return nil, fmt.Errorf("prepare resource claims: %w", err)
}
resp := &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{}}
for uid, claimResult := range result {
var devices []*drapb.Device
for _, result := range claimResult.Devices {
device := &drapb.Device{
RequestNames: stripSubrequestNames(result.Requests),
PoolName: result.PoolName,
DeviceName: result.DeviceName,
CDIDeviceIDs: result.CDIDeviceIDs,
}
devices = append(devices, device)
}
resp.Claims[string(uid)] = &drapb.NodePrepareResourceResponse{
Error: errorString(claimResult.Err),
Devices: devices,
}
}
return resp, nil
}
func errorString(err error) string {
if err == nil {
return ""
}
return err.Error()
}
func stripSubrequestNames(names []string) []string {
stripped := make([]string, len(names))
for i, name := range names {
stripped[i] = resourceclaim.BaseRequestRef(name)
}
return stripped
}
func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims []*drapb.Claim) ([]*resourceapi.ResourceClaim, error) {
var resourceClaims []*resourceapi.ResourceClaim
for _, claimReq := range claims {
claim, err := d.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{})
if err != nil {
return resourceClaims, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err)
}
if claim.Status.Allocation == nil {
return resourceClaims, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name)
}
if claim.UID != types.UID(claimReq.UID) {
return resourceClaims, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name)
}
resourceClaims = append(resourceClaims, claim)
}
return resourceClaims, nil
}
// NodeUnprepareResources implements [draapi.NodeUnprepareResources].
func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
defer d.serializeGRPCIfEnabled()
claims := make([]NamespacedObject, 0, len(req.Claims))
for _, claim := range req.Claims {
claims = append(claims, NamespacedObject{UID: types.UID(claim.UID), NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: claim.Namespace}})
}
result, err := d.plugin.UnprepareResourceClaims(ctx, claims)
if err != nil {
return nil, fmt.Errorf("unprepare resource claims: %w", err)
}
resp := &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{}}
for uid, err := range result {
resp.Claims[string(uid)] = &drapb.NodeUnprepareResourceResponse{
Error: errorString(err),
}
}
return resp, nil
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"k8s.io/apimachinery/pkg/types"
)
// NamespacedObject comprises a resource name with a mandatory namespace
// and optional UID. It gets rendered as "<namespace>/<name>:[<uid>]"
// (text output) or as an object (JSON output).
type NamespacedObject struct {
types.NamespacedName
UID types.UID
}
// String returns the general purpose string representation
func (n NamespacedObject) String() string {
if n.UID != "" {
return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID)
}
return n.Namespace + string(types.Separator) + n.Name
}
// MarshalLog emits a struct containing required key/value pair
func (n NamespacedObject) MarshalLog() interface{} {
return struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
UID types.UID `json:"uid,omitempty"`
}{
Name: n.Name,
Namespace: n.Namespace,
UID: n.UID,
}
}

View File

@ -82,14 +82,18 @@ func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryIntercept
// Run a gRPC server. It will close the listening socket when
// shutting down, so we don't need to do that.
// The per-context logger always gets initialized because
// there might be log output inside the method implementations.
var opts []grpc.ServerOption
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)}
finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)}
if grpcVerbosity >= 0 {
finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor)
finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor)
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{
unaryContextInterceptor(valueCtx),
s.interceptor,
}
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
finalStreamInterceptors := []grpc.StreamServerInterceptor{
streamContextInterceptor(valueCtx),
s.streamInterceptor,
}
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...))
@ -164,7 +168,6 @@ func (m mergeCtx) Value(i interface{}) interface{} {
// sequentially increasing request ID and adds that logger to the context. It
// also logs request and response.
func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
requestID := atomic.AddInt64(&requestID, 1)
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod)

View File

@ -430,7 +430,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
kubeletplugin.KubeletPluginSocketPath(draAddr),
kubeletplugin.NodeV1alpha4(d.NodeV1alpha4),
kubeletplugin.NodeV1beta1(d.NodeV1beta1),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)

View File

@ -1737,15 +1737,13 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
})
multipleDrivers := func(nodeV1alpha4, nodeV1beta1 bool) {
multipleDrivers := func(nodeV1beta1 bool) {
nodes := NewNodes(f, 1, 4)
driver1 := NewDriver(f, nodes, perNode(2, nodes))
driver1.NodeV1alpha4 = nodeV1alpha4
driver1.NodeV1beta1 = nodeV1beta1
b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, perNode(2, nodes))
driver2.NodeV1alpha4 = nodeV1alpha4
driver2.NodeV1beta1 = nodeV1beta1
driver2.NameSuffix = "-other"
b2 := newBuilder(f, driver2)
@ -1769,16 +1767,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
b1.testPod(ctx, f, pod)
})
}
multipleDriversContext := func(prefix string, nodeV1alpha4, nodeV1beta1 bool) {
multipleDriversContext := func(prefix string, nodeV1beta1 bool) {
ginkgo.Context(prefix, func() {
multipleDrivers(nodeV1alpha4, nodeV1beta1)
multipleDrivers(nodeV1beta1)
})
}
ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha4", true, false)
multipleDriversContext("using only drapbv1beta1", false, true)
multipleDriversContext("using both drav1alpha4 and drapbv1beta1", true, true)
multipleDriversContext("using only drapbv1beta1", true)
})
ginkgo.It("runs pod after driver starts", func(ctx context.Context) {

View File

@ -60,7 +60,11 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio
In another:
```
sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
sudo mkdir -p /var/run/cdi
sudo mkdir -p /var/lib/kubelet/plugins
sudo mkdir -p /var/lib/kubelet/plugins_registry
sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins
sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1
```

View File

@ -40,15 +40,13 @@ import (
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
)
type ExamplePlugin struct {
stopCh <-chan struct{}
logger klog.Logger
kubeClient kubernetes.Interface
d kubeletplugin.DRAPlugin
d *kubeletplugin.Helper
fileOps FileOperations
cdiDir string
@ -56,8 +54,11 @@ type ExamplePlugin struct {
nodeName string
deviceNames sets.Set[string]
// The mutex is needed because there are other goroutines checking the state.
// Serializing in the gRPC server alone is not enough because writing would
// race with reading.
mutex sync.Mutex
prepared map[ClaimID][]Device // prepared claims -> result of nodePrepareResource
prepared map[ClaimID][]kubeletplugin.Device // prepared claims -> result of nodePrepareResource
gRPCCalls []GRPCCall
blockPrepareResourcesMutex sync.Mutex
@ -89,7 +90,7 @@ type GRPCCall struct {
// sufficient to make the ClaimID unique.
type ClaimID struct {
Name string
UID string
UID types.UID
}
type Device struct {
@ -99,10 +100,10 @@ type Device struct {
CDIDeviceID string
}
var _ drapb.DRAPluginServer = &ExamplePlugin{}
var _ kubeletplugin.DRAPlugin = &ExamplePlugin{}
// getJSONFilePath returns the absolute path where CDI file is/should be.
func (ex *ExamplePlugin) getJSONFilePath(claimUID string, requestName string) string {
func (ex *ExamplePlugin) getJSONFilePath(claimUID types.UID, requestName string) string {
baseRequestRef := resourceclaim.BaseRequestRef(requestName)
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s-%s.json", ex.driverName, claimUID, baseRequestRef))
}
@ -151,7 +152,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
prepared: make(map[ClaimID][]Device),
prepared: make(map[ClaimID][]kubeletplugin.Device),
deviceNames: sets.New[string](),
}
@ -167,16 +168,9 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
kubeletplugin.KubeClient(kubeClient),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
kubeletplugin.Serialize(false), // The example plugin does its own locking.
)
// Both APIs get provided, the legacy one via wrapping. The options
// determine which one(s) really get served (by default, both).
// The options are a bit redundant now because a single instance cannot
// implement both, but that might be different in the future.
nodeServers := []any{
drapb.DRAPluginServer(ex), // Casting is done only for clarity here, it's not needed.
drapbv1alpha4.V1Beta1ServerWrapper{DRAPluginServer: ex},
}
d, err := kubeletplugin.Start(ctx, nodeServers, opts...)
d, err := kubeletplugin.Start(ctx, ex, opts...)
if err != nil {
return nil, fmt.Errorf("start kubelet plugin: %w", err)
}
@ -300,34 +294,21 @@ func (ex *ExamplePlugin) getUnprepareResourcesFailure() error {
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
// gets written again).
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drapb.Claim) ([]Device, error) {
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) {
logger := klog.FromContext(ctx)
// The plugin must retrieve the claim itself to get it in the version
// that it understands.
claim, err := ex.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err)
}
if claim.Status.Allocation == nil {
return nil, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name)
}
if claim.UID != types.UID(claimReq.UID) {
return nil, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name)
}
ex.mutex.Lock()
defer ex.mutex.Unlock()
ex.blockPrepareResourcesMutex.Lock()
defer ex.blockPrepareResourcesMutex.Unlock()
claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID}
claimID := ClaimID{Name: claim.Name, UID: claim.UID}
if result, ok := ex.prepared[claimID]; ok {
// Idempotent call, nothing to do.
return result, nil
}
var devices []Device
var devices []kubeletplugin.Device
for _, result := range claim.Status.Allocation.Devices.Results {
// Only handle allocations for the current driver.
if ex.driverName != result.Driver {
@ -356,7 +337,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
claimReqName = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(claimReqName, "_")
env[claimReqName] = "true"
deviceName := "claim-" + claimReq.UID + "-" + baseRequestName
deviceName := "claim-" + string(claim.UID) + "-" + baseRequestName
vendor := ex.driverName
class := "test"
cdiDeviceID := vendor + "/" + class + "=" + deviceName
@ -391,7 +372,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
},
},
}
filePath := ex.getJSONFilePath(claimReq.UID, baseRequestName)
filePath := ex.getJSONFilePath(claim.UID, baseRequestName)
buffer, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("marshal spec: %w", err)
@ -399,11 +380,11 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
if err := ex.fileOps.Create(filePath, buffer); err != nil {
return nil, fmt.Errorf("failed to write CDI file: %w", err)
}
device := Device{
PoolName: result.Pool,
DeviceName: result.Device,
RequestName: baseRequestName,
CDIDeviceID: cdiDeviceID,
device := kubeletplugin.Device{
PoolName: result.Pool,
DeviceName: result.Device,
Requests: []string{result.Request}, // May also return baseRequestName here.
CDIDeviceIDs: []string{cdiDeviceID},
}
devices = append(devices, device)
}
@ -434,48 +415,35 @@ func extractParameters(parameters runtime.RawExtension, env *map[string]string,
return nil
}
func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
resp := &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}
func (ex *ExamplePlugin) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) {
if failure := ex.getPrepareResourcesFailure(); failure != nil {
return resp, failure
return nil, failure
}
for _, claimReq := range req.Claims {
devices, err := ex.nodePrepareResource(ctx, claimReq)
result := make(map[types.UID]kubeletplugin.PrepareResult)
for _, claim := range claims {
devices, err := ex.nodePrepareResource(ctx, claim)
var claimResult kubeletplugin.PrepareResult
if err != nil {
resp.Claims[claimReq.UID] = &drapb.NodePrepareResourceResponse{
Error: err.Error(),
}
claimResult.Err = err
} else {
r := &drapb.NodePrepareResourceResponse{}
for _, device := range devices {
pbDevice := &drapb.Device{
PoolName: device.PoolName,
DeviceName: device.DeviceName,
RequestNames: []string{device.RequestName},
CDIDeviceIDs: []string{device.CDIDeviceID},
}
r.Devices = append(r.Devices, pbDevice)
}
resp.Claims[claimReq.UID] = r
claimResult.Devices = devices
}
result[claim.UID] = claimResult
}
return resp, nil
return result, nil
}
// NodeUnprepareResource removes the CDI file created by
// NodePrepareResource. It's idempotent, therefore it is not an error when that
// file is already gone.
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *drapb.Claim) error {
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error {
ex.blockUnprepareResourcesMutex.Lock()
defer ex.blockUnprepareResourcesMutex.Unlock()
logger := klog.FromContext(ctx)
claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID}
claimID := ClaimID{Name: claimRef.Name, UID: claimRef.UID}
devices, ok := ex.prepared[claimID]
if !ok {
// Idempotent call, nothing to do.
@ -483,11 +451,14 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr
}
for _, device := range devices {
filePath := ex.getJSONFilePath(claimReq.UID, device.RequestName)
if err := ex.fileOps.Remove(filePath); err != nil {
return fmt.Errorf("error removing CDI file: %w", err)
// In practice we only prepare one, but let's not assume that here.
for _, request := range device.Requests {
filePath := ex.getJSONFilePath(claimRef.UID, request)
if err := ex.fileOps.Remove(filePath); err != nil {
return fmt.Errorf("error removing CDI file: %w", err)
}
logger.V(3).Info("CDI file removed", "path", filePath)
}
logger.V(3).Info("CDI file removed", "path", filePath)
}
delete(ex.prepared, claimID)
@ -495,26 +466,18 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr
return nil
}
func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
resp := &drapb.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
func (ex *ExamplePlugin) UnprepareResourceClaims(ctx context.Context, claims []kubeletplugin.NamespacedObject) (map[types.UID]error, error) {
result := make(map[types.UID]error)
if failure := ex.getUnprepareResourcesFailure(); failure != nil {
return resp, failure
return nil, failure
}
for _, claimReq := range req.Claims {
err := ex.nodeUnprepareResource(ctx, claimReq)
if err != nil {
resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{
Error: err.Error(),
}
} else {
resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{}
}
for _, claimRef := range claims {
err := ex.nodeUnprepareResource(ctx, claimRef)
result[claimRef.UID] = err
}
return resp, nil
return result, nil
}
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {