DRA e2e: more flexible socket listening

Instead of hard-coding two instances of the hostpathplugin which listen on
certain socket paths, the hostpathplugin now gets started through Pod exec as
needed. The advantage is that the helper code is in charge of socket naming,
just like it would be in real deployment.

One nuisance is that exec.StreamWithContext always complains in copyFromStdout
and copyFromStderr when the remote hostpathplugin gets killed via context
cancellation:

    E0312 11:56:31.637669  289446 v2.go:167] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError"
    E0312 11:56:31.637749  289446 v2.go:150] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError"

These can be ignored.
This commit is contained in:
Patrick Ohly 2025-03-12 09:49:19 +01:00
parent ec12727957
commit 1057407cee
3 changed files with 170 additions and 39 deletions

View File

@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/klog/v2/ktesting"
)
func TestEndpointLifecycle(t *testing.T) {

View File

@ -22,11 +22,14 @@ import (
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/url"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/go-cmp/cmp"
@ -49,11 +52,13 @@ import (
"k8s.io/client-go/discovery/cached/memory"
resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/exec"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -356,11 +361,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
},
}
item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath
item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename))
item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock"))
item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath
item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath
item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath
item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath
}
return nil
}, manifests...)
@ -423,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
} else {
fileOps.NumDevices = numDevices
}
// All listeners running in this pod use a new unique local port number
// by atomically incrementing this variable.
listenerPort := int32(9000)
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@ -432,15 +438,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
return d.streamInterceptor(nodename, srv, ss, info, handler)
}),
// TODO: start socat on-demand in listen. Then we can properly test
// the socket path handling in kubeletplugin and do rolling updates.
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)),
kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)),
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() {
@ -557,27 +560,163 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
F: d.f,
Namespace: pod.Namespace,
PodName: pod.Name,
ContainerName: "plugin",
ContainerName: pod.Spec.Containers[0].Name,
Logger: &logger,
}
}
func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) {
return func(ctx context.Context, path string) (net.Listener, error) {
// errListenerDone is the special error that we use to shut down.
// It doesn't need to be logged.
var errListenerDone = errors.New("listener is shutting down")
// listen returns the function which the kubeletplugin helper needs to open a listening socket.
// For that it spins up hostpathplugin in the pod for the desired node
// and connects to hostpathplugin via port forwarding.
func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) {
return func(ctx context.Context, path string) (l net.Listener, e error) {
// "Allocate" a new port by by bumping the per-pod counter by one.
port := atomic.AddInt32(port, 1)
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "socket-listener")
logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port)
ctx = klog.NewContext(ctx, logger)
// Start hostpathplugin in proxy mode and keep it running until the listener gets closed.
req := f.ClientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{
"/hostpathplugin",
"--v=5",
"--endpoint=" + path,
fmt.Sprintf("--proxy-endpoint=tcp://:%d", port),
},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)
var wg sync.WaitGroup
wg.Add(1)
cmdCtx, cmdCancel := context.WithCancelCause(ctx)
go func() {
defer wg.Done()
cmdLogger := klog.LoggerWithName(logger, "hostpathplugin")
cmdCtx := klog.NewContext(cmdCtx, cmdLogger)
logger.V(1).Info("Starting...")
defer logger.V(1).Info("Stopped")
if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil {
// errors.Is(err, listenerDoneErr) would be nicer, but we don't get
// that error from remotecommand. Instead forgo logging when we already shut down.
if cmdCtx.Err() == nil {
logger.Error(err, "execution failed")
}
}
// Killing hostpathplugin does not remove the socket. Need to do that manually.
req := f.ClientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{
"rm",
"-f",
path,
},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)
cleanupLogger := klog.LoggerWithName(logger, "cleanup")
cleanupCtx := klog.NewContext(ctx, cleanupLogger)
if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil {
cleanupLogger.Error(err, "Socket removal failed")
}
}()
defer func() {
// If we don't return a functional listener, then clean up.
if e != nil {
cmdCancel(e)
}
}()
stopHostpathplugin := func() {
cmdCancel(errListenerDone)
wg.Wait()
}
addr := proxy.Addr{
Namespace: f.Namespace.Name,
PodName: podName,
ContainerName: containerName,
Port: port,
PodName: pod.Name,
ContainerName: pod.Spec.Containers[0].Name,
Port: int(port),
}
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
if err != nil {
return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
}
return listener, nil
return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil
}
}
// listenerWithClose wraps Close so that it also shuts down hostpathplugin.
type listenerWithClose struct {
net.Listener
close func()
}
func (l *listenerWithClose) Close() error {
// First close connections, then shut down the remote command.
// Otherwise the connection code is unhappy and logs errors.
err := l.Listener.Close()
l.close()
return err
}
// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level.
func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error {
// Stream output as long as we run, i.e. ignore cancellation.
stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity)
stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity)
defer func() { _ = stdout.Close() }()
defer func() { _ = stderr.Close() }()
executor := exec.DefaultRemoteExecutor{}
return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil)
}
// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background.
func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
logger := klog.FromContext(ctx)
reader, writer := io.Pipe()
go func() {
buffer := make([]byte, 10*1024)
for {
n, err := reader.Read(buffer)
if n > 0 {
logger.V(verbosity).Info(msg, "msg", string(buffer[0:n]))
}
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Error(err, msg)
}
reader.CloseWithError(err)
return
}
if ctx.Err() != nil {
reader.CloseWithError(context.Cause(ctx))
return
}
}
}()
return writer
}
func (d *Driver) TearDown() {
for _, c := range d.cleanup {
c()

View File

@ -46,29 +46,21 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: registrar
- name: pause
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args:
- "--v=5"
- "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock"
- "--proxy-endpoint=tcp://:9000"
volumeMounts:
- mountPath: /var/lib/kubelet/plugins_registry
name: registration-dir
- name: plugin
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args:
- "--v=5"
- "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock"
- "--proxy-endpoint=tcp://:9001"
securityContext:
privileged: true
command:
- /bin/sh
- -c
- while true; do sleep 10000; done
volumeMounts:
- mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
name: socket-dir
- mountPath: /var/lib/kubelet/plugins_registry
name: registration-dir
- mountPath: /cdi
name: cdi-dir
securityContext:
privileged: true
volumes:
- hostPath:
@ -78,11 +70,11 @@ spec:
path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/run/cdi
type: DirectoryOrCreate
name: cdi-dir
- hostPath:
path: /var/lib/kubelet/plugins_registry
type: DirectoryOrCreate
name: registration-dir
- hostPath:
path: /var/run/cdi
type: DirectoryOrCreate
name: cdi-dir