From 7b33495d9db7be16f6f2c98044b1ef030ea49df8 Mon Sep 17 00:00:00 2001 From: carlory Date: Tue, 27 Aug 2024 00:54:37 +0800 Subject: [PATCH 1/2] DRA: rename pkg/cm/dra/plugin files --- pkg/kubelet/cm/dra/plugin/client.go | 146 --------- pkg/kubelet/cm/dra/plugin/client_test.go | 279 ----------------- pkg/kubelet/cm/dra/plugin/plugin.go | 296 ++++++------------ pkg/kubelet/cm/dra/plugin/plugin_test.go | 272 +++++++++++++--- pkg/kubelet/cm/dra/plugin/registration.go | 234 ++++++++++++++ .../cm/dra/plugin/registration_test.go | 91 ++++++ 6 files changed, 659 insertions(+), 659 deletions(-) delete mode 100644 pkg/kubelet/cm/dra/plugin/client.go delete mode 100644 pkg/kubelet/cm/dra/plugin/client_test.go create mode 100644 pkg/kubelet/cm/dra/plugin/registration.go create mode 100644 pkg/kubelet/cm/dra/plugin/registration_test.go diff --git a/pkg/kubelet/cm/dra/plugin/client.go b/pkg/kubelet/cm/dra/plugin/client.go deleted file mode 100644 index 01a9de105c8..00000000000 --- a/pkg/kubelet/cm/dra/plugin/client.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package plugin - -import ( - "context" - "errors" - "fmt" - "net" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - - utilversion "k8s.io/apimachinery/pkg/util/version" - "k8s.io/klog/v2" - drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" -) - -// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA -// driver kubelet plugin which need to be called by kubelet. The wrapper -// handles gRPC connection management and logging. Connections are reused -// across different NewDRAPluginClient calls. -func NewDRAPluginClient(pluginName string) (*Plugin, error) { - if pluginName == "" { - return nil, fmt.Errorf("plugin name is empty") - } - - existingPlugin := draPlugins.get(pluginName) - if existingPlugin == nil { - return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName) - } - - return existingPlugin, nil -} - -type Plugin struct { - backgroundCtx context.Context - cancel func(cause error) - - mutex sync.Mutex - conn *grpc.ClientConn - endpoint string - highestSupportedVersion *utilversion.Version - clientCallTimeout time.Duration -} - -func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) { - p.mutex.Lock() - defer p.mutex.Unlock() - - if p.conn != nil { - return p.conn, nil - } - - ctx := p.backgroundCtx - logger := klog.FromContext(ctx) - - network := "unix" - logger.V(4).Info("Creating new gRPC connection", "protocol", network, "endpoint", p.endpoint) - // grpc.Dial is deprecated. grpc.NewClient should be used instead. - // For now this gets ignored because this function is meant to establish - // the connection, with the one second timeout below. Perhaps that - // approach should be reconsidered? - //nolint:staticcheck - conn, err := grpc.Dial( - p.endpoint, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, network, target) - }), - ) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok { - return nil, errors.New("timed out waiting for gRPC connection to be ready") - } - - p.conn = conn - return p.conn, nil -} - -func (p *Plugin) NodePrepareResources( - ctx context.Context, - req *drapb.NodePrepareResourcesRequest, - opts ...grpc.CallOption, -) (*drapb.NodePrepareResourcesResponse, error) { - logger := klog.FromContext(ctx) - logger.V(4).Info("Calling NodePrepareResources rpc", "request", req) - - conn, err := p.getOrCreateGRPCConn() - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(ctx, p.clientCallTimeout) - defer cancel() - - nodeClient := drapb.NewNodeClient(conn) - response, err := nodeClient.NodePrepareResources(ctx, req) - logger.V(4).Info("Done calling NodePrepareResources rpc", "response", response, "err", err) - return response, err -} - -func (p *Plugin) NodeUnprepareResources( - ctx context.Context, - req *drapb.NodeUnprepareResourcesRequest, - opts ...grpc.CallOption, -) (*drapb.NodeUnprepareResourcesResponse, error) { - logger := klog.FromContext(ctx) - logger.V(4).Info("Calling NodeUnprepareResource rpc", "request", req) - - conn, err := p.getOrCreateGRPCConn() - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(ctx, p.clientCallTimeout) - defer cancel() - - nodeClient := drapb.NewNodeClient(conn) - response, err := nodeClient.NodeUnprepareResources(ctx, req) - logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err) - return response, err -} diff --git a/pkg/kubelet/cm/dra/plugin/client_test.go b/pkg/kubelet/cm/dra/plugin/client_test.go deleted file mode 100644 index d78ed6bd393..00000000000 --- a/pkg/kubelet/cm/dra/plugin/client_test.go +++ /dev/null @@ -1,279 +0,0 @@ -/* -Copyright 2023 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package plugin - -import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" - "k8s.io/kubernetes/test/utils/ktesting" -) - -const ( - v1alpha4Version = "v1alpha4" -) - -type fakeV1alpha4GRPCServer struct { - drapb.UnimplementedNodeServer -} - -var _ drapb.NodeServer = &fakeV1alpha4GRPCServer{} - -func (f *fakeV1alpha4GRPCServer) NodePrepareResources(ctx context.Context, in *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { - return &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{"claim-uid": { - Devices: []*drapb.Device{ - { - RequestNames: []string{"test-request"}, - CDIDeviceIDs: []string{"test-cdi-id"}, - }, - }, - }}}, nil -} - -func (f *fakeV1alpha4GRPCServer) NodeUnprepareResources(ctx context.Context, in *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - - return &drapb.NodeUnprepareResourcesResponse{}, nil -} - -type tearDown func() - -func setupFakeGRPCServer(version string) (string, tearDown, error) { - p, err := os.MkdirTemp("", "dra_plugin") - if err != nil { - return "", nil, err - } - - closeCh := make(chan struct{}) - addr := filepath.Join(p, "server.sock") - teardown := func() { - close(closeCh) - os.RemoveAll(addr) - } - - listener, err := net.Listen("unix", addr) - if err != nil { - teardown() - return "", nil, err - } - - s := grpc.NewServer() - switch version { - case v1alpha4Version: - fakeGRPCServer := &fakeV1alpha4GRPCServer{} - drapb.RegisterNodeServer(s, fakeGRPCServer) - default: - return "", nil, fmt.Errorf("unsupported version: %s", version) - } - - go func() { - go s.Serve(listener) - <-closeCh - s.GracefulStop() - }() - - return addr, teardown, nil -} - -func TestGRPCConnIsReused(t *testing.T) { - tCtx := ktesting.Init(t) - addr, teardown, err := setupFakeGRPCServer(v1alpha4Version) - if err != nil { - t.Fatal(err) - } - defer teardown() - - reusedConns := make(map[*grpc.ClientConn]int) - wg := sync.WaitGroup{} - m := sync.Mutex{} - - p := &Plugin{ - backgroundCtx: tCtx, - endpoint: addr, - clientCallTimeout: defaultClientCallTimeout, - } - - conn, err := p.getOrCreateGRPCConn() - defer func() { - err := conn.Close() - if err != nil { - t.Error(err) - } - }() - if err != nil { - t.Fatal(err) - } - - // ensure the plugin we are using is registered - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") - - // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused - for i := 0; i < 2; i++ { - wg.Add(1) - go func() { - defer wg.Done() - client, err := NewDRAPluginClient("dummy-plugin") - if err != nil { - t.Error(err) - return - } - - req := &drapb.NodePrepareResourcesRequest{ - Claims: []*drapb.Claim{ - { - Namespace: "dummy-namespace", - UID: "dummy-uid", - Name: "dummy-claim", - }, - }, - } - - _, err = client.NodePrepareResources(tCtx, req) - assert.NoError(t, err) - - client.mutex.Lock() - conn := client.conn - client.mutex.Unlock() - - m.Lock() - defer m.Unlock() - reusedConns[conn]++ - }() - } - - wg.Wait() - // We should have only one entry otherwise it means another gRPC connection has been created - if len(reusedConns) != 1 { - t.Errorf("expected length to be 1 but got %d", len(reusedConns)) - } - if counter, ok := reusedConns[conn]; ok && counter != 2 { - t.Errorf("expected counter to be 2 but got %d", counter) - } -} - -func TestNewDRAPluginClient(t *testing.T) { - for _, test := range []struct { - description string - setup func(string) tearDown - pluginName string - shouldError bool - }{ - { - description: "plugin name is empty", - setup: func(_ string) tearDown { - return func() {} - }, - pluginName: "", - shouldError: true, - }, - { - description: "plugin name not found in the list", - setup: func(_ string) tearDown { - return func() {} - }, - pluginName: "plugin-name-not-found-in-the-list", - shouldError: true, - }, - { - description: "plugin exists", - setup: func(name string) tearDown { - draPlugins.add(name, &Plugin{}) - return func() { - draPlugins.delete(name) - } - }, - pluginName: "dummy-plugin", - }, - } { - t.Run(test.description, func(t *testing.T) { - teardown := test.setup(test.pluginName) - defer teardown() - - client, err := NewDRAPluginClient(test.pluginName) - if test.shouldError { - assert.Nil(t, client) - assert.Error(t, err) - } else { - assert.NotNil(t, client) - assert.Nil(t, err) - } - }) - } -} - -func TestNodeUnprepareResources(t *testing.T) { - for _, test := range []struct { - description string - serverSetup func(string) (string, tearDown, error) - serverVersion string - request *drapb.NodeUnprepareResourcesRequest - }{ - { - description: "server supports v1alpha4", - serverSetup: setupFakeGRPCServer, - serverVersion: v1alpha4Version, - request: &drapb.NodeUnprepareResourcesRequest{}, - }, - } { - t.Run(test.description, func(t *testing.T) { - tCtx := ktesting.Init(t) - addr, teardown, err := setupFakeGRPCServer(test.serverVersion) - if err != nil { - t.Fatal(err) - } - defer teardown() - - p := &Plugin{ - backgroundCtx: tCtx, - endpoint: addr, - clientCallTimeout: defaultClientCallTimeout, - } - - conn, err := p.getOrCreateGRPCConn() - defer func() { - err := conn.Close() - if err != nil { - t.Error(err) - } - }() - if err != nil { - t.Fatal(err) - } - - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") - - client, err := NewDRAPluginClient("dummy-plugin") - if err != nil { - t.Fatal(err) - } - - _, err = client.NodeUnprepareResources(tCtx, test.request) - if err != nil { - t.Fatal(err) - } - }) - } -} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 99e577a4259..01a9de105c8 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -20,215 +20,127 @@ import ( "context" "errors" "fmt" + "net" + "sync" "time" - v1 "k8s.io/api/core/v1" - resourceapi "k8s.io/api/resource/v1alpha3" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + utilversion "k8s.io/apimachinery/pkg/util/version" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" ) -// defaultClientCallTimeout is the default amount of time that a DRA driver has -// to respond to any of the gRPC calls. kubelet uses this value by passing nil -// to RegisterPlugin. Some tests use a different, usually shorter timeout to -// speed up testing. -// -// This is half of the kubelet retry period (according to -// https://github.com/kubernetes/kubernetes/commit/0449cef8fd5217d394c5cd331d852bd50983e6b3). -const defaultClientCallTimeout = 45 * time.Second - -// RegistrationHandler is the handler which is fed to the pluginwatcher API. -type RegistrationHandler struct { - // backgroundCtx is used for all future activities of the handler. - // This is necessary because it implements APIs which don't - // provide a context. - backgroundCtx context.Context - kubeClient kubernetes.Interface - getNode func() (*v1.Node, error) -} - -var _ cache.PluginHandler = &RegistrationHandler{} - -// NewPluginHandler returns new registration handler. -// -// Must only be called once per process because it manages global state. -// If a kubeClient is provided, then it synchronizes ResourceSlices -// with the resource information provided by plugins. -func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { - handler := &RegistrationHandler{ - // The context and thus logger should come from the caller. - backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")), - kubeClient: kubeClient, - getNode: getNode, - } - - // When kubelet starts up, no DRA driver has registered yet. None of - // the drivers are usable until they come back, which might not happen - // at all. Therefore it is better to not advertise any local resources - // because pods could get stuck on the node waiting for the driver - // to start up. - // - // This has to run in the background. - go handler.wipeResourceSlices("") - - return handler -} - -// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver. -func (h *RegistrationHandler) wipeResourceSlices(driver string) { - if h.kubeClient == nil { - return - } - ctx := h.backgroundCtx - logger := klog.FromContext(ctx) - - backoff := wait.Backoff{ - Duration: time.Second, - Factor: 2, - Jitter: 0.2, - Cap: 5 * time.Minute, - Steps: 100, - } - - // Error logging is done inside the loop. Context cancellation doesn't get logged. - _ = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { - node, err := h.getNode() - if apierrors.IsNotFound(err) { - return false, nil - } - if err != nil { - logger.Error(err, "Unexpected error checking for node") - return false, nil - } - fieldSelector := fields.Set{resourceapi.ResourceSliceSelectorNodeName: node.Name} - if driver != "" { - fieldSelector[resourceapi.ResourceSliceSelectorDriver] = driver - } - - err = h.kubeClient.ResourceV1alpha3().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: fieldSelector.String()}) - switch { - case err == nil: - logger.V(3).Info("Deleted ResourceSlices", "fieldSelector", fieldSelector) - return true, nil - case apierrors.IsUnauthorized(err): - // This can happen while kubelet is still figuring out - // its credentials. - logger.V(5).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err) - return false, nil - default: - // Log and retry for other errors. - logger.V(3).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err) - return false, nil - } - }) -} - -// RegisterPlugin is called when a plugin can be registered. -func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { - // Prepare a context with its own logger for the plugin. - // - // The lifecycle of the plugin's background activities is tied to our - // root context, so canceling that will also cancel the plugin. - // - // The logger injects the plugin name as additional value - // into all log output related to the plugin. - ctx := h.backgroundCtx - logger := klog.FromContext(ctx) - logger = klog.LoggerWithValues(logger, "pluginName", pluginName) - ctx = klog.NewContext(ctx, logger) - - logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) - - highestSupportedVersion, err := h.validateVersions(pluginName, versions) - if err != nil { - return fmt.Errorf("version check of plugin %s failed: %w", pluginName, err) - } - - var timeout time.Duration - if pluginClientTimeout == nil { - timeout = defaultClientCallTimeout - } else { - timeout = *pluginClientTimeout - } - - ctx, cancel := context.WithCancelCause(ctx) - - pluginInstance := &Plugin{ - backgroundCtx: ctx, - cancel: cancel, - conn: nil, - endpoint: endpoint, - highestSupportedVersion: highestSupportedVersion, - clientCallTimeout: timeout, - } - - // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key - // all other DRA components will be able to get the actual socket of DRA plugins by its name. - if draPlugins.add(pluginName, pluginInstance) { - logger.V(1).Info("Already registered, previous plugin was replaced") - } - - return nil -} - -func (h *RegistrationHandler) validateVersions( - pluginName string, - versions []string, -) (*utilversion.Version, error) { - if len(versions) == 0 { - return nil, errors.New("empty list for supported versions") - } - - // Validate version - newPluginHighestVersion, err := utilversion.HighestSupportedVersion(versions) - if err != nil { - // HighestSupportedVersion includes the list of versions in its error - // if relevant, no need to repeat it here. - return nil, fmt.Errorf("none of the versions are supported: %w", err) +// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA +// driver kubelet plugin which need to be called by kubelet. The wrapper +// handles gRPC connection management and logging. Connections are reused +// across different NewDRAPluginClient calls. +func NewDRAPluginClient(pluginName string) (*Plugin, error) { + if pluginName == "" { + return nil, fmt.Errorf("plugin name is empty") } existingPlugin := draPlugins.get(pluginName) if existingPlugin == nil { - return newPluginHighestVersion, nil + return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName) } - if existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) { - return newPluginHighestVersion, nil - } - return nil, fmt.Errorf("another plugin instance is already registered with a higher supported version: %q < %q", newPluginHighestVersion, existingPlugin.highestSupportedVersion) + + return existingPlugin, nil } -// DeRegisterPlugin is called when a plugin has removed its socket, -// signaling it is no longer available. -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - if p := draPlugins.delete(pluginName); p != nil { - logger := klog.FromContext(p.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) +type Plugin struct { + backgroundCtx context.Context + cancel func(cause error) - // Clean up the ResourceSlices for the deleted Plugin since it - // may have died without doing so itself and might never come - // back. - go h.wipeResourceSlices(pluginName) - - return - } - - logger := klog.FromContext(h.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin not necessary, was already removed") + mutex sync.Mutex + conn *grpc.ClientConn + endpoint string + highestSupportedVersion *utilversion.Version + clientCallTimeout time.Duration } -// ValidatePlugin is called by kubelet's plugin watcher upon detection -// of a new registration socket opened by DRA plugin. -func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { - _, err := h.validateVersions(pluginName, versions) +func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.conn != nil { + return p.conn, nil + } + + ctx := p.backgroundCtx + logger := klog.FromContext(ctx) + + network := "unix" + logger.V(4).Info("Creating new gRPC connection", "protocol", network, "endpoint", p.endpoint) + // grpc.Dial is deprecated. grpc.NewClient should be used instead. + // For now this gets ignored because this function is meant to establish + // the connection, with the one second timeout below. Perhaps that + // approach should be reconsidered? + //nolint:staticcheck + conn, err := grpc.Dial( + p.endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, target) + }), + ) if err != nil { - return fmt.Errorf("invalid versions of plugin %s: %w", pluginName, err) + return nil, err } - return err + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok { + return nil, errors.New("timed out waiting for gRPC connection to be ready") + } + + p.conn = conn + return p.conn, nil +} + +func (p *Plugin) NodePrepareResources( + ctx context.Context, + req *drapb.NodePrepareResourcesRequest, + opts ...grpc.CallOption, +) (*drapb.NodePrepareResourcesResponse, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Calling NodePrepareResources rpc", "request", req) + + conn, err := p.getOrCreateGRPCConn() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, p.clientCallTimeout) + defer cancel() + + nodeClient := drapb.NewNodeClient(conn) + response, err := nodeClient.NodePrepareResources(ctx, req) + logger.V(4).Info("Done calling NodePrepareResources rpc", "response", response, "err", err) + return response, err +} + +func (p *Plugin) NodeUnprepareResources( + ctx context.Context, + req *drapb.NodeUnprepareResourcesRequest, + opts ...grpc.CallOption, +) (*drapb.NodeUnprepareResourcesResponse, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Calling NodeUnprepareResource rpc", "request", req) + + conn, err := p.getOrCreateGRPCConn() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, p.clientCallTimeout) + defer cancel() + + nodeClient := drapb.NewNodeClient(conn) + response, err := nodeClient.NodeUnprepareResources(ctx, req) + logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err) + return response, err } diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 7495a4d7751..d78ed6bd393 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -17,75 +17,263 @@ limitations under the License. package plugin import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + "sync" "testing" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "google.golang.org/grpc" + drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" + "k8s.io/kubernetes/test/utils/ktesting" ) -func getFakeNode() (*v1.Node, error) { - return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil +const ( + v1alpha4Version = "v1alpha4" +) + +type fakeV1alpha4GRPCServer struct { + drapb.UnimplementedNodeServer } -func TestRegistrationHandler_ValidatePlugin(t *testing.T) { - newRegistrationHandler := func() *RegistrationHandler { - return NewRegistrationHandler(nil, getFakeNode) +var _ drapb.NodeServer = &fakeV1alpha4GRPCServer{} + +func (f *fakeV1alpha4GRPCServer) NodePrepareResources(ctx context.Context, in *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { + return &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{"claim-uid": { + Devices: []*drapb.Device{ + { + RequestNames: []string{"test-request"}, + CDIDeviceIDs: []string{"test-cdi-id"}, + }, + }, + }}}, nil +} + +func (f *fakeV1alpha4GRPCServer) NodeUnprepareResources(ctx context.Context, in *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { + + return &drapb.NodeUnprepareResourcesResponse{}, nil +} + +type tearDown func() + +func setupFakeGRPCServer(version string) (string, tearDown, error) { + p, err := os.MkdirTemp("", "dra_plugin") + if err != nil { + return "", nil, err } + closeCh := make(chan struct{}) + addr := filepath.Join(p, "server.sock") + teardown := func() { + close(closeCh) + os.RemoveAll(addr) + } + + listener, err := net.Listen("unix", addr) + if err != nil { + teardown() + return "", nil, err + } + + s := grpc.NewServer() + switch version { + case v1alpha4Version: + fakeGRPCServer := &fakeV1alpha4GRPCServer{} + drapb.RegisterNodeServer(s, fakeGRPCServer) + default: + return "", nil, fmt.Errorf("unsupported version: %s", version) + } + + go func() { + go s.Serve(listener) + <-closeCh + s.GracefulStop() + }() + + return addr, teardown, nil +} + +func TestGRPCConnIsReused(t *testing.T) { + tCtx := ktesting.Init(t) + addr, teardown, err := setupFakeGRPCServer(v1alpha4Version) + if err != nil { + t.Fatal(err) + } + defer teardown() + + reusedConns := make(map[*grpc.ClientConn]int) + wg := sync.WaitGroup{} + m := sync.Mutex{} + + p := &Plugin{ + backgroundCtx: tCtx, + endpoint: addr, + clientCallTimeout: defaultClientCallTimeout, + } + + conn, err := p.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + // ensure the plugin we are using is registered + draPlugins.add("dummy-plugin", p) + defer draPlugins.delete("dummy-plugin") + + // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Error(err) + return + } + + req := &drapb.NodePrepareResourcesRequest{ + Claims: []*drapb.Claim{ + { + Namespace: "dummy-namespace", + UID: "dummy-uid", + Name: "dummy-claim", + }, + }, + } + + _, err = client.NodePrepareResources(tCtx, req) + assert.NoError(t, err) + + client.mutex.Lock() + conn := client.conn + client.mutex.Unlock() + + m.Lock() + defer m.Unlock() + reusedConns[conn]++ + }() + } + + wg.Wait() + // We should have only one entry otherwise it means another gRPC connection has been created + if len(reusedConns) != 1 { + t.Errorf("expected length to be 1 but got %d", len(reusedConns)) + } + if counter, ok := reusedConns[conn]; ok && counter != 2 { + t.Errorf("expected counter to be 2 but got %d", counter) + } +} + +func TestNewDRAPluginClient(t *testing.T) { for _, test := range []struct { description string - handler func() *RegistrationHandler + setup func(string) tearDown pluginName string - endpoint string - versions []string shouldError bool }{ { - description: "no versions provided", - handler: newRegistrationHandler, - shouldError: true, - }, - { - description: "unsupported version", - handler: newRegistrationHandler, - versions: []string{"v2.0.0"}, - shouldError: true, - }, - { - description: "plugin already registered with a higher supported version", - handler: func() *RegistrationHandler { - handler := newRegistrationHandler() - if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}, nil); err != nil { - t.Fatal(err) - } - return handler + description: "plugin name is empty", + setup: func(_ string) tearDown { + return func() {} }, - pluginName: "this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", - versions: []string{"v1.0.0"}, + pluginName: "", shouldError: true, }, { - description: "should validate the plugin", - handler: newRegistrationHandler, - pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", - versions: []string{"v1.3.0"}, + description: "plugin name not found in the list", + setup: func(_ string) tearDown { + return func() {} + }, + pluginName: "plugin-name-not-found-in-the-list", + shouldError: true, + }, + { + description: "plugin exists", + setup: func(name string) tearDown { + draPlugins.add(name, &Plugin{}) + return func() { + draPlugins.delete(name) + } + }, + pluginName: "dummy-plugin", }, } { t.Run(test.description, func(t *testing.T) { - handler := test.handler() - err := handler.ValidatePlugin(test.pluginName, test.endpoint, test.versions) + teardown := test.setup(test.pluginName) + defer teardown() + + client, err := NewDRAPluginClient(test.pluginName) if test.shouldError { + assert.Nil(t, client) assert.Error(t, err) } else { + assert.NotNil(t, client) assert.Nil(t, err) } }) } - - t.Cleanup(func() { - handler := newRegistrationHandler() - handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") - handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") - }) +} + +func TestNodeUnprepareResources(t *testing.T) { + for _, test := range []struct { + description string + serverSetup func(string) (string, tearDown, error) + serverVersion string + request *drapb.NodeUnprepareResourcesRequest + }{ + { + description: "server supports v1alpha4", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha4Version, + request: &drapb.NodeUnprepareResourcesRequest{}, + }, + } { + t.Run(test.description, func(t *testing.T) { + tCtx := ktesting.Init(t) + addr, teardown, err := setupFakeGRPCServer(test.serverVersion) + if err != nil { + t.Fatal(err) + } + defer teardown() + + p := &Plugin{ + backgroundCtx: tCtx, + endpoint: addr, + clientCallTimeout: defaultClientCallTimeout, + } + + conn, err := p.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + draPlugins.add("dummy-plugin", p) + defer draPlugins.delete("dummy-plugin") + + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Fatal(err) + } + + _, err = client.NodeUnprepareResources(tCtx, test.request) + if err != nil { + t.Fatal(err) + } + }) + } } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go new file mode 100644 index 00000000000..99e577a4259 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -0,0 +1,234 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "errors" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1alpha3" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" +) + +// defaultClientCallTimeout is the default amount of time that a DRA driver has +// to respond to any of the gRPC calls. kubelet uses this value by passing nil +// to RegisterPlugin. Some tests use a different, usually shorter timeout to +// speed up testing. +// +// This is half of the kubelet retry period (according to +// https://github.com/kubernetes/kubernetes/commit/0449cef8fd5217d394c5cd331d852bd50983e6b3). +const defaultClientCallTimeout = 45 * time.Second + +// RegistrationHandler is the handler which is fed to the pluginwatcher API. +type RegistrationHandler struct { + // backgroundCtx is used for all future activities of the handler. + // This is necessary because it implements APIs which don't + // provide a context. + backgroundCtx context.Context + kubeClient kubernetes.Interface + getNode func() (*v1.Node, error) +} + +var _ cache.PluginHandler = &RegistrationHandler{} + +// NewPluginHandler returns new registration handler. +// +// Must only be called once per process because it manages global state. +// If a kubeClient is provided, then it synchronizes ResourceSlices +// with the resource information provided by plugins. +func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { + handler := &RegistrationHandler{ + // The context and thus logger should come from the caller. + backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")), + kubeClient: kubeClient, + getNode: getNode, + } + + // When kubelet starts up, no DRA driver has registered yet. None of + // the drivers are usable until they come back, which might not happen + // at all. Therefore it is better to not advertise any local resources + // because pods could get stuck on the node waiting for the driver + // to start up. + // + // This has to run in the background. + go handler.wipeResourceSlices("") + + return handler +} + +// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver. +func (h *RegistrationHandler) wipeResourceSlices(driver string) { + if h.kubeClient == nil { + return + } + ctx := h.backgroundCtx + logger := klog.FromContext(ctx) + + backoff := wait.Backoff{ + Duration: time.Second, + Factor: 2, + Jitter: 0.2, + Cap: 5 * time.Minute, + Steps: 100, + } + + // Error logging is done inside the loop. Context cancellation doesn't get logged. + _ = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { + node, err := h.getNode() + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + logger.Error(err, "Unexpected error checking for node") + return false, nil + } + fieldSelector := fields.Set{resourceapi.ResourceSliceSelectorNodeName: node.Name} + if driver != "" { + fieldSelector[resourceapi.ResourceSliceSelectorDriver] = driver + } + + err = h.kubeClient.ResourceV1alpha3().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: fieldSelector.String()}) + switch { + case err == nil: + logger.V(3).Info("Deleted ResourceSlices", "fieldSelector", fieldSelector) + return true, nil + case apierrors.IsUnauthorized(err): + // This can happen while kubelet is still figuring out + // its credentials. + logger.V(5).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err) + return false, nil + default: + // Log and retry for other errors. + logger.V(3).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err) + return false, nil + } + }) +} + +// RegisterPlugin is called when a plugin can be registered. +func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { + // Prepare a context with its own logger for the plugin. + // + // The lifecycle of the plugin's background activities is tied to our + // root context, so canceling that will also cancel the plugin. + // + // The logger injects the plugin name as additional value + // into all log output related to the plugin. + ctx := h.backgroundCtx + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + ctx = klog.NewContext(ctx, logger) + + logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) + + highestSupportedVersion, err := h.validateVersions(pluginName, versions) + if err != nil { + return fmt.Errorf("version check of plugin %s failed: %w", pluginName, err) + } + + var timeout time.Duration + if pluginClientTimeout == nil { + timeout = defaultClientCallTimeout + } else { + timeout = *pluginClientTimeout + } + + ctx, cancel := context.WithCancelCause(ctx) + + pluginInstance := &Plugin{ + backgroundCtx: ctx, + cancel: cancel, + conn: nil, + endpoint: endpoint, + highestSupportedVersion: highestSupportedVersion, + clientCallTimeout: timeout, + } + + // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key + // all other DRA components will be able to get the actual socket of DRA plugins by its name. + if draPlugins.add(pluginName, pluginInstance) { + logger.V(1).Info("Already registered, previous plugin was replaced") + } + + return nil +} + +func (h *RegistrationHandler) validateVersions( + pluginName string, + versions []string, +) (*utilversion.Version, error) { + if len(versions) == 0 { + return nil, errors.New("empty list for supported versions") + } + + // Validate version + newPluginHighestVersion, err := utilversion.HighestSupportedVersion(versions) + if err != nil { + // HighestSupportedVersion includes the list of versions in its error + // if relevant, no need to repeat it here. + return nil, fmt.Errorf("none of the versions are supported: %w", err) + } + + existingPlugin := draPlugins.get(pluginName) + if existingPlugin == nil { + return newPluginHighestVersion, nil + } + if existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) { + return newPluginHighestVersion, nil + } + return nil, fmt.Errorf("another plugin instance is already registered with a higher supported version: %q < %q", newPluginHighestVersion, existingPlugin.highestSupportedVersion) +} + +// DeRegisterPlugin is called when a plugin has removed its socket, +// signaling it is no longer available. +func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { + if p := draPlugins.delete(pluginName); p != nil { + logger := klog.FromContext(p.backgroundCtx) + logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + + // Clean up the ResourceSlices for the deleted Plugin since it + // may have died without doing so itself and might never come + // back. + go h.wipeResourceSlices(pluginName) + + return + } + + logger := klog.FromContext(h.backgroundCtx) + logger.V(3).Info("Deregister DRA plugin not necessary, was already removed") +} + +// ValidatePlugin is called by kubelet's plugin watcher upon detection +// of a new registration socket opened by DRA plugin. +func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + _, err := h.validateVersions(pluginName, versions) + if err != nil { + return fmt.Errorf("invalid versions of plugin %s: %w", pluginName, err) + } + + return err +} diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go new file mode 100644 index 00000000000..7495a4d7751 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func getFakeNode() (*v1.Node, error) { + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil +} + +func TestRegistrationHandler_ValidatePlugin(t *testing.T) { + newRegistrationHandler := func() *RegistrationHandler { + return NewRegistrationHandler(nil, getFakeNode) + } + + for _, test := range []struct { + description string + handler func() *RegistrationHandler + pluginName string + endpoint string + versions []string + shouldError bool + }{ + { + description: "no versions provided", + handler: newRegistrationHandler, + shouldError: true, + }, + { + description: "unsupported version", + handler: newRegistrationHandler, + versions: []string{"v2.0.0"}, + shouldError: true, + }, + { + description: "plugin already registered with a higher supported version", + handler: func() *RegistrationHandler { + handler := newRegistrationHandler() + if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}, nil); err != nil { + t.Fatal(err) + } + return handler + }, + pluginName: "this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", + versions: []string{"v1.0.0"}, + shouldError: true, + }, + { + description: "should validate the plugin", + handler: newRegistrationHandler, + pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", + versions: []string{"v1.3.0"}, + }, + } { + t.Run(test.description, func(t *testing.T) { + handler := test.handler() + err := handler.ValidatePlugin(test.pluginName, test.endpoint, test.versions) + if test.shouldError { + assert.Error(t, err) + } else { + assert.Nil(t, err) + } + }) + } + + t.Cleanup(func() { + handler := newRegistrationHandler() + handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") + handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") + }) +} From 3372c056cd6074569d80aeb7e42f00684ff9b460 Mon Sep 17 00:00:00 2001 From: carlory Date: Tue, 27 Aug 2024 00:56:36 +0800 Subject: [PATCH 2/2] fix linter hints --- pkg/kubelet/cm/dra/plugin/plugin_test.go | 10 ++++++++-- pkg/kubelet/cm/dra/plugin/registration_test.go | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index d78ed6bd393..12219e06517 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -69,7 +69,9 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) { addr := filepath.Join(p, "server.sock") teardown := func() { close(closeCh) - os.RemoveAll(addr) + if err := os.RemoveAll(addr); err != nil { + panic(err) + } } listener, err := net.Listen("unix", addr) @@ -88,7 +90,11 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) { } go func() { - go s.Serve(listener) + go func() { + if err := s.Serve(listener); err != nil { + panic(err) + } + }() <-closeCh s.GracefulStop() }() diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 7495a4d7751..91258be8e69 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -78,7 +78,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { if test.shouldError { assert.Error(t, err) } else { - assert.Nil(t, err) + assert.NoError(t, err) } }) }