diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go index 1a7cc8ec0df..77944f38f80 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/klog/v2/ktesting" ) func TestEndpointLifecycle(t *testing.T) { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 6dad7033077..37fc5226e0e 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -22,11 +22,14 @@ import ( _ "embed" "errors" "fmt" + "io" "net" + "net/url" "path" "sort" "strings" "sync" + "sync/atomic" "time" "github.com/google/go-cmp/cmp" @@ -49,11 +52,13 @@ import ( "k8s.io/client-go/discovery/cached/memory" resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" + "k8s.io/kubectl/pkg/cmd/exec" "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -356,11 +361,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ }, } item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath - item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath - item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename)) - item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath - item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock")) - item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath + item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath + item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath } return nil }, manifests...) @@ -423,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } else { fileOps.NumDevices = numDevices } + // All listeners running in this pod use a new unique local port number + // by atomically incrementing this variable. + listenerPort := int32(9000) plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -432,15 +438,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ return d.streamInterceptor(nodename, srv, ss, info, handler) }), - // TODO: start socat on-demand in listen. Then we can properly test - // the socket path handling in kubeletplugin and do rolling updates. - kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), - kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)), + kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), - kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)), + kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) d.cleanup = append(d.cleanup, func() { @@ -557,27 +560,163 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO { F: d.f, Namespace: pod.Namespace, PodName: pod.Name, - ContainerName: "plugin", + ContainerName: pod.Spec.Containers[0].Name, Logger: &logger, } } -func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) { - return func(ctx context.Context, path string) (net.Listener, error) { +// errListenerDone is the special error that we use to shut down. +// It doesn't need to be logged. +var errListenerDone = errors.New("listener is shutting down") + +// listen returns the function which the kubeletplugin helper needs to open a listening socket. +// For that it spins up hostpathplugin in the pod for the desired node +// and connects to hostpathplugin via port forwarding. +func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) { + return func(ctx context.Context, path string) (l net.Listener, e error) { + // "Allocate" a new port by by bumping the per-pod counter by one. + port := atomic.AddInt32(port, 1) + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "socket-listener") + logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port) + ctx = klog.NewContext(ctx, logger) + + // Start hostpathplugin in proxy mode and keep it running until the listener gets closed. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "/hostpathplugin", + "--v=5", + "--endpoint=" + path, + fmt.Sprintf("--proxy-endpoint=tcp://:%d", port), + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + var wg sync.WaitGroup + wg.Add(1) + cmdCtx, cmdCancel := context.WithCancelCause(ctx) + go func() { + defer wg.Done() + cmdLogger := klog.LoggerWithName(logger, "hostpathplugin") + cmdCtx := klog.NewContext(cmdCtx, cmdLogger) + logger.V(1).Info("Starting...") + defer logger.V(1).Info("Stopped") + if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil { + // errors.Is(err, listenerDoneErr) would be nicer, but we don't get + // that error from remotecommand. Instead forgo logging when we already shut down. + if cmdCtx.Err() == nil { + logger.Error(err, "execution failed") + } + } + + // Killing hostpathplugin does not remove the socket. Need to do that manually. + req := f.ClientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{ + "rm", + "-f", + path, + }, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + cleanupLogger := klog.LoggerWithName(logger, "cleanup") + cleanupCtx := klog.NewContext(ctx, cleanupLogger) + if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil { + cleanupLogger.Error(err, "Socket removal failed") + } + }() + defer func() { + // If we don't return a functional listener, then clean up. + if e != nil { + cmdCancel(e) + } + }() + stopHostpathplugin := func() { + cmdCancel(errListenerDone) + wg.Wait() + } + addr := proxy.Addr{ Namespace: f.Namespace.Name, - PodName: podName, - ContainerName: containerName, - Port: port, + PodName: pod.Name, + ContainerName: pod.Spec.Containers[0].Name, + Port: int(port), } listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) if err != nil { return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err) } - return listener, nil + return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil } } +// listenerWithClose wraps Close so that it also shuts down hostpathplugin. +type listenerWithClose struct { + net.Listener + close func() +} + +func (l *listenerWithClose) Close() error { + // First close connections, then shut down the remote command. + // Otherwise the connection code is unhappy and logs errors. + err := l.Listener.Close() + l.close() + return err +} + +// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level. +func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error { + // Stream output as long as we run, i.e. ignore cancellation. + stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity) + stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity) + defer func() { _ = stdout.Close() }() + defer func() { _ = stderr.Close() }() + + executor := exec.DefaultRemoteExecutor{} + return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil) +} + +// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background. +func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { + logger := klog.FromContext(ctx) + + reader, writer := io.Pipe() + go func() { + buffer := make([]byte, 10*1024) + for { + n, err := reader.Read(buffer) + if n > 0 { + logger.V(verbosity).Info(msg, "msg", string(buffer[0:n])) + } + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Error(err, msg) + } + reader.CloseWithError(err) + return + } + if ctx.Err() != nil { + reader.CloseWithError(context.Cause(ctx)) + return + } + } + }() + return writer +} + func (d *Driver) TearDown() { for _, c := range d.cleanup { c() diff --git a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml index 0e7c508bb48..ed32a26acac 100644 --- a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml +++ b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml @@ -46,29 +46,21 @@ spec: topologyKey: kubernetes.io/hostname containers: - - name: registrar + - name: pause image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock" - - "--proxy-endpoint=tcp://:9000" - volumeMounts: - - mountPath: /var/lib/kubelet/plugins_registry - name: registration-dir - - - name: plugin - image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 - args: - - "--v=5" - - "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock" - - "--proxy-endpoint=tcp://:9001" - securityContext: - privileged: true + command: + - /bin/sh + - -c + - while true; do sleep 10000; done volumeMounts: - mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io name: socket-dir + - mountPath: /var/lib/kubelet/plugins_registry + name: registration-dir - mountPath: /cdi name: cdi-dir + securityContext: + privileged: true volumes: - hostPath: @@ -78,11 +70,11 @@ spec: path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io type: DirectoryOrCreate name: socket-dir - - hostPath: - path: /var/run/cdi - type: DirectoryOrCreate - name: cdi-dir - hostPath: path: /var/lib/kubelet/plugins_registry type: DirectoryOrCreate name: registration-dir + - hostPath: + path: /var/run/cdi + type: DirectoryOrCreate + name: cdi-dir