diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go index cdc33100a3f..4541ad0143d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go @@ -16,4 +16,52 @@ limitations under the License. // Package kubeletplugin provides helper functions for running a dynamic // resource allocation kubelet plugin. +// +// A DRA driver using this package can be deployed as a DaemonSet on suitable +// nodes. Node labeling, for example through NFD +// (https://github.com/kubernetes-sigs/node-feature-discovery), can be used +// to run the driver only on nodes which have the necessary hardware. +// +// The service account of the DaemonSet must have sufficient RBAC permissions +// to read ResourceClaims and to create and update ResourceSlices, if +// the driver intends to publish per-node ResourceSlices. It is good +// security practice (but not required) to limit access to ResourceSlices +// associated with the node a specific Pod is running on. This can be done +// with a Validating Admission Policy (VAP). For more information, +// see the deployment of the DRA example driver +// (https://github.com/kubernetes-sigs/dra-example-driver/tree/main/deployments/helm/dra-example-driver/templates). +// +// Traditionally, the kubelet has not supported rolling updates of plugins. +// Therefore the DaemonSet must not set `maxSurge` to a value larger than +// zero. With the default `maxSurge: 0`, updating the DaemonSet of the driver +// will first shut down the old driver Pod, then start the replacement. +// +// This leads to a short downtime for operations that need the driver: +// - Pods cannot start unless the claims they depend on were already +// prepared for use. +// - Cleanup after the last pod which used a claim gets delayed +// until the driver is available again. The pod is not marked +// as terminated. This prevents reusing the resources used by +// the pod for other pods. +// - Running pods are *not* affected as far as Kubernetes is +// concerned. However, a DRA driver might provide required runtime +// services. Vendors need to document this. +// +// Note that the second point also means that draining a node should +// first evict normal pods, then the driver DaemonSet Pod. +// +// Starting with Kubernetes 1.33, the kubelet supports rolling updates +// such that old and new Pod run at the same time for a short while +// and hand over work gracefully, with no downtime. +// However, there is no mechanism for determining in advance whether +// the node the DaemonSet runs on supports that. Trying +// to do a rolling update with a kubelet which does not support it yet +// will fail because shutting down the old Pod unregisters the driver +// even though the new Pod is running. See https://github.com/kubernetes/kubernetes/pull/129832 +// for details (TODO: link to doc after merging instead). +// +// A DRA driver can either require 1.33 as minimal Kubernetes version or +// provide two variants of its DaemonSet deployment. In the variant with +// support for rolling updates, `maxSurge` can be set to a non-zero +// value. Administrators have to be careful about running the right variant. package kubeletplugin diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 5966e59abba..57602cdaee0 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -21,56 +21,135 @@ import ( "errors" "fmt" "net" + "path" "sync" "google.golang.org/grpc" "k8s.io/klog/v2" + resourceapi "k8s.io/api/resource/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceslice" - drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" - drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1" + drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) -// DRAPlugin gets returned by Start and defines the public API of the generic -// dynamic resource allocation plugin. +const ( + // KubeletPluginsDir is the default directory for [PluginDataDirectoryPath]. + KubeletPluginsDir = "/var/lib/kubelet/plugins" + // KubeletRegistryDir is the default for [RegistrarDirectoryPath] + KubeletRegistryDir = "/var/lib/kubelet/plugins_registry" +) + +// DRAPlugin is the interface that needs to be implemented by a DRA driver to +// use this helper package. The helper package then implements the gRPC +// interface expected by the kubelet by wrapping the DRAPlugin implementation. type DRAPlugin interface { - // Stop ensures that all spawned goroutines are stopped and frees - // resources. - Stop() + // PrepareResourceClaims is called to prepare all resources allocated + // for the given ResourceClaims. This is used to implement + // the gRPC NodePrepareResources call. + // + // It gets called with the complete list of claims that are needed + // by some pod. In contrast to the gRPC call, the helper has + // already retrieved the actual ResourceClaim objects. + // + // In addition to that, the helper also: + // - verifies that all claims are really allocated + // - increments a numeric counter for each call and + // adds its value to a per-context logger with "requestID" as key + // - adds the method name with "method" as key to that logger + // - logs the gRPC call and response (configurable with GRPCVerbosity) + // - serializes all gRPC calls unless the driver explicitly opted out of that + // + // This call must be idempotent because the kubelet might have to ask + // for preparation multiple times, for example if it gets restarted. + // + // A DRA driver should verify that all devices listed in a + // [resourceapi.DeviceRequestAllocationResult] are not already in use + // for some other ResourceClaim. Kubernetes tries very hard to ensure + // that, but if something went wrong, then the DRA driver is the last + // line of defense against using the same device for two different + // unrelated workloads. + // + // If an error is returned, the result is ignored. Otherwise the result + // must have exactly one entry for each claim, identified by the UID of + // the corresponding ResourceClaim. For each claim, preparation + // can be either successful (no error set in the per-ResourceClaim PrepareResult) + // or can be reported as failed. + // + // It is possible to create the CDI spec files which define the CDI devices + // on-the-fly in PrepareResourceClaims. UnprepareResourceClaims then can + // remove them. Container runtimes may cache CDI specs but must reload + // files in case of a cache miss. To avoid false cache hits, the unique + // name in the CDI device ID should not be reused. A DRA driver can use + // the claim UID for it. + PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (result map[types.UID]PrepareResult, err error) - // RegistrationStatus returns the result of registration, nil if none - // received yet. - RegistrationStatus() *registerapi.RegistrationStatus + // UnprepareResourceClaims must undo whatever work PrepareResourceClaims did. + // + // At the time when this gets called, the original ResourceClaims may have + // been deleted already. They also don't get cached by the kubelet. Therefore + // parameters for each ResourcClaim are only the UID, namespace and name. + // It is the responsibility of the DRA driver to cache whatever additional + // information it might need about prepared resources. + // + // This call must be idempotent because the kubelet might have to ask + // for un-preparation multiple times, for example if it gets restarted. + // Therefore it is not an error if this gets called for a ResourceClaim + // which is not currently prepared. + // + // As with PrepareResourceClaims, the helper takes care of logging + // and serialization. + // + // The conventions for returning one overall error and several per-ResourceClaim + // errors are the same as in PrepareResourceClaims. + UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error) +} - // PublishResources may be called one or more times to publish - // resource information in ResourceSlice objects. If it never gets - // called, then the kubelet plugin does not manage any ResourceSlice - // objects. - // - // PublishResources does not block, so it might still take a while - // after it returns before all information is actually written - // to the API server. - // - // It is the responsibility of the caller to ensure that the pools and - // slices described in the driver resources parameters are valid - // according to the restrictions defined in the resource.k8s.io API. - // - // Invalid ResourceSlices will be rejected by the apiserver during - // publishing, which happens asynchronously and thus does not - // get returned as error here. The only error returned here is - // when publishing was not set up properly, for example missing - // [KubeClient] or [NodeName] options. - // - // The caller may modify the resources after this call returns. - PublishResources(ctx context.Context, resources resourceslice.DriverResources) error +// PrepareResult contains the result of preparing one particular ResourceClaim. +type PrepareResult struct { + // Err, if non-nil, describes a problem that occurred while preparing + // the ResourceClaim. The devices are then ignored and the kubelet will + // try to prepare the ResourceClaim again later. + Err error - // This unexported method ensures that we can modify the interface - // without causing an API break of the package - // (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme). - internal() + // Devices contains the IDs of CDI devices associated with specific requests + // in a ResourceClaim. Those IDs will be passed on to the container runtime + // by the kubelet. + // + // The empty slice is also valid. + Devices []Device +} + +// Device provides the CDI device IDs for one request in a ResourceClaim. +type Device struct { + // Requests lists the names of requests or subrequests in the + // ResourceClaim that this device is associated with. The subrequest + // name may be included here, but it is also okay to just return + // the request name. + // + // A DRA driver can get this string from the Request field in + // [resourceapi.DeviceRequestAllocationResult], which includes the + // subrequest name if there is one. + // + // If empty, the device is associated with all requests. + Requests []string + + // PoolName identifies the DRA driver's pool which contains the device. + // Must not be empty. + PoolName string + + // DeviceName identifies the device inside that pool. + // Must not be empty. + DeviceName string + + // CDIDeviceIDs lists all CDI devices associated with this DRA device. + // Each ID must be of the form "/=". + // May be empty. + CDIDeviceIDs []string } // Option implements the functional options pattern for Start. @@ -94,63 +173,77 @@ func GRPCVerbosity(level int) Option { } } -// RegistrarSocketPath sets the file path for a Unix domain socket. -// If RegistrarListener is not used, then Start will remove -// a file at that path, should one exist, and creates a socket -// itself. Otherwise it uses the provided listener and only -// removes the socket at the specified path during shutdown. +// RegistrarDirectoryPath sets the path to the directory where the kubelet +// expects to find registration sockets of plugins. Typically this is +// /var/lib/kubelet/plugins_registry with /var/lib/kubelet being the kubelet's +// data directory. // -// At least one of these two options is required. -func RegistrarSocketPath(path string) Option { +// This is also the default. Some Kubernetes clusters may use a different data directory. +// This path must be the same inside and outside of the driver's container. +// The directory must exist. +func RegistrarDirectoryPath(path string) Option { return func(o *options) error { - o.pluginRegistrationEndpoint.path = path + o.pluginRegistrationEndpoint.dir = path return nil } } -// RegistrarListener sets an already created listener for the plugin -// registration API. Can be combined with RegistrarSocketPath. +// RegistrarSocketFilename sets the name of the socket inside the directory where +// the kubelet watches for registration sockets (see RegistrarDirectoryPath). // -// At least one of these two options is required. -func RegistrarListener(listener net.Listener) Option { +// Usually DRA drivers should not need this option. It is provided to +// support updates from an installation which used an older release of +// of the helper code. +// +// The default is -reg.sock. When rolling updates are enabled (not supported yet), +// it is --reg.sock. +func RegistrarSocketFilename(name string) Option { return func(o *options) error { - o.pluginRegistrationEndpoint.listener = listener + o.pluginRegistrationEndpoint.file = name return nil } } -// PluginSocketPath sets the file path for a Unix domain socket. -// If PluginListener is not used, then Start will remove -// a file at that path, should one exist, and creates a socket -// itself. Otherwise it uses the provided listener and only -// removes the socket at the specified path during shutdown. +// RegistrarListener configures how to create the registrar socket. +// The default is to remove the file if it exists and to then +// create a socket. // -// At least one of these two options is required. -func PluginSocketPath(path string) Option { +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. +func RegistrarListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option { return func(o *options) error { - o.draEndpoint.path = path + o.pluginRegistrationEndpoint.listenFunc = listen return nil } } -// PluginListener sets an already created listener for the dynamic resource -// allocation plugin API. Can be combined with PluginSocketPath. +// PluginDataDirectoryPath sets the path where the DRA driver creates the +// "dra.sock" socket that the kubelet connects to for the DRA-specific gRPC calls. +// It is also used to coordinate between different Pods when using rolling +// updates. It must not be shared with other kubelet plugins. // -// At least one of these two options is required. -func PluginListener(listener net.Listener) Option { +// The default is /var/lib/kubelet/plugins/. This directory +// does not need to be inside the kubelet data directory, as long as +// the kubelet can access it. +// +// This path must be the same inside and outside of the driver's container. +// The directory must exist. +func PluginDataDirectoryPath(path string) Option { return func(o *options) error { - o.draEndpoint.listener = listener + o.pluginDataDirectoryPath = path return nil } } -// KubeletPluginSocketPath defines how kubelet will connect to the dynamic -// resource allocation plugin. This corresponds to PluginSocketPath, except -// that PluginSocketPath defines the path in the filesystem of the caller and -// KubeletPluginSocketPath in the filesystem of kubelet. -func KubeletPluginSocketPath(path string) Option { +// PluginListener configures how to create the registrar socket. +// The default is to remove the file if it exists and to then +// create a socket. +// +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. +func PluginListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option { return func(o *options) error { - o.draAddress = path + o.draEndpointListen = listen return nil } } @@ -173,17 +266,11 @@ func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option { } } -// NodeV1alpha4 explicitly chooses whether the DRA gRPC API v1alpha4 -// gets enabled. -func NodeV1alpha4(enabled bool) Option { - return func(o *options) error { - o.nodeV1alpha4 = enabled - return nil - } -} - // NodeV1beta1 explicitly chooses whether the DRA gRPC API v1beta1 -// gets enabled. +// gets enabled. True by default. +// +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. func NodeV1beta1(enabled bool) Option { return func(o *options) error { o.nodeV1beta1 = enabled @@ -223,37 +310,51 @@ func NodeUID(nodeUID types.UID) Option { } } +// Serialize overrides whether the helper serializes the prepare and unprepare +// calls. The default is to serialize. +// +// A DRA driver can opt out of that to speed up parallel processing, but then +// must handle concurrency itself. +func Serialize(enabled bool) Option { + return func(o *options) error { + o.serialize = enabled + return nil + } +} + type options struct { logger klog.Logger grpcVerbosity int driverName string nodeName string nodeUID types.UID - draEndpoint endpoint - draAddress string pluginRegistrationEndpoint endpoint + pluginDataDirectoryPath string + draEndpointListen func(ctx context.Context, path string) (net.Listener, error) unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface - - nodeV1alpha4 bool - nodeV1beta1 bool + serialize bool + nodeV1beta1 bool } -// draPlugin combines the kubelet registration service and the DRA node plugin -// service. -type draPlugin struct { +// Helper combines the kubelet registration service and the DRA node plugin +// service and implements them by calling a [DRAPlugin] implementation. +type Helper struct { // backgroundCtx is for activities that are started later. backgroundCtx context.Context // cancel cancels the backgroundCtx. - cancel func(cause error) - wg sync.WaitGroup - registrar *nodeRegistrar - plugin *grpcServer - driverName string - nodeName string - nodeUID types.UID - kubeClient kubernetes.Interface + cancel func(cause error) + wg sync.WaitGroup + registrar *nodeRegistrar + pluginServer *grpcServer + plugin DRAPlugin + driverName string + nodeName string + nodeUID types.UID + kubeClient kubernetes.Interface + serialize bool + grpcMutex sync.Mutex // Information about resource publishing changes concurrently and thus // must be protected by the mutex. The controller gets started only @@ -263,27 +364,24 @@ type draPlugin struct { } // Start sets up two gRPC servers (one for registration, one for the DRA node -// client). By default, all APIs implemented by the nodeServer get registered. +// client) and implements them by calling a [DRAPlugin] implementation. // // The context and/or DRAPlugin.Stop can be used to stop all background activity. // Stop also blocks. A logger can be stored in the context to add values or // a name to all log entries. // // If the plugin will be used to publish resources, [KubeClient] and [NodeName] -// options are mandatory. -// -// The DRA driver decides which gRPC interfaces it implements. At least one -// implementation of [drapbv1alpha4.NodeServer] or [drapbv1beta1.DRAPluginServer] -// is required. Implementing drapbv1beta1.DRAPluginServer is recommended for -// DRA driver targeting Kubernetes >= 1.32. To be compatible with Kubernetes 1.31, -// DRA drivers must implement only [drapbv1alpha4.NodeServer]. -func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (result DRAPlugin, finalErr error) { +// options are mandatory. Otherwise only [DriverName] is mandatory. +func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) { logger := klog.FromContext(ctx) o := options{ logger: klog.Background(), grpcVerbosity: 6, // Logs requests and responses, which can be large. - nodeV1alpha4: true, + serialize: true, nodeV1beta1: true, + pluginRegistrationEndpoint: endpoint{ + dir: KubeletRegistryDir, + }, } for _, option := range opts { if err := option(&o); err != nil { @@ -294,22 +392,19 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu if o.driverName == "" { return nil, errors.New("driver name must be set") } - if o.draAddress == "" { - return nil, errors.New("DRA address must be set") + if o.pluginRegistrationEndpoint.file == "" { + o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock" } - var emptyEndpoint endpoint - if o.draEndpoint == emptyEndpoint { - return nil, errors.New("a Unix domain socket path and/or listener must be set for the kubelet plugin") + if o.pluginDataDirectoryPath == "" { + o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName) } - if o.pluginRegistrationEndpoint == emptyEndpoint { - return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar") - } - - d := &draPlugin{ + d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, nodeUID: o.nodeUID, kubeClient: o.kubeClient, + serialize: o.serialize, + plugin: plugin, } // Stop calls cancel and therefore both cancellation @@ -337,39 +432,28 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu // Run the node plugin gRPC server first to ensure that it is ready. var supportedServices []string - plugin, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { - for _, nodeServer := range nodeServers { - if nodeServer, ok := nodeServer.(drapbv1alpha4.NodeServer); ok && o.nodeV1alpha4 { - logger.V(5).Info("registering v1alpha4.Node gGRPC service") - drapbv1alpha4.RegisterNodeServer(grpcServer, nodeServer) - supportedServices = append(supportedServices, drapbv1alpha4.NodeService) - } - if nodeServer, ok := nodeServer.(drapbv1beta1.DRAPluginServer); ok && o.nodeV1beta1 { - logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") - drapbv1beta1.RegisterDRAPluginServer(grpcServer, nodeServer) - supportedServices = append(supportedServices, drapbv1beta1.DRAPluginService) - } + draEndpoint := endpoint{ + dir: o.pluginDataDirectoryPath, + file: "dra.sock", // "dra" is hard-coded. + listenFunc: o.draEndpointListen, + } + pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { + if o.nodeV1beta1 { + logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") + drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) + supportedServices = append(supportedServices, drapb.DRAPluginService) } }) if err != nil { return nil, fmt.Errorf("start node client: %v", err) } - d.plugin = plugin + d.pluginServer = pluginServer if len(supportedServices) == 0 { return nil, errors.New("no supported DRA gRPC API is implemented and enabled") } - // Backwards compatibility hack: if only the alpha gRPC service is enabled, - // then we can support registration against a 1.31 kubelet by reporting "1.0.0" - // as version. That also works with 1.32 because 1.32 supports that legacy - // behavior and 1.31 works because it doesn't fail while parsing "v1alpha3.Node" - // as version. - if len(supportedServices) == 1 && supportedServices[0] == drapbv1alpha4.NodeService { - supportedServices = []string{"1.0.0"} - } - // Now make it available to kubelet. - registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint) + registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint) if err != nil { return nil, fmt.Errorf("start registrar: %v", err) } @@ -383,7 +467,7 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu <-ctx.Done() // Time to stop. - d.plugin.stop() + d.pluginServer.stop() d.registrar.stop() // d.resourceSliceController is set concurrently. @@ -395,8 +479,8 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu return d, nil } -// Stop implements [DRAPlugin.Stop]. -func (d *draPlugin) Stop() { +// Stop ensures that all spawned goroutines are stopped and frees resources. +func (d *Helper) Stop() { if d == nil { return } @@ -405,9 +489,27 @@ func (d *draPlugin) Stop() { d.wg.Wait() } -// PublishResources implements [DRAPlugin.PublishResources]. Returns en error if -// kubeClient or nodeName are unset. -func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.DriverResources) error { +// PublishResources may be called one or more times to publish +// resource information in ResourceSlice objects. If it never gets +// called, then the kubelet plugin does not manage any ResourceSlice +// objects. +// +// PublishResources does not block, so it might still take a while +// after it returns before all information is actually written +// to the API server. +// +// It is the responsibility of the caller to ensure that the pools and +// slices described in the driver resources parameters are valid +// according to the restrictions defined in the resource.k8s.io API. +// +// Invalid ResourceSlices will be rejected by the apiserver during +// publishing, which happens asynchronously and thus does not +// get returned as error here. The only error returned here is +// when publishing was not set up properly, for example missing +// [KubeClient] or [NodeName] options. +// +// The caller may modify the resources after this call returns. +func (d *Helper) PublishResources(_ context.Context, resources resourceslice.DriverResources) error { if d.kubeClient == nil { return errors.New("no KubeClient found to publish resources") } @@ -433,6 +535,11 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice. // our background context, not the one passed into this // function, and thus is connected to the lifecycle of the // plugin. + // + // TODO: don't delete ResourceSlices, not even on a clean shutdown. + // We either support rolling updates and want to hand over seamlessly + // or don't and then perhaps restart the pod quickly enough that + // the kubelet hasn't deleted ResourceSlices yet. controllerCtx := d.backgroundCtx controllerLogger := klog.FromContext(controllerCtx) controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") @@ -456,12 +563,118 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice. return nil } -// RegistrationStatus implements [DRAPlugin.RegistrationStatus]. -func (d *draPlugin) RegistrationStatus() *registerapi.RegistrationStatus { +// RegistrationStatus returns the result of registration, nil if none received yet. +func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus { if d.registrar == nil { return nil } + // TODO: protect against concurrency issues. return d.registrar.status } -func (d *draPlugin) internal() {} +// serializeGRPCIfEnabled locks a mutex if serialization is enabled. +// Either way it returns a method that the caller must invoke +// via defer. +func (d *Helper) serializeGRPCIfEnabled() func() { + if !d.serialize { + return func() {} + } + d.grpcMutex.Lock() + return d.grpcMutex.Unlock +} + +// nodePluginImplementation is a thin wrapper around the helper instance. +// It prevents polluting the public API with these implementation details. +type nodePluginImplementation struct { + *Helper +} + +// NodePrepareResources implements [drapb.NodePrepareResources]. +func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { + // Do slow API calls before serializing. + claims, err := d.getResourceClaims(ctx, req.Claims) + if err != nil { + return nil, fmt.Errorf("get resource claims: %w", err) + } + + defer d.serializeGRPCIfEnabled()() + + result, err := d.plugin.PrepareResourceClaims(ctx, claims) + if err != nil { + return nil, fmt.Errorf("prepare resource claims: %w", err) + } + + resp := &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{}} + for uid, claimResult := range result { + var devices []*drapb.Device + for _, result := range claimResult.Devices { + device := &drapb.Device{ + RequestNames: stripSubrequestNames(result.Requests), + PoolName: result.PoolName, + DeviceName: result.DeviceName, + CDIDeviceIDs: result.CDIDeviceIDs, + } + devices = append(devices, device) + } + resp.Claims[string(uid)] = &drapb.NodePrepareResourceResponse{ + Error: errorString(claimResult.Err), + Devices: devices, + } + } + return resp, nil +} + +func errorString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + +func stripSubrequestNames(names []string) []string { + stripped := make([]string, len(names)) + for i, name := range names { + stripped[i] = resourceclaim.BaseRequestRef(name) + } + return stripped +} + +func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims []*drapb.Claim) ([]*resourceapi.ResourceClaim, error) { + var resourceClaims []*resourceapi.ResourceClaim + for _, claimReq := range claims { + claim, err := d.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{}) + if err != nil { + return resourceClaims, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err) + } + if claim.Status.Allocation == nil { + return resourceClaims, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name) + } + if claim.UID != types.UID(claimReq.UID) { + return resourceClaims, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name) + } + resourceClaims = append(resourceClaims, claim) + } + return resourceClaims, nil +} + +// NodeUnprepareResources implements [draapi.NodeUnprepareResources]. +func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { + defer d.serializeGRPCIfEnabled() + + claims := make([]NamespacedObject, 0, len(req.Claims)) + for _, claim := range req.Claims { + claims = append(claims, NamespacedObject{UID: types.UID(claim.UID), NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: claim.Namespace}}) + } + result, err := d.plugin.UnprepareResourceClaims(ctx, claims) + if err != nil { + return nil, fmt.Errorf("unprepare resource claims: %w", err) + } + + resp := &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{}} + for uid, err := range result { + resp.Claims[string(uid)] = &drapb.NodeUnprepareResourceResponse{ + Error: errorString(err), + } + } + return resp, nil +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go new file mode 100644 index 00000000000..45916999c39 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go @@ -0,0 +1,83 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "path" +) + +// endpoint defines where and how to listen for incoming connections. +// The listener always gets closed when shutting down. +// +// If the listen function is not set, a new listener for a Unix domain socket gets +// created at the path. +type endpoint struct { + dir, file string + listenFunc func(ctx context.Context, socketpath string) (net.Listener, error) +} + +func (e endpoint) path() string { + return path.Join(e.dir, e.file) +} + +func (e endpoint) listen(ctx context.Context) (net.Listener, error) { + socketpath := e.path() + + if e.listenFunc != nil { + return e.listenFunc(ctx, socketpath) + } + + // Remove stale sockets, listen would fail otherwise. + if err := e.removeSocket(); err != nil { + return nil, err + } + cfg := net.ListenConfig{} + listener, err := cfg.Listen(ctx, "unix", socketpath) + if err != nil { + if removeErr := e.removeSocket(); removeErr != nil { + err = errors.Join(err, err) + } + return nil, err + } + return &unixListener{Listener: listener, endpoint: e}, nil +} + +func (e endpoint) removeSocket() error { + if err := os.Remove(e.path()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove Unix domain socket: %w", err) + } + return nil +} + +// unixListener adds removing of the Unix domain socket on Close. +type unixListener struct { + net.Listener + endpoint endpoint +} + +func (l *unixListener) Close() error { + err := l.Listener.Close() + if removeErr := l.endpoint.removeSocket(); removeErr != nil { + err = errors.Join(err, err) + } + return err +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go new file mode 100644 index 00000000000..77944f38f80 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "context" + "net" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/klog/v2/ktesting" +) + +func TestEndpointLifecycle(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + tempDir := t.TempDir() + socketname := "test.sock" + e := endpoint{dir: tempDir, file: socketname} + listener, err := e.listen(ctx) + require.NoError(t, err, "listen") + assert.FileExists(t, path.Join(tempDir, socketname)) + require.NoError(t, listener.Close(), "close") + assert.NoFileExists(t, path.Join(tempDir, socketname)) +} + +func TestEndpointListener(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + tempDir := t.TempDir() + socketname := "test.sock" + listen := func(ctx2 context.Context, socketpath string) (net.Listener, error) { + assert.Equal(t, path.Join(tempDir, socketname), socketpath) + return nil, nil + } + e := endpoint{dir: tempDir, file: socketname, listenFunc: listen} + listener, err := e.listen(ctx) + require.NoError(t, err, "listen") + assert.NoFileExists(t, path.Join(tempDir, socketname)) + assert.Nil(t, listener) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go new file mode 100644 index 00000000000..d8bf451794a --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "k8s.io/apimachinery/pkg/types" +) + +// NamespacedObject comprises a resource name with a mandatory namespace +// and optional UID. It gets rendered as "/:[]" +// (text output) or as an object (JSON output). +type NamespacedObject struct { + types.NamespacedName + UID types.UID +} + +// String returns the general purpose string representation +func (n NamespacedObject) String() string { + if n.UID != "" { + return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID) + } + return n.Namespace + string(types.Separator) + n.Name +} + +// MarshalLog emits a struct containing required key/value pair +func (n NamespacedObject) MarshalLog() interface{} { + return struct { + Name string `json:"name"` + Namespace string `json:"namespace,omitempty"` + UID types.UID `json:"uid,omitempty"` + }{ + Name: n.Name, + Namespace: n.Namespace, + UID: n.UID, + } +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go index 6384ca0c180..595b0a0bd62 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go @@ -17,10 +17,10 @@ limitations under the License. package kubeletplugin import ( - "context" "fmt" "google.golang.org/grpc" + "k8s.io/klog/v2" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) @@ -30,17 +30,15 @@ type nodeRegistrar struct { } // startRegistrar returns a running instance. -// -// The context is only used for additional values, cancellation is ignored. -func startRegistrar(valueCtx context.Context, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { +func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, socketpath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { n := &nodeRegistrar{ registrationServer: registrationServer{ driverName: driverName, - endpoint: endpoint, + endpoint: socketpath, supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin"). }, } - s, err := startGRPCServer(valueCtx, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { + s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { registerapi.RegisterRegistrationServer(grpcServer, n) }) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go index c8b0a2594d7..a148b83d696 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go @@ -19,15 +19,11 @@ package kubeletplugin import ( "context" "fmt" - "net" - "os" "sync" "sync/atomic" "google.golang.org/grpc" "k8s.io/klog/v2" - - utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) var requestID int64 @@ -36,60 +32,41 @@ type grpcServer struct { grpcVerbosity int wg sync.WaitGroup endpoint endpoint + socketpath string server *grpc.Server } type registerService func(s *grpc.Server) -// endpoint defines where to listen for incoming connections. -// The listener always gets closed when shutting down. -// -// If the listener is not set, a new listener for a Unix domain socket gets -// created at the path. -// -// If the path is non-empty, then the socket will get removed when shutting -// down, regardless of who created the listener. -type endpoint struct { - path string - listener net.Listener -} - // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // which handles requests for arbitrary services. -// -// The context is only used for additional values, cancellation is ignored. -func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { - logger := klog.FromContext(valueCtx) +func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { + ctx := klog.NewContext(context.Background(), logger) + s := &grpcServer{ endpoint: endpoint, grpcVerbosity: grpcVerbosity, } - listener := endpoint.listener - if listener == nil { - // Remove any (probably stale) existing socket. - if err := os.Remove(endpoint.path); err != nil && !os.IsNotExist(err) { - return nil, fmt.Errorf("remove Unix domain socket: %v", err) - } - - // Now we can use the endpoint for listening. - l, err := net.Listen("unix", endpoint.path) - if err != nil { - return nil, fmt.Errorf("listen on %q: %v", endpoint.path, err) - } - listener = l + listener, err := endpoint.listen(ctx) + if err != nil { + return nil, fmt.Errorf("listen on %q: %w", s.socketpath, err) } // Run a gRPC server. It will close the listening socket when // shutting down, so we don't need to do that. + // The per-context logger always gets initialized because + // there might be log output inside the method implementations. var opts []grpc.ServerOption - finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)} - finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)} - if grpcVerbosity >= 0 { - finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor) - finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor) + finalUnaryInterceptors := []grpc.UnaryServerInterceptor{ + unaryContextInterceptor(ctx), + s.interceptor, } finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) + finalStreamInterceptors := []grpc.StreamServerInterceptor{ + streamContextInterceptor(ctx), + s.streamInterceptor, + } finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...)) @@ -164,7 +141,6 @@ func (m mergeCtx) Value(i interface{}) interface{} { // sequentially increasing request ID and adds that logger to the context. It // also logs request and response. func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - requestID := atomic.AddInt64(&requestID, 1) logger := klog.FromContext(ctx) logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod) @@ -240,9 +216,4 @@ func (s *grpcServer) stop() { } s.wg.Wait() s.server = nil - if s.endpoint.path != "" { - if err := os.Remove(s.endpoint.path); err != nil && !os.IsNotExist(err) { - utilruntime.HandleError(fmt.Errorf("remove Unix socket: %w", err)) - } - } } diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index ea2ef1f2675..37fc5226e0e 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -22,11 +22,14 @@ import ( _ "embed" "errors" "fmt" + "io" "net" + "net/url" "path" "sort" "strings" "sync" + "sync/atomic" "time" "github.com/google/go-cmp/cmp" @@ -49,11 +52,13 @@ import ( "k8s.io/client-go/discovery/cached/memory" resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" + "k8s.io/kubectl/pkg/cmd/exec" "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -326,8 +331,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ instanceKey := "app.kubernetes.io/instance" rsName := "" - draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock") numNodes := int32(len(nodes.NodeNames)) + pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) + registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") + registrarSocketFilename := d.Name + "-reg.sock" err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { case *appsv1.ReplicaSet: @@ -353,10 +360,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ }, }, } - item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins") - item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") - item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock") - item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock") + item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath + item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath } return nil }, manifests...) @@ -419,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } else { fileOps.NumDevices = numDevices } + // All listeners running in this pod use a new unique local port number + // by atomically incrementing this variable. + listenerPort := int32(9000) plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -427,11 +437,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { return d.streamInterceptor(nodename, srv, ss, info, handler) }), - kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), - kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), - kubeletplugin.KubeletPluginSocketPath(draAddr), - kubeletplugin.NodeV1alpha4(d.NodeV1alpha4), - kubeletplugin.NodeV1beta1(d.NodeV1beta1), + + kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), + kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), + + kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), + kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), + kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) d.cleanup = append(d.cleanup, func() { @@ -548,21 +560,161 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO { F: d.f, Namespace: pod.Namespace, PodName: pod.Name, - ContainerName: "plugin", + ContainerName: pod.Spec.Containers[0].Name, Logger: &logger, } } -func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener { - addr := proxy.Addr{ - Namespace: f.Namespace.Name, - PodName: podName, - ContainerName: containerName, - Port: port, +// errListenerDone is the special error that we use to shut down. +// It doesn't need to be logged. +var errListenerDone = errors.New("listener is shutting down") + +// listen returns the function which the kubeletplugin helper needs to open a listening socket. +// For that it spins up hostpathplugin in the pod for the desired node +// and connects to hostpathplugin via port forwarding. +func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) { + return func(ctx context.Context, path string) (l net.Listener, e error) { + // "Allocate" a new port by by bumping the per-pod counter by one. + port := atomic.AddInt32(port, 1) + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "socket-listener") + logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port) + ctx = klog.NewContext(ctx, logger) + + // Start hostpathplugin in proxy mode and keep it running until the listener gets closed. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "/hostpathplugin", + "--v=5", + "--endpoint=" + path, + fmt.Sprintf("--proxy-endpoint=tcp://:%d", port), + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + var wg sync.WaitGroup + wg.Add(1) + cmdCtx, cmdCancel := context.WithCancelCause(ctx) + go func() { + defer wg.Done() + cmdLogger := klog.LoggerWithName(logger, "hostpathplugin") + cmdCtx := klog.NewContext(cmdCtx, cmdLogger) + logger.V(1).Info("Starting...") + defer logger.V(1).Info("Stopped") + if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil { + // errors.Is(err, listenerDoneErr) would be nicer, but we don't get + // that error from remotecommand. Instead forgo logging when we already shut down. + if cmdCtx.Err() == nil { + logger.Error(err, "execution failed") + } + } + + // Killing hostpathplugin does not remove the socket. Need to do that manually. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "rm", + "-f", + path, + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + cleanupLogger := klog.LoggerWithName(logger, "cleanup") + cleanupCtx := klog.NewContext(ctx, cleanupLogger) + if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil { + cleanupLogger.Error(err, "Socket removal failed") + } + }() + defer func() { + // If we don't return a functional listener, then clean up. + if e != nil { + cmdCancel(e) + } + }() + stopHostpathplugin := func() { + cmdCancel(errListenerDone) + wg.Wait() + } + + addr := proxy.Addr{ + Namespace: f.Namespace.Name, + PodName: pod.Name, + ContainerName: pod.Spec.Containers[0].Name, + Port: int(port), + } + listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) + if err != nil { + return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err) + } + return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil } - listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) - framework.ExpectNoError(err, "listen for connections from %+v", addr) - return listener +} + +// listenerWithClose wraps Close so that it also shuts down hostpathplugin. +type listenerWithClose struct { + net.Listener + close func() +} + +func (l *listenerWithClose) Close() error { + // First close connections, then shut down the remote command. + // Otherwise the connection code is unhappy and logs errors. + err := l.Listener.Close() + l.close() + return err +} + +// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level. +func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error { + // Stream output as long as we run, i.e. ignore cancellation. + stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity) + stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity) + defer func() { _ = stdout.Close() }() + defer func() { _ = stderr.Close() }() + + executor := exec.DefaultRemoteExecutor{} + return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil) +} + +// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background. +func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { + logger := klog.FromContext(ctx) + + reader, writer := io.Pipe() + go func() { + buffer := make([]byte, 10*1024) + for { + n, err := reader.Read(buffer) + if n > 0 { + logger.V(verbosity).Info(msg, "msg", string(buffer[0:n])) + } + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Error(err, msg) + } + reader.CloseWithError(err) + return + } + if ctx.Err() != nil { + reader.CloseWithError(context.Cause(ctx)) + return + } + } + }() + return writer } func (d *Driver) TearDown() { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index e03479ecaa2..b93c44a9b1e 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1737,15 +1737,13 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, }) }) - multipleDrivers := func(nodeV1alpha4, nodeV1beta1 bool) { + multipleDrivers := func(nodeV1beta1 bool) { nodes := NewNodes(f, 1, 4) driver1 := NewDriver(f, nodes, perNode(2, nodes)) - driver1.NodeV1alpha4 = nodeV1alpha4 driver1.NodeV1beta1 = nodeV1beta1 b1 := newBuilder(f, driver1) driver2 := NewDriver(f, nodes, perNode(2, nodes)) - driver2.NodeV1alpha4 = nodeV1alpha4 driver2.NodeV1beta1 = nodeV1beta1 driver2.NameSuffix = "-other" b2 := newBuilder(f, driver2) @@ -1769,16 +1767,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, b1.testPod(ctx, f, pod) }) } - multipleDriversContext := func(prefix string, nodeV1alpha4, nodeV1beta1 bool) { + multipleDriversContext := func(prefix string, nodeV1beta1 bool) { ginkgo.Context(prefix, func() { - multipleDrivers(nodeV1alpha4, nodeV1beta1) + multipleDrivers(nodeV1beta1) }) } ginkgo.Context("multiple drivers", func() { - multipleDriversContext("using only drapbv1alpha4", true, false) - multipleDriversContext("using only drapbv1beta1", false, true) - multipleDriversContext("using both drav1alpha4 and drapbv1beta1", true, true) + multipleDriversContext("using only drapbv1beta1", true) }) ginkgo.It("runs pod after driver starts", func(ctx context.Context) { diff --git a/test/e2e/dra/test-driver/README.md b/test/e2e/dra/test-driver/README.md index 81d89389c99..11a7ea9dbc9 100644 --- a/test/e2e/dra/test-driver/README.md +++ b/test/e2e/dra/test-driver/README.md @@ -60,7 +60,11 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio In another: ``` -sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry +sudo mkdir -p /var/run/cdi +sudo mkdir -p /var/lib/kubelet/plugins/test-driver.cdi.k8s.io +sudo mkdir -p /var/lib/kubelet/plugins_registry +sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins +sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins/test-driver.cdi.k8s.io KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1 ``` diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 863477c54e7..498e02fc715 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -40,15 +40,13 @@ import ( "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" - drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" - drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1" ) type ExamplePlugin struct { stopCh <-chan struct{} logger klog.Logger kubeClient kubernetes.Interface - d kubeletplugin.DRAPlugin + d *kubeletplugin.Helper fileOps FileOperations cdiDir string @@ -56,8 +54,11 @@ type ExamplePlugin struct { nodeName string deviceNames sets.Set[string] + // The mutex is needed because there are other goroutines checking the state. + // Serializing in the gRPC server alone is not enough because writing would + // race with reading. mutex sync.Mutex - prepared map[ClaimID][]Device // prepared claims -> result of nodePrepareResource + prepared map[ClaimID][]kubeletplugin.Device // prepared claims -> result of nodePrepareResource gRPCCalls []GRPCCall blockPrepareResourcesMutex sync.Mutex @@ -89,7 +90,7 @@ type GRPCCall struct { // sufficient to make the ClaimID unique. type ClaimID struct { Name string - UID string + UID types.UID } type Device struct { @@ -99,10 +100,10 @@ type Device struct { CDIDeviceID string } -var _ drapb.DRAPluginServer = &ExamplePlugin{} +var _ kubeletplugin.DRAPlugin = &ExamplePlugin{} // getJSONFilePath returns the absolute path where CDI file is/should be. -func (ex *ExamplePlugin) getJSONFilePath(claimUID string, requestName string) string { +func (ex *ExamplePlugin) getJSONFilePath(claimUID types.UID, requestName string) string { baseRequestRef := resourceclaim.BaseRequestRef(requestName) return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s-%s.json", ex.driverName, claimUID, baseRequestRef)) } @@ -151,7 +152,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube cdiDir: cdiDir, driverName: driverName, nodeName: nodeName, - prepared: make(map[ClaimID][]Device), + prepared: make(map[ClaimID][]kubeletplugin.Device), deviceNames: sets.New[string](), } @@ -167,16 +168,9 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube kubeletplugin.KubeClient(kubeClient), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), + kubeletplugin.Serialize(false), // The example plugin does its own locking. ) - // Both APIs get provided, the legacy one via wrapping. The options - // determine which one(s) really get served (by default, both). - // The options are a bit redundant now because a single instance cannot - // implement both, but that might be different in the future. - nodeServers := []any{ - drapb.DRAPluginServer(ex), // Casting is done only for clarity here, it's not needed. - drapbv1alpha4.V1Beta1ServerWrapper{DRAPluginServer: ex}, - } - d, err := kubeletplugin.Start(ctx, nodeServers, opts...) + d, err := kubeletplugin.Start(ctx, ex, opts...) if err != nil { return nil, fmt.Errorf("start kubelet plugin: %w", err) } @@ -300,34 +294,21 @@ func (ex *ExamplePlugin) getUnprepareResourcesFailure() error { // a deterministic name to simplify NodeUnprepareResource (no need to remember // or discover the name) and idempotency (when called again, the file simply // gets written again). -func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drapb.Claim) ([]Device, error) { +func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) { logger := klog.FromContext(ctx) - // The plugin must retrieve the claim itself to get it in the version - // that it understands. - claim, err := ex.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err) - } - if claim.Status.Allocation == nil { - return nil, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name) - } - if claim.UID != types.UID(claimReq.UID) { - return nil, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name) - } - ex.mutex.Lock() defer ex.mutex.Unlock() ex.blockPrepareResourcesMutex.Lock() defer ex.blockPrepareResourcesMutex.Unlock() - claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} + claimID := ClaimID{Name: claim.Name, UID: claim.UID} if result, ok := ex.prepared[claimID]; ok { // Idempotent call, nothing to do. return result, nil } - var devices []Device + var devices []kubeletplugin.Device for _, result := range claim.Status.Allocation.Devices.Results { // Only handle allocations for the current driver. if ex.driverName != result.Driver { @@ -356,7 +337,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap claimReqName = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(claimReqName, "_") env[claimReqName] = "true" - deviceName := "claim-" + claimReq.UID + "-" + baseRequestName + deviceName := "claim-" + string(claim.UID) + "-" + baseRequestName vendor := ex.driverName class := "test" cdiDeviceID := vendor + "/" + class + "=" + deviceName @@ -391,7 +372,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap }, }, } - filePath := ex.getJSONFilePath(claimReq.UID, baseRequestName) + filePath := ex.getJSONFilePath(claim.UID, baseRequestName) buffer, err := json.Marshal(spec) if err != nil { return nil, fmt.Errorf("marshal spec: %w", err) @@ -399,11 +380,11 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap if err := ex.fileOps.Create(filePath, buffer); err != nil { return nil, fmt.Errorf("failed to write CDI file: %w", err) } - device := Device{ - PoolName: result.Pool, - DeviceName: result.Device, - RequestName: baseRequestName, - CDIDeviceID: cdiDeviceID, + device := kubeletplugin.Device{ + PoolName: result.Pool, + DeviceName: result.Device, + Requests: []string{result.Request}, // May also return baseRequestName here. + CDIDeviceIDs: []string{cdiDeviceID}, } devices = append(devices, device) } @@ -434,48 +415,35 @@ func extractParameters(parameters runtime.RawExtension, env *map[string]string, return nil } -func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { - resp := &drapb.NodePrepareResourcesResponse{ - Claims: make(map[string]*drapb.NodePrepareResourceResponse), - } - +func (ex *ExamplePlugin) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) { if failure := ex.getPrepareResourcesFailure(); failure != nil { - return resp, failure + return nil, failure } - for _, claimReq := range req.Claims { - devices, err := ex.nodePrepareResource(ctx, claimReq) + result := make(map[types.UID]kubeletplugin.PrepareResult) + for _, claim := range claims { + devices, err := ex.nodePrepareResource(ctx, claim) + var claimResult kubeletplugin.PrepareResult if err != nil { - resp.Claims[claimReq.UID] = &drapb.NodePrepareResourceResponse{ - Error: err.Error(), - } + claimResult.Err = err } else { - r := &drapb.NodePrepareResourceResponse{} - for _, device := range devices { - pbDevice := &drapb.Device{ - PoolName: device.PoolName, - DeviceName: device.DeviceName, - RequestNames: []string{device.RequestName}, - CDIDeviceIDs: []string{device.CDIDeviceID}, - } - r.Devices = append(r.Devices, pbDevice) - } - resp.Claims[claimReq.UID] = r + claimResult.Devices = devices } + result[claim.UID] = claimResult } - return resp, nil + return result, nil } // NodeUnprepareResource removes the CDI file created by // NodePrepareResource. It's idempotent, therefore it is not an error when that // file is already gone. -func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *drapb.Claim) error { +func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error { ex.blockUnprepareResourcesMutex.Lock() defer ex.blockUnprepareResourcesMutex.Unlock() logger := klog.FromContext(ctx) - claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} + claimID := ClaimID{Name: claimRef.Name, UID: claimRef.UID} devices, ok := ex.prepared[claimID] if !ok { // Idempotent call, nothing to do. @@ -483,11 +451,14 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr } for _, device := range devices { - filePath := ex.getJSONFilePath(claimReq.UID, device.RequestName) - if err := ex.fileOps.Remove(filePath); err != nil { - return fmt.Errorf("error removing CDI file: %w", err) + // In practice we only prepare one, but let's not assume that here. + for _, request := range device.Requests { + filePath := ex.getJSONFilePath(claimRef.UID, request) + if err := ex.fileOps.Remove(filePath); err != nil { + return fmt.Errorf("error removing CDI file: %w", err) + } + logger.V(3).Info("CDI file removed", "path", filePath) } - logger.V(3).Info("CDI file removed", "path", filePath) } delete(ex.prepared, claimID) @@ -495,26 +466,18 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr return nil } -func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - resp := &drapb.NodeUnprepareResourcesResponse{ - Claims: make(map[string]*drapb.NodeUnprepareResourceResponse), - } +func (ex *ExamplePlugin) UnprepareResourceClaims(ctx context.Context, claims []kubeletplugin.NamespacedObject) (map[types.UID]error, error) { + result := make(map[types.UID]error) if failure := ex.getUnprepareResourcesFailure(); failure != nil { - return resp, failure + return nil, failure } - for _, claimReq := range req.Claims { - err := ex.nodeUnprepareResource(ctx, claimReq) - if err != nil { - resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{ - Error: err.Error(), - } - } else { - resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{} - } + for _, claimRef := range claims { + err := ex.nodeUnprepareResource(ctx, claimRef) + result[claimRef.UID] = err } - return resp, nil + return result, nil } func (ex *ExamplePlugin) GetPreparedResources() []ClaimID { diff --git a/test/e2e/dra/test-driver/app/server.go b/test/e2e/dra/test-driver/app/server.go index 043bd239656..d3900f581b0 100644 --- a/test/e2e/dra/test-driver/app/server.go +++ b/test/e2e/dra/test-driver/app/server.go @@ -178,9 +178,8 @@ func NewCommand() *cobra.Command { } kubeletPluginFlagSets := cliflag.NamedFlagSets{} fs = kubeletPluginFlagSets.FlagSet("kubelet") - pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.") - endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.") - draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.") + kubeletRegistryDir := fs.String("plugin-registration-path", kubeletplugin.KubeletRegistryDir, "The directory where kubelet looks for plugin registration sockets.") + kubeletPluginsDir := fs.String("datadir", kubeletplugin.KubeletPluginsDir, "The per-driver directory where the DRA Unix domain socket will be created.") fs = kubeletPluginFlagSets.FlagSet("CDI") cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files") nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for") @@ -196,7 +195,8 @@ func NewCommand() *cobra.Command { if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil { return fmt.Errorf("create CDI directory: %w", err) } - if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil { + datadir := path.Join(*kubeletPluginsDir, *driverName) + if err := os.MkdirAll(filepath.Dir(datadir), 0750); err != nil { return fmt.Errorf("create socket directory: %w", err) } @@ -205,9 +205,8 @@ func NewCommand() *cobra.Command { } plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices}, - kubeletplugin.PluginSocketPath(*endpoint), - kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), - kubeletplugin.KubeletPluginSocketPath(*draAddress), + kubeletplugin.PluginDataDirectoryPath(datadir), + kubeletplugin.RegistrarDirectoryPath(*kubeletRegistryDir), ) if err != nil { return fmt.Errorf("start example plugin: %w", err) diff --git a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml index 377bb2e587b..ed32a26acac 100644 --- a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml +++ b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml @@ -46,40 +46,35 @@ spec: topologyKey: kubernetes.io/hostname containers: - - name: registrar + - name: pause image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/plugins_registry/dra-test-driver-reg.sock" - - "--proxy-endpoint=tcp://:9000" + command: + - /bin/sh + - -c + - while true; do sleep 10000; done volumeMounts: - - mountPath: /plugins_registry - name: registration-dir - - - name: plugin - image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/dra/dra-test-driver.sock" - - "--proxy-endpoint=tcp://:9001" - securityContext: - privileged: true - volumeMounts: - - mountPath: /dra + - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io name: socket-dir + - mountPath: /var/lib/kubelet/plugins_registry + name: registration-dir - mountPath: /cdi name: cdi-dir + securityContext: + privileged: true volumes: - hostPath: - path: /var/lib/kubelet/plugins + # There's no way to remove this. Conceptually, it may have to + # survive Pod restarts and there's no way to tell the kubelet + # that this isn't the case for this Pod. + path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io type: DirectoryOrCreate name: socket-dir - - hostPath: - path: /var/run/cdi - type: DirectoryOrCreate - name: cdi-dir - hostPath: path: /var/lib/kubelet/plugins_registry type: DirectoryOrCreate name: registration-dir + - hostPath: + path: /var/run/cdi + type: DirectoryOrCreate + name: cdi-dir diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index f58d0dab408..6c18fdcf8e2 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -29,7 +29,6 @@ import ( "fmt" "os" "path" - "path/filepath" "regexp" "sort" "strings" @@ -62,8 +61,6 @@ const ( kubeletPlugin1Name = "test-driver1.cdi.k8s.io" kubeletPlugin2Name = "test-driver2.cdi.k8s.io" cdiDir = "/var/run/cdi" - endpointTemplate = "/var/lib/kubelet/plugins/%s/dra.sock" - pluginRegistrationPath = "/var/lib/kubelet/plugins_registry" pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state @@ -554,9 +551,9 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN // creating those directories. err := os.MkdirAll(cdiDir, os.FileMode(0750)) framework.ExpectNoError(err, "create CDI directory") - endpoint := fmt.Sprintf(endpointTemplate, pluginName) - err = os.MkdirAll(filepath.Dir(endpoint), 0750) - framework.ExpectNoError(err, "create socket directory") + datadir := path.Join(kubeletplugin.KubeletPluginsDir, pluginName) // The default, not set below. + err = os.MkdirAll(datadir, 0750) + framework.ExpectNoError(err, "create DRA socket directory") plugin, err := testdriver.StartPlugin( ctx, @@ -565,9 +562,6 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN clientSet, nodeName, testdriver.FileOperations{}, - kubeletplugin.PluginSocketPath(endpoint), - kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")), - kubeletplugin.KubeletPluginSocketPath(endpoint), ) framework.ExpectNoError(err)