From 60a8bca507fe9feafdd79c1df8d7470421637080 Mon Sep 17 00:00:00 2001 From: TommyStarK Date: Wed, 12 Jul 2023 21:03:22 +0200 Subject: [PATCH] dynamic resource allocation: add unit test to check the reuse of the gRPC connection Signed-off-by: TommyStarK --- pkg/kubelet/cm/dra/plugin/client.go | 4 +- pkg/kubelet/cm/dra/plugin/client_test.go | 149 +++++++++++++++++++++ pkg/kubelet/cm/dra/plugin/plugins_store.go | 37 ++--- 3 files changed, 170 insertions(+), 20 deletions(-) create mode 100644 pkg/kubelet/cm/dra/plugin/client_test.go diff --git a/pkg/kubelet/cm/dra/plugin/client.go b/pkg/kubelet/cm/dra/plugin/client.go index b33620616d2..e3a1e96756c 100644 --- a/pkg/kubelet/cm/dra/plugin/client.go +++ b/pkg/kubelet/cm/dra/plugin/client.go @@ -63,7 +63,7 @@ func (r *draPluginClient) NodePrepareResources( logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req) defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err) - conn, err := r.plugin.GetOrCreateGRPCConn() + conn, err := r.plugin.getOrCreateGRPCConn() if err != nil { return nil, err } @@ -113,7 +113,7 @@ func (r *draPluginClient) NodeUnprepareResources( logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req) defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err) - conn, err := r.plugin.GetOrCreateGRPCConn() + conn, err := r.plugin.getOrCreateGRPCConn() if err != nil { return nil, err } diff --git a/pkg/kubelet/cm/dra/plugin/client_test.go b/pkg/kubelet/cm/dra/plugin/client_test.go new file mode 100644 index 00000000000..18c37a1d1ee --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/client_test.go @@ -0,0 +1,149 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "net" + "os" + "path/filepath" + "sync" + "testing" + + "google.golang.org/grpc" + drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha3" +) + +type fakeGRPCServer struct { + drapbv1.UnimplementedNodeServer +} + +func (f *fakeGRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) { + return &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{"dummy": {CDIDevices: []string{"dummy"}}}}, nil +} + +func (f *fakeGRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) { + return &drapbv1.NodeUnprepareResourcesResponse{}, nil +} + +type tearDown func() + +func setupFakeGRPCServer() (string, tearDown, error) { + p, err := os.MkdirTemp("", "dra_plugin") + if err != nil { + return "", nil, err + } + + closeCh := make(chan struct{}) + addr := filepath.Join(p, "server.sock") + teardown := func() { + close(closeCh) + os.RemoveAll(addr) + } + + listener, err := net.Listen("unix", addr) + if err != nil { + teardown() + return "", nil, err + } + + s := grpc.NewServer() + fakeGRPCServer := &fakeGRPCServer{} + drapbv1.RegisterNodeServer(s, fakeGRPCServer) + + go func() { + go s.Serve(listener) + <-closeCh + s.GracefulStop() + }() + + return addr, teardown, nil +} + +func TestGRPCConnIsReused(t *testing.T) { + addr, teardown, err := setupFakeGRPCServer() + if err != nil { + t.Fatal(err) + } + defer teardown() + + reusedConns := make(map[*grpc.ClientConn]int) + wg := sync.WaitGroup{} + m := sync.Mutex{} + + plugin := &Plugin{ + endpoint: addr, + } + + conn, err := plugin.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + // ensure the plugin we are using is registered + draPlugins.Set("dummy-plugin", plugin) + + // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Error(err) + return + } + + req := &drapbv1.NodePrepareResourcesRequest{ + Claims: []*drapbv1.Claim{ + { + Namespace: "dummy-namespace", + Uid: "dummy-uid", + Name: "dummy-claim", + ResourceHandle: "dummy-resource", + }, + }, + } + client.NodePrepareResources(context.TODO(), req) + + client.(*draPluginClient).plugin.Lock() + conn := client.(*draPluginClient).plugin.conn + client.(*draPluginClient).plugin.Unlock() + + m.Lock() + defer m.Unlock() + reusedConns[conn]++ + }() + } + + wg.Wait() + // We should have only one entry otherwise it means another gRPC connection has been created + if len(reusedConns) != 1 { + t.Errorf("expected length to be 1 but got %d", len(reusedConns)) + } + if counter, ok := reusedConns[conn]; ok && counter != 2 { + t.Errorf("expected counter to be 2 but got %d", counter) + } + + draPlugins.Delete("dummy-plugin") +} diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index c90bc758110..32f750af80d 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -18,8 +18,10 @@ package plugin import ( "context" + "errors" "net" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -37,12 +39,16 @@ type Plugin struct { highestSupportedVersion *utilversion.Version } -func (p *Plugin) CreateGRPCConn() error { - network := "unix" - klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint) - +func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) { p.Lock() defer p.Unlock() + + if p.conn != nil { + return p.conn, nil + } + + network := "unix" + klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint) conn, err := grpc.Dial( p.endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -51,23 +57,18 @@ func (p *Plugin) CreateGRPCConn() error { }), ) if err != nil { - return err + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok { + return nil, errors.New("timed out waiting for gRPC connection to be ready") } p.conn = conn - return nil -} - -func (p *Plugin) GetGRPCConn() *grpc.ClientConn { - p.RLock() - defer p.RUnlock() - return p.conn -} - -func (p *Plugin) IsConnReady() bool { - p.RLock() - defer p.RUnlock() - return p.conn != nil && p.conn.GetState() == connectivity.Ready + return p.conn, nil } // PluginsStore holds a list of DRA Plugins.