From 2ea47038b90eb39cf9a6aed07d6184fce00a3cb6 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 10 Aug 2023 11:54:26 +0200 Subject: [PATCH] podresources: e2e: force eager connection Add and use more facilities to the *internal* podresources client. Checking e2e test runs, we have quite some ``` rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial unix /var/lib/kubelet/pod-resources/kubelet.sock: connect: connection refused": rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial unix /var/lib/kubelet/pod-resources/kubelet.sock: connect: connection refused" ``` This is likely caused by kubelet restarts, which we do plenty in e2e tests, combined with the fact gRPC does lazy connection AND we don't really check the errors in client code - we just bubble them up. While it's arguably bad we don't check properly error codes, it's also true that in the main case, e2e tests, the functions should just never fail besides few well known cases, we're connecting over a super-reliable unix domain socket after all. So, we centralize the fix adding a function (alongside with minor cleanups) which wants to trigger and ensure the connection happens, localizing the changes just here. The main advantage is this approach is opt-in, composable, and doesn't leak gRPC details into the client code. Signed-off-by: Francesco Romani --- pkg/kubelet/apis/podresources/client.go | 31 ++++++++++++++++++++++++ test/e2e_node/device_manager_test.go | 4 ++-- test/e2e_node/memory_manager_test.go | 8 +++---- test/e2e_node/podresources_test.go | 32 ++++++++++++------------- test/e2e_node/util.go | 2 +- 5 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pkg/kubelet/apis/podresources/client.go b/pkg/kubelet/apis/podresources/client.go index fcc6f1b903d..3b2700b9977 100644 --- a/pkg/kubelet/apis/podresources/client.go +++ b/pkg/kubelet/apis/podresources/client.go @@ -32,6 +32,11 @@ import ( // Note: Consumers of the pod resources API should not be importing this package. // They should copy paste the function in their project. +const ( + DefaultTimeout = 10 * time.Second + DefaultMaxMsgSize = 1024 * 1024 * 16 // 16 MiB +) + // GetV1alpha1Client returns a client for the PodResourcesLister grpc service // Note: This is deprecated func GetV1alpha1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1alpha1.PodResourcesListerClient, *grpc.ClientConn, error) { @@ -70,3 +75,29 @@ func GetV1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) } return v1.NewPodResourcesListerClient(conn), conn, nil } + +// GetClient returns a client for the recommended version of the PodResourcesLister grpc service with the recommended settings +func GetClient(endpoint string) (v1.PodResourcesListerClient, *grpc.ClientConn, error) { + return GetV1Client(endpoint, DefaultTimeout, DefaultMaxMsgSize) +} + +// WaitForReady ensures the communication has been established. +// We provide a composable WaitForReady instead of setting flags in the Dialing function to enable client code flexibility. +// In general, using `grpc.Dial` with the blocking flag enabled is an anti-pattern https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md +// But things are a bit different in the very narrow case we use here, over local UNIX domain socket. The transport is very stable and lossless, +// and the most common cause for failures bubbling up is for kubelet not yet ready, which is very common in the e2e tests but much less +// in the expected normal operation. +func WaitForReady(cli v1.PodResourcesListerClient, conn *grpc.ClientConn, err error) (v1.PodResourcesListerClient, *grpc.ClientConn, error) { + if err != nil { + return cli, conn, err + } + // we use List because it's the oldest endpoint and the one guaranteed to be available. + // Note we only set WaitForReady explicitly here effectively triggering eager connection. This way we force the connection to happen + // (or fail critically) without forcing the client code to use `grpc.WaitForReady` in their code everywhere. + // TODO: evaluate more lightweight option like GetAllocatableResources - we will discard the return value anyway. + _, listErr := cli.List(context.Background(), &v1.ListPodResourcesRequest{}, grpc.WaitForReady(true)) + if listErr != nil { + return cli, conn, fmt.Errorf("WaitForReady failed: %w", listErr) + } + return cli, conn, nil +} diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 55d6b7f979c..3832e5d6881 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -176,7 +176,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur sd := setupSRIOVConfigOrFail(ctx, f, configMap) waitForSRIOVResources(ctx, f, sd) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) @@ -260,7 +260,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur compareSRIOVResources(sd, sd2) - cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err = podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() diff --git a/test/e2e_node/memory_manager_test.go b/test/e2e_node/memory_manager_test.go index 9e73bb8c5cf..329e0b4ec08 100644 --- a/test/e2e_node/memory_manager_test.go +++ b/test/e2e_node/memory_manager_test.go @@ -382,7 +382,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -522,7 +522,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -667,7 +667,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -684,7 +684,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index ab16c859114..1e8a06ae527 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -667,8 +667,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() waitForSRIOVResources(ctx, f, sd) @@ -696,8 +696,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() waitForSRIOVResources(ctx, f, sd) @@ -749,8 +749,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() podresourcesListTests(ctx, f, cli, nil) @@ -765,8 +765,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() // intentionally passing empty cpuset instead of onlineCPUs because with none policy @@ -781,8 +781,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() faild err %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() ginkgo.By("checking Get fail if the feature gate is not enabled") @@ -827,8 +827,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() ginkgo.By("checking List and resources topology unaware resource should be without topology") @@ -884,8 +884,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() _, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) @@ -949,8 +949,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err) ginkgo.By("Connecting to the kubelet endpoint") - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 8c2a15bbb04..962e4943ea8 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -150,7 +150,7 @@ func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResour if err != nil { return nil, fmt.Errorf("Error getting local endpoint: %w", err) } - client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + client, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) if err != nil { return nil, fmt.Errorf("Error getting gRPC client: %w", err) }