diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 87c3cef13f3..c375b8c6f20 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -22,6 +22,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "testing" @@ -232,24 +233,41 @@ func TestNewDRAPluginClient(t *testing.T) { } } -func TestNodeUnprepareResources(t *testing.T) { +func TestGRPCMethods(t *testing.T) { for _, test := range []struct { - description string - serverSetup func(string) (string, tearDown, error) - service string - request *drapbv1beta1.NodeUnprepareResourcesRequest + description string + serverSetup func(string) (string, tearDown, error) + service string + chosenService string + expectError string }{ { - description: "server supports v1alpha4", - serverSetup: setupFakeGRPCServer, - service: drapbv1alpha4.NodeService, - request: &drapbv1beta1.NodeUnprepareResourcesRequest{}, + description: "v1alpha4", + serverSetup: setupFakeGRPCServer, + service: drapbv1alpha4.NodeService, + chosenService: drapbv1alpha4.NodeService, }, { - description: "server supports v1beta1", - serverSetup: setupFakeGRPCServer, - service: drapbv1beta1.DRAPluginService, - request: &drapbv1beta1.NodeUnprepareResourcesRequest{}, + description: "v1beta1", + serverSetup: setupFakeGRPCServer, + service: drapbv1beta1.DRAPluginService, + chosenService: drapbv1beta1.DRAPluginService, + }, + { + // In practice, such a mismatch between plugin and kubelet should not happen. + description: "mismatch", + serverSetup: setupFakeGRPCServer, + service: drapbv1beta1.DRAPluginService, + chosenService: drapbv1alpha4.NodeService, + expectError: "unknown service v1alpha3.Node", + }, + { + // In practice, kubelet wouldn't choose an invalid service. + description: "internal-error", + serverSetup: setupFakeGRPCServer, + service: drapbv1beta1.DRAPluginService, + chosenService: "some-other-service", + expectError: "unsupported chosen service", }, } { t.Run(test.description, func(t *testing.T) { @@ -265,7 +283,7 @@ func TestNodeUnprepareResources(t *testing.T) { name: pluginName, backgroundCtx: tCtx, endpoint: addr, - chosenService: test.service, + chosenService: test.chosenService, clientCallTimeout: defaultClientCallTimeout, } @@ -288,10 +306,23 @@ func TestNodeUnprepareResources(t *testing.T) { t.Fatal(err) } - _, err = client.NodeUnprepareResources(tCtx, test.request) - if err != nil { - t.Fatal(err) - } + _, err = client.NodePrepareResources(tCtx, &drapbv1beta1.NodePrepareResourcesRequest{}) + assertError(t, test.expectError, err) + + _, err = client.NodeUnprepareResources(tCtx, &drapbv1beta1.NodeUnprepareResourcesRequest{}) + assertError(t, test.expectError, err) }) } } + +func assertError(t *testing.T, expectError string, err error) { + t.Helper() + switch { + case err != nil && expectError == "": + t.Errorf("Expected no error, got: %v", err) + case err == nil && expectError != "": + t.Errorf("Expected error %q, got none", expectError) + case err != nil && !strings.Contains(err.Error(), expectError): + t.Errorf("Expected error %q, got: %v", expectError, err) + } +} diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 880d134824b..c8e17f034fb 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -17,57 +17,207 @@ limitations under the License. package plugin import ( + "sort" + "strings" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + cgotesting "k8s.io/client-go/testing" + drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" +) + +const ( + nodeName = "worker" + pluginA = "pluginA" + endpointA = "endpointA" + pluginB = "pluginB" + endpointB = "endpointB" ) func getFakeNode() (*v1.Node, error) { - return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}, nil } -func TestRegistrationHandler_ValidatePlugin(t *testing.T) { - newRegistrationHandler := func() *RegistrationHandler { - return NewRegistrationHandler(nil, getFakeNode) +func TestRegistrationHandler(t *testing.T) { + slice := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{Name: "test-slice"}, + Spec: resourceapi.ResourceSliceSpec{ + NodeName: nodeName, + }, } for _, test := range []struct { description string - handler func() *RegistrationHandler pluginName string endpoint string + withClient bool supportedServices []string shouldError bool + chosenService string }{ { - description: "no versions provided", - handler: newRegistrationHandler, + description: "no-services-provided", + pluginName: pluginB, + endpoint: endpointB, shouldError: true, }, { - description: "should validate the plugin", - handler: newRegistrationHandler, - pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", + description: "current-service", + pluginName: pluginB, + endpoint: endpointB, supportedServices: []string{drapb.DRAPluginService}, + chosenService: drapb.DRAPluginService, + }, + { + description: "two-services", + pluginName: pluginB, + endpoint: endpointB, + supportedServices: []string{drapbv1alpha4.NodeService, drapb.DRAPluginService}, + chosenService: drapb.DRAPluginService, + }, + { + description: "old-service", + pluginName: pluginB, + endpoint: endpointB, + supportedServices: []string{drapbv1alpha4.NodeService}, + chosenService: drapbv1alpha4.NodeService, + }, + { + // Legacy behavior. + description: "version", + pluginName: pluginB, + endpoint: endpointB, + supportedServices: []string{"1.0.0"}, + chosenService: drapbv1alpha4.NodeService, + }, + { + description: "replace", + pluginName: pluginA, + endpoint: endpointB, + supportedServices: []string{drapb.DRAPluginService}, + chosenService: drapb.DRAPluginService, + }, + { + description: "manage-resource-slices", + withClient: true, + pluginName: pluginB, + endpoint: endpointB, + supportedServices: []string{drapb.DRAPluginService}, + chosenService: drapb.DRAPluginService, }, } { t.Run(test.description, func(t *testing.T) { - handler := test.handler() - err := handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) - if test.shouldError { - assert.Error(t, err) - } else { - assert.NoError(t, err) + tCtx := ktesting.Init(t) + + // Stand-alone kubelet has no connection to an + // apiserver, so faking one is optional. + var client kubernetes.Interface + var deleteCollectionForDriver atomic.Pointer[string] + if test.withClient { + fakeClient := fake.NewClientset(slice) + fakeClient.AddReactor("delete-collection", "resourceslices", func(action cgotesting.Action) (bool, runtime.Object, error) { + deleteAction := action.(cgotesting.DeleteCollectionAction) + restrictions := deleteAction.GetListRestrictions() + sliceFields := fields.Set{"spec.nodeName": nodeName} + forDriver := deleteCollectionForDriver.Load() + if forDriver != nil { + sliceFields["spec.driver"] = *forDriver + } + fieldsSelector := fields.SelectorFromSet(sliceFields) + // The order of field requirements is random because it comes + // from a map. We need to sort. + normalize := func(selector string) string { + requirements := strings.Split(selector, ",") + sort.Strings(requirements) + return strings.Join(requirements, ",") + } + assert.Equal(t, "", restrictions.Labels.String(), "label selector in DeleteCollection") + assert.Equal(t, normalize(fieldsSelector.String()), normalize(restrictions.Fields.String()), "field selector in DeleteCollection") + + // There's only one object that could get matched, so delete it. + // Delete doesn't return an error if already deleted, which is what + // we need here (no error when nothing to delete). + err := fakeClient.Tracker().Delete(resourceapi.SchemeGroupVersion.WithResource("resourceslices"), "", slice.Name) + return true, nil, err + }) + client = fakeClient } + + // The handler wipes all slices at startup. + handler := NewRegistrationHandler(client, getFakeNode) + requireNoSlices := func() { + t.Helper() + if client == nil { + return + } + require.EventuallyWithT(t, func(t *assert.CollectT) { + slices, err := client.ResourceV1beta1().ResourceSlices().List(tCtx, metav1.ListOptions{}) + if !assert.NoError(t, err, "list slices") { + return + } + assert.Empty(t, slices.Items, "slices") + }, time.Minute, time.Second) + } + requireNoSlices() + + // Simulate one existing plugin A. + err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil) + require.NoError(t, err) + + err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) + if test.shouldError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + if err != nil { + return + } + if test.pluginName != pluginA { + require.Nil(t, draPlugins.get(test.pluginName), "not registered yet") + } + + // Add plugin for the first time. + err = handler.RegisterPlugin(test.pluginName, test.endpoint, test.supportedServices, nil) + if test.shouldError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + plugin := draPlugins.get(test.pluginName) + assert.NotNil(t, plugin, "plugin should be registered") + t.Cleanup(func() { + if client != nil { + // Create the slice as if the plugin had done that while it runs. + _, err := client.ResourceV1beta1().ResourceSlices().Create(tCtx, slice, metav1.CreateOptions{}) + assert.NoError(t, err, "recreate slice") + } + + handler.DeRegisterPlugin(test.pluginName) + // Nop. + handler.DeRegisterPlugin(test.pluginName) + + // Deleted by the kubelet after deregistration, now specifically + // for that plugin (checked by the fake client reactor). + deleteCollectionForDriver.Store(ptr.To(test.pluginName)) + requireNoSlices() + }) + assert.Equal(t, test.endpoint, plugin.endpoint, "plugin endpoint") + assert.Equal(t, test.chosenService, plugin.chosenService, "chosen service") }) } - - t.Cleanup(func() { - handler := newRegistrationHandler() - handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") - handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") - }) }