Merge pull request #119882 from ffromani/podres-client-wait

podresources: e2e: force eager connection
This commit is contained in:
Kubernetes Prow Robot 2023-10-12 15:59:55 +02:00 committed by GitHub
commit 38a1ec75f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 23 deletions

View File

@ -32,6 +32,11 @@ import (
// Note: Consumers of the pod resources API should not be importing this package.
// They should copy paste the function in their project.
const (
DefaultTimeout = 10 * time.Second
DefaultMaxMsgSize = 1024 * 1024 * 16 // 16 MiB
)
// GetV1alpha1Client returns a client for the PodResourcesLister grpc service
// Note: This is deprecated
func GetV1alpha1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1alpha1.PodResourcesListerClient, *grpc.ClientConn, error) {
@ -70,3 +75,29 @@ func GetV1Client(socket string, connectionTimeout time.Duration, maxMsgSize int)
}
return v1.NewPodResourcesListerClient(conn), conn, nil
}
// GetClient returns a client for the recommended version of the PodResourcesLister grpc service with the recommended settings
func GetClient(endpoint string) (v1.PodResourcesListerClient, *grpc.ClientConn, error) {
return GetV1Client(endpoint, DefaultTimeout, DefaultMaxMsgSize)
}
// WaitForReady ensures the communication has been established.
// We provide a composable WaitForReady instead of setting flags in the Dialing function to enable client code flexibility.
// In general, using `grpc.Dial` with the blocking flag enabled is an anti-pattern https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md
// But things are a bit different in the very narrow case we use here, over local UNIX domain socket. The transport is very stable and lossless,
// and the most common cause for failures bubbling up is for kubelet not yet ready, which is very common in the e2e tests but much less
// in the expected normal operation.
func WaitForReady(cli v1.PodResourcesListerClient, conn *grpc.ClientConn, err error) (v1.PodResourcesListerClient, *grpc.ClientConn, error) {
if err != nil {
return cli, conn, err
}
// we use List because it's the oldest endpoint and the one guaranteed to be available.
// Note we only set WaitForReady explicitly here effectively triggering eager connection. This way we force the connection to happen
// (or fail critically) without forcing the client code to use `grpc.WaitForReady` in their code everywhere.
// TODO: evaluate more lightweight option like GetAllocatableResources - we will discard the return value anyway.
_, listErr := cli.List(context.Background(), &v1.ListPodResourcesRequest{}, grpc.WaitForReady(true))
if listErr != nil {
return cli, conn, fmt.Errorf("WaitForReady failed: %w", listErr)
}
return cli, conn, nil
}

View File

@ -176,7 +176,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
sd := setupSRIOVConfigOrFail(ctx, f, configMap)
waitForSRIOVResources(ctx, f, sd)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
@ -260,7 +260,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
compareSRIOVResources(sd, sd2)
cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err = podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
defer conn.Close()

View File

@ -382,7 +382,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
defer conn.Close()
@ -522,7 +522,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
defer conn.Close()
@ -667,7 +667,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
defer conn.Close()
@ -684,7 +684,7 @@ var _ = SIGDescribe("Memory Manager [Disruptive] [Serial] [Feature:MemoryManager
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err)
defer conn.Close()

View File

@ -667,8 +667,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err: %v", err)
defer conn.Close()
waitForSRIOVResources(ctx, f, sd)
@ -696,8 +696,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err: %v", err)
defer conn.Close()
waitForSRIOVResources(ctx, f, sd)
@ -749,8 +749,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err: %v", err)
defer conn.Close()
podresourcesListTests(ctx, f, cli, nil)
@ -786,8 +786,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err: %v", err)
defer conn.Close()
// intentionally passing empty cpuset instead of onlineCPUs because with none policy
@ -802,8 +802,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() faild err %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err %v", err)
defer conn.Close()
ginkgo.By("checking Get fail if the feature gate is not enabled")
@ -848,8 +848,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err: %v", err)
defer conn.Close()
ginkgo.By("checking List and resources topology unaware resource should be without topology")
@ -904,8 +904,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err %v", err)
defer conn.Close()
_, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
@ -969,8 +969,8 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
framework.ExpectNoError(err, "LocalEndpoint() failed err %v", err)
ginkgo.By("Connecting to the kubelet endpoint")
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err %v", err)
cli, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
framework.ExpectNoError(err, "GetClient() failed err %v", err)
defer conn.Close()
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens

View File

@ -150,7 +150,7 @@ func getV1NodeDevices(ctx context.Context) (*kubeletpodresourcesv1.ListPodResour
if err != nil {
return nil, fmt.Errorf("Error getting local endpoint: %w", err)
}
client, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
client, conn, err := podresources.WaitForReady(podresources.GetClient(endpoint))
if err != nil {
return nil, fmt.Errorf("Error getting gRPC client: %w", err)
}