Merge pull request #118619 from TommyStarK/gh_113832

dynamic resource allocation: reuse gRPC connection
This commit is contained in:
Kubernetes Prow Robot 2023-08-16 09:32:27 -07:00 committed by GitHub
commit 19deb04a90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 202 additions and 63 deletions

View File

@ -18,15 +18,11 @@ package plugin
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
grpcstatus "google.golang.org/grpc/status"
"k8s.io/klog/v2"
@ -36,39 +32,10 @@ import (
const PluginClientTimeout = 45 * time.Second
// Strongly typed address.
type draAddr string
// draPluginClient encapsulates all dra plugin methods.
type draPluginClient struct {
pluginName string
addr draAddr
nodeClientCreator nodeClientCreator
}
var _ drapb.NodeClient = &draPluginClient{}
type nodeClientCreator func(addr draAddr) (
nodeClient drapb.NodeClient,
nodeClientOld drapbv1alpha2.NodeClient,
closer io.Closer,
err error,
)
// newNodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeClientCreator, used in
// newDRAPluginClient.
func newNodeClient(addr draAddr) (nodeClient drapb.NodeClient, nodeClientOld drapbv1alpha2.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, nil, err
}
return drapb.NewNodeClient(conn), drapbv1alpha2.NewNodeClient(conn), conn, nil
plugin *Plugin
}
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
@ -83,8 +50,7 @@ func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
return &draPluginClient{
pluginName: pluginName,
addr: draAddr(existingPlugin.endpoint),
nodeClientCreator: newNodeClient,
plugin: existingPlugin,
}, nil
}
@ -97,15 +63,12 @@ func (r *draPluginClient) NodePrepareResources(
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err)
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodePrepareResources. nodeClientCreator is nil")
}
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
conn, err := r.plugin.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
defer closer.Close()
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
@ -150,15 +113,12 @@ func (r *draPluginClient) NodeUnprepareResources(
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err)
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodeUnprepareResources. nodeClientCreator is nil")
}
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
conn, err := r.plugin.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
defer closer.Close()
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
@ -191,16 +151,3 @@ func (r *draPluginClient) NodeUnprepareResources(
return
}
func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr)
return grpc.Dial(
string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"context"
"net"
"os"
"path/filepath"
"sync"
"testing"
"google.golang.org/grpc"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
type fakeGRPCServer struct {
drapbv1.UnimplementedNodeServer
}
func (f *fakeGRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
return &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{"dummy": {CDIDevices: []string{"dummy"}}}}, nil
}
func (f *fakeGRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
return &drapbv1.NodeUnprepareResourcesResponse{}, nil
}
type tearDown func()
func setupFakeGRPCServer() (string, tearDown, error) {
p, err := os.MkdirTemp("", "dra_plugin")
if err != nil {
return "", nil, err
}
closeCh := make(chan struct{})
addr := filepath.Join(p, "server.sock")
teardown := func() {
close(closeCh)
os.RemoveAll(addr)
}
listener, err := net.Listen("unix", addr)
if err != nil {
teardown()
return "", nil, err
}
s := grpc.NewServer()
fakeGRPCServer := &fakeGRPCServer{}
drapbv1.RegisterNodeServer(s, fakeGRPCServer)
go func() {
go s.Serve(listener)
<-closeCh
s.GracefulStop()
}()
return addr, teardown, nil
}
func TestGRPCConnIsReused(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer()
if err != nil {
t.Fatal(err)
}
defer teardown()
reusedConns := make(map[*grpc.ClientConn]int)
wg := sync.WaitGroup{}
m := sync.Mutex{}
plugin := &Plugin{
endpoint: addr,
}
conn, err := plugin.getOrCreateGRPCConn()
defer func() {
err := conn.Close()
if err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
// ensure the plugin we are using is registered
draPlugins.Set("dummy-plugin", plugin)
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client, err := NewDRAPluginClient("dummy-plugin")
if err != nil {
t.Error(err)
return
}
req := &drapbv1.NodePrepareResourcesRequest{
Claims: []*drapbv1.Claim{
{
Namespace: "dummy-namespace",
Uid: "dummy-uid",
Name: "dummy-claim",
ResourceHandle: "dummy-resource",
},
},
}
client.NodePrepareResources(context.TODO(), req)
client.(*draPluginClient).plugin.Lock()
conn := client.(*draPluginClient).plugin.conn
client.(*draPluginClient).plugin.Unlock()
m.Lock()
defer m.Unlock()
reusedConns[conn]++
}()
}
wg.Wait()
// We should have only one entry otherwise it means another gRPC connection has been created
if len(reusedConns) != 1 {
t.Errorf("expected length to be 1 but got %d", len(reusedConns))
}
if counter, ok := reusedConns[conn]; ok && counter != 2 {
t.Errorf("expected counter to be 2 but got %d", counter)
}
draPlugins.Delete("dummy-plugin")
}

View File

@ -54,6 +54,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
draPlugins.Set(pluginName, &Plugin{
conn: nil,
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})

View File

@ -17,18 +17,60 @@ limitations under the License.
package plugin
import (
"context"
"errors"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
)
// Plugin is a description of a DRA Plugin, defined by an endpoint
// and the highest DRA version supported.
type Plugin struct {
sync.RWMutex
conn *grpc.ClientConn
endpoint string
highestSupportedVersion *utilversion.Version
}
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
p.Lock()
defer p.Unlock()
if p.conn != nil {
return p.conn, nil
}
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint)
conn, err := grpc.Dial(
p.endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
return nil, errors.New("timed out waiting for gRPC connection to be ready")
}
p.conn = conn
return p.conn, nil
}
// PluginsStore holds a list of DRA Plugins.
type PluginsStore struct {
sync.RWMutex