From 7ffd3063cef280b6221ebe51bb1962ffb01f617e Mon Sep 17 00:00:00 2001 From: TommyStarK Date: Wed, 12 Jul 2023 21:02:32 +0200 Subject: [PATCH] dynamic resource allocation: reuse gRPC connection Signed-off-by: TommyStarK --- pkg/kubelet/cm/dra/plugin/client.go | 73 +++------------------- pkg/kubelet/cm/dra/plugin/plugin.go | 1 + pkg/kubelet/cm/dra/plugin/plugins_store.go | 41 ++++++++++++ 3 files changed, 52 insertions(+), 63 deletions(-) diff --git a/pkg/kubelet/cm/dra/plugin/client.go b/pkg/kubelet/cm/dra/plugin/client.go index a18dcba2172..b33620616d2 100644 --- a/pkg/kubelet/cm/dra/plugin/client.go +++ b/pkg/kubelet/cm/dra/plugin/client.go @@ -18,15 +18,11 @@ package plugin import ( "context" - "errors" "fmt" - "io" - "net" "time" "google.golang.org/grpc" grpccodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" grpcstatus "google.golang.org/grpc/status" "k8s.io/klog/v2" @@ -36,39 +32,10 @@ import ( const PluginClientTimeout = 45 * time.Second -// Strongly typed address. -type draAddr string - // draPluginClient encapsulates all dra plugin methods. type draPluginClient struct { - pluginName string - addr draAddr - nodeClientCreator nodeClientCreator -} - -var _ drapb.NodeClient = &draPluginClient{} - -type nodeClientCreator func(addr draAddr) ( - nodeClient drapb.NodeClient, - nodeClientOld drapbv1alpha2.NodeClient, - closer io.Closer, - err error, -) - -// newNodeClient creates a new NodeClient with the internally used gRPC -// connection set up. It also returns a closer which must be called to close -// the gRPC connection when the NodeClient is not used anymore. -// This is the default implementation for the nodeClientCreator, used in -// newDRAPluginClient. -func newNodeClient(addr draAddr) (nodeClient drapb.NodeClient, nodeClientOld drapbv1alpha2.NodeClient, closer io.Closer, err error) { - var conn *grpc.ClientConn - - conn, err = newGrpcConn(addr) - if err != nil { - return nil, nil, nil, err - } - - return drapb.NewNodeClient(conn), drapbv1alpha2.NewNodeClient(conn), conn, nil + pluginName string + plugin *Plugin } func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) { @@ -82,9 +49,8 @@ func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) { } return &draPluginClient{ - pluginName: pluginName, - addr: draAddr(existingPlugin.endpoint), - nodeClientCreator: newNodeClient, + pluginName: pluginName, + plugin: existingPlugin, }, nil } @@ -97,15 +63,12 @@ func (r *draPluginClient) NodePrepareResources( logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req) defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err) - if r.nodeClientCreator == nil { - return nil, errors.New("failed to call NodePrepareResources. nodeClientCreator is nil") - } - - nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr) + conn, err := r.plugin.GetOrCreateGRPCConn() if err != nil { return nil, err } - defer closer.Close() + nodeClient := drapb.NewNodeClient(conn) + nodeClientOld := drapbv1alpha2.NewNodeClient(conn) ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout) defer cancel() @@ -150,15 +113,12 @@ func (r *draPluginClient) NodeUnprepareResources( logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req) defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err) - if r.nodeClientCreator == nil { - return nil, errors.New("failed to call NodeUnprepareResources. nodeClientCreator is nil") - } - - nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr) + conn, err := r.plugin.GetOrCreateGRPCConn() if err != nil { return nil, err } - defer closer.Close() + nodeClient := drapb.NewNodeClient(conn) + nodeClientOld := drapbv1alpha2.NewNodeClient(conn) ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout) defer cancel() @@ -191,16 +151,3 @@ func (r *draPluginClient) NodeUnprepareResources( return } - -func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) { - network := "unix" - klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr) - - return grpc.Dial( - string(addr), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, network, target) - }), - ) -} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 0f2a1ff4cb4..b5eabe4e6bb 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -54,6 +54,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. draPlugins.Set(pluginName, &Plugin{ + conn: nil, endpoint: endpoint, highestSupportedVersion: highestSupportedVersion, }) diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index 90adb702ac7..c90bc758110 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -17,18 +17,59 @@ limitations under the License. package plugin import ( + "context" + "net" "sync" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/klog/v2" ) // Plugin is a description of a DRA Plugin, defined by an endpoint // and the highest DRA version supported. type Plugin struct { + sync.RWMutex + conn *grpc.ClientConn endpoint string highestSupportedVersion *utilversion.Version } +func (p *Plugin) CreateGRPCConn() error { + network := "unix" + klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint) + + p.Lock() + defer p.Unlock() + conn, err := grpc.Dial( + p.endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, target) + }), + ) + if err != nil { + return err + } + + p.conn = conn + return nil +} + +func (p *Plugin) GetGRPCConn() *grpc.ClientConn { + p.RLock() + defer p.RUnlock() + return p.conn +} + +func (p *Plugin) IsConnReady() bool { + p.RLock() + defer p.RUnlock() + return p.conn != nil && p.conn.GetState() == connectivity.Ready +} + // PluginsStore holds a list of DRA Plugins. type PluginsStore struct { sync.RWMutex