DRA kubeletplugin: revise socket path handling

When supporting rolling updates, we cannot use the same fixed socket paths for
old and new pod. With the revised API, the caller no longer specifies the full
socket paths, only directories. The logic about how to name sockets then can be
in the helper.

While at it, avoid passing a context to the gRPC helper code when
all that the helper code needs is a logger. That leads to confusion
about whether cancellation has an effect.
This commit is contained in:
Patrick Ohly 2025-03-11 16:03:35 +01:00
parent c6252daccb
commit ec12727957
11 changed files with 326 additions and 134 deletions

View File

@ -16,4 +16,52 @@ limitations under the License.
// Package kubeletplugin provides helper functions for running a dynamic // Package kubeletplugin provides helper functions for running a dynamic
// resource allocation kubelet plugin. // resource allocation kubelet plugin.
//
// A DRA driver using this package can be deployed as a DaemonSet on suitable
// nodes. Node labeling, for example through NFD
// (https://github.com/kubernetes-sigs/node-feature-discovery), can be used
// to run the driver only on nodes which have the necessary hardware.
//
// The service account of the DaemonSet must have sufficient RBAC permissions
// to read ResourceClaims and to create and update ResourceSlices, if
// the driver intends to publish per-node ResourceSlices. It is good
// security practice (but not required) to limit access to ResourceSlices
// associated with the node a specific Pod is running on. This can be done
// with a Validating Admission Policy (VAP). For more information,
// see the deployment of the DRA example driver
// (https://github.com/kubernetes-sigs/dra-example-driver/tree/main/deployments/helm/dra-example-driver/templates).
//
// Traditionally, the kubelet has not supported rolling updates of plugins.
// Therefore the DaemonSet must not set `maxSurge` to a value larger than
// zero. With the default `maxSurge: 0`, updating the DaemonSet of the driver
// will first shut down the old driver Pod, then start the replacement.
//
// This leads to a short downtime for operations that need the driver:
// - Pods cannot start unless the claims they depend on were already
// prepared for use.
// - Cleanup after the last pod which used a claim gets delayed
// until the driver is available again. The pod is not marked
// as terminated. This prevents reusing the resources used by
// the pod for other pods.
// - Running pods are *not* affected as far as Kubernetes is
// concerned. However, a DRA driver might provide required runtime
// services. Vendors need to document this.
//
// Note that the second point also means that draining a node should
// first evict normal pods, then the driver DaemonSet Pod.
//
// Starting with Kubernetes 1.33, the kubelet supports rolling updates
// such that old and new Pod run at the same time for a short while
// and hand over work gracefully, with no downtime.
// However, there is no mechanism for determining in advance whether
// the node the DaemonSet runs on supports that. Trying
// to do a rolling update with a kubelet which does not support it yet
// will fail because shutting down the old Pod unregisters the driver
// even though the new Pod is running. See https://github.com/kubernetes/kubernetes/pull/129832
// for details (TODO: link to doc after merging instead).
//
// A DRA driver can either require 1.33 as minimal Kubernetes version or
// provide two variants of its DaemonSet deployment. In the variant with
// support for rolling updates, `maxSurge` can be set to a non-zero
// value. Administrators have to be careful about running the right variant.
package kubeletplugin package kubeletplugin

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"path"
"sync" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -36,6 +37,13 @@ import (
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
) )
const (
// KubeletPluginsDir is the default directory for [PluginDataDirectoryPath].
KubeletPluginsDir = "/var/lib/kubelet/plugins"
// KubeletRegistryDir is the default for [RegistrarDirectoryPath]
KubeletRegistryDir = "/var/lib/kubelet/plugins_registry"
)
// DRAPlugin is the interface that needs to be implemented by a DRA driver to // DRAPlugin is the interface that needs to be implemented by a DRA driver to
// use this helper package. The helper package then implements the gRPC // use this helper package. The helper package then implements the gRPC
// interface expected by the kubelet by wrapping the DRAPlugin implementation. // interface expected by the kubelet by wrapping the DRAPlugin implementation.
@ -165,63 +173,77 @@ func GRPCVerbosity(level int) Option {
} }
} }
// RegistrarSocketPath sets the file path for a Unix domain socket. // RegistrarDirectoryPath sets the path to the directory where the kubelet
// If RegistrarListener is not used, then Start will remove // expects to find registration sockets of plugins. Typically this is
// a file at that path, should one exist, and creates a socket // /var/lib/kubelet/plugins_registry with /var/lib/kubelet being the kubelet's
// itself. Otherwise it uses the provided listener and only // data directory.
// removes the socket at the specified path during shutdown.
// //
// At least one of these two options is required. // This is also the default. Some Kubernetes clusters may use a different data directory.
func RegistrarSocketPath(path string) Option { // This path must be the same inside and outside of the driver's container.
// The directory must exist.
func RegistrarDirectoryPath(path string) Option {
return func(o *options) error { return func(o *options) error {
o.pluginRegistrationEndpoint.path = path o.pluginRegistrationEndpoint.dir = path
return nil return nil
} }
} }
// RegistrarListener sets an already created listener for the plugin // RegistrarSocketFilename sets the name of the socket inside the directory where
// registration API. Can be combined with RegistrarSocketPath. // the kubelet watches for registration sockets (see RegistrarDirectoryPath).
// //
// At least one of these two options is required. // Usually DRA drivers should not need this option. It is provided to
func RegistrarListener(listener net.Listener) Option { // support updates from an installation which used an older release of
// of the helper code.
//
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
// it is <driver name>-<uid>-reg.sock.
func RegistrarSocketFilename(name string) Option {
return func(o *options) error { return func(o *options) error {
o.pluginRegistrationEndpoint.listener = listener o.pluginRegistrationEndpoint.file = name
return nil return nil
} }
} }
// PluginSocketPath sets the file path for a Unix domain socket. // RegistrarListener configures how to create the registrar socket.
// If PluginListener is not used, then Start will remove // The default is to remove the file if it exists and to then
// a file at that path, should one exist, and creates a socket // create a socket.
// itself. Otherwise it uses the provided listener and only
// removes the socket at the specified path during shutdown.
// //
// At least one of these two options is required. // This is used in Kubernetes for end-to-end testing. The default should
func PluginSocketPath(path string) Option { // be fine for DRA drivers.
func RegistrarListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
return func(o *options) error { return func(o *options) error {
o.draEndpoint.path = path o.pluginRegistrationEndpoint.listenFunc = listen
return nil return nil
} }
} }
// PluginListener sets an already created listener for the dynamic resource // PluginDataDirectoryPath sets the path where the DRA driver creates the
// allocation plugin API. Can be combined with PluginSocketPath. // "dra.sock" socket that the kubelet connects to for the DRA-specific gRPC calls.
// It is also used to coordinate between different Pods when using rolling
// updates. It must not be shared with other kubelet plugins.
// //
// At least one of these two options is required. // The default is /var/lib/kubelet/plugins/<driver name>. This directory
func PluginListener(listener net.Listener) Option { // does not need to be inside the kubelet data directory, as long as
// the kubelet can access it.
//
// This path must be the same inside and outside of the driver's container.
// The directory must exist.
func PluginDataDirectoryPath(path string) Option {
return func(o *options) error { return func(o *options) error {
o.draEndpoint.listener = listener o.pluginDataDirectoryPath = path
return nil return nil
} }
} }
// KubeletPluginSocketPath defines how kubelet will connect to the dynamic // PluginListener configures how to create the registrar socket.
// resource allocation plugin. This corresponds to PluginSocketPath, except // The default is to remove the file if it exists and to then
// that PluginSocketPath defines the path in the filesystem of the caller and // create a socket.
// KubeletPluginSocketPath in the filesystem of kubelet. //
func KubeletPluginSocketPath(path string) Option { // This is used in Kubernetes for end-to-end testing. The default should
// be fine for DRA drivers.
func PluginListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
return func(o *options) error { return func(o *options) error {
o.draAddress = path o.draEndpointListen = listen
return nil return nil
} }
} }
@ -306,9 +328,9 @@ type options struct {
driverName string driverName string
nodeName string nodeName string
nodeUID types.UID nodeUID types.UID
draEndpoint endpoint
draAddress string
pluginRegistrationEndpoint endpoint pluginRegistrationEndpoint endpoint
pluginDataDirectoryPath string
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
unaryInterceptors []grpc.UnaryServerInterceptor unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
@ -349,7 +371,7 @@ type Helper struct {
// a name to all log entries. // a name to all log entries.
// //
// If the plugin will be used to publish resources, [KubeClient] and [NodeName] // If the plugin will be used to publish resources, [KubeClient] and [NodeName]
// options are mandatory. // options are mandatory. Otherwise only [DriverName] is mandatory.
func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) { func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
o := options{ o := options{
@ -357,6 +379,9 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
grpcVerbosity: 6, // Logs requests and responses, which can be large. grpcVerbosity: 6, // Logs requests and responses, which can be large.
serialize: true, serialize: true,
nodeV1beta1: true, nodeV1beta1: true,
pluginRegistrationEndpoint: endpoint{
dir: KubeletRegistryDir,
},
} }
for _, option := range opts { for _, option := range opts {
if err := option(&o); err != nil { if err := option(&o); err != nil {
@ -367,17 +392,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
if o.driverName == "" { if o.driverName == "" {
return nil, errors.New("driver name must be set") return nil, errors.New("driver name must be set")
} }
if o.draAddress == "" { if o.pluginRegistrationEndpoint.file == "" {
return nil, errors.New("DRA address must be set") o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
} }
var emptyEndpoint endpoint if o.pluginDataDirectoryPath == "" {
if o.draEndpoint == emptyEndpoint { o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
return nil, errors.New("a Unix domain socket path and/or listener must be set for the kubelet plugin")
} }
if o.pluginRegistrationEndpoint == emptyEndpoint {
return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar")
}
d := &Helper{ d := &Helper{
driverName: o.driverName, driverName: o.driverName,
nodeName: o.nodeName, nodeName: o.nodeName,
@ -412,7 +432,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
// Run the node plugin gRPC server first to ensure that it is ready. // Run the node plugin gRPC server first to ensure that it is ready.
var supportedServices []string var supportedServices []string
pluginServer, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { draEndpoint := endpoint{
dir: o.pluginDataDirectoryPath,
file: "dra.sock", // "dra" is hard-coded.
listenFunc: o.draEndpointListen,
}
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
if o.nodeV1beta1 { if o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
@ -428,7 +453,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
} }
// Now make it available to kubelet. // Now make it available to kubelet.
registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint) registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint)
if err != nil { if err != nil {
return nil, fmt.Errorf("start registrar: %v", err) return nil, fmt.Errorf("start registrar: %v", err)
} }
@ -510,6 +535,11 @@ func (d *Helper) PublishResources(_ context.Context, resources resourceslice.Dri
// our background context, not the one passed into this // our background context, not the one passed into this
// function, and thus is connected to the lifecycle of the // function, and thus is connected to the lifecycle of the
// plugin. // plugin.
//
// TODO: don't delete ResourceSlices, not even on a clean shutdown.
// We either support rolling updates and want to hand over seamlessly
// or don't and then perhaps restart the pod quickly enough that
// the kubelet hasn't deleted ResourceSlices yet.
controllerCtx := d.backgroundCtx controllerCtx := d.backgroundCtx
controllerLogger := klog.FromContext(controllerCtx) controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")

View File

@ -0,0 +1,83 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"context"
"errors"
"fmt"
"net"
"os"
"path"
)
// endpoint defines where and how to listen for incoming connections.
// The listener always gets closed when shutting down.
//
// If the listen function is not set, a new listener for a Unix domain socket gets
// created at the path.
type endpoint struct {
dir, file string
listenFunc func(ctx context.Context, socketpath string) (net.Listener, error)
}
func (e endpoint) path() string {
return path.Join(e.dir, e.file)
}
func (e endpoint) listen(ctx context.Context) (net.Listener, error) {
socketpath := e.path()
if e.listenFunc != nil {
return e.listenFunc(ctx, socketpath)
}
// Remove stale sockets, listen would fail otherwise.
if err := e.removeSocket(); err != nil {
return nil, err
}
cfg := net.ListenConfig{}
listener, err := cfg.Listen(ctx, "unix", socketpath)
if err != nil {
if removeErr := e.removeSocket(); removeErr != nil {
err = errors.Join(err, err)
}
return nil, err
}
return &unixListener{Listener: listener, endpoint: e}, nil
}
func (e endpoint) removeSocket() error {
if err := os.Remove(e.path()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove Unix domain socket: %w", err)
}
return nil
}
// unixListener adds removing of the Unix domain socket on Close.
type unixListener struct {
net.Listener
endpoint endpoint
}
func (l *unixListener) Close() error {
err := l.Listener.Close()
if removeErr := l.endpoint.removeSocket(); removeErr != nil {
err = errors.Join(err, err)
}
return err
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletplugin
import (
"context"
"net"
"path"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestEndpointLifecycle(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tempDir := t.TempDir()
socketname := "test.sock"
e := endpoint{dir: tempDir, file: socketname}
listener, err := e.listen(ctx)
require.NoError(t, err, "listen")
assert.FileExists(t, path.Join(tempDir, socketname))
require.NoError(t, listener.Close(), "close")
assert.NoFileExists(t, path.Join(tempDir, socketname))
}
func TestEndpointListener(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tempDir := t.TempDir()
socketname := "test.sock"
listen := func(ctx2 context.Context, socketpath string) (net.Listener, error) {
assert.Equal(t, path.Join(tempDir, socketname), socketpath)
return nil, nil
}
e := endpoint{dir: tempDir, file: socketname, listenFunc: listen}
listener, err := e.listen(ctx)
require.NoError(t, err, "listen")
assert.NoFileExists(t, path.Join(tempDir, socketname))
assert.Nil(t, listener)
}

View File

@ -17,10 +17,10 @@ limitations under the License.
package kubeletplugin package kubeletplugin
import ( import (
"context"
"fmt" "fmt"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
) )
@ -30,17 +30,15 @@ type nodeRegistrar struct {
} }
// startRegistrar returns a running instance. // startRegistrar returns a running instance.
// func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, socketpath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
// The context is only used for additional values, cancellation is ignored.
func startRegistrar(valueCtx context.Context, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
n := &nodeRegistrar{ n := &nodeRegistrar{
registrationServer: registrationServer{ registrationServer: registrationServer{
driverName: driverName, driverName: driverName,
endpoint: endpoint, endpoint: socketpath,
supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin"). supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin").
}, },
} }
s, err := startGRPCServer(valueCtx, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n) registerapi.RegisterRegistrationServer(grpcServer, n)
}) })
if err != nil { if err != nil {

View File

@ -19,15 +19,11 @@ package kubeletplugin
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2" "k8s.io/klog/v2"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
) )
var requestID int64 var requestID int64
@ -36,48 +32,25 @@ type grpcServer struct {
grpcVerbosity int grpcVerbosity int
wg sync.WaitGroup wg sync.WaitGroup
endpoint endpoint endpoint endpoint
socketpath string
server *grpc.Server server *grpc.Server
} }
type registerService func(s *grpc.Server) type registerService func(s *grpc.Server)
// endpoint defines where to listen for incoming connections.
// The listener always gets closed when shutting down.
//
// If the listener is not set, a new listener for a Unix domain socket gets
// created at the path.
//
// If the path is non-empty, then the socket will get removed when shutting
// down, regardless of who created the listener.
type endpoint struct {
path string
listener net.Listener
}
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services. // which handles requests for arbitrary services.
// func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
// The context is only used for additional values, cancellation is ignored. ctx := klog.NewContext(context.Background(), logger)
func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
logger := klog.FromContext(valueCtx)
s := &grpcServer{ s := &grpcServer{
endpoint: endpoint, endpoint: endpoint,
grpcVerbosity: grpcVerbosity, grpcVerbosity: grpcVerbosity,
} }
listener := endpoint.listener listener, err := endpoint.listen(ctx)
if listener == nil { if err != nil {
// Remove any (probably stale) existing socket. return nil, fmt.Errorf("listen on %q: %w", s.socketpath, err)
if err := os.Remove(endpoint.path); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("remove Unix domain socket: %v", err)
}
// Now we can use the endpoint for listening.
l, err := net.Listen("unix", endpoint.path)
if err != nil {
return nil, fmt.Errorf("listen on %q: %v", endpoint.path, err)
}
listener = l
} }
// Run a gRPC server. It will close the listening socket when // Run a gRPC server. It will close the listening socket when
@ -86,12 +59,12 @@ func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryIntercept
// there might be log output inside the method implementations. // there might be log output inside the method implementations.
var opts []grpc.ServerOption var opts []grpc.ServerOption
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{ finalUnaryInterceptors := []grpc.UnaryServerInterceptor{
unaryContextInterceptor(valueCtx), unaryContextInterceptor(ctx),
s.interceptor, s.interceptor,
} }
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
finalStreamInterceptors := []grpc.StreamServerInterceptor{ finalStreamInterceptors := []grpc.StreamServerInterceptor{
streamContextInterceptor(valueCtx), streamContextInterceptor(ctx),
s.streamInterceptor, s.streamInterceptor,
} }
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
@ -243,9 +216,4 @@ func (s *grpcServer) stop() {
} }
s.wg.Wait() s.wg.Wait()
s.server = nil s.server = nil
if s.endpoint.path != "" {
if err := os.Remove(s.endpoint.path); err != nil && !os.IsNotExist(err) {
utilruntime.HandleError(fmt.Errorf("remove Unix socket: %w", err))
}
}
} }

View File

@ -326,8 +326,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
instanceKey := "app.kubernetes.io/instance" instanceKey := "app.kubernetes.io/instance"
rsName := "" rsName := ""
draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
numNodes := int32(len(nodes.NodeNames)) numNodes := int32(len(nodes.NodeNames))
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
registrarSocketFilename := d.Name + "-reg.sock"
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
switch item := item.(type) { switch item := item.(type) {
case *appsv1.ReplicaSet: case *appsv1.ReplicaSet:
@ -353,10 +355,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
}, },
}, },
} }
item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins") item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath
item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock") item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename))
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock") item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock"))
item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath
} }
return nil return nil
}, manifests...) }, manifests...)
@ -427,10 +431,16 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
return d.streamInterceptor(nodename, srv, ss, info, handler) return d.streamInterceptor(nodename, srv, ss, info, handler)
}), }),
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), // TODO: start socat on-demand in listen. Then we can properly test
kubeletplugin.KubeletPluginSocketPath(draAddr), // the socket path handling in kubeletplugin and do rolling updates.
kubeletplugin.NodeV1beta1(d.NodeV1beta1),
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)),
) )
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() { d.cleanup = append(d.cleanup, func() {
@ -552,16 +562,20 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
} }
} }
func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener { func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) {
addr := proxy.Addr{ return func(ctx context.Context, path string) (net.Listener, error) {
Namespace: f.Namespace.Name, addr := proxy.Addr{
PodName: podName, Namespace: f.Namespace.Name,
ContainerName: containerName, PodName: podName,
Port: port, ContainerName: containerName,
Port: port,
}
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
if err != nil {
return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
}
return listener, nil
} }
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
framework.ExpectNoError(err, "listen for connections from %+v", addr)
return listener
} }
func (d *Driver) TearDown() { func (d *Driver) TearDown() {

View File

@ -61,10 +61,10 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio
In another: In another:
``` ```
sudo mkdir -p /var/run/cdi sudo mkdir -p /var/run/cdi
sudo mkdir -p /var/lib/kubelet/plugins sudo mkdir -p /var/lib/kubelet/plugins/test-driver.cdi.k8s.io
sudo mkdir -p /var/lib/kubelet/plugins_registry sudo mkdir -p /var/lib/kubelet/plugins_registry
sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins
sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins/test-driver.cdi.k8s.io
KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1 KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1
``` ```

View File

@ -178,9 +178,8 @@ func NewCommand() *cobra.Command {
} }
kubeletPluginFlagSets := cliflag.NamedFlagSets{} kubeletPluginFlagSets := cliflag.NamedFlagSets{}
fs = kubeletPluginFlagSets.FlagSet("kubelet") fs = kubeletPluginFlagSets.FlagSet("kubelet")
pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.") kubeletRegistryDir := fs.String("plugin-registration-path", kubeletplugin.KubeletRegistryDir, "The directory where kubelet looks for plugin registration sockets.")
endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.") kubeletPluginsDir := fs.String("datadir", kubeletplugin.KubeletPluginsDir, "The per-driver directory where the DRA Unix domain socket will be created.")
draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
fs = kubeletPluginFlagSets.FlagSet("CDI") fs = kubeletPluginFlagSets.FlagSet("CDI")
cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files") cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for") nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for")
@ -196,7 +195,8 @@ func NewCommand() *cobra.Command {
if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil { if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
return fmt.Errorf("create CDI directory: %w", err) return fmt.Errorf("create CDI directory: %w", err)
} }
if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil { datadir := path.Join(*kubeletPluginsDir, *driverName)
if err := os.MkdirAll(filepath.Dir(datadir), 0750); err != nil {
return fmt.Errorf("create socket directory: %w", err) return fmt.Errorf("create socket directory: %w", err)
} }
@ -205,9 +205,8 @@ func NewCommand() *cobra.Command {
} }
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices}, plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{NumDevices: *numDevices},
kubeletplugin.PluginSocketPath(*endpoint), kubeletplugin.PluginDataDirectoryPath(datadir),
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), kubeletplugin.RegistrarDirectoryPath(*kubeletRegistryDir),
kubeletplugin.KubeletPluginSocketPath(*draAddress),
) )
if err != nil { if err != nil {
return fmt.Errorf("start example plugin: %w", err) return fmt.Errorf("start example plugin: %w", err)

View File

@ -50,29 +50,32 @@ spec:
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args: args:
- "--v=5" - "--v=5"
- "--endpoint=/plugins_registry/dra-test-driver-reg.sock" - "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock"
- "--proxy-endpoint=tcp://:9000" - "--proxy-endpoint=tcp://:9000"
volumeMounts: volumeMounts:
- mountPath: /plugins_registry - mountPath: /var/lib/kubelet/plugins_registry
name: registration-dir name: registration-dir
- name: plugin - name: plugin
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args: args:
- "--v=5" - "--v=5"
- "--endpoint=/dra/dra-test-driver.sock" - "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock"
- "--proxy-endpoint=tcp://:9001" - "--proxy-endpoint=tcp://:9001"
securityContext: securityContext:
privileged: true privileged: true
volumeMounts: volumeMounts:
- mountPath: /dra - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
name: socket-dir name: socket-dir
- mountPath: /cdi - mountPath: /cdi
name: cdi-dir name: cdi-dir
volumes: volumes:
- hostPath: - hostPath:
path: /var/lib/kubelet/plugins # There's no way to remove this. Conceptually, it may have to
# survive Pod restarts and there's no way to tell the kubelet
# that this isn't the case for this Pod.
path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
type: DirectoryOrCreate type: DirectoryOrCreate
name: socket-dir name: socket-dir
- hostPath: - hostPath:

View File

@ -29,7 +29,6 @@ import (
"fmt" "fmt"
"os" "os"
"path" "path"
"path/filepath"
"regexp" "regexp"
"sort" "sort"
"strings" "strings"
@ -62,8 +61,6 @@ const (
kubeletPlugin1Name = "test-driver1.cdi.k8s.io" kubeletPlugin1Name = "test-driver1.cdi.k8s.io"
kubeletPlugin2Name = "test-driver2.cdi.k8s.io" kubeletPlugin2Name = "test-driver2.cdi.k8s.io"
cdiDir = "/var/run/cdi" cdiDir = "/var/run/cdi"
endpointTemplate = "/var/lib/kubelet/plugins/%s/dra.sock"
pluginRegistrationPath = "/var/lib/kubelet/plugins_registry"
pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered
podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state podInPendingStateTimeout = time.Second * 60 // how long to wait for a pod to stay in pending state
@ -554,9 +551,9 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
// creating those directories. // creating those directories.
err := os.MkdirAll(cdiDir, os.FileMode(0750)) err := os.MkdirAll(cdiDir, os.FileMode(0750))
framework.ExpectNoError(err, "create CDI directory") framework.ExpectNoError(err, "create CDI directory")
endpoint := fmt.Sprintf(endpointTemplate, pluginName) datadir := path.Join(kubeletplugin.KubeletPluginsDir, pluginName) // The default, not set below.
err = os.MkdirAll(filepath.Dir(endpoint), 0750) err = os.MkdirAll(datadir, 0750)
framework.ExpectNoError(err, "create socket directory") framework.ExpectNoError(err, "create DRA socket directory")
plugin, err := testdriver.StartPlugin( plugin, err := testdriver.StartPlugin(
ctx, ctx,
@ -565,9 +562,6 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
clientSet, clientSet,
nodeName, nodeName,
testdriver.FileOperations{}, testdriver.FileOperations{},
kubeletplugin.PluginSocketPath(endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")),
kubeletplugin.KubeletPluginSocketPath(endpoint),
) )
framework.ExpectNoError(err) framework.ExpectNoError(err)