diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go index cdc33100a3f..4541ad0143d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go @@ -16,4 +16,52 @@ limitations under the License. // Package kubeletplugin provides helper functions for running a dynamic // resource allocation kubelet plugin. +// +// A DRA driver using this package can be deployed as a DaemonSet on suitable +// nodes. Node labeling, for example through NFD +// (https://github.com/kubernetes-sigs/node-feature-discovery), can be used +// to run the driver only on nodes which have the necessary hardware. +// +// The service account of the DaemonSet must have sufficient RBAC permissions +// to read ResourceClaims and to create and update ResourceSlices, if +// the driver intends to publish per-node ResourceSlices. It is good +// security practice (but not required) to limit access to ResourceSlices +// associated with the node a specific Pod is running on. This can be done +// with a Validating Admission Policy (VAP). For more information, +// see the deployment of the DRA example driver +// (https://github.com/kubernetes-sigs/dra-example-driver/tree/main/deployments/helm/dra-example-driver/templates). +// +// Traditionally, the kubelet has not supported rolling updates of plugins. +// Therefore the DaemonSet must not set `maxSurge` to a value larger than +// zero. With the default `maxSurge: 0`, updating the DaemonSet of the driver +// will first shut down the old driver Pod, then start the replacement. +// +// This leads to a short downtime for operations that need the driver: +// - Pods cannot start unless the claims they depend on were already +// prepared for use. +// - Cleanup after the last pod which used a claim gets delayed +// until the driver is available again. The pod is not marked +// as terminated. This prevents reusing the resources used by +// the pod for other pods. +// - Running pods are *not* affected as far as Kubernetes is +// concerned. However, a DRA driver might provide required runtime +// services. Vendors need to document this. +// +// Note that the second point also means that draining a node should +// first evict normal pods, then the driver DaemonSet Pod. +// +// Starting with Kubernetes 1.33, the kubelet supports rolling updates +// such that old and new Pod run at the same time for a short while +// and hand over work gracefully, with no downtime. +// However, there is no mechanism for determining in advance whether +// the node the DaemonSet runs on supports that. Trying +// to do a rolling update with a kubelet which does not support it yet +// will fail because shutting down the old Pod unregisters the driver +// even though the new Pod is running. See https://github.com/kubernetes/kubernetes/pull/129832 +// for details (TODO: link to doc after merging instead). +// +// A DRA driver can either require 1.33 as minimal Kubernetes version or +// provide two variants of its DaemonSet deployment. In the variant with +// support for rolling updates, `maxSurge` can be set to a non-zero +// value. Administrators have to be careful about running the right variant. package kubeletplugin diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index f25a4bd0157..57602cdaee0 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "net" + "path" "sync" "google.golang.org/grpc" @@ -36,6 +37,13 @@ import ( registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) +const ( + // KubeletPluginsDir is the default directory for [PluginDataDirectoryPath]. + KubeletPluginsDir = "/var/lib/kubelet/plugins" + // KubeletRegistryDir is the default for [RegistrarDirectoryPath] + KubeletRegistryDir = "/var/lib/kubelet/plugins_registry" +) + // DRAPlugin is the interface that needs to be implemented by a DRA driver to // use this helper package. The helper package then implements the gRPC // interface expected by the kubelet by wrapping the DRAPlugin implementation. @@ -165,63 +173,77 @@ func GRPCVerbosity(level int) Option { } } -// RegistrarSocketPath sets the file path for a Unix domain socket. -// If RegistrarListener is not used, then Start will remove -// a file at that path, should one exist, and creates a socket -// itself. Otherwise it uses the provided listener and only -// removes the socket at the specified path during shutdown. +// RegistrarDirectoryPath sets the path to the directory where the kubelet +// expects to find registration sockets of plugins. Typically this is +// /var/lib/kubelet/plugins_registry with /var/lib/kubelet being the kubelet's +// data directory. // -// At least one of these two options is required. -func RegistrarSocketPath(path string) Option { +// This is also the default. Some Kubernetes clusters may use a different data directory. +// This path must be the same inside and outside of the driver's container. +// The directory must exist. +func RegistrarDirectoryPath(path string) Option { return func(o *options) error { - o.pluginRegistrationEndpoint.path = path + o.pluginRegistrationEndpoint.dir = path return nil } } -// RegistrarListener sets an already created listener for the plugin -// registration API. Can be combined with RegistrarSocketPath. +// RegistrarSocketFilename sets the name of the socket inside the directory where +// the kubelet watches for registration sockets (see RegistrarDirectoryPath). // -// At least one of these two options is required. -func RegistrarListener(listener net.Listener) Option { +// Usually DRA drivers should not need this option. It is provided to +// support updates from an installation which used an older release of +// of the helper code. +// +// The default is -reg.sock. When rolling updates are enabled (not supported yet), +// it is --reg.sock. +func RegistrarSocketFilename(name string) Option { return func(o *options) error { - o.pluginRegistrationEndpoint.listener = listener + o.pluginRegistrationEndpoint.file = name return nil } } -// PluginSocketPath sets the file path for a Unix domain socket. -// If PluginListener is not used, then Start will remove -// a file at that path, should one exist, and creates a socket -// itself. Otherwise it uses the provided listener and only -// removes the socket at the specified path during shutdown. +// RegistrarListener configures how to create the registrar socket. +// The default is to remove the file if it exists and to then +// create a socket. // -// At least one of these two options is required. -func PluginSocketPath(path string) Option { +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. +func RegistrarListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option { return func(o *options) error { - o.draEndpoint.path = path + o.pluginRegistrationEndpoint.listenFunc = listen return nil } } -// PluginListener sets an already created listener for the dynamic resource -// allocation plugin API. Can be combined with PluginSocketPath. +// PluginDataDirectoryPath sets the path where the DRA driver creates the +// "dra.sock" socket that the kubelet connects to for the DRA-specific gRPC calls. +// It is also used to coordinate between different Pods when using rolling +// updates. It must not be shared with other kubelet plugins. // -// At least one of these two options is required. -func PluginListener(listener net.Listener) Option { +// The default is /var/lib/kubelet/plugins/. This directory +// does not need to be inside the kubelet data directory, as long as +// the kubelet can access it. +// +// This path must be the same inside and outside of the driver's container. +// The directory must exist. +func PluginDataDirectoryPath(path string) Option { return func(o *options) error { - o.draEndpoint.listener = listener + o.pluginDataDirectoryPath = path return nil } } -// KubeletPluginSocketPath defines how kubelet will connect to the dynamic -// resource allocation plugin. This corresponds to PluginSocketPath, except -// that PluginSocketPath defines the path in the filesystem of the caller and -// KubeletPluginSocketPath in the filesystem of kubelet. -func KubeletPluginSocketPath(path string) Option { +// PluginListener configures how to create the registrar socket. +// The default is to remove the file if it exists and to then +// create a socket. +// +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. +func PluginListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option { return func(o *options) error { - o.draAddress = path + o.draEndpointListen = listen return nil } } @@ -306,9 +328,9 @@ type options struct { driverName string nodeName string nodeUID types.UID - draEndpoint endpoint - draAddress string pluginRegistrationEndpoint endpoint + pluginDataDirectoryPath string + draEndpointListen func(ctx context.Context, path string) (net.Listener, error) unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface @@ -349,7 +371,7 @@ type Helper struct { // a name to all log entries. // // If the plugin will be used to publish resources, [KubeClient] and [NodeName] -// options are mandatory. +// options are mandatory. Otherwise only [DriverName] is mandatory. func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) { logger := klog.FromContext(ctx) o := options{ @@ -357,6 +379,9 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe grpcVerbosity: 6, // Logs requests and responses, which can be large. serialize: true, nodeV1beta1: true, + pluginRegistrationEndpoint: endpoint{ + dir: KubeletRegistryDir, + }, } for _, option := range opts { if err := option(&o); err != nil { @@ -367,17 +392,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.driverName == "" { return nil, errors.New("driver name must be set") } - if o.draAddress == "" { - return nil, errors.New("DRA address must be set") + if o.pluginRegistrationEndpoint.file == "" { + o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock" } - var emptyEndpoint endpoint - if o.draEndpoint == emptyEndpoint { - return nil, errors.New("a Unix domain socket path and/or listener must be set for the kubelet plugin") + if o.pluginDataDirectoryPath == "" { + o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName) } - if o.pluginRegistrationEndpoint == emptyEndpoint { - return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar") - } - d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, @@ -412,7 +432,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe // Run the node plugin gRPC server first to ensure that it is ready. var supportedServices []string - pluginServer, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { + draEndpoint := endpoint{ + dir: o.pluginDataDirectoryPath, + file: "dra.sock", // "dra" is hard-coded. + listenFunc: o.draEndpointListen, + } + pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { if o.nodeV1beta1 { logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) @@ -428,7 +453,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe } // Now make it available to kubelet. - registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint) + registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint) if err != nil { return nil, fmt.Errorf("start registrar: %v", err) } @@ -510,6 +535,11 @@ func (d *Helper) PublishResources(_ context.Context, resources resourceslice.Dri // our background context, not the one passed into this // function, and thus is connected to the lifecycle of the // plugin. + // + // TODO: don't delete ResourceSlices, not even on a clean shutdown. + // We either support rolling updates and want to hand over seamlessly + // or don't and then perhaps restart the pod quickly enough that + // the kubelet hasn't deleted ResourceSlices yet. controllerCtx := d.backgroundCtx controllerLogger := klog.FromContext(controllerCtx) controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go new file mode 100644 index 00000000000..45916999c39 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint.go @@ -0,0 +1,83 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "path" +) + +// endpoint defines where and how to listen for incoming connections. +// The listener always gets closed when shutting down. +// +// If the listen function is not set, a new listener for a Unix domain socket gets +// created at the path. +type endpoint struct { + dir, file string + listenFunc func(ctx context.Context, socketpath string) (net.Listener, error) +} + +func (e endpoint) path() string { + return path.Join(e.dir, e.file) +} + +func (e endpoint) listen(ctx context.Context) (net.Listener, error) { + socketpath := e.path() + + if e.listenFunc != nil { + return e.listenFunc(ctx, socketpath) + } + + // Remove stale sockets, listen would fail otherwise. + if err := e.removeSocket(); err != nil { + return nil, err + } + cfg := net.ListenConfig{} + listener, err := cfg.Listen(ctx, "unix", socketpath) + if err != nil { + if removeErr := e.removeSocket(); removeErr != nil { + err = errors.Join(err, err) + } + return nil, err + } + return &unixListener{Listener: listener, endpoint: e}, nil +} + +func (e endpoint) removeSocket() error { + if err := os.Remove(e.path()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove Unix domain socket: %w", err) + } + return nil +} + +// unixListener adds removing of the Unix domain socket on Close. +type unixListener struct { + net.Listener + endpoint endpoint +} + +func (l *unixListener) Close() error { + err := l.Listener.Close() + if removeErr := l.endpoint.removeSocket(); removeErr != nil { + err = errors.Join(err, err) + } + return err +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go new file mode 100644 index 00000000000..1a7cc8ec0df --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "context" + "net" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/kubernetes/test/utils/ktesting" +) + +func TestEndpointLifecycle(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + tempDir := t.TempDir() + socketname := "test.sock" + e := endpoint{dir: tempDir, file: socketname} + listener, err := e.listen(ctx) + require.NoError(t, err, "listen") + assert.FileExists(t, path.Join(tempDir, socketname)) + require.NoError(t, listener.Close(), "close") + assert.NoFileExists(t, path.Join(tempDir, socketname)) +} + +func TestEndpointListener(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + tempDir := t.TempDir() + socketname := "test.sock" + listen := func(ctx2 context.Context, socketpath string) (net.Listener, error) { + assert.Equal(t, path.Join(tempDir, socketname), socketpath) + return nil, nil + } + e := endpoint{dir: tempDir, file: socketname, listenFunc: listen} + listener, err := e.listen(ctx) + require.NoError(t, err, "listen") + assert.NoFileExists(t, path.Join(tempDir, socketname)) + assert.Nil(t, listener) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go index 6384ca0c180..595b0a0bd62 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go @@ -17,10 +17,10 @@ limitations under the License. package kubeletplugin import ( - "context" "fmt" "google.golang.org/grpc" + "k8s.io/klog/v2" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) @@ -30,17 +30,15 @@ type nodeRegistrar struct { } // startRegistrar returns a running instance. -// -// The context is only used for additional values, cancellation is ignored. -func startRegistrar(valueCtx context.Context, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { +func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, socketpath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { n := &nodeRegistrar{ registrationServer: registrationServer{ driverName: driverName, - endpoint: endpoint, + endpoint: socketpath, supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin"). }, } - s, err := startGRPCServer(valueCtx, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { + s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { registerapi.RegisterRegistrationServer(grpcServer, n) }) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go index 13d4f5ac46a..a148b83d696 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go @@ -19,15 +19,11 @@ package kubeletplugin import ( "context" "fmt" - "net" - "os" "sync" "sync/atomic" "google.golang.org/grpc" "k8s.io/klog/v2" - - utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) var requestID int64 @@ -36,48 +32,25 @@ type grpcServer struct { grpcVerbosity int wg sync.WaitGroup endpoint endpoint + socketpath string server *grpc.Server } type registerService func(s *grpc.Server) -// endpoint defines where to listen for incoming connections. -// The listener always gets closed when shutting down. -// -// If the listener is not set, a new listener for a Unix domain socket gets -// created at the path. -// -// If the path is non-empty, then the socket will get removed when shutting -// down, regardless of who created the listener. -type endpoint struct { - path string - listener net.Listener -} - // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // which handles requests for arbitrary services. -// -// The context is only used for additional values, cancellation is ignored. -func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { - logger := klog.FromContext(valueCtx) +func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { + ctx := klog.NewContext(context.Background(), logger) + s := &grpcServer{ endpoint: endpoint, grpcVerbosity: grpcVerbosity, } - listener := endpoint.listener - if listener == nil { - // Remove any (probably stale) existing socket. - if err := os.Remove(endpoint.path); err != nil && !os.IsNotExist(err) { - return nil, fmt.Errorf("remove Unix domain socket: %v", err) - } - - // Now we can use the endpoint for listening. - l, err := net.Listen("unix", endpoint.path) - if err != nil { - return nil, fmt.Errorf("listen on %q: %v", endpoint.path, err) - } - listener = l + listener, err := endpoint.listen(ctx) + if err != nil { + return nil, fmt.Errorf("listen on %q: %w", s.socketpath, err) } // Run a gRPC server. It will close the listening socket when @@ -86,12 +59,12 @@ func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryIntercept // there might be log output inside the method implementations. var opts []grpc.ServerOption finalUnaryInterceptors := []grpc.UnaryServerInterceptor{ - unaryContextInterceptor(valueCtx), + unaryContextInterceptor(ctx), s.interceptor, } finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) finalStreamInterceptors := []grpc.StreamServerInterceptor{ - streamContextInterceptor(valueCtx), + streamContextInterceptor(ctx), s.streamInterceptor, } finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) @@ -243,9 +216,4 @@ func (s *grpcServer) stop() { } s.wg.Wait() s.server = nil - if s.endpoint.path != "" { - if err := os.Remove(s.endpoint.path); err != nil && !os.IsNotExist(err) { - utilruntime.HandleError(fmt.Errorf("remove Unix socket: %w", err)) - } - } } diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 61fdb22fc5f..6dad7033077 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -326,8 +326,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ instanceKey := "app.kubernetes.io/instance" rsName := "" - draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock") numNodes := int32(len(nodes.NodeNames)) + pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) + registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") + registrarSocketFilename := d.Name + "-reg.sock" err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { case *appsv1.ReplicaSet: @@ -353,10 +355,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ }, }, } - item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins") - item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") - item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock") - item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock") + item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath + item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath + item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename)) + item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath + item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock")) + item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath } return nil }, manifests...) @@ -427,10 +431,16 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { return d.streamInterceptor(nodename, srv, ss, info, handler) }), - kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), - kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), - kubeletplugin.KubeletPluginSocketPath(draAddr), - kubeletplugin.NodeV1beta1(d.NodeV1beta1), + + // TODO: start socat on-demand in listen. Then we can properly test + // the socket path handling in kubeletplugin and do rolling updates. + + kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), + kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)), + + kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), + kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), + kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) d.cleanup = append(d.cleanup, func() { @@ -552,16 +562,20 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO { } } -func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener { - addr := proxy.Addr{ - Namespace: f.Namespace.Name, - PodName: podName, - ContainerName: containerName, - Port: port, +func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) { + return func(ctx context.Context, path string) (net.Listener, error) { + addr := proxy.Addr{ + Namespace: f.Namespace.Name, + PodName: podName, + ContainerName: containerName, + Port: port, + } + listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) + if err != nil { + return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err) + } + return listener, nil } - listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) - framework.ExpectNoError(err, "listen for connections from %+v", addr) - return listener } func (d *Driver) TearDown() { diff --git a/test/e2e/dra/test-driver/README.md b/test/e2e/dra/test-driver/README.md index abae6a8c0c5..11a7ea9dbc9 100644 --- a/test/e2e/dra/test-driver/README.md +++ b/test/e2e/dra/test-driver/README.md @@ -61,10 +61,10 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio In another: ``` sudo mkdir -p /var/run/cdi -sudo mkdir -p /var/lib/kubelet/plugins +sudo mkdir -p /var/lib/kubelet/plugins/test-driver.cdi.k8s.io sudo mkdir -p /var/lib/kubelet/plugins_registry sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins -sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry +sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins/test-driver.cdi.k8s.io KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1 ``` diff --git a/test/e2e/dra/test-driver/app/server.go b/test/e2e/dra/test-driver/app/server.go index 043bd239656..d3900f581b0 100644 --- a/test/e2e/dra/test-driver/app/server.go +++ b/test/e2e/dra/test-driver/app/server.go @@ -178,9 +178,8 @@ func NewCommand() *cobra.Command { } kubeletPluginFlagSets := cliflag.NamedFlagSets{} fs = kubeletPluginFlagSets.FlagSet("kubelet") - pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.") - endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.") - draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.") + kubeletRegistryDir := fs.String("plugin-registration-path", kubeletplugin.KubeletRegistryDir, "The directory where kubelet looks for plugin registration sockets.") + kubeletPluginsDir := fs.String("datadir", kubeletplugin.KubeletPluginsDir, "The per-driver directory where the DRA Unix domain socket will be created.") fs = kubeletPluginFlagSets.FlagSet("CDI") cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files") nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for") @@ -196,7 +195,8 @@ func NewCommand() *cobra.Command { if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil { return fmt.Errorf("create CDI directory: %w", err) } - if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil { + datadir := path.Join(*kubeletPluginsDir, *driverName) + if err := os.MkdirAll(filepath.Dir(datadir), 0750); err != nil { return fmt.Errorf("create socket directory: %w", err) } @@ -205,9 +205,8 @@ func NewCommand() *cobra.Command { } plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices}, - kubeletplugin.PluginSocketPath(*endpoint), - kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), - kubeletplugin.KubeletPluginSocketPath(*draAddress), + kubeletplugin.PluginDataDirectoryPath(datadir), + kubeletplugin.RegistrarDirectoryPath(*kubeletRegistryDir), ) if err != nil { return fmt.Errorf("start example plugin: %w", err) diff --git a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml index 377bb2e587b..0e7c508bb48 100644 --- a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml +++ b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml @@ -50,29 +50,32 @@ spec: image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 args: - "--v=5" - - "--endpoint=/plugins_registry/dra-test-driver-reg.sock" + - "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock" - "--proxy-endpoint=tcp://:9000" volumeMounts: - - mountPath: /plugins_registry + - mountPath: /var/lib/kubelet/plugins_registry name: registration-dir - name: plugin image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 args: - "--v=5" - - "--endpoint=/dra/dra-test-driver.sock" + - "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock" - "--proxy-endpoint=tcp://:9001" securityContext: privileged: true volumeMounts: - - mountPath: /dra + - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io name: socket-dir - mountPath: /cdi name: cdi-dir volumes: - hostPath: - path: /var/lib/kubelet/plugins + # There's no way to remove this. Conceptually, it may have to + # survive Pod restarts and there's no way to tell the kubelet + # that this isn't the case for this Pod. + path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io type: DirectoryOrCreate name: socket-dir - hostPath: diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index f58d0dab408..6c18fdcf8e2 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -29,7 +29,6 @@ import ( "fmt" "os" "path" - "path/filepath" "regexp" "sort" "strings" @@ -62,8 +61,6 @@ const ( kubeletPlugin1Name = "test-driver1.cdi.k8s.io" kubeletPlugin2Name = "test-driver2.cdi.k8s.io" cdiDir = "/var/run/cdi" - endpointTemplate = "/var/lib/kubelet/plugins/%s/dra.sock" - pluginRegistrationPath = "/var/lib/kubelet/plugins_registry" pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state @@ -554,9 +551,9 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN // creating those directories. err := os.MkdirAll(cdiDir, os.FileMode(0750)) framework.ExpectNoError(err, "create CDI directory") - endpoint := fmt.Sprintf(endpointTemplate, pluginName) - err = os.MkdirAll(filepath.Dir(endpoint), 0750) - framework.ExpectNoError(err, "create socket directory") + datadir := path.Join(kubeletplugin.KubeletPluginsDir, pluginName) // The default, not set below. + err = os.MkdirAll(datadir, 0750) + framework.ExpectNoError(err, "create DRA socket directory") plugin, err := testdriver.StartPlugin( ctx, @@ -565,9 +562,6 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN clientSet, nodeName, testdriver.FileOperations{}, - kubeletplugin.PluginSocketPath(endpoint), - kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")), - kubeletplugin.KubeletPluginSocketPath(endpoint), ) framework.ExpectNoError(err)