dynamic resource allocation: reuse gRPC connection

Signed-off-by: TommyStarK <thomasmilox@gmail.com>
This commit is contained in:
TommyStarK 2023-07-12 21:02:32 +02:00
parent 90c362b343
commit 7ffd3063ce
3 changed files with 52 additions and 63 deletions

View File

@ -18,15 +18,11 @@ package plugin
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
grpcstatus "google.golang.org/grpc/status"
"k8s.io/klog/v2"
@ -36,39 +32,10 @@ import (
const PluginClientTimeout = 45 * time.Second
// Strongly typed address.
type draAddr string
// draPluginClient encapsulates all dra plugin methods.
type draPluginClient struct {
pluginName string
addr draAddr
nodeClientCreator nodeClientCreator
}
var _ drapb.NodeClient = &draPluginClient{}
type nodeClientCreator func(addr draAddr) (
nodeClient drapb.NodeClient,
nodeClientOld drapbv1alpha2.NodeClient,
closer io.Closer,
err error,
)
// newNodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeClientCreator, used in
// newDRAPluginClient.
func newNodeClient(addr draAddr) (nodeClient drapb.NodeClient, nodeClientOld drapbv1alpha2.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, nil, err
}
return drapb.NewNodeClient(conn), drapbv1alpha2.NewNodeClient(conn), conn, nil
plugin *Plugin
}
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
@ -83,8 +50,7 @@ func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
return &draPluginClient{
pluginName: pluginName,
addr: draAddr(existingPlugin.endpoint),
nodeClientCreator: newNodeClient,
plugin: existingPlugin,
}, nil
}
@ -97,15 +63,12 @@ func (r *draPluginClient) NodePrepareResources(
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err)
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodePrepareResources. nodeClientCreator is nil")
}
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
conn, err := r.plugin.GetOrCreateGRPCConn()
if err != nil {
return nil, err
}
defer closer.Close()
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
@ -150,15 +113,12 @@ func (r *draPluginClient) NodeUnprepareResources(
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err)
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodeUnprepareResources. nodeClientCreator is nil")
}
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
conn, err := r.plugin.GetOrCreateGRPCConn()
if err != nil {
return nil, err
}
defer closer.Close()
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
@ -191,16 +151,3 @@ func (r *draPluginClient) NodeUnprepareResources(
return
}
func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr)
return grpc.Dial(
string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
}

View File

@ -54,6 +54,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
draPlugins.Set(pluginName, &Plugin{
conn: nil,
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})

View File

@ -17,18 +17,59 @@ limitations under the License.
package plugin
import (
"context"
"net"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
)
// Plugin is a description of a DRA Plugin, defined by an endpoint
// and the highest DRA version supported.
type Plugin struct {
sync.RWMutex
conn *grpc.ClientConn
endpoint string
highestSupportedVersion *utilversion.Version
}
func (p *Plugin) CreateGRPCConn() error {
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint)
p.Lock()
defer p.Unlock()
conn, err := grpc.Dial(
p.endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
if err != nil {
return err
}
p.conn = conn
return nil
}
func (p *Plugin) GetGRPCConn() *grpc.ClientConn {
p.RLock()
defer p.RUnlock()
return p.conn
}
func (p *Plugin) IsConnReady() bool {
p.RLock()
defer p.RUnlock()
return p.conn != nil && p.conn.GetState() == connectivity.Ready
}
// PluginsStore holds a list of DRA Plugins.
type PluginsStore struct {
sync.RWMutex