From 1057407cee34683fc0cf1a9ccdaec00f3c9b1973 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 12 Mar 2025 09:49:19 +0100 Subject: [PATCH] DRA e2e: more flexible socket listening Instead of hard-coding two instances of the hostpathplugin which listen on certain socket paths, the hostpathplugin now gets started through Pod exec as needed. The advantage is that the helper code is in charge of socket naming, just like it would be in real deployment. One nuisance is that exec.StreamWithContext always complains in copyFromStdout and copyFromStderr when the remote hostpathplugin gets killed via context cancellation: E0312 11:56:31.637669 289446 v2.go:167] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError" E0312 11:56:31.637749 289446 v2.go:150] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError" These can be ignored. --- .../kubeletplugin/endpoint_test.go | 2 +- test/e2e/dra/deploy.go | 173 ++++++++++++++++-- .../dra/dra-test-driver-proxy.yaml | 34 ++-- 3 files changed, 170 insertions(+), 39 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go index 1a7cc8ec0df..77944f38f80 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/klog/v2/ktesting" ) func TestEndpointLifecycle(t *testing.T) { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 6dad7033077..37fc5226e0e 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -22,11 +22,14 @@ import ( _ "embed" "errors" "fmt" + "io" "net" + "net/url" "path" "sort" "strings" "sync" + "sync/atomic" "time" "github.com/google/go-cmp/cmp" @@ -49,11 +52,13 @@ import ( "k8s.io/client-go/discovery/cached/memory" resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" + "k8s.io/kubectl/pkg/cmd/exec" "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -356,11 +361,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ }, } item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath - item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath - item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename)) - item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath - item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock")) - item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath + item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath } return nil }, manifests...) @@ -423,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } else { fileOps.NumDevices = numDevices } + // All listeners running in this pod use a new unique local port number + // by atomically incrementing this variable. + listenerPort := int32(9000) plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -432,15 +438,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ return d.streamInterceptor(nodename, srv, ss, info, handler) }), - // TODO: start socat on-demand in listen. Then we can properly test - // the socket path handling in kubeletplugin and do rolling updates. - kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), - kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)), + kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), - kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)), + kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) d.cleanup = append(d.cleanup, func() { @@ -557,27 +560,163 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO { F: d.f, Namespace: pod.Namespace, PodName: pod.Name, - ContainerName: "plugin", + ContainerName: pod.Spec.Containers[0].Name, Logger: &logger, } } -func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) { - return func(ctx context.Context, path string) (net.Listener, error) { +// errListenerDone is the special error that we use to shut down. +// It doesn't need to be logged. +var errListenerDone = errors.New("listener is shutting down") + +// listen returns the function which the kubeletplugin helper needs to open a listening socket. +// For that it spins up hostpathplugin in the pod for the desired node +// and connects to hostpathplugin via port forwarding. +func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) { + return func(ctx context.Context, path string) (l net.Listener, e error) { + // "Allocate" a new port by by bumping the per-pod counter by one. + port := atomic.AddInt32(port, 1) + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "socket-listener") + logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port) + ctx = klog.NewContext(ctx, logger) + + // Start hostpathplugin in proxy mode and keep it running until the listener gets closed. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "/hostpathplugin", + "--v=5", + "--endpoint=" + path, + fmt.Sprintf("--proxy-endpoint=tcp://:%d", port), + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + var wg sync.WaitGroup + wg.Add(1) + cmdCtx, cmdCancel := context.WithCancelCause(ctx) + go func() { + defer wg.Done() + cmdLogger := klog.LoggerWithName(logger, "hostpathplugin") + cmdCtx := klog.NewContext(cmdCtx, cmdLogger) + logger.V(1).Info("Starting...") + defer logger.V(1).Info("Stopped") + if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil { + // errors.Is(err, listenerDoneErr) would be nicer, but we don't get + // that error from remotecommand. Instead forgo logging when we already shut down. + if cmdCtx.Err() == nil { + logger.Error(err, "execution failed") + } + } + + // Killing hostpathplugin does not remove the socket. Need to do that manually. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "rm", + "-f", + path, + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + cleanupLogger := klog.LoggerWithName(logger, "cleanup") + cleanupCtx := klog.NewContext(ctx, cleanupLogger) + if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil { + cleanupLogger.Error(err, "Socket removal failed") + } + }() + defer func() { + // If we don't return a functional listener, then clean up. + if e != nil { + cmdCancel(e) + } + }() + stopHostpathplugin := func() { + cmdCancel(errListenerDone) + wg.Wait() + } + addr := proxy.Addr{ Namespace: f.Namespace.Name, - PodName: podName, - ContainerName: containerName, - Port: port, + PodName: pod.Name, + ContainerName: pod.Spec.Containers[0].Name, + Port: int(port), } listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) if err != nil { return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err) } - return listener, nil + return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil } } +// listenerWithClose wraps Close so that it also shuts down hostpathplugin. +type listenerWithClose struct { + net.Listener + close func() +} + +func (l *listenerWithClose) Close() error { + // First close connections, then shut down the remote command. + // Otherwise the connection code is unhappy and logs errors. + err := l.Listener.Close() + l.close() + return err +} + +// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level. +func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error { + // Stream output as long as we run, i.e. ignore cancellation. + stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity) + stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity) + defer func() { _ = stdout.Close() }() + defer func() { _ = stderr.Close() }() + + executor := exec.DefaultRemoteExecutor{} + return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil) +} + +// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background. +func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { + logger := klog.FromContext(ctx) + + reader, writer := io.Pipe() + go func() { + buffer := make([]byte, 10*1024) + for { + n, err := reader.Read(buffer) + if n > 0 { + logger.V(verbosity).Info(msg, "msg", string(buffer[0:n])) + } + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Error(err, msg) + } + reader.CloseWithError(err) + return + } + if ctx.Err() != nil { + reader.CloseWithError(context.Cause(ctx)) + return + } + } + }() + return writer +} + func (d *Driver) TearDown() { for _, c := range d.cleanup { c() diff --git a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml index 0e7c508bb48..ed32a26acac 100644 --- a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml +++ b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml @@ -46,29 +46,21 @@ spec: topologyKey: kubernetes.io/hostname containers: - - name: registrar + - name: pause image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock" - - "--proxy-endpoint=tcp://:9000" - volumeMounts: - - mountPath: /var/lib/kubelet/plugins_registry - name: registration-dir - - - name: plugin - image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock" - - "--proxy-endpoint=tcp://:9001" - securityContext: - privileged: true + command: + - /bin/sh + - -c + - while true; do sleep 10000; done volumeMounts: - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io name: socket-dir + - mountPath: /var/lib/kubelet/plugins_registry + name: registration-dir - mountPath: /cdi name: cdi-dir + securityContext: + privileged: true volumes: - hostPath: @@ -78,11 +70,11 @@ spec: path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io type: DirectoryOrCreate name: socket-dir - - hostPath: - path: /var/run/cdi - type: DirectoryOrCreate - name: cdi-dir - hostPath: path: /var/lib/kubelet/plugins_registry type: DirectoryOrCreate name: registration-dir + - hostPath: + path: /var/run/cdi + type: DirectoryOrCreate + name: cdi-dir