Merge pull request #130700 from pohly/dra-kubeletplugin-helper

DRA kubeletplugin: turn helper into wrapper
This commit is contained in:
Kubernetes Prow Robot 2025-03-16 01:55:47 -07:00 committed by GitHub
commit f007012f5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 869 additions and 348 deletions

View File

@ -16,4 +16,52 @@ limitations under the License.
// Package kubeletplugin provides helper functions for running a dynamic // Package kubeletplugin provides helper functions for running a dynamic
// resource allocation kubelet plugin. // resource allocation kubelet plugin.
//
// A DRA driver using this package can be deployed as a DaemonSet on suitable
// nodes. Node labeling, for example through NFD
// (https://github.com/kubernetes-sigs/node-feature-discovery), can be used
// to run the driver only on nodes which have the necessary hardware.
//
// The service account of the DaemonSet must have sufficient RBAC permissions
// to read ResourceClaims and to create and update ResourceSlices, if
// the driver intends to publish per-node ResourceSlices. It is good
// security practice (but not required) to limit access to ResourceSlices
// associated with the node a specific Pod is running on. This can be done
// with a Validating Admission Policy (VAP). For more information,
// see the deployment of the DRA example driver
// (https://github.com/kubernetes-sigs/dra-example-driver/tree/main/deployments/helm/dra-example-driver/templates).
//
// Traditionally, the kubelet has not supported rolling updates of plugins.
// Therefore the DaemonSet must not set `maxSurge` to a value larger than
// zero. With the default `maxSurge: 0`, updating the DaemonSet of the driver
// will first shut down the old driver Pod, then start the replacement.
//
// This leads to a short downtime for operations that need the driver:
// - Pods cannot start unless the claims they depend on were already
// prepared for use.
// - Cleanup after the last pod which used a claim gets delayed
// until the driver is available again. The pod is not marked
// as terminated. This prevents reusing the resources used by
// the pod for other pods.
// - Running pods are *not* affected as far as Kubernetes is
// concerned. However, a DRA driver might provide required runtime
// services. Vendors need to document this.
//
// Note that the second point also means that draining a node should
// first evict normal pods, then the driver DaemonSet Pod.
//
// Starting with Kubernetes 1.33, the kubelet supports rolling updates
// such that old and new Pod run at the same time for a short while
// and hand over work gracefully, with no downtime.
// However, there is no mechanism for determining in advance whether
// the node the DaemonSet runs on supports that. Trying
// to do a rolling update with a kubelet which does not support it yet
// will fail because shutting down the old Pod unregisters the driver
// even though the new Pod is running. See https://github.com/kubernetes/kubernetes/pull/129832
// for details (TODO: link to doc after merging instead).
//
// A DRA driver can either require 1.33 as minimal Kubernetes version or
// provide two variants of its DaemonSet deployment. In the variant with
// support for rolling updates, `maxSurge` can be set to a non-zero
// value. Administrators have to be careful about running the right variant.
package kubeletplugin package kubeletplugin

View File

@ -21,56 +21,135 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"path"
"sync" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2" "k8s.io/klog/v2"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/dynamic-resource-allocation/resourceslice"
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
) )
// DRAPlugin gets returned by Start and defines the public API of the generic const (
// dynamic resource allocation plugin. // KubeletPluginsDir is the default directory for [PluginDataDirectoryPath].
KubeletPluginsDir = "/var/lib/kubelet/plugins"
// KubeletRegistryDir is the default for [RegistrarDirectoryPath]
KubeletRegistryDir = "/var/lib/kubelet/plugins_registry"
)
// DRAPlugin is the interface that needs to be implemented by a DRA driver to
// use this helper package. The helper package then implements the gRPC
// interface expected by the kubelet by wrapping the DRAPlugin implementation.
type DRAPlugin interface { type DRAPlugin interface {
// Stop ensures that all spawned goroutines are stopped and frees // PrepareResourceClaims is called to prepare all resources allocated
// resources. // for the given ResourceClaims. This is used to implement
Stop() // the gRPC NodePrepareResources call.
//
// It gets called with the complete list of claims that are needed
// by some pod. In contrast to the gRPC call, the helper has
// already retrieved the actual ResourceClaim objects.
//
// In addition to that, the helper also:
// - verifies that all claims are really allocated
// - increments a numeric counter for each call and
// adds its value to a per-context logger with "requestID" as key
// - adds the method name with "method" as key to that logger
// - logs the gRPC call and response (configurable with GRPCVerbosity)
// - serializes all gRPC calls unless the driver explicitly opted out of that
//
// This call must be idempotent because the kubelet might have to ask
// for preparation multiple times, for example if it gets restarted.
//
// A DRA driver should verify that all devices listed in a
// [resourceapi.DeviceRequestAllocationResult] are not already in use
// for some other ResourceClaim. Kubernetes tries very hard to ensure
// that, but if something went wrong, then the DRA driver is the last
// line of defense against using the same device for two different
// unrelated workloads.
//
// If an error is returned, the result is ignored. Otherwise the result
// must have exactly one entry for each claim, identified by the UID of
// the corresponding ResourceClaim. For each claim, preparation
// can be either successful (no error set in the per-ResourceClaim PrepareResult)
// or can be reported as failed.
//
// It is possible to create the CDI spec files which define the CDI devices
// on-the-fly in PrepareResourceClaims. UnprepareResourceClaims then can
// remove them. Container runtimes may cache CDI specs but must reload
// files in case of a cache miss. To avoid false cache hits, the unique
// name in the CDI device ID should not be reused. A DRA driver can use
// the claim UID for it.
PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (result map[types.UID]PrepareResult, err error)
// RegistrationStatus returns the result of registration, nil if none // UnprepareResourceClaims must undo whatever work PrepareResourceClaims did.
// received yet. //
RegistrationStatus() *registerapi.RegistrationStatus // At the time when this gets called, the original ResourceClaims may have
// been deleted already. They also don't get cached by the kubelet. Therefore
// parameters for each ResourcClaim are only the UID, namespace and name.
// It is the responsibility of the DRA driver to cache whatever additional
// information it might need about prepared resources.
//
// This call must be idempotent because the kubelet might have to ask
// for un-preparation multiple times, for example if it gets restarted.
// Therefore it is not an error if this gets called for a ResourceClaim
// which is not currently prepared.
//
// As with PrepareResourceClaims, the helper takes care of logging
// and serialization.
//
// The conventions for returning one overall error and several per-ResourceClaim
// errors are the same as in PrepareResourceClaims.
UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error)
}
// PublishResources may be called one or more times to publish // PrepareResult contains the result of preparing one particular ResourceClaim.
// resource information in ResourceSlice objects. If it never gets type PrepareResult struct {
// called, then the kubelet plugin does not manage any ResourceSlice // Err, if non-nil, describes a problem that occurred while preparing
// objects. // the ResourceClaim. The devices are then ignored and the kubelet will
// // try to prepare the ResourceClaim again later.
// PublishResources does not block, so it might still take a while Err error
// after it returns before all information is actually written
// to the API server.
//
// It is the responsibility of the caller to ensure that the pools and
// slices described in the driver resources parameters are valid
// according to the restrictions defined in the resource.k8s.io API.
//
// Invalid ResourceSlices will be rejected by the apiserver during
// publishing, which happens asynchronously and thus does not
// get returned as error here. The only error returned here is
// when publishing was not set up properly, for example missing
// [KubeClient] or [NodeName] options.
//
// The caller may modify the resources after this call returns.
PublishResources(ctx context.Context, resources resourceslice.DriverResources) error
// This unexported method ensures that we can modify the interface // Devices contains the IDs of CDI devices associated with specific requests
// without causing an API break of the package // in a ResourceClaim. Those IDs will be passed on to the container runtime
// (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme). // by the kubelet.
internal() //
// The empty slice is also valid.
Devices []Device
}
// Device provides the CDI device IDs for one request in a ResourceClaim.
type Device struct {
// Requests lists the names of requests or subrequests in the
// ResourceClaim that this device is associated with. The subrequest
// name may be included here, but it is also okay to just return
// the request name.
//
// A DRA driver can get this string from the Request field in
// [resourceapi.DeviceRequestAllocationResult], which includes the
// subrequest name if there is one.
//
// If empty, the device is associated with all requests.
Requests []string
// PoolName identifies the DRA driver's pool which contains the device.
// Must not be empty.
PoolName string
// DeviceName identifies the device inside that pool.
// Must not be empty.
DeviceName string
// CDIDeviceIDs lists all CDI devices associated with this DRA device.
// Each ID must be of the form "<vendor ID>/<class>=<unique name>".
// May be empty.
CDIDeviceIDs []string
} }
// Option implements the functional options pattern for Start. // Option implements the functional options pattern for Start.
@ -94,63 +173,77 @@ func GRPCVerbosity(level int) Option {
} }
} }
// RegistrarSocketPath sets the file path for a Unix domain socket. // RegistrarDirectoryPath sets the path to the directory where the kubelet
// If RegistrarListener is not used, then Start will remove // expects to find registration sockets of plugins. Typically this is
// a file at that path, should one exist, and creates a socket // /var/lib/kubelet/plugins_registry with /var/lib/kubelet being the kubelet's
// itself. Otherwise it uses the provided listener and only // data directory.
// removes the socket at the specified path during shutdown.
// //
// At least one of these two options is required. // This is also the default. Some Kubernetes clusters may use a different data directory.
func RegistrarSocketPath(path string) Option { // This path must be the same inside and outside of the driver's container.
// The directory must exist.
func RegistrarDirectoryPath(path string) Option {
return func(o *options) error { return func(o *options) error {
o.pluginRegistrationEndpoint.path = path o.pluginRegistrationEndpoint.dir = path
return nil return nil
} }
} }
// RegistrarListener sets an already created listener for the plugin // RegistrarSocketFilename sets the name of the socket inside the directory where
// registration API. Can be combined with RegistrarSocketPath. // the kubelet watches for registration sockets (see RegistrarDirectoryPath).
// //
// At least one of these two options is required. // Usually DRA drivers should not need this option. It is provided to
func RegistrarListener(listener net.Listener) Option { // support updates from an installation which used an older release of
// of the helper code.
//
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
// it is <driver name>-<uid>-reg.sock.
func RegistrarSocketFilename(name string) Option {
return func(o *options) error { return func(o *options) error {
o.pluginRegistrationEndpoint.listener = listener o.pluginRegistrationEndpoint.file = name
return nil return nil
} }
} }
// PluginSocketPath sets the file path for a Unix domain socket. // RegistrarListener configures how to create the registrar socket.
// If PluginListener is not used, then Start will remove // The default is to remove the file if it exists and to then
// a file at that path, should one exist, and creates a socket // create a socket.
// itself. Otherwise it uses the provided listener and only
// removes the socket at the specified path during shutdown.
// //
// At least one of these two options is required. // This is used in Kubernetes for end-to-end testing. The default should
func PluginSocketPath(path string) Option { // be fine for DRA drivers.
func RegistrarListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
return func(o *options) error { return func(o *options) error {
o.draEndpoint.path = path o.pluginRegistrationEndpoint.listenFunc = listen
return nil return nil
} }
} }
// PluginListener sets an already created listener for the dynamic resource // PluginDataDirectoryPath sets the path where the DRA driver creates the
// allocation plugin API. Can be combined with PluginSocketPath. // "dra.sock" socket that the kubelet connects to for the DRA-specific gRPC calls.
// It is also used to coordinate between different Pods when using rolling
// updates. It must not be shared with other kubelet plugins.
// //
// At least one of these two options is required. // The default is /var/lib/kubelet/plugins/<driver name>. This directory
func PluginListener(listener net.Listener) Option { // does not need to be inside the kubelet data directory, as long as
// the kubelet can access it.
//
// This path must be the same inside and outside of the driver's container.
// The directory must exist.
func PluginDataDirectoryPath(path string) Option {
return func(o *options) error { return func(o *options) error {
o.draEndpoint.listener = listener o.pluginDataDirectoryPath = path
return nil return nil
} }
} }
// KubeletPluginSocketPath defines how kubelet will connect to the dynamic // PluginListener configures how to create the registrar socket.
// resource allocation plugin. This corresponds to PluginSocketPath, except // The default is to remove the file if it exists and to then
// that PluginSocketPath defines the path in the filesystem of the caller and // create a socket.
// KubeletPluginSocketPath in the filesystem of kubelet. //
func KubeletPluginSocketPath(path string) Option { // This is used in Kubernetes for end-to-end testing. The default should
// be fine for DRA drivers.
func PluginListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
return func(o *options) error { return func(o *options) error {
o.draAddress = path o.draEndpointListen = listen
return nil return nil
} }
} }
@ -173,17 +266,11 @@ func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
} }
} }
// NodeV1alpha4 explicitly chooses whether the DRA gRPC API v1alpha4
// gets enabled.
func NodeV1alpha4(enabled bool) Option {
return func(o *options) error {
o.nodeV1alpha4 = enabled
return nil
}
}
// NodeV1beta1 explicitly chooses whether the DRA gRPC API v1beta1 // NodeV1beta1 explicitly chooses whether the DRA gRPC API v1beta1
// gets enabled. // gets enabled. True by default.
//
// This is used in Kubernetes for end-to-end testing. The default should
// be fine for DRA drivers.
func NodeV1beta1(enabled bool) Option { func NodeV1beta1(enabled bool) Option {
return func(o *options) error { return func(o *options) error {
o.nodeV1beta1 = enabled o.nodeV1beta1 = enabled
@ -223,37 +310,51 @@ func NodeUID(nodeUID types.UID) Option {
} }
} }
// Serialize overrides whether the helper serializes the prepare and unprepare
// calls. The default is to serialize.
//
// A DRA driver can opt out of that to speed up parallel processing, but then
// must handle concurrency itself.
func Serialize(enabled bool) Option {
return func(o *options) error {
o.serialize = enabled
return nil
}
}
type options struct { type options struct {
logger klog.Logger logger klog.Logger
grpcVerbosity int grpcVerbosity int
driverName string driverName string
nodeName string nodeName string
nodeUID types.UID nodeUID types.UID
draEndpoint endpoint
draAddress string
pluginRegistrationEndpoint endpoint pluginRegistrationEndpoint endpoint
pluginDataDirectoryPath string
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
unaryInterceptors []grpc.UnaryServerInterceptor unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
serialize bool
nodeV1alpha4 bool
nodeV1beta1 bool nodeV1beta1 bool
} }
// draPlugin combines the kubelet registration service and the DRA node plugin // Helper combines the kubelet registration service and the DRA node plugin
// service. // service and implements them by calling a [DRAPlugin] implementation.
type draPlugin struct { type Helper struct {
// backgroundCtx is for activities that are started later. // backgroundCtx is for activities that are started later.
backgroundCtx context.Context backgroundCtx context.Context
// cancel cancels the backgroundCtx. // cancel cancels the backgroundCtx.
cancel func(cause error) cancel func(cause error)
wg sync.WaitGroup wg sync.WaitGroup
registrar *nodeRegistrar registrar *nodeRegistrar
plugin *grpcServer pluginServer *grpcServer
plugin DRAPlugin
driverName string driverName string
nodeName string nodeName string
nodeUID types.UID nodeUID types.UID
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
// Information about resource publishing changes concurrently and thus // Information about resource publishing changes concurrently and thus
// must be protected by the mutex. The controller gets started only // must be protected by the mutex. The controller gets started only
@ -263,27 +364,24 @@ type draPlugin struct {
} }
// Start sets up two gRPC servers (one for registration, one for the DRA node // Start sets up two gRPC servers (one for registration, one for the DRA node
// client). By default, all APIs implemented by the nodeServer get registered. // client) and implements them by calling a [DRAPlugin] implementation.
// //
// The context and/or DRAPlugin.Stop can be used to stop all background activity. // The context and/or DRAPlugin.Stop can be used to stop all background activity.
// Stop also blocks. A logger can be stored in the context to add values or // Stop also blocks. A logger can be stored in the context to add values or
// a name to all log entries. // a name to all log entries.
// //
// If the plugin will be used to publish resources, [KubeClient] and [NodeName] // If the plugin will be used to publish resources, [KubeClient] and [NodeName]
// options are mandatory. // options are mandatory. Otherwise only [DriverName] is mandatory.
// func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) {
// The DRA driver decides which gRPC interfaces it implements. At least one
// implementation of [drapbv1alpha4.NodeServer] or [drapbv1beta1.DRAPluginServer]
// is required. Implementing drapbv1beta1.DRAPluginServer is recommended for
// DRA driver targeting Kubernetes >= 1.32. To be compatible with Kubernetes 1.31,
// DRA drivers must implement only [drapbv1alpha4.NodeServer].
func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (result DRAPlugin, finalErr error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
o := options{ o := options{
logger: klog.Background(), logger: klog.Background(),
grpcVerbosity: 6, // Logs requests and responses, which can be large. grpcVerbosity: 6, // Logs requests and responses, which can be large.
nodeV1alpha4: true, serialize: true,
nodeV1beta1: true, nodeV1beta1: true,
pluginRegistrationEndpoint: endpoint{
dir: KubeletRegistryDir,
},
} }
for _, option := range opts { for _, option := range opts {
if err := option(&o); err != nil { if err := option(&o); err != nil {
@ -294,22 +392,19 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
if o.driverName == "" { if o.driverName == "" {
return nil, errors.New("driver name must be set") return nil, errors.New("driver name must be set")
} }
if o.draAddress == "" { if o.pluginRegistrationEndpoint.file == "" {
return nil, errors.New("DRA address must be set") o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
} }
var emptyEndpoint endpoint if o.pluginDataDirectoryPath == "" {
if o.draEndpoint == emptyEndpoint { o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
return nil, errors.New("a Unix domain socket path and/or listener must be set for the kubelet plugin")
} }
if o.pluginRegistrationEndpoint == emptyEndpoint { d := &Helper{
return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar")
}
d := &draPlugin{
driverName: o.driverName, driverName: o.driverName,
nodeName: o.nodeName, nodeName: o.nodeName,
nodeUID: o.nodeUID, nodeUID: o.nodeUID,
kubeClient: o.kubeClient, kubeClient: o.kubeClient,
serialize: o.serialize,
plugin: plugin,
} }
// Stop calls cancel and therefore both cancellation // Stop calls cancel and therefore both cancellation
@ -337,39 +432,28 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
// Run the node plugin gRPC server first to ensure that it is ready. // Run the node plugin gRPC server first to ensure that it is ready.
var supportedServices []string var supportedServices []string
plugin, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { draEndpoint := endpoint{
for _, nodeServer := range nodeServers { dir: o.pluginDataDirectoryPath,
if nodeServer, ok := nodeServer.(drapbv1alpha4.NodeServer); ok && o.nodeV1alpha4 { file: "dra.sock", // "dra" is hard-coded.
logger.V(5).Info("registering v1alpha4.Node gGRPC service") listenFunc: o.draEndpointListen,
drapbv1alpha4.RegisterNodeServer(grpcServer, nodeServer)
supportedServices = append(supportedServices, drapbv1alpha4.NodeService)
} }
if nodeServer, ok := nodeServer.(drapbv1beta1.DRAPluginServer); ok && o.nodeV1beta1 { pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
if o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapbv1beta1.RegisterDRAPluginServer(grpcServer, nodeServer) drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
supportedServices = append(supportedServices, drapbv1beta1.DRAPluginService) supportedServices = append(supportedServices, drapb.DRAPluginService)
}
} }
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("start node client: %v", err) return nil, fmt.Errorf("start node client: %v", err)
} }
d.plugin = plugin d.pluginServer = pluginServer
if len(supportedServices) == 0 { if len(supportedServices) == 0 {
return nil, errors.New("no supported DRA gRPC API is implemented and enabled") return nil, errors.New("no supported DRA gRPC API is implemented and enabled")
} }
// Backwards compatibility hack: if only the alpha gRPC service is enabled,
// then we can support registration against a 1.31 kubelet by reporting "1.0.0"
// as version. That also works with 1.32 because 1.32 supports that legacy
// behavior and 1.31 works because it doesn't fail while parsing "v1alpha3.Node"
// as version.
if len(supportedServices) == 1 && supportedServices[0] == drapbv1alpha4.NodeService {
supportedServices = []string{"1.0.0"}
}
// Now make it available to kubelet. // Now make it available to kubelet.
registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint) registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint)
if err != nil { if err != nil {
return nil, fmt.Errorf("start registrar: %v", err) return nil, fmt.Errorf("start registrar: %v", err)
} }
@ -383,7 +467,7 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
<-ctx.Done() <-ctx.Done()
// Time to stop. // Time to stop.
d.plugin.stop() d.pluginServer.stop()
d.registrar.stop() d.registrar.stop()
// d.resourceSliceController is set concurrently. // d.resourceSliceController is set concurrently.
@ -395,8 +479,8 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu
return d, nil return d, nil
} }
// Stop implements [DRAPlugin.Stop]. // Stop ensures that all spawned goroutines are stopped and frees resources.
func (d *draPlugin) Stop() { func (d *Helper) Stop() {
if d == nil { if d == nil {
return return
} }
@ -405,9 +489,27 @@ func (d *draPlugin) Stop() {
d.wg.Wait() d.wg.Wait()
} }
// PublishResources implements [DRAPlugin.PublishResources]. Returns en error if // PublishResources may be called one or more times to publish
// kubeClient or nodeName are unset. // resource information in ResourceSlice objects. If it never gets
func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.DriverResources) error { // called, then the kubelet plugin does not manage any ResourceSlice
// objects.
//
// PublishResources does not block, so it might still take a while
// after it returns before all information is actually written
// to the API server.
//
// It is the responsibility of the caller to ensure that the pools and
// slices described in the driver resources parameters are valid
// according to the restrictions defined in the resource.k8s.io API.
//
// Invalid ResourceSlices will be rejected by the apiserver during
// publishing, which happens asynchronously and thus does not
// get returned as error here. The only error returned here is
// when publishing was not set up properly, for example missing
// [KubeClient] or [NodeName] options.
//
// The caller may modify the resources after this call returns.
func (d *Helper) PublishResources(_ context.Context, resources resourceslice.DriverResources) error {
if d.kubeClient == nil { if d.kubeClient == nil {
return errors.New("no KubeClient found to publish resources") return errors.New("no KubeClient found to publish resources")
} }
@ -433,6 +535,11 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.
// our background context, not the one passed into this // our background context, not the one passed into this
// function, and thus is connected to the lifecycle of the // function, and thus is connected to the lifecycle of the
// plugin. // plugin.
//
// TODO: don't delete ResourceSlices, not even on a clean shutdown.
// We either support rolling updates and want to hand over seamlessly
// or don't and then perhaps restart the pod quickly enough that
// the kubelet hasn't deleted ResourceSlices yet.
controllerCtx := d.backgroundCtx controllerCtx := d.backgroundCtx
controllerLogger := klog.FromContext(controllerCtx) controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
@ -456,12 +563,118 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.
return nil return nil
} }
// RegistrationStatus implements [DRAPlugin.RegistrationStatus]. // RegistrationStatus returns the result of registration, nil if none received yet.
func (d *draPlugin) RegistrationStatus() *registerapi.RegistrationStatus { func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus {
if d.registrar == nil { if d.registrar == nil {
return nil return nil
} }
// TODO: protect against concurrency issues.
return d.registrar.status return d.registrar.status
} }
func (d *draPlugin) internal() {} // serializeGRPCIfEnabled locks a mutex if serialization is enabled.
// Either way it returns a method that the caller must invoke
// via defer.
func (d *Helper) serializeGRPCIfEnabled() func() {
if !d.serialize {
return func() {}
}
d.grpcMutex.Lock()
return d.grpcMutex.Unlock
}
// nodePluginImplementation is a thin wrapper around the helper instance.
// It prevents polluting the public API with these implementation details.
type nodePluginImplementation struct {
*Helper
}
// NodePrepareResources implements [drapb.NodePrepareResources].
func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
// Do slow API calls before serializing.
claims, err := d.getResourceClaims(ctx, req.Claims)
if err != nil {
return nil, fmt.Errorf("get resource claims: %w", err)
}
defer d.serializeGRPCIfEnabled()()
result, err := d.plugin.PrepareResourceClaims(ctx, claims)
if err != nil {
return nil, fmt.Errorf("prepare resource claims: %w", err)
}
resp := &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{}}
for uid, claimResult := range result {
var devices []*drapb.Device
for _, result := range claimResult.Devices {
device := &drapb.Device{
RequestNames: stripSubrequestNames(result.Requests),
PoolName: result.PoolName,
DeviceName: result.DeviceName,
CDIDeviceIDs: result.CDIDeviceIDs,
}
devices = append(devices, device)
}
resp.Claims[string(uid)] = &drapb.NodePrepareResourceResponse{
Error: errorString(claimResult.Err),
Devices: devices,
}
}
return resp, nil
}
func errorString(err error) string {
if err == nil {
return ""
}
return err.Error()
}
func stripSubrequestNames(names []string) []string {
stripped := make([]string, len(names))
for i, name := range names {
stripped[i] = resourceclaim.BaseRequestRef(name)
}
return stripped
}
func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims []*drapb.Claim) ([]*resourceapi.ResourceClaim, error) {
var resourceClaims []*resourceapi.ResourceClaim
for _, claimReq := range claims {
claim, err := d.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{})
if err != nil {
return resourceClaims, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err)
}
if claim.Status.Allocation == nil {
return resourceClaims, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name)
}
if claim.UID != types.UID(claimReq.UID) {
return resourceClaims, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name)
}
resourceClaims = append(resourceClaims, claim)
}
return resourceClaims, nil
}
// NodeUnprepareResources implements [draapi.NodeUnprepareResources].
func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
defer d.serializeGRPCIfEnabled()
claims := make([]NamespacedObject, 0, len(req.Claims))
for _, claim := range req.Claims {
claims = append(claims, NamespacedObject{UID: types.UID(claim.UID), NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: claim.Namespace}})
}
result, err := d.plugin.UnprepareResourceClaims(ctx, claims)
if err != nil {
return nil, fmt.Errorf("unprepare resource claims: %w", err)
}
resp := &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{}}
for uid, err := range result {
resp.Claims[string(uid)] = &drapb.NodeUnprepareResourceResponse{
Error: errorString(err),
}
}
return resp, nil
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"context"
"errors"
"fmt"
"net"
"os"
"path"
)
// endpoint defines where and how to listen for incoming connections.
// The listener always gets closed when shutting down.
//
// If the listen function is not set, a new listener for a Unix domain socket gets
// created at the path.
type endpoint struct {
dir, file string
listenFunc func(ctx context.Context, socketpath string) (net.Listener, error)
}
func (e endpoint) path() string {
return path.Join(e.dir, e.file)
}
func (e endpoint) listen(ctx context.Context) (net.Listener, error) {
socketpath := e.path()
if e.listenFunc != nil {
return e.listenFunc(ctx, socketpath)
}
// Remove stale sockets, listen would fail otherwise.
if err := e.removeSocket(); err != nil {
return nil, err
}
cfg := net.ListenConfig{}
listener, err := cfg.Listen(ctx, "unix", socketpath)
if err != nil {
if removeErr := e.removeSocket(); removeErr != nil {
err = errors.Join(err, err)
}
return nil, err
}
return &unixListener{Listener: listener, endpoint: e}, nil
}
func (e endpoint) removeSocket() error {
if err := os.Remove(e.path()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove Unix domain socket: %w", err)
}
return nil
}
// unixListener adds removing of the Unix domain socket on Close.
type unixListener struct {
net.Listener
endpoint endpoint
}
func (l *unixListener) Close() error {
err := l.Listener.Close()
if removeErr := l.endpoint.removeSocket(); removeErr != nil {
err = errors.Join(err, err)
}
return err
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"context"
"net"
"path"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/klog/v2/ktesting"
)
func TestEndpointLifecycle(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tempDir := t.TempDir()
socketname := "test.sock"
e := endpoint{dir: tempDir, file: socketname}
listener, err := e.listen(ctx)
require.NoError(t, err, "listen")
assert.FileExists(t, path.Join(tempDir, socketname))
require.NoError(t, listener.Close(), "close")
assert.NoFileExists(t, path.Join(tempDir, socketname))
}
func TestEndpointListener(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tempDir := t.TempDir()
socketname := "test.sock"
listen := func(ctx2 context.Context, socketpath string) (net.Listener, error) {
assert.Equal(t, path.Join(tempDir, socketname), socketpath)
return nil, nil
}
e := endpoint{dir: tempDir, file: socketname, listenFunc: listen}
listener, err := e.listen(ctx)
require.NoError(t, err, "listen")
assert.NoFileExists(t, path.Join(tempDir, socketname))
assert.Nil(t, listener)
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"k8s.io/apimachinery/pkg/types"
)
// NamespacedObject comprises a resource name with a mandatory namespace
// and optional UID. It gets rendered as "<namespace>/<name>:[<uid>]"
// (text output) or as an object (JSON output).
type NamespacedObject struct {
types.NamespacedName
UID types.UID
}
// String returns the general purpose string representation
func (n NamespacedObject) String() string {
if n.UID != "" {
return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID)
}
return n.Namespace + string(types.Separator) + n.Name
}
// MarshalLog emits a struct containing required key/value pair
func (n NamespacedObject) MarshalLog() interface{} {
return struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
UID types.UID `json:"uid,omitempty"`
}{
Name: n.Name,
Namespace: n.Namespace,
UID: n.UID,
}
}

View File

@ -17,10 +17,10 @@ limitations under the License.
package kubeletplugin package kubeletplugin
import ( import (
"context"
"fmt" "fmt"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
) )
@ -30,17 +30,15 @@ type nodeRegistrar struct {
} }
// startRegistrar returns a running instance. // startRegistrar returns a running instance.
// func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, socketpath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
// The context is only used for additional values, cancellation is ignored.
func startRegistrar(valueCtx context.Context, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
n := &nodeRegistrar{ n := &nodeRegistrar{
registrationServer: registrationServer{ registrationServer: registrationServer{
driverName: driverName, driverName: driverName,
endpoint: endpoint, endpoint: socketpath,
supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin"). supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin").
}, },
} }
s, err := startGRPCServer(valueCtx, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n) registerapi.RegisterRegistrationServer(grpcServer, n)
}) })
if err != nil { if err != nil {

View File

@ -19,15 +19,11 @@ package kubeletplugin
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2" "k8s.io/klog/v2"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
) )
var requestID int64 var requestID int64
@ -36,60 +32,41 @@ type grpcServer struct {
grpcVerbosity int grpcVerbosity int
wg sync.WaitGroup wg sync.WaitGroup
endpoint endpoint endpoint endpoint
socketpath string
server *grpc.Server server *grpc.Server
} }
type registerService func(s *grpc.Server) type registerService func(s *grpc.Server)
// endpoint defines where to listen for incoming connections.
// The listener always gets closed when shutting down.
//
// If the listener is not set, a new listener for a Unix domain socket gets
// created at the path.
//
// If the path is non-empty, then the socket will get removed when shutting
// down, regardless of who created the listener.
type endpoint struct {
path string
listener net.Listener
}
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services. // which handles requests for arbitrary services.
// func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
// The context is only used for additional values, cancellation is ignored. ctx := klog.NewContext(context.Background(), logger)
func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
logger := klog.FromContext(valueCtx)
s := &grpcServer{ s := &grpcServer{
endpoint: endpoint, endpoint: endpoint,
grpcVerbosity: grpcVerbosity, grpcVerbosity: grpcVerbosity,
} }
listener := endpoint.listener listener, err := endpoint.listen(ctx)
if listener == nil {
// Remove any (probably stale) existing socket.
if err := os.Remove(endpoint.path); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("remove Unix domain socket: %v", err)
}
// Now we can use the endpoint for listening.
l, err := net.Listen("unix", endpoint.path)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen on %q: %v", endpoint.path, err) return nil, fmt.Errorf("listen on %q: %w", s.socketpath, err)
}
listener = l
} }
// Run a gRPC server. It will close the listening socket when // Run a gRPC server. It will close the listening socket when
// shutting down, so we don't need to do that. // shutting down, so we don't need to do that.
// The per-context logger always gets initialized because
// there might be log output inside the method implementations.
var opts []grpc.ServerOption var opts []grpc.ServerOption
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)} finalUnaryInterceptors := []grpc.UnaryServerInterceptor{
finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)} unaryContextInterceptor(ctx),
if grpcVerbosity >= 0 { s.interceptor,
finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor)
finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor)
} }
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
finalStreamInterceptors := []grpc.StreamServerInterceptor{
streamContextInterceptor(ctx),
s.streamInterceptor,
}
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...)) opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...))
@ -164,7 +141,6 @@ func (m mergeCtx) Value(i interface{}) interface{} {
// sequentially increasing request ID and adds that logger to the context. It // sequentially increasing request ID and adds that logger to the context. It
// also logs request and response. // also logs request and response.
func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
requestID := atomic.AddInt64(&requestID, 1) requestID := atomic.AddInt64(&requestID, 1)
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod) logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod)
@ -240,9 +216,4 @@ func (s *grpcServer) stop() {
} }
s.wg.Wait() s.wg.Wait()
s.server = nil s.server = nil
if s.endpoint.path != "" {
if err := os.Remove(s.endpoint.path); err != nil && !os.IsNotExist(err) {
utilruntime.HandleError(fmt.Errorf("remove Unix socket: %w", err))
}
}
} }

View File

@ -22,11 +22,14 @@ import (
_ "embed" _ "embed"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"net/url"
"path" "path"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -49,11 +52,13 @@ import (
"k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/discovery/cached/memory"
resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1" resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/exec"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -326,8 +331,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
instanceKey := "app.kubernetes.io/instance" instanceKey := "app.kubernetes.io/instance"
rsName := "" rsName := ""
draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
numNodes := int32(len(nodes.NodeNames)) numNodes := int32(len(nodes.NodeNames))
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
registrarSocketFilename := d.Name + "-reg.sock"
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
switch item := item.(type) { switch item := item.(type) {
case *appsv1.ReplicaSet: case *appsv1.ReplicaSet:
@ -353,10 +360,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
}, },
}, },
} }
item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins") item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath
item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock") item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock") item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath
} }
return nil return nil
}, manifests...) }, manifests...)
@ -419,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
} else { } else {
fileOps.NumDevices = numDevices fileOps.NumDevices = numDevices
} }
// All listeners running in this pod use a new unique local port number
// by atomically incrementing this variable.
listenerPort := int32(9000)
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@ -427,11 +437,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
return d.streamInterceptor(nodename, srv, ss, info, handler) return d.streamInterceptor(nodename, srv, ss, info, handler)
}), }),
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.KubeletPluginSocketPath(draAddr), kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
kubeletplugin.NodeV1alpha4(d.NodeV1alpha4),
kubeletplugin.NodeV1beta1(d.NodeV1beta1), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
) )
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() { d.cleanup = append(d.cleanup, func() {
@ -548,21 +560,161 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
F: d.f, F: d.f,
Namespace: pod.Namespace, Namespace: pod.Namespace,
PodName: pod.Name, PodName: pod.Name,
ContainerName: "plugin", ContainerName: pod.Spec.Containers[0].Name,
Logger: &logger, Logger: &logger,
} }
} }
func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener { // errListenerDone is the special error that we use to shut down.
// It doesn't need to be logged.
var errListenerDone = errors.New("listener is shutting down")
// listen returns the function which the kubeletplugin helper needs to open a listening socket.
// For that it spins up hostpathplugin in the pod for the desired node
// and connects to hostpathplugin via port forwarding.
func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) {
return func(ctx context.Context, path string) (l net.Listener, e error) {
// "Allocate" a new port by by bumping the per-pod counter by one.
port := atomic.AddInt32(port, 1)
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "socket-listener")
logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port)
ctx = klog.NewContext(ctx, logger)
// Start hostpathplugin in proxy mode and keep it running until the listener gets closed.
req := f.ClientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{
"/hostpathplugin",
"--v=5",
"--endpoint=" + path,
fmt.Sprintf("--proxy-endpoint=tcp://:%d", port),
},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)
var wg sync.WaitGroup
wg.Add(1)
cmdCtx, cmdCancel := context.WithCancelCause(ctx)
go func() {
defer wg.Done()
cmdLogger := klog.LoggerWithName(logger, "hostpathplugin")
cmdCtx := klog.NewContext(cmdCtx, cmdLogger)
logger.V(1).Info("Starting...")
defer logger.V(1).Info("Stopped")
if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil {
// errors.Is(err, listenerDoneErr) would be nicer, but we don't get
// that error from remotecommand. Instead forgo logging when we already shut down.
if cmdCtx.Err() == nil {
logger.Error(err, "execution failed")
}
}
// Killing hostpathplugin does not remove the socket. Need to do that manually.
req := f.ClientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{
"rm",
"-f",
path,
},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)
cleanupLogger := klog.LoggerWithName(logger, "cleanup")
cleanupCtx := klog.NewContext(ctx, cleanupLogger)
if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil {
cleanupLogger.Error(err, "Socket removal failed")
}
}()
defer func() {
// If we don't return a functional listener, then clean up.
if e != nil {
cmdCancel(e)
}
}()
stopHostpathplugin := func() {
cmdCancel(errListenerDone)
wg.Wait()
}
addr := proxy.Addr{ addr := proxy.Addr{
Namespace: f.Namespace.Name, Namespace: f.Namespace.Name,
PodName: podName, PodName: pod.Name,
ContainerName: containerName, ContainerName: pod.Spec.Containers[0].Name,
Port: port, Port: int(port),
} }
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
framework.ExpectNoError(err, "listen for connections from %+v", addr) if err != nil {
return listener return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
}
return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil
}
}
// listenerWithClose wraps Close so that it also shuts down hostpathplugin.
type listenerWithClose struct {
net.Listener
close func()
}
func (l *listenerWithClose) Close() error {
// First close connections, then shut down the remote command.
// Otherwise the connection code is unhappy and logs errors.
err := l.Listener.Close()
l.close()
return err
}
// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level.
func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error {
// Stream output as long as we run, i.e. ignore cancellation.
stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity)
stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity)
defer func() { _ = stdout.Close() }()
defer func() { _ = stderr.Close() }()
executor := exec.DefaultRemoteExecutor{}
return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil)
}
// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background.
func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
logger := klog.FromContext(ctx)
reader, writer := io.Pipe()
go func() {
buffer := make([]byte, 10*1024)
for {
n, err := reader.Read(buffer)
if n > 0 {
logger.V(verbosity).Info(msg, "msg", string(buffer[0:n]))
}
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Error(err, msg)
}
reader.CloseWithError(err)
return
}
if ctx.Err() != nil {
reader.CloseWithError(context.Cause(ctx))
return
}
}
}()
return writer
} }
func (d *Driver) TearDown() { func (d *Driver) TearDown() {

View File

@ -1737,15 +1737,13 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
}) })
}) })
multipleDrivers := func(nodeV1alpha4, nodeV1beta1 bool) { multipleDrivers := func(nodeV1beta1 bool) {
nodes := NewNodes(f, 1, 4) nodes := NewNodes(f, 1, 4)
driver1 := NewDriver(f, nodes, perNode(2, nodes)) driver1 := NewDriver(f, nodes, perNode(2, nodes))
driver1.NodeV1alpha4 = nodeV1alpha4
driver1.NodeV1beta1 = nodeV1beta1 driver1.NodeV1beta1 = nodeV1beta1
b1 := newBuilder(f, driver1) b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, perNode(2, nodes)) driver2 := NewDriver(f, nodes, perNode(2, nodes))
driver2.NodeV1alpha4 = nodeV1alpha4
driver2.NodeV1beta1 = nodeV1beta1 driver2.NodeV1beta1 = nodeV1beta1
driver2.NameSuffix = "-other" driver2.NameSuffix = "-other"
b2 := newBuilder(f, driver2) b2 := newBuilder(f, driver2)
@ -1769,16 +1767,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
b1.testPod(ctx, f, pod) b1.testPod(ctx, f, pod)
}) })
} }
multipleDriversContext := func(prefix string, nodeV1alpha4, nodeV1beta1 bool) { multipleDriversContext := func(prefix string, nodeV1beta1 bool) {
ginkgo.Context(prefix, func() { ginkgo.Context(prefix, func() {
multipleDrivers(nodeV1alpha4, nodeV1beta1) multipleDrivers(nodeV1beta1)
}) })
} }
ginkgo.Context("multiple drivers", func() { ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha4", true, false) multipleDriversContext("using only drapbv1beta1", true)
multipleDriversContext("using only drapbv1beta1", false, true)
multipleDriversContext("using both drav1alpha4 and drapbv1beta1", true, true)
}) })
ginkgo.It("runs pod after driver starts", func(ctx context.Context) { ginkgo.It("runs pod after driver starts", func(ctx context.Context) {

View File

@ -60,7 +60,11 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio
In another: In another:
``` ```
sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry sudo mkdir -p /var/run/cdi
sudo mkdir -p /var/lib/kubelet/plugins/test-driver.cdi.k8s.io
sudo mkdir -p /var/lib/kubelet/plugins_registry
sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins
sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins/test-driver.cdi.k8s.io
KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1 KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1
``` ```

View File

@ -40,15 +40,13 @@ import (
"k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2" "k8s.io/klog/v2"
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
) )
type ExamplePlugin struct { type ExamplePlugin struct {
stopCh <-chan struct{} stopCh <-chan struct{}
logger klog.Logger logger klog.Logger
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
d kubeletplugin.DRAPlugin d *kubeletplugin.Helper
fileOps FileOperations fileOps FileOperations
cdiDir string cdiDir string
@ -56,8 +54,11 @@ type ExamplePlugin struct {
nodeName string nodeName string
deviceNames sets.Set[string] deviceNames sets.Set[string]
// The mutex is needed because there are other goroutines checking the state.
// Serializing in the gRPC server alone is not enough because writing would
// race with reading.
mutex sync.Mutex mutex sync.Mutex
prepared map[ClaimID][]Device // prepared claims -> result of nodePrepareResource prepared map[ClaimID][]kubeletplugin.Device // prepared claims -> result of nodePrepareResource
gRPCCalls []GRPCCall gRPCCalls []GRPCCall
blockPrepareResourcesMutex sync.Mutex blockPrepareResourcesMutex sync.Mutex
@ -89,7 +90,7 @@ type GRPCCall struct {
// sufficient to make the ClaimID unique. // sufficient to make the ClaimID unique.
type ClaimID struct { type ClaimID struct {
Name string Name string
UID string UID types.UID
} }
type Device struct { type Device struct {
@ -99,10 +100,10 @@ type Device struct {
CDIDeviceID string CDIDeviceID string
} }
var _ drapb.DRAPluginServer = &ExamplePlugin{} var _ kubeletplugin.DRAPlugin = &ExamplePlugin{}
// getJSONFilePath returns the absolute path where CDI file is/should be. // getJSONFilePath returns the absolute path where CDI file is/should be.
func (ex *ExamplePlugin) getJSONFilePath(claimUID string, requestName string) string { func (ex *ExamplePlugin) getJSONFilePath(claimUID types.UID, requestName string) string {
baseRequestRef := resourceclaim.BaseRequestRef(requestName) baseRequestRef := resourceclaim.BaseRequestRef(requestName)
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s-%s.json", ex.driverName, claimUID, baseRequestRef)) return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s-%s.json", ex.driverName, claimUID, baseRequestRef))
} }
@ -151,7 +152,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
cdiDir: cdiDir, cdiDir: cdiDir,
driverName: driverName, driverName: driverName,
nodeName: nodeName, nodeName: nodeName,
prepared: make(map[ClaimID][]Device), prepared: make(map[ClaimID][]kubeletplugin.Device),
deviceNames: sets.New[string](), deviceNames: sets.New[string](),
} }
@ -167,16 +168,9 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
kubeletplugin.KubeClient(kubeClient), kubeletplugin.KubeClient(kubeClient),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
kubeletplugin.Serialize(false), // The example plugin does its own locking.
) )
// Both APIs get provided, the legacy one via wrapping. The options d, err := kubeletplugin.Start(ctx, ex, opts...)
// determine which one(s) really get served (by default, both).
// The options are a bit redundant now because a single instance cannot
// implement both, but that might be different in the future.
nodeServers := []any{
drapb.DRAPluginServer(ex), // Casting is done only for clarity here, it's not needed.
drapbv1alpha4.V1Beta1ServerWrapper{DRAPluginServer: ex},
}
d, err := kubeletplugin.Start(ctx, nodeServers, opts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("start kubelet plugin: %w", err) return nil, fmt.Errorf("start kubelet plugin: %w", err)
} }
@ -300,34 +294,21 @@ func (ex *ExamplePlugin) getUnprepareResourcesFailure() error {
// a deterministic name to simplify NodeUnprepareResource (no need to remember // a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply // or discover the name) and idempotency (when called again, the file simply
// gets written again). // gets written again).
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drapb.Claim) ([]Device, error) { func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// The plugin must retrieve the claim itself to get it in the version
// that it understands.
claim, err := ex.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err)
}
if claim.Status.Allocation == nil {
return nil, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name)
}
if claim.UID != types.UID(claimReq.UID) {
return nil, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name)
}
ex.mutex.Lock() ex.mutex.Lock()
defer ex.mutex.Unlock() defer ex.mutex.Unlock()
ex.blockPrepareResourcesMutex.Lock() ex.blockPrepareResourcesMutex.Lock()
defer ex.blockPrepareResourcesMutex.Unlock() defer ex.blockPrepareResourcesMutex.Unlock()
claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} claimID := ClaimID{Name: claim.Name, UID: claim.UID}
if result, ok := ex.prepared[claimID]; ok { if result, ok := ex.prepared[claimID]; ok {
// Idempotent call, nothing to do. // Idempotent call, nothing to do.
return result, nil return result, nil
} }
var devices []Device var devices []kubeletplugin.Device
for _, result := range claim.Status.Allocation.Devices.Results { for _, result := range claim.Status.Allocation.Devices.Results {
// Only handle allocations for the current driver. // Only handle allocations for the current driver.
if ex.driverName != result.Driver { if ex.driverName != result.Driver {
@ -356,7 +337,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
claimReqName = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(claimReqName, "_") claimReqName = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(claimReqName, "_")
env[claimReqName] = "true" env[claimReqName] = "true"
deviceName := "claim-" + claimReq.UID + "-" + baseRequestName deviceName := "claim-" + string(claim.UID) + "-" + baseRequestName
vendor := ex.driverName vendor := ex.driverName
class := "test" class := "test"
cdiDeviceID := vendor + "/" + class + "=" + deviceName cdiDeviceID := vendor + "/" + class + "=" + deviceName
@ -391,7 +372,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
}, },
}, },
} }
filePath := ex.getJSONFilePath(claimReq.UID, baseRequestName) filePath := ex.getJSONFilePath(claim.UID, baseRequestName)
buffer, err := json.Marshal(spec) buffer, err := json.Marshal(spec)
if err != nil { if err != nil {
return nil, fmt.Errorf("marshal spec: %w", err) return nil, fmt.Errorf("marshal spec: %w", err)
@ -399,11 +380,11 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap
if err := ex.fileOps.Create(filePath, buffer); err != nil { if err := ex.fileOps.Create(filePath, buffer); err != nil {
return nil, fmt.Errorf("failed to write CDI file: %w", err) return nil, fmt.Errorf("failed to write CDI file: %w", err)
} }
device := Device{ device := kubeletplugin.Device{
PoolName: result.Pool, PoolName: result.Pool,
DeviceName: result.Device, DeviceName: result.Device,
RequestName: baseRequestName, Requests: []string{result.Request}, // May also return baseRequestName here.
CDIDeviceID: cdiDeviceID, CDIDeviceIDs: []string{cdiDeviceID},
} }
devices = append(devices, device) devices = append(devices, device)
} }
@ -434,48 +415,35 @@ func extractParameters(parameters runtime.RawExtension, env *map[string]string,
return nil return nil
} }
func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { func (ex *ExamplePlugin) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) {
resp := &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}
if failure := ex.getPrepareResourcesFailure(); failure != nil { if failure := ex.getPrepareResourcesFailure(); failure != nil {
return resp, failure return nil, failure
} }
for _, claimReq := range req.Claims { result := make(map[types.UID]kubeletplugin.PrepareResult)
devices, err := ex.nodePrepareResource(ctx, claimReq) for _, claim := range claims {
devices, err := ex.nodePrepareResource(ctx, claim)
var claimResult kubeletplugin.PrepareResult
if err != nil { if err != nil {
resp.Claims[claimReq.UID] = &drapb.NodePrepareResourceResponse{ claimResult.Err = err
Error: err.Error(),
}
} else { } else {
r := &drapb.NodePrepareResourceResponse{} claimResult.Devices = devices
for _, device := range devices {
pbDevice := &drapb.Device{
PoolName: device.PoolName,
DeviceName: device.DeviceName,
RequestNames: []string{device.RequestName},
CDIDeviceIDs: []string{device.CDIDeviceID},
} }
r.Devices = append(r.Devices, pbDevice) result[claim.UID] = claimResult
} }
resp.Claims[claimReq.UID] = r return result, nil
}
}
return resp, nil
} }
// NodeUnprepareResource removes the CDI file created by // NodeUnprepareResource removes the CDI file created by
// NodePrepareResource. It's idempotent, therefore it is not an error when that // NodePrepareResource. It's idempotent, therefore it is not an error when that
// file is already gone. // file is already gone.
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *drapb.Claim) error { func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error {
ex.blockUnprepareResourcesMutex.Lock() ex.blockUnprepareResourcesMutex.Lock()
defer ex.blockUnprepareResourcesMutex.Unlock() defer ex.blockUnprepareResourcesMutex.Unlock()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} claimID := ClaimID{Name: claimRef.Name, UID: claimRef.UID}
devices, ok := ex.prepared[claimID] devices, ok := ex.prepared[claimID]
if !ok { if !ok {
// Idempotent call, nothing to do. // Idempotent call, nothing to do.
@ -483,38 +451,33 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr
} }
for _, device := range devices { for _, device := range devices {
filePath := ex.getJSONFilePath(claimReq.UID, device.RequestName) // In practice we only prepare one, but let's not assume that here.
for _, request := range device.Requests {
filePath := ex.getJSONFilePath(claimRef.UID, request)
if err := ex.fileOps.Remove(filePath); err != nil { if err := ex.fileOps.Remove(filePath); err != nil {
return fmt.Errorf("error removing CDI file: %w", err) return fmt.Errorf("error removing CDI file: %w", err)
} }
logger.V(3).Info("CDI file removed", "path", filePath) logger.V(3).Info("CDI file removed", "path", filePath)
} }
}
delete(ex.prepared, claimID) delete(ex.prepared, claimID)
return nil return nil
} }
func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { func (ex *ExamplePlugin) UnprepareResourceClaims(ctx context.Context, claims []kubeletplugin.NamespacedObject) (map[types.UID]error, error) {
resp := &drapb.NodeUnprepareResourcesResponse{ result := make(map[types.UID]error)
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
if failure := ex.getUnprepareResourcesFailure(); failure != nil { if failure := ex.getUnprepareResourcesFailure(); failure != nil {
return resp, failure return nil, failure
} }
for _, claimReq := range req.Claims { for _, claimRef := range claims {
err := ex.nodeUnprepareResource(ctx, claimReq) err := ex.nodeUnprepareResource(ctx, claimRef)
if err != nil { result[claimRef.UID] = err
resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{
Error: err.Error(),
} }
} else { return result, nil
resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{}
}
}
return resp, nil
} }
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID { func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {

View File

@ -178,9 +178,8 @@ func NewCommand() *cobra.Command {
} }
kubeletPluginFlagSets := cliflag.NamedFlagSets{} kubeletPluginFlagSets := cliflag.NamedFlagSets{}
fs = kubeletPluginFlagSets.FlagSet("kubelet") fs = kubeletPluginFlagSets.FlagSet("kubelet")
pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.") kubeletRegistryDir := fs.String("plugin-registration-path", kubeletplugin.KubeletRegistryDir, "The directory where kubelet looks for plugin registration sockets.")
endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.") kubeletPluginsDir := fs.String("datadir", kubeletplugin.KubeletPluginsDir, "The per-driver directory where the DRA Unix domain socket will be created.")
draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
fs = kubeletPluginFlagSets.FlagSet("CDI") fs = kubeletPluginFlagSets.FlagSet("CDI")
cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files") cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for") nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for")
@ -196,7 +195,8 @@ func NewCommand() *cobra.Command {
if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil { if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
return fmt.Errorf("create CDI directory: %w", err) return fmt.Errorf("create CDI directory: %w", err)
} }
if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil { datadir := path.Join(*kubeletPluginsDir, *driverName)
if err := os.MkdirAll(filepath.Dir(datadir), 0750); err != nil {
return fmt.Errorf("create socket directory: %w", err) return fmt.Errorf("create socket directory: %w", err)
} }
@ -205,9 +205,8 @@ func NewCommand() *cobra.Command {
} }
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices}, plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices},
kubeletplugin.PluginSocketPath(*endpoint), kubeletplugin.PluginDataDirectoryPath(datadir),
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), kubeletplugin.RegistrarDirectoryPath(*kubeletRegistryDir),
kubeletplugin.KubeletPluginSocketPath(*draAddress),
) )
if err != nil { if err != nil {
return fmt.Errorf("start example plugin: %w", err) return fmt.Errorf("start example plugin: %w", err)

View File

@ -46,40 +46,35 @@ spec:
topologyKey: kubernetes.io/hostname topologyKey: kubernetes.io/hostname
containers: containers:
- name: registrar - name: pause
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args: command:
- "--v=5" - /bin/sh
- "--endpoint=/plugins_registry/dra-test-driver-reg.sock" - -c
- "--proxy-endpoint=tcp://:9000" - while true; do sleep 10000; done
volumeMounts: volumeMounts:
- mountPath: /plugins_registry - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
name: registration-dir
- name: plugin
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args:
- "--v=5"
- "--endpoint=/dra/dra-test-driver.sock"
- "--proxy-endpoint=tcp://:9001"
securityContext:
privileged: true
volumeMounts:
- mountPath: /dra
name: socket-dir name: socket-dir
- mountPath: /var/lib/kubelet/plugins_registry
name: registration-dir
- mountPath: /cdi - mountPath: /cdi
name: cdi-dir name: cdi-dir
securityContext:
privileged: true
volumes: volumes:
- hostPath: - hostPath:
path: /var/lib/kubelet/plugins # There's no way to remove this. Conceptually, it may have to
# survive Pod restarts and there's no way to tell the kubelet
# that this isn't the case for this Pod.
path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
type: DirectoryOrCreate type: DirectoryOrCreate
name: socket-dir name: socket-dir
- hostPath:
path: /var/run/cdi
type: DirectoryOrCreate
name: cdi-dir
- hostPath: - hostPath:
path: /var/lib/kubelet/plugins_registry path: /var/lib/kubelet/plugins_registry
type: DirectoryOrCreate type: DirectoryOrCreate
name: registration-dir name: registration-dir
- hostPath:
path: /var/run/cdi
type: DirectoryOrCreate
name: cdi-dir

View File

@ -29,7 +29,6 @@ import (
"fmt" "fmt"
"os" "os"
"path" "path"
"path/filepath"
"regexp" "regexp"
"sort" "sort"
"strings" "strings"
@ -62,8 +61,6 @@ const (
kubeletPlugin1Name = "test-driver1.cdi.k8s.io" kubeletPlugin1Name = "test-driver1.cdi.k8s.io"
kubeletPlugin2Name = "test-driver2.cdi.k8s.io" kubeletPlugin2Name = "test-driver2.cdi.k8s.io"
cdiDir = "/var/run/cdi" cdiDir = "/var/run/cdi"
endpointTemplate = "/var/lib/kubelet/plugins/%s/dra.sock"
pluginRegistrationPath = "/var/lib/kubelet/plugins_registry"
pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered
podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state
@ -554,9 +551,9 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
// creating those directories. // creating those directories.
err := os.MkdirAll(cdiDir, os.FileMode(0750)) err := os.MkdirAll(cdiDir, os.FileMode(0750))
framework.ExpectNoError(err, "create CDI directory") framework.ExpectNoError(err, "create CDI directory")
endpoint := fmt.Sprintf(endpointTemplate, pluginName) datadir := path.Join(kubeletplugin.KubeletPluginsDir, pluginName) // The default, not set below.
err = os.MkdirAll(filepath.Dir(endpoint), 0750) err = os.MkdirAll(datadir, 0750)
framework.ExpectNoError(err, "create socket directory") framework.ExpectNoError(err, "create DRA socket directory")
plugin, err := testdriver.StartPlugin( plugin, err := testdriver.StartPlugin(
ctx, ctx,
@ -565,9 +562,6 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
clientSet, clientSet,
nodeName, nodeName,
testdriver.FileOperations{}, testdriver.FileOperations{},
kubeletplugin.PluginSocketPath(endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")),
kubeletplugin.KubeletPluginSocketPath(endpoint),
) )
framework.ExpectNoError(err) framework.ExpectNoError(err)