diff --git a/pkg/kubelet/apis/podresources/client.go b/pkg/kubelet/apis/podresources/client.go index fcc6f1b903d..3b2700b9977 100644 --- a/pkg/kubelet/apis/podresources/client.go +++ b/pkg/kubelet/apis/podresources/client.go @@ -32,6 +32,11 @@ import ( // Note: Consumers of the pod resources API should not be importing this package. // They should copy paste the function in their project. +const ( + DefaultTimeout = 10 * time.Second + DefaultMaxMsgSize = 1024 * 1024 * 16 // 16 MiB +) + // GetV1alpha1Client returns a client for the PodResourcesLister grpc service // Note: This is deprecated func GetV1alpha1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1alpha1.PodResourcesListerClient, *grpc.ClientConn, error) { @@ -70,3 +75,29 @@ func GetV1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) } return v1.NewPodResourcesListerClient(conn), conn, nil } + +// GetClient returns a client for the recommended version of the PodResourcesLister grpc service with the recommended settings +func GetClient(endpoint string) (v1.PodResourcesListerClient, *grpc.ClientConn, error) { + return GetV1Client(endpoint, DefaultTimeout, DefaultMaxMsgSize) +} + +// WaitForReady ensures the communication has been established. +// We provide a composable WaitForReady instead of setting flags in the Dialing function to enable client code flexibility. +// In general, using `grpc.Dial` with the blocking flag enabled is an anti-pattern https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md +// But things are a bit different in the very narrow case we use here, over local UNIX domain socket. The transport is very stable and lossless, +// and the most common cause for failures bubbling up is for kubelet not yet ready, which is very common in the e2e tests but much less +// in the expected normal operation. +func WaitForReady(cli v1.PodResourcesListerClient, conn *grpc.ClientConn, err error) (v1.PodResourcesListerClient, *grpc.ClientConn, error) { + if err != nil { + return cli, conn, err + } + // we use List because it's the oldest endpoint and the one guaranteed to be available. + // Note we only set WaitForReady explicitly here effectively triggering eager connection. This way we force the connection to happen + // (or fail critically) without forcing the client code to use `grpc.WaitForReady` in their code everywhere. + // TODO: evaluate more lightweight option like GetAllocatableResources - we will discard the return value anyway. + _, listErr := cli.List(context.Background(), &v1.ListPodResourcesRequest{}, grpc.WaitForReady(true)) + if listErr != nil { + return cli, conn, fmt.Errorf("WaitForReady failed: %w", listErr) + } + return cli, conn, nil +} diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 55d6b7f979c..3832e5d6881 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -176,7 +176,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur sd := setupSRIOVConfigOrFail(ctx, f, configMap) waitForSRIOVResources(ctx, f, sd) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) @@ -260,7 +260,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur compareSRIOVResources(sd, sd2) - cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err = podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() diff --git a/test/e2e_node/memory_manager_test.go b/test/e2e_node/memory_manager_test.go index 9e73bb8c5cf..329e0b4ec08 100644 --- a/test/e2e_node/memory_manager_test.go +++ b/test/e2e_node/memory_manager_test.go @@ -382,7 +382,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -522,7 +522,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -667,7 +667,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() @@ -684,7 +684,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) framework.ExpectNoError(err) defer conn.Close() diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index ab16c859114..1e8a06ae527 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -667,8 +667,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() waitForSRIOVResources(ctx, f, sd) @@ -696,8 +696,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() waitForSRIOVResources(ctx, f, sd) @@ -749,8 +749,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() podresourcesListTests(ctx, f, cli, nil) @@ -765,8 +765,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() // intentionally passing empty cpuset instead of onlineCPUs because with none policy @@ -781,8 +781,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() faild err %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() ginkgo.By("checking Get fail if the feature gate is not enabled") @@ -827,8 +827,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err: %v", err) defer conn.Close() ginkgo.By("checking List and resources topology unaware resource should be without topology") @@ -884,8 +884,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err) - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() _, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) @@ -949,8 +949,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err) ginkgo.By("Connecting to the kubelet endpoint") - cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) - framework.ExpectNoError(err, "GetV1Client() failed err %v", err) + cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) + framework.ExpectNoError(err, "GetClient() failed err %v", err) defer conn.Close() tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 8c2a15bbb04..962e4943ea8 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -150,7 +150,7 @@ func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResour if err != nil { return nil, fmt.Errorf("Error getting local endpoint: %w", err) } - client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + client, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint)) if err != nil { return nil, fmt.Errorf("Error getting gRPC client: %w", err) }