diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index b8d406a2be3..d8d01b21e02 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -815,6 +815,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{ + NodeName: nodeName, RuntimeCgroupsName: s.RuntimeCgroups, SystemCgroupsName: s.SystemCgroups, KubeletCgroupsName: s.KubeletCgroups, diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index d5ac7cbb459..dbd35dd71ef 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -135,6 +135,7 @@ type ContainerManager interface { } type NodeConfig struct { + NodeName types.NodeName RuntimeCgroupsName string SystemCgroupsName string KubeletCgroupsName string diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 4b3d9ffe9dc..4ed1aa7b591 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I // initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") - cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir) + cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName) if err != nil { return nil, err } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 085366ac0af..49cd60cb266 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -45,7 +45,7 @@ type ManagerImpl struct { } // NewManagerImpl creates a new manager. -func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) { +func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) { klog.V(2).InfoS("Creating DRA manager") claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 9e5e8c4bd67..dc0e697d0dd 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory) + manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker") if test.wantErr { assert.Error(t, err) return @@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, t.TempDir()) + manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker") assert.NoError(t, err) if test.claimInfo != nil { @@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/client.go b/pkg/kubelet/cm/dra/plugin/client.go index e7cb802181b..6002a2a87b3 100644 --- a/pkg/kubelet/cm/dra/plugin/client.go +++ b/pkg/kubelet/cm/dra/plugin/client.go @@ -198,3 +198,20 @@ func (p *plugin) NodeUnprepareResources( logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err) return response, err } + +func (p *plugin) NodeListAndWatchResources( + ctx context.Context, + req *drapb.NodeListAndWatchResourcesRequest, + opts ...grpc.CallOption, +) (drapb.Node_NodeListAndWatchResourcesClient, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req) + + conn, err := p.getOrCreateGRPCConn() + if err != nil { + return nil, err + } + + nodeClient := drapb.NewNodeClient(conn) + return nodeClient.NodeListAndWatchResources(ctx, req, opts...) +} diff --git a/pkg/kubelet/cm/dra/plugin/client_test.go b/pkg/kubelet/cm/dra/plugin/client_test.go index 3e1889e2c97..4b898b88300 100644 --- a/pkg/kubelet/cm/dra/plugin/client_test.go +++ b/pkg/kubelet/cm/dra/plugin/client_test.go @@ -43,6 +43,16 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in * return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil } +func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error { + if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil { + return err + } + if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil { + return err + } + return nil +} + type fakeV1alpha2GRPCServer struct { drapbv1alpha2.UnimplementedNodeServer } @@ -288,3 +298,82 @@ func TestNodeUnprepareResource(t *testing.T) { }) } } + +func TestListAndWatchResources(t *testing.T) { + for _, test := range []struct { + description string + serverSetup func(string) (string, tearDown, error) + serverVersion string + request *drapbv1alpha3.NodeListAndWatchResourcesRequest + responses []*drapbv1alpha3.NodeListAndWatchResourcesResponse + expectError string + }{ + { + description: "server supports NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha3Version, + request: &drapbv1alpha3.NodeListAndWatchResourcesRequest{}, + responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{ + {}, + {}, + }, + expectError: "EOF", + }, + { + description: "server doesn't support NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha2Version, + request: new(drapbv1alpha3.NodeListAndWatchResourcesRequest), + expectError: "Unimplemented", + }, + } { + t.Run(test.description, func(t *testing.T) { + addr, teardown, err := setupFakeGRPCServer(test.serverVersion) + if err != nil { + t.Fatal(err) + } + defer teardown() + + p := &plugin{ + endpoint: addr, + version: v1alpha3Version, + } + + conn, err := p.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + draPlugins.add("dummy-plugin", p) + defer draPlugins.delete("dummy-plugin") + + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Fatal(err) + } + + stream, err := client.NodeListAndWatchResources(context.Background(), test.request) + if err != nil { + t.Fatal(err) + } + var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse + var actualErr error + for { + resp, err := stream.Recv() + if err != nil { + actualErr = err + break + } + actualResponses = append(actualResponses, resp) + } + assert.Equal(t, test.responses, actualResponses) + assert.Contains(t, actualErr.Error(), test.expectError) + }) + } +} diff --git a/pkg/kubelet/cm/dra/plugin/noderesources.go b/pkg/kubelet/cm/dra/plugin/noderesources.go new file mode 100644 index 00000000000..79cec7cef84 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/noderesources.go @@ -0,0 +1,479 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + resourceapi "k8s.io/api/resource/v1alpha2" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + resourceinformers "k8s.io/client-go/informers/resource/v1alpha2" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3" +) + +const ( + // resyncPeriod for informer + // TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable? + resyncPeriod = time.Duration(10 * time.Minute) +) + +// nodeResourcesController collects resource information from all registered +// plugins and synchronizes that information with NodeResourceSlice objects. +type nodeResourcesController struct { + ctx context.Context + kubeClient kubernetes.Interface + nodeName string + wg sync.WaitGroup + queue workqueue.RateLimitingInterface + sliceStore cache.Store + + mutex sync.RWMutex + activePlugins map[string]*activePlugin +} + +// activePlugin holds the resource information about one plugin +// and the gRPC stream that is used to retrieve that. The context +// used by that stream can be canceled separately to stop +// the monitoring. +type activePlugin struct { + // cancel is the function which cancels the monitorPlugin goroutine + // for this plugin. + cancel func(reason error) + + // resources is protected by the nodeResourcesController read/write lock. + // When receiving updates from the driver, the entire slice gets replaced, + // so it is okay to not do a deep copy of it. Only retrieving the slice + // must be protected by a read lock. + resources []*resourceapi.NodeResourceModel +} + +// startNodeResourcesController constructs a new controller and starts it. +// +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. Without it, +// the controller is inactive. This can happen when kubelet is run stand-alone +// without an apiserver. In that case we can't and don't need to publish +// NodeResourceSlices. +func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController { + if kubeClient == nil { + return nil + } + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "node resources controller") + ctx = klog.NewContext(ctx, logger) + + c := &nodeResourcesController{ + ctx: ctx, + kubeClient: kubeClient, + nodeName: nodeName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"), + activePlugins: make(map[string]*activePlugin), + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.run(ctx) + }() + + return c +} + +// waitForStop blocks until all background activity spawned by +// the controller has stopped. The context passed to start must +// be canceled for that to happen. +// +// Not needed at the moment, but if it was, this is what it would +// look like... +// func (c *nodeResourcesController) waitForStop() { +// if c == nil { +// return +// } +// +// c.wg.Wait() +// } + +// addPlugin is called whenever a plugin has been (re-)registered. +func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + + if active := c.activePlugins[driverName]; active != nil { + active.cancel(errors.New("plugin has re-registered")) + } + active := &activePlugin{} + cancelCtx, cancel := context.WithCancelCause(c.ctx) + active.cancel = cancel + c.activePlugins[driverName] = active + c.queue.Add(driverName) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.monitorPlugin(cancelCtx, active, driverName, pluginInstance) + }() +} + +// removePlugin is called whenever a plugin has been unregistered. +func (c *nodeResourcesController) removePlugin(driverName string) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + if active, ok := c.activePlugins[driverName]; ok { + active.cancel(errors.New("plugin has unregistered")) + delete(c.activePlugins, driverName) + c.queue.Add(driverName) + } +} + +// monitorPlugin calls the plugin to retrieve resource information and caches +// all responses that it gets for processing in the sync method. It keeps +// retrying until an error or EOF response indicates that no further data is +// going to be sent, then watch resources of the plugin stops until it +// re-registers. +func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "driverName", driverName) + logger.Info("Starting to monitor node resources of the plugin") + defer func() { + r := recover() + logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r) + }() + + // Keep trying until canceled. + for ctx.Err() == nil { + logger.V(5).Info("Calling NodeListAndWatchResources") + stream, err := pluginInstance.NodeListAndWatchResources(ctx, new(drapb.NodeListAndWatchResourcesRequest)) + if err != nil { + switch { + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin simply doesn't provide node resources. + active.cancel(errors.New("plugin does not support node resource reporting")) + default: + // This is a problem, report it and retry. + logger.Error(err, "Creating gRPC stream for node resources failed") + // TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + continue + } + for { + response, err := stream.Recv() + if err != nil { + switch { + case errors.Is(err, io.EOF): + // This is okay. Some plugins might never change their + // resources after reporting them once. + active.cancel(errors.New("plugin has closed the stream")) + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin has the method, does not really implement it. + active.cancel(errors.New("plugin does not support node resource reporting")) + case ctx.Err() == nil: + // This is a problem, report it and retry. + logger.Error(err, "Reading node resources from gRPC stream failed") + // TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + break + } + + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("Driver resources updated", "resources", response.Resources) + } else { + logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources)) + } + + c.mutex.Lock() + active.resources = response.Resources + c.mutex.Unlock() + c.queue.Add(driverName) + } + } +} + +// run is running in the background. It handles blocking initialization (like +// syncing the informer) and then syncs the actual with the desired state. +func (c *nodeResourcesController) run(ctx context.Context) { + logger := klog.FromContext(ctx) + + // When kubelet starts, we have two choices: + // - Sync immediately, which in practice will delete all NodeResourceSlices + // because no plugin has registered yet. We could do a DeleteCollection + // to speed this up. + // - Wait a bit, then sync. If all plugins have re-registered in the meantime, + // we might not need to change any NodeResourceSlice. + // + // For now syncing starts immediately, with no DeleteCollection. This + // can be reconsidered later. + + // While kubelet starts up, there are errors: + // E0226 13:41:19.880621 126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.NodeResourceSlice: failed to list *v1alpha2.NodeResourceSlice: noderesourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "noderesourceslices" in API group "resource.k8s.io" at the cluster scope + // + // The credentials used by kubeClient seem to get swapped out later, + // because eventually these list calls succeed. + // TODO (https://github.com/kubernetes/kubernetes/issues/123691): can we avoid these error log entries? Perhaps wait here? + + // We could use an indexer on driver name, but that seems overkill. + informer := resourceinformers.NewFilteredNodeResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { + options.FieldSelector = "nodeName=" + c.nodeName + }) + c.sliceStore = informer.GetStore() + handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice add", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + UpdateFunc: func(old, new any) { + oldSlice, ok := old.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + newSlice, ok := new.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("NodeResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice)) + } else { + logger.V(5).Info("NodeResourceSlice update", "slice", klog.KObj(newSlice)) + } + c.queue.Add(newSlice.DriverName) + }, + DeleteFunc: func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice delete", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + }) + if err != nil { + logger.Error(err, "Registering event handler on the NodeResourceSlice informer failed, disabling resource monitoring") + return + } + + // Start informer and wait for our cache to be populated. + c.wg.Add(1) + go func() { + defer c.wg.Done() + informer.Run(ctx.Done()) + }() + for !handler.HasSynced() { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + } + logger.Info("NodeResourceSlice informer has synced") + + for c.processNextWorkItem(ctx) { + } +} + +func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(key) + + driverName := key.(string) + + // Panics are caught and treated like errors. + var err error + func() { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("internal error: %v", r) + } + }() + err = c.sync(ctx, driverName) + }() + + if err != nil { + // TODO (https://github.com/kubernetes/enhancements/issues/3077): contextual logging in utilruntime + utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err)) + c.queue.AddRateLimited(key) + + // Return without removing the work item from the queue. + // It will be retried. + return true + } + + c.queue.Forget(key) + return true +} + +func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error { + logger := klog.FromContext(ctx) + + // Gather information about the actual and desired state. + slices := c.sliceStore.List() + var driverResources []*resourceapi.NodeResourceModel + c.mutex.RLock() + if active, ok := c.activePlugins[driverName]; ok { + // No need for a deep copy, the entire slice gets replaced on writes. + driverResources = active.resources + } + c.mutex.RUnlock() + + // Resources that are not yet stored in any slice need to be published. + // Here we track the indices of any resources that are already stored. + storedResourceIndices := sets.New[int]() + + // Slices that don't match any driver resource can either be updated (if there + // are new driver resources that need to be stored) or they need to be deleted. + obsoleteSlices := make([]*resourceapi.NodeResourceSlice, 0, len(slices)) + + // Match slices with resource information. + for _, obj := range slices { + slice := obj.(*resourceapi.NodeResourceSlice) + if slice.DriverName != driverName { + continue + } + + index := indexOfModel(driverResources, &slice.NodeResourceModel) + if index >= 0 { + storedResourceIndices.Insert(index) + continue + } + + obsoleteSlices = append(obsoleteSlices, slice) + } + + if loggerV := logger.V(6); loggerV.Enabled() { + // Dump entire resource information. + loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources) + } else { + logger.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources)) + } + + // Update stale slices before removing what's left. + // + // We don't really know which of these slices might have + // been used for "the" driver resource because they don't + // have a unique ID. In practice, a driver is most likely + // to just give us one NodeResourceModel, in which case + // this isn't a problem at all. If we have more than one, + // then at least conceptually it currently doesn't matter + // where we publish it. + // + // The long-term goal is to move the handling of + // NodeResourceSlice objects into the driver, with kubelet + // just acting as a REST proxy. The advantage of that will + // be that kubelet won't need to support the same + // resource API version as the driver and the control plane. + // With that approach, the driver will be able to match + // up objects more intelligently. + numObsoleteSlices := len(obsoleteSlices) + for index, resource := range driverResources { + if storedResourceIndices.Has(index) { + // No need to do anything, it is already stored exactly + // like this in an existing slice. + continue + } + + if numObsoleteSlices > 0 { + // Update one existing slice. + slice := obsoleteSlices[numObsoleteSlices-1] + numObsoleteSlices-- + slice = slice.DeepCopy() + slice.NodeResourceModel = *resource + logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update node resource slice: %w", err) + } + continue + } + + // Create a new slice. + slice := &resourceapi.NodeResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: c.nodeName + "-" + driverName + "-", + // TODO (https://github.com/kubernetes/kubernetes/issues/123692): node object as owner + }, + NodeName: c.nodeName, + DriverName: driverName, + NodeResourceModel: *resource, + } + logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create node resource slice: %w", err) + } + } + + // All remaining slices are truly orphaned. + for i := 0; i < numObsoleteSlices; i++ { + slice := obsoleteSlices[i] + logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice)) + if err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("delete node resource slice: %w", err) + } + } + + return nil +} + +func indexOfModel(models []*resourceapi.NodeResourceModel, model *resourceapi.NodeResourceModel) int { + for index, m := range models { + if apiequality.Semantic.DeepEqual(m, model) { + return index + } + } + return -1 +} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 94a9c7354de..7acdec5f530 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -94,11 +95,23 @@ func (p *plugin) setVersion(version string) { } // RegistrationHandler is the handler which is fed to the pluginwatcher API. -type RegistrationHandler struct{} +type RegistrationHandler struct { + controller *nodeResourcesController +} // NewPluginHandler returns new registration handler. -func NewRegistrationHandler() *RegistrationHandler { - return &RegistrationHandler{} +// +// Must only be called once per process because it manages global state. +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. +func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler { + handler := &RegistrationHandler{} + + // If kubelet ever gets an API for stopping registration handlers, then + // that would need to be hooked up with stopping the controller. + handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName) + + return handler } // RegisterPlugin is called when a plugin can be registered. @@ -110,15 +123,18 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, return err } - // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key - // all other DRA components will be able to get the actual socket of DRA plugins by its name. - // By default we assume the supported plugin version is v1alpha3 - draPlugins.add(pluginName, &plugin{ + pluginInstance := &plugin{ conn: nil, endpoint: endpoint, version: v1alpha3Version, highestSupportedVersion: highestSupportedVersion, - }) + } + + // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key + // all other DRA components will be able to get the actual socket of DRA plugins by its name. + // By default we assume the supported plugin version is v1alpha3 + draPlugins.add(pluginName, pluginInstance) + h.controller.addPlugin(pluginName, pluginInstance) return nil } @@ -178,6 +194,7 @@ func deregisterPlugin(pluginName string) { func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { klog.InfoS("DeRegister DRA plugin", "name", pluginName) deregisterPlugin(pluginName) + h.controller.removePlugin(pluginName) } // ValidatePlugin is called by kubelet's plugin watcher upon detection diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 70499b260c8..f9d70238f60 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -23,6 +23,10 @@ import ( ) func TestRegistrationHandler_ValidatePlugin(t *testing.T) { + newRegistrationHandler := func() *RegistrationHandler { + return NewRegistrationHandler(nil, "worker") + } + for _, test := range []struct { description string handler func() *RegistrationHandler @@ -33,19 +37,19 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }{ { description: "no versions provided", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, shouldError: true, }, { description: "unsupported version", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, versions: []string{"v2.0.0"}, shouldError: true, }, { description: "plugin already registered with a higher supported version", handler: func() *RegistrationHandler { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil { t.Fatal(err) } @@ -57,7 +61,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }, { description: "should validate the plugin", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", versions: []string{"v1.3.0"}, }, @@ -74,7 +78,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { } t.Cleanup(func() { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") }) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 96900baaf44..b8b109df825 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1561,7 +1561,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for DRA Plugin if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler())) + kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname))) } // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index e3a0bafe2b8..282b407bd55 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -140,7 +140,16 @@ func KubeletPluginSocketPath(path string) Option { // may be used more than once and each interceptor will get called. func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option { return func(o *options) error { - o.interceptors = append(o.interceptors, interceptor) + o.unaryInterceptors = append(o.unaryInterceptors, interceptor) + return nil + } +} + +// GRPCStreamInterceptor is called for each gRPC streaming method call. This option +// may be used more than once and each interceptor will get called. +func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option { + return func(o *options) error { + o.streamInterceptors = append(o.streamInterceptors, interceptor) return nil } } @@ -170,7 +179,8 @@ type options struct { draEndpoint endpoint draAddress string pluginRegistrationEndpoint endpoint - interceptors []grpc.UnaryServerInterceptor + unaryInterceptors []grpc.UnaryServerInterceptor + streamInterceptors []grpc.StreamServerInterceptor nodeV1alpha2, nodeV1alpha3 bool } @@ -215,7 +225,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e // Run the node plugin gRPC server first to ensure that it is ready. implemented := false - plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.draEndpoint, func(grpcServer *grpc.Server) { + plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { if nodeServer, ok := nodeServer.(drapbv1alpha3.NodeServer); ok && o.nodeV1alpha3 { o.logger.V(5).Info("registering drapbv1alpha3.NodeServer") drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer) @@ -246,7 +256,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e } // Now make it available to kubelet. - registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint) + registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint) if err != nil { return nil, fmt.Errorf("start registrar: %v", err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go index f5148e4c9c1..579f86245a7 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go @@ -31,7 +31,7 @@ type nodeRegistrar struct { } // startRegistrar returns a running instance. -func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { +func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { n := &nodeRegistrar{ logger: logger, registrationServer: registrationServer{ @@ -40,7 +40,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.U supportedVersions: []string{"1.0.0"}, // TODO: is this correct? }, } - s, err := startGRPCServer(logger, grpcVerbosity, interceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { + s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { registerapi.RegisterRegistrationServer(grpcServer, n) }) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go index b840c91c7df..c2d6cf18267 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go @@ -54,7 +54,7 @@ type endpoint struct { // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // which handles requests for arbitrary services. -func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { +func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { s := &grpcServer{ logger: logger, endpoint: endpoint, @@ -79,12 +79,15 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc. // Run a gRPC server. It will close the listening socket when // shutting down, so we don't need to do that. var opts []grpc.ServerOption - var finalInterceptors []grpc.UnaryServerInterceptor + var finalUnaryInterceptors []grpc.UnaryServerInterceptor + var finalStreamInterceptors []grpc.StreamServerInterceptor if grpcVerbosity >= 0 { - finalInterceptors = append(finalInterceptors, s.interceptor) + finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor) } - finalInterceptors = append(finalInterceptors, interceptors...) - opts = append(opts, grpc.ChainUnaryInterceptor(finalInterceptors...)) + finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) + finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) + opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...)) + opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...)) s.server = grpc.NewServer(opts...) for _, service := range services { service(s.server) diff --git a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.pb.go b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.pb.go index 3a9929690b1..50e6c93fbfe 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.pb.go +++ b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.pb.go @@ -342,6 +342,88 @@ func (m *NodeUnprepareResourceResponse) GetError() string { return "" } +type NodeListAndWatchResourcesRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodeListAndWatchResourcesRequest) Reset() { *m = NodeListAndWatchResourcesRequest{} } +func (*NodeListAndWatchResourcesRequest) ProtoMessage() {} +func (*NodeListAndWatchResourcesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{6} +} +func (m *NodeListAndWatchResourcesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodeListAndWatchResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodeListAndWatchResourcesRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodeListAndWatchResourcesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodeListAndWatchResourcesRequest.Merge(m, src) +} +func (m *NodeListAndWatchResourcesRequest) XXX_Size() int { + return m.Size() +} +func (m *NodeListAndWatchResourcesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_NodeListAndWatchResourcesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_NodeListAndWatchResourcesRequest proto.InternalMessageInfo + +type NodeListAndWatchResourcesResponse struct { + Resources []*v1alpha2.NodeResourceModel `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodeListAndWatchResourcesResponse) Reset() { *m = NodeListAndWatchResourcesResponse{} } +func (*NodeListAndWatchResourcesResponse) ProtoMessage() {} +func (*NodeListAndWatchResourcesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{7} +} +func (m *NodeListAndWatchResourcesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodeListAndWatchResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodeListAndWatchResourcesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodeListAndWatchResourcesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodeListAndWatchResourcesResponse.Merge(m, src) +} +func (m *NodeListAndWatchResourcesResponse) XXX_Size() int { + return m.Size() +} +func (m *NodeListAndWatchResourcesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_NodeListAndWatchResourcesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_NodeListAndWatchResourcesResponse proto.InternalMessageInfo + +func (m *NodeListAndWatchResourcesResponse) GetResources() []*v1alpha2.NodeResourceModel { + if m != nil { + return m.Resources + } + return nil +} + type Claim struct { // The ResourceClaim namespace (ResourceClaim.meta.Namespace). // This field is REQUIRED. @@ -368,7 +450,7 @@ type Claim struct { func (m *Claim) Reset() { *m = Claim{} } func (*Claim) ProtoMessage() {} func (*Claim) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{6} + return fileDescriptor_00212fb1f9d3bf1c, []int{8} } func (m *Claim) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -441,49 +523,55 @@ func init() { proto.RegisterType((*NodeUnprepareResourcesResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse") proto.RegisterMapType((map[string]*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse.ClaimsEntry") proto.RegisterType((*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourceResponse") + proto.RegisterType((*NodeListAndWatchResourcesRequest)(nil), "v1alpha3.NodeListAndWatchResourcesRequest") + proto.RegisterType((*NodeListAndWatchResourcesResponse)(nil), "v1alpha3.NodeListAndWatchResourcesResponse") proto.RegisterType((*Claim)(nil), "v1alpha3.Claim") } func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } var fileDescriptor_00212fb1f9d3bf1c = []byte{ - // 562 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xce, 0x36, 0x49, 0x45, 0x26, 0x52, 0x8b, 0x56, 0x15, 0xb2, 0x42, 0x31, 0x91, 0x45, 0x49, - 0x0e, 0x60, 0x0b, 0x07, 0x50, 0x05, 0xe2, 0x92, 0x16, 0x54, 0x10, 0x42, 0xc8, 0x88, 0x0b, 0x97, - 0xb0, 0xb1, 0x07, 0xc7, 0x4a, 0x62, 0x9b, 0x5d, 0x3b, 0x52, 0x6f, 0x3c, 0x02, 0x8f, 0xd5, 0x03, - 0x07, 0xc4, 0x89, 0x53, 0x45, 0xcd, 0x8d, 0xa7, 0x40, 0x5e, 0xdb, 0x69, 0x13, 0x39, 0x4d, 0xa5, - 0xde, 0x66, 0xe7, 0xef, 0x9b, 0xfd, 0xe6, 0x07, 0x1a, 0x2c, 0xf4, 0xf4, 0x90, 0x07, 0x51, 0x40, - 0x6f, 0xcc, 0x1e, 0xb1, 0x49, 0x38, 0x62, 0xbd, 0xd6, 0x43, 0xd7, 0x8b, 0x46, 0xf1, 0x50, 0xb7, - 0x83, 0xa9, 0xe1, 0x06, 0x6e, 0x60, 0x48, 0x87, 0x61, 0xfc, 0x45, 0xbe, 0xe4, 0x43, 0x4a, 0x59, - 0x60, 0xeb, 0xc1, 0x78, 0x5f, 0xe8, 0x5e, 0x60, 0xb0, 0xd0, 0x33, 0x38, 0x8a, 0x20, 0xe6, 0x36, - 0x1a, 0x79, 0x32, 0xd3, 0x70, 0xd1, 0x47, 0xce, 0x22, 0x74, 0x32, 0x6f, 0xed, 0x15, 0xdc, 0x7e, - 0x17, 0x38, 0xf8, 0x9e, 0x63, 0xc8, 0x38, 0x5a, 0xb9, 0xbf, 0xb0, 0xf0, 0x6b, 0x8c, 0x22, 0xa2, - 0x1d, 0xd8, 0xb4, 0x27, 0xcc, 0x9b, 0x0a, 0x85, 0xb4, 0xab, 0xdd, 0xa6, 0xb9, 0xad, 0x17, 0x65, - 0xe9, 0x07, 0xa9, 0xde, 0xca, 0xcd, 0xda, 0x0f, 0x02, 0xbb, 0xe5, 0x89, 0x44, 0x18, 0xf8, 0x02, - 0xe9, 0x9b, 0xa5, 0x4c, 0xe6, 0x79, 0xa6, 0xcb, 0xe2, 0x32, 0x18, 0xf1, 0xd2, 0x8f, 0xf8, 0x71, - 0x01, 0xd6, 0xfa, 0x0c, 0xcd, 0x0b, 0x6a, 0x7a, 0x13, 0xaa, 0x63, 0x3c, 0x56, 0x48, 0x9b, 0x74, - 0x1b, 0x56, 0x2a, 0xd2, 0xe7, 0x50, 0x9f, 0xb1, 0x49, 0x8c, 0xca, 0x46, 0x9b, 0x74, 0x9b, 0xe6, - 0xde, 0xa5, 0x58, 0x05, 0x94, 0x95, 0xc5, 0x3c, 0xdb, 0xd8, 0x27, 0x9a, 0x53, 0x4a, 0xcb, 0xfc, - 0x33, 0x06, 0x34, 0x6d, 0xc7, 0x1b, 0x38, 0x38, 0xf3, 0x6c, 0xcc, 0x7e, 0xd4, 0xe8, 0x6f, 0x25, - 0xa7, 0x77, 0xe1, 0xe0, 0xf0, 0xf5, 0x61, 0xa6, 0xb5, 0xc0, 0x76, 0xbc, 0x5c, 0xa6, 0x3b, 0x50, - 0x47, 0xce, 0x03, 0x2e, 0x0b, 0x6a, 0x58, 0xd9, 0x43, 0x3b, 0x82, 0x3b, 0x29, 0xca, 0x47, 0x3f, - 0xbc, 0x2e, 0xfd, 0xbf, 0x08, 0xa8, 0xab, 0x52, 0xe5, 0x35, 0xbf, 0x5d, 0xca, 0xf5, 0x78, 0x91, - 0x94, 0xd5, 0x91, 0xa5, 0x2d, 0x18, 0xae, 0x6b, 0xc1, 0x8b, 0xc5, 0x16, 0x74, 0xd6, 0xa0, 0x95, - 0x35, 0xe1, 0xc9, 0x0a, 0x7a, 0xe6, 0x5f, 0x9a, 0xb3, 0x4a, 0x2e, 0xb2, 0xfa, 0x8f, 0x40, 0x5d, - 0xd6, 0x46, 0x77, 0xa1, 0xe1, 0xb3, 0x29, 0x8a, 0x90, 0xd9, 0x98, 0xfb, 0x9c, 0x2b, 0xd2, 0x9a, - 0x63, 0xcf, 0xc9, 0x3b, 0x92, 0x8a, 0x94, 0x42, 0x2d, 0x35, 0x2b, 0x55, 0xa9, 0x92, 0x32, 0xed, - 0xc0, 0x76, 0xb1, 0x45, 0x83, 0x11, 0xf3, 0x9d, 0x09, 0x2a, 0x35, 0x69, 0xde, 0x2a, 0xd4, 0x47, - 0x52, 0x4b, 0x23, 0x68, 0x89, 0x88, 0xc7, 0x76, 0x14, 0x73, 0x74, 0x06, 0xcb, 0x31, 0x75, 0xc9, - 0xf9, 0x53, 0x3d, 0x5b, 0x4e, 0x3d, 0xdd, 0xf3, 0xc2, 0xa5, 0x60, 0xc6, 0xd4, 0x3f, 0xcc, 0xe3, - 0xad, 0x85, 0xdc, 0x96, 0x22, 0x56, 0x58, 0xcc, 0x53, 0x02, 0xb5, 0x94, 0x24, 0xea, 0xc2, 0x4e, - 0xd9, 0x1e, 0xd1, 0xbd, 0x75, 0x7b, 0x26, 0x27, 0xad, 0x75, 0xff, 0x6a, 0xeb, 0xa8, 0x55, 0xe8, - 0x14, 0x6e, 0x95, 0xcf, 0x0b, 0xed, 0xac, 0x9f, 0xa8, 0x0c, 0xac, 0x7b, 0xd5, 0xd1, 0xd3, 0x2a, - 0xfd, 0xfe, 0xc9, 0x99, 0x4a, 0x7e, 0x9f, 0xa9, 0x95, 0x6f, 0x89, 0x4a, 0x4e, 0x12, 0x95, 0xfc, - 0x4c, 0x54, 0xf2, 0x27, 0x51, 0xc9, 0xf7, 0xbf, 0x6a, 0xe5, 0xd3, 0xbd, 0xfc, 0xd8, 0x8d, 0xe3, - 0x21, 0x4e, 0x30, 0x32, 0xc2, 0xb1, 0x9b, 0x1e, 0x3e, 0x61, 0x38, 0x9c, 0x15, 0x47, 0xaf, 0x37, - 0xdc, 0x94, 0xb7, 0xae, 0xf7, 0x3f, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x14, 0x30, 0xd4, 0x5f, 0x05, - 0x00, 0x00, + // 631 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xce, 0xb6, 0x4d, 0x45, 0x26, 0x52, 0x8b, 0x56, 0x15, 0x0a, 0xa6, 0x98, 0x60, 0x51, 0x12, + 0xf1, 0x63, 0x83, 0x0b, 0xa8, 0x02, 0x71, 0xa0, 0x2d, 0xa8, 0xa0, 0x16, 0x21, 0x23, 0x84, 0xc4, + 0xa5, 0x6c, 0xbc, 0x8b, 0x63, 0xc5, 0xb1, 0xcd, 0xae, 0x5d, 0xd1, 0x1b, 0x8f, 0xc0, 0x63, 0xf5, + 0xc0, 0x01, 0x71, 0xea, 0x09, 0xd1, 0x70, 0xe3, 0x29, 0x90, 0xd7, 0xde, 0xb4, 0x89, 0x9c, 0xa4, + 0x12, 0xb7, 0xd9, 0xf9, 0xf9, 0x66, 0xe7, 0x9b, 0x99, 0x5d, 0xa8, 0x91, 0xd8, 0x37, 0x63, 0x1e, + 0x25, 0x11, 0xbe, 0x70, 0x70, 0x9f, 0x04, 0x71, 0x97, 0xac, 0x6b, 0x77, 0x3d, 0x3f, 0xe9, 0xa6, + 0x1d, 0xd3, 0x8d, 0xfa, 0x96, 0x17, 0x79, 0x91, 0x25, 0x1d, 0x3a, 0xe9, 0x27, 0x79, 0x92, 0x07, + 0x29, 0xe5, 0x81, 0xda, 0x9d, 0xde, 0x86, 0x30, 0xfd, 0xc8, 0x22, 0xb1, 0x6f, 0x71, 0x26, 0xa2, + 0x94, 0xbb, 0xcc, 0x2a, 0xc0, 0x6c, 0xcb, 0x63, 0x21, 0xe3, 0x24, 0x61, 0x34, 0xf7, 0x36, 0x5e, + 0xc0, 0x95, 0xd7, 0x11, 0x65, 0x6f, 0x38, 0x8b, 0x09, 0x67, 0x4e, 0xe1, 0x2f, 0x1c, 0xf6, 0x39, + 0x65, 0x22, 0xc1, 0x2d, 0x58, 0x74, 0x03, 0xe2, 0xf7, 0x45, 0x03, 0x35, 0xe7, 0xdb, 0x75, 0x7b, + 0xd9, 0x54, 0xd7, 0x32, 0xb7, 0x32, 0xbd, 0x53, 0x98, 0x8d, 0xef, 0x08, 0x56, 0xcb, 0x81, 0x44, + 0x1c, 0x85, 0x82, 0xe1, 0x57, 0x63, 0x48, 0xf6, 0x29, 0xd2, 0xb4, 0xb8, 0x3c, 0x8d, 0x78, 0x1e, + 0x26, 0xfc, 0x50, 0x25, 0xd3, 0x3e, 0x42, 0xfd, 0x8c, 0x1a, 0x5f, 0x84, 0xf9, 0x1e, 0x3b, 0x6c, + 0xa0, 0x26, 0x6a, 0xd7, 0x9c, 0x4c, 0xc4, 0x4f, 0xa0, 0x7a, 0x40, 0x82, 0x94, 0x35, 0xe6, 0x9a, + 0xa8, 0x5d, 0xb7, 0xd7, 0xa6, 0xe6, 0x52, 0xa9, 0x9c, 0x3c, 0xe6, 0xf1, 0xdc, 0x06, 0x32, 0x68, + 0x29, 0x2d, 0xc3, 0x62, 0x2c, 0xa8, 0xbb, 0xd4, 0xdf, 0xa7, 0xec, 0xc0, 0x77, 0x59, 0x5e, 0x51, + 0x6d, 0x73, 0x69, 0xf0, 0xeb, 0x1a, 0x6c, 0x6d, 0xbf, 0xdc, 0xce, 0xb5, 0x0e, 0xb8, 0xd4, 0x2f, + 0x64, 0xbc, 0x02, 0x55, 0xc6, 0x79, 0xc4, 0xe5, 0x85, 0x6a, 0x4e, 0x7e, 0x30, 0x76, 0xe0, 0x6a, + 0x96, 0xe5, 0x5d, 0x18, 0xff, 0x2f, 0xfd, 0x3f, 0x11, 0xe8, 0x93, 0xa0, 0x8a, 0x3b, 0xef, 0x8e, + 0x61, 0x3d, 0x18, 0x25, 0x65, 0x72, 0x64, 0x69, 0x0b, 0x3a, 0xb3, 0x5a, 0xf0, 0x74, 0xb4, 0x05, + 0xad, 0x19, 0xd9, 0xca, 0x9a, 0xf0, 0x70, 0x02, 0x3d, 0xc3, 0x92, 0x86, 0xac, 0xa2, 0xb3, 0xac, + 0x1a, 0xd0, 0xcc, 0xc2, 0x76, 0x7d, 0x91, 0x3c, 0x0b, 0xe9, 0x7b, 0x92, 0xb8, 0xdd, 0x71, 0x62, + 0x0d, 0x0e, 0xd7, 0xa7, 0xf8, 0x14, 0xf0, 0x7b, 0x50, 0x53, 0x0b, 0xa4, 0x48, 0xb3, 0xcc, 0x7c, + 0xbb, 0xcc, 0x6c, 0x51, 0x95, 0x51, 0x95, 0x66, 0xcb, 0xd2, 0x14, 0xce, 0x5e, 0x44, 0x59, 0xe0, + 0x9c, 0x22, 0x18, 0x7f, 0x11, 0x54, 0x25, 0x67, 0x78, 0x15, 0x6a, 0x21, 0xe9, 0x33, 0x11, 0x13, + 0x97, 0x15, 0x77, 0x3f, 0x55, 0x64, 0x5c, 0xa6, 0x3e, 0x2d, 0x26, 0x25, 0x13, 0x31, 0x86, 0x85, + 0xcc, 0xdc, 0x98, 0x97, 0x2a, 0x29, 0xe3, 0x16, 0x2c, 0x2b, 0xe8, 0xfd, 0x2e, 0x09, 0x69, 0xc0, + 0x1a, 0x0b, 0xd2, 0xbc, 0xa4, 0xd4, 0x3b, 0x52, 0x8b, 0x13, 0xd0, 0x44, 0xc2, 0x53, 0x37, 0x49, + 0x39, 0xa3, 0xfb, 0xe3, 0x31, 0x55, 0x59, 0xd6, 0xa3, 0xe9, 0x65, 0xbd, 0x1d, 0xc6, 0x3b, 0x23, + 0xd8, 0x4e, 0x43, 0x4c, 0xb0, 0xd8, 0xc7, 0x73, 0xb0, 0x90, 0xb1, 0x81, 0x3d, 0x58, 0x29, 0xdb, + 0x6f, 0xbc, 0x36, 0x6b, 0xff, 0x65, 0xa3, 0xb4, 0x9b, 0xe7, 0x7b, 0x26, 0x8c, 0x0a, 0xee, 0xc3, + 0xa5, 0xf2, 0x39, 0xc6, 0xad, 0xd9, 0x93, 0x9e, 0x27, 0x6b, 0x9f, 0x77, 0x25, 0x8c, 0x0a, 0xfe, + 0x02, 0x97, 0x27, 0x4e, 0x10, 0xbe, 0x35, 0x0a, 0x34, 0x6d, 0x14, 0xb5, 0xdb, 0xe7, 0xf2, 0x55, + 0x79, 0xef, 0xa1, 0xcd, 0xcd, 0xa3, 0x13, 0x1d, 0x1d, 0x9f, 0xe8, 0x95, 0xaf, 0x03, 0x1d, 0x1d, + 0x0d, 0x74, 0xf4, 0x63, 0xa0, 0xa3, 0xdf, 0x03, 0x1d, 0x7d, 0xfb, 0xa3, 0x57, 0x3e, 0xdc, 0x28, + 0x9e, 0xff, 0x5e, 0xda, 0x61, 0x01, 0x4b, 0xac, 0xb8, 0xe7, 0x65, 0x5f, 0x81, 0xb0, 0x28, 0x27, + 0xea, 0x1b, 0x58, 0xef, 0x2c, 0xca, 0xd7, 0x7f, 0xfd, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, + 0x6d, 0x9d, 0x90, 0x71, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -506,6 +594,10 @@ type NodeClient interface { // NodeUnprepareResources is the opposite of NodePrepareResources. // The same error handling rules apply, NodeUnprepareResources(ctx context.Context, in *NodeUnprepareResourcesRequest, opts ...grpc.CallOption) (*NodeUnprepareResourcesResponse, error) + // NodeListAndWatchResources returns a stream of NodeResourcesResponse objects. + // At the start and whenever resource availability changes, the + // plugin must send one such object with all information to the Kubelet. + NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error) } type nodeClient struct { @@ -534,6 +626,38 @@ func (c *nodeClient) NodeUnprepareResources(ctx context.Context, in *NodeUnprepa return out, nil } +func (c *nodeClient) NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error) { + stream, err := c.cc.NewStream(ctx, &_Node_serviceDesc.Streams[0], "/v1alpha3.Node/NodeListAndWatchResources", opts...) + if err != nil { + return nil, err + } + x := &nodeNodeListAndWatchResourcesClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Node_NodeListAndWatchResourcesClient interface { + Recv() (*NodeListAndWatchResourcesResponse, error) + grpc.ClientStream +} + +type nodeNodeListAndWatchResourcesClient struct { + grpc.ClientStream +} + +func (x *nodeNodeListAndWatchResourcesClient) Recv() (*NodeListAndWatchResourcesResponse, error) { + m := new(NodeListAndWatchResourcesResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // NodeServer is the server API for Node service. type NodeServer interface { // NodePrepareResources prepares several ResourceClaims @@ -544,6 +668,10 @@ type NodeServer interface { // NodeUnprepareResources is the opposite of NodePrepareResources. // The same error handling rules apply, NodeUnprepareResources(context.Context, *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error) + // NodeListAndWatchResources returns a stream of NodeResourcesResponse objects. + // At the start and whenever resource availability changes, the + // plugin must send one such object with all information to the Kubelet. + NodeListAndWatchResources(*NodeListAndWatchResourcesRequest, Node_NodeListAndWatchResourcesServer) error } // UnimplementedNodeServer can be embedded to have forward compatible implementations. @@ -556,6 +684,9 @@ func (*UnimplementedNodeServer) NodePrepareResources(ctx context.Context, req *N func (*UnimplementedNodeServer) NodeUnprepareResources(ctx context.Context, req *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method NodeUnprepareResources not implemented") } +func (*UnimplementedNodeServer) NodeListAndWatchResources(req *NodeListAndWatchResourcesRequest, srv Node_NodeListAndWatchResourcesServer) error { + return status.Errorf(codes.Unimplemented, "method NodeListAndWatchResources not implemented") +} func RegisterNodeServer(s *grpc.Server, srv NodeServer) { s.RegisterService(&_Node_serviceDesc, srv) @@ -597,6 +728,27 @@ func _Node_NodeUnprepareResources_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _Node_NodeListAndWatchResources_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(NodeListAndWatchResourcesRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(NodeServer).NodeListAndWatchResources(m, &nodeNodeListAndWatchResourcesServer{stream}) +} + +type Node_NodeListAndWatchResourcesServer interface { + Send(*NodeListAndWatchResourcesResponse) error + grpc.ServerStream +} + +type nodeNodeListAndWatchResourcesServer struct { + grpc.ServerStream +} + +func (x *nodeNodeListAndWatchResourcesServer) Send(m *NodeListAndWatchResourcesResponse) error { + return x.ServerStream.SendMsg(m) +} + var _Node_serviceDesc = grpc.ServiceDesc{ ServiceName: "v1alpha3.Node", HandlerType: (*NodeServer)(nil), @@ -610,7 +762,13 @@ var _Node_serviceDesc = grpc.ServiceDesc{ Handler: _Node_NodeUnprepareResources_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "NodeListAndWatchResources", + Handler: _Node_NodeListAndWatchResources_Handler, + ServerStreams: true, + }, + }, Metadata: "api.proto", } @@ -855,6 +1013,66 @@ func (m *NodeUnprepareResourceResponse) MarshalToSizedBuffer(dAtA []byte) (int, return len(dAtA) - i, nil } +func (m *NodeListAndWatchResourcesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeListAndWatchResourcesRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodeListAndWatchResourcesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *NodeListAndWatchResourcesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeListAndWatchResourcesResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodeListAndWatchResourcesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Resources) > 0 { + for iNdEx := len(m.Resources) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Resources[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintApi(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *Claim) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1037,6 +1255,30 @@ func (m *NodeUnprepareResourceResponse) Size() (n int) { return n } +func (m *NodeListAndWatchResourcesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *NodeListAndWatchResourcesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Resources) > 0 { + for _, e := range m.Resources { + l = e.Size() + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + func (m *Claim) Size() (n int) { if m == nil { return 0 @@ -1165,6 +1407,30 @@ func (this *NodeUnprepareResourceResponse) String() string { }, "") return s } +func (this *NodeListAndWatchResourcesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodeListAndWatchResourcesRequest{`, + `}`, + }, "") + return s +} +func (this *NodeListAndWatchResourcesResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForResources := "[]*NodeResourceModel{" + for _, f := range this.Resources { + repeatedStringForResources += strings.Replace(fmt.Sprintf("%v", f), "NodeResourceModel", "v1alpha2.NodeResourceModel", 1) + "," + } + repeatedStringForResources += "}" + s := strings.Join([]string{`&NodeListAndWatchResourcesResponse{`, + `Resources:` + repeatedStringForResources + `,`, + `}`, + }, "") + return s +} func (this *Claim) String() string { if this == nil { return "nil" @@ -1914,6 +2180,140 @@ func (m *NodeUnprepareResourceResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *NodeListAndWatchResourcesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *NodeListAndWatchResourcesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Resources", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Resources = append(m.Resources, &v1alpha2.NodeResourceModel{}) + if err := m.Resources[len(m.Resources)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Claim) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.proto b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.proto index e3ca0238249..6994aa25184 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.proto +++ b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha3/api.proto @@ -44,6 +44,12 @@ service Node { // The same error handling rules apply, rpc NodeUnprepareResources (NodeUnprepareResourcesRequest) returns (NodeUnprepareResourcesResponse) {} + + // NodeListAndWatchResources returns a stream of NodeResourcesResponse objects. + // At the start and whenever resource availability changes, the + // plugin must send one such object with all information to the Kubelet. + rpc NodeListAndWatchResources(NodeListAndWatchResourcesRequest) + returns (stream NodeListAndWatchResourcesResponse) {} } message NodePrepareResourcesRequest { @@ -88,6 +94,13 @@ message NodeUnprepareResourceResponse { string error = 1; } +message NodeListAndWatchResourcesRequest { +} + +message NodeListAndWatchResourcesResponse { + repeated k8s.io.api.resource.v1alpha2.NodeResourceModel resources = 1; +} + message Claim { // The ResourceClaim namespace (ResourceClaim.meta.Namespace). // This field is REQUIRED. diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 8f5bacb854e..3864fe0f854 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -34,6 +34,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -50,10 +51,11 @@ import ( ) const ( - NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource" - NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources" - NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource" - NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources" + NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource" + NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources" + NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource" + NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources" + NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources" ) type Nodes struct { @@ -105,6 +107,7 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a // not run on all nodes. resources.Nodes = nodes.NodeNames } + ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources) ginkgo.DeferCleanup(d.TearDown) }) @@ -182,6 +185,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { if d.parameterMode == "" { d.parameterMode = parameterModeConfigMap } + var numResourceInstances = -1 // disabled + if d.parameterMode != parameterModeConfigMap { + numResourceInstances = resources.MaxAllocations + } switch d.parameterMode { case parameterModeConfigMap, parameterModeTranslated: d.parameterAPIGroup = "" @@ -259,7 +266,8 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { pod := pod nodename := pod.Spec.NodeName logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod)) - plugin, err := app.StartPlugin(logger, "/cdi", d.Name, nodename, + loggerCtx := klog.NewContext(ctx, logger) + plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename, app.FileOperations{ Create: func(name string, content []byte) error { klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content)) @@ -269,11 +277,15 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { klog.Background().Info("deleting CDI file", "node", nodename, "filename", name) return d.removeFile(&pod, name) }, + NumResourceInstances: numResourceInstances, }, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { return d.interceptor(nodename, ctx, req, info, handler) }), + kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + return d.streamInterceptor(nodename, srv, ss, info, handler) + }), kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), kubeletplugin.KubeletPluginSocketPath(draAddr), @@ -349,6 +361,16 @@ func (d *Driver) TearDown() { d.wg.Wait() } +func (d *Driver) IsGone(ctx context.Context) { + gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := d.f.ClientSet.ResourceV1alpha2().NodeResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name}) + if err != nil { + return nil, err + } + return slices.Items, err + }).Should(gomega.BeEmpty()) +} + func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { d.mutex.Lock() defer d.mutex.Unlock() @@ -362,6 +384,22 @@ func (d *Driver) interceptor(nodename string, ctx context.Context, req interface return handler(ctx, req) } +func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Stream calls block for a long time. We must not hold the lock while + // they are running. + d.mutex.Lock() + m := MethodInstance{nodename, info.FullMethod} + d.callCounts[m]++ + fail := d.fail[m] + d.mutex.Unlock() + + if fail { + return errors.New("injected error") + } + + return handler(srv, stream) +} + func (d *Driver) Fail(m MethodInstance, injectError bool) { d.mutex.Lock() defer d.mutex.Unlock() diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 7e71eb90847..f97aee67af3 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -28,6 +28,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/gstruct" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" @@ -82,116 +83,167 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.Context("kubelet", func() { nodes := NewNodes(f, 1, 1) - driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance. - b := newBuilder(f, driver) - ginkgo.It("registers plugin", func() { - ginkgo.By("the driver is running") - }) + ginkgo.Context("with ConfigMap parameters", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) - ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { - // We have exactly one host. - m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} - - driver.Fail(m, true) - - ginkgo.By("waiting for container startup to fail") - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - - b.create(ctx, parameters, pod, template) - - ginkgo.By("wait for NodePrepareResources call") - gomega.Eventually(ctx, func(ctx context.Context) error { - if driver.CallCount(m) == 0 { - return errors.New("NodePrepareResources not called yet") - } - return nil - }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) - - ginkgo.By("allowing container startup to succeed") - callCount := driver.CallCount(m) - driver.Fail(m, false) - err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) - framework.ExpectNoError(err, "start pod with inline resource claim") - if driver.CallCount(m) == callCount { - framework.Fail("NodePrepareResources should have been called again") - } - }) - - ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { - // Pretend that the resource is allocated and reserved for some other entity. - // Until the resourceclaim controller learns to remove reservations for - // arbitrary types we can simply fake somthing here. - claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - b.create(ctx, claim) - claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) - framework.ExpectNoError(err, "get claim") - claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} - claim.Status.DriverName = driver.Name - claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ - APIGroup: "example.com", - Resource: "some", - Name: "thing", - UID: "12345", + ginkgo.It("registers plugin", func() { + ginkgo.By("the driver is running") }) - _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - framework.ExpectNoError(err, "update claim") - pod := b.podExternal() + ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { + // We have exactly one host. + m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} - // This bypasses scheduling and therefore the pod gets - // to run on the node although it never gets added to - // the `ReservedFor` field of the claim. - pod.Spec.NodeName = nodes.NodeNames[0] - b.create(ctx, pod) + driver.Fail(m, true) - gomega.Consistently(ctx, func(ctx context.Context) error { - testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) + ginkgo.By("waiting for container startup to fail") + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + + b.create(ctx, parameters, pod, template) + + ginkgo.By("wait for NodePrepareResources call") + gomega.Eventually(ctx, func(ctx context.Context) error { + if driver.CallCount(m) == 0 { + return errors.New("NodePrepareResources not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("allowing container startup to succeed") + callCount := driver.CallCount(m) + driver.Fail(m, false) + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "start pod with inline resource claim") + if driver.CallCount(m) == callCount { + framework.Fail("NodePrepareResources should have been called again") } - if testPod.Status.Phase != v1.PodPending { - return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + }) + + ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { + // Pretend that the resource is allocated and reserved for some other entity. + // Until the resourceclaim controller learns to remove reservations for + // arbitrary types we can simply fake somthing here. + claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + b.create(ctx, claim) + claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get claim") + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + claim.Status.DriverName = driver.Name + claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ + APIGroup: "example.com", + Resource: "some", + Name: "thing", + UID: "12345", + }) + _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "update claim") + + pod := b.podExternal() + + // This bypasses scheduling and therefore the pod gets + // to run on the node although it never gets added to + // the `ReservedFor` field of the claim. + pod.Spec.NodeName = nodes.NodeNames[0] + b.create(ctx, pod) + + gomega.Consistently(ctx, func(ctx context.Context) error { + testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) + } + if testPod.Status.Phase != v1.PodPending { + return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + } + return nil + }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) + }) + + ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { + parameters := b.parameters() + claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) + pod := b.podExternal() + zero := int64(0) + pod.Spec.TerminationGracePeriodSeconds = &zero + + b.create(ctx, parameters, claim, pod) + + b.testPod(ctx, f.ClientSet, pod) + + ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) + err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "force delete test pod") } - return nil - }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) + + for host, plugin := range b.driver.Nodes { + ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) + gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) + } + }) + + ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].Resources.Claims = nil + } + b.create(ctx, parameters, pod, template) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") + for host, plugin := range b.driver.Nodes { + gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) + } + }) + }) - ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { - parameters := b.parameters() - claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) - pod := b.podExternal() - zero := int64(0) - pod.Spec.TerminationGracePeriodSeconds = &zero + ginkgo.Context("with structured parameters", func() { + driver := NewDriver(f, nodes, perNode(1, nodes)) + driver.parameterMode = parameterModeStructured - b.create(ctx, parameters, claim, pod) + f.It("must manage NodeResourceSlice", f.WithSlow(), func(ctx context.Context) { + nodeName := nodes.NodeNames[0] + driverName := driver.Name + m := MethodInstance{nodeName, NodeListAndWatchResourcesMethod} + ginkgo.By("wait for NodeListAndWatchResources call") + gomega.Eventually(ctx, func() int64 { + return driver.CallCount(m) + }).WithTimeout(podStartTimeout).Should(gomega.BeNumerically(">", int64(0)), "NodeListAndWatchResources call count") - b.testPod(ctx, f.ClientSet, pod) + ginkgo.By("check if NodeResourceSlice object exists on the API server") + resourceClient := f.ClientSet.ResourceV1alpha2().NodeResourceSlices() + matchSlices := gomega.And( + gomega.HaveLen(1), + gomega.ContainElement(gstruct.MatchAllFields(gstruct.Fields{ + "TypeMeta": gstruct.Ignore(), + "ObjectMeta": gstruct.Ignore(), // TODO: validate ownerref + "NodeName": gomega.Equal(nodes.NodeNames[0]), + "DriverName": gomega.Equal(driver.Name), + "NodeResourceModel": gomega.Equal(resourcev1alpha2.NodeResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{ + Instances: []resourcev1alpha2.NamedResourcesInstance{{Name: "instance-0"}}, + }}), + })), + ) + getSlices := func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := resourceClient.List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("nodeName=%s,driverName=%s", nodeName, driverName)}) + if err != nil { + return nil, err + } + return slices.Items, nil + } + gomega.Eventually(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) + gomega.Consistently(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) - ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) - err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) - if !apierrors.IsNotFound(err) { - framework.ExpectNoError(err, "force delete test pod") - } + // Removal of node resource slice is tested by the general driver removal code. + }) - for host, plugin := range b.driver.Nodes { - ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) - gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) - } - }) - - ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - for i := range pod.Spec.Containers { - pod.Spec.Containers[i].Resources.Claims = nil - } - b.create(ctx, parameters, pod, template) - framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") - for host, plugin := range b.driver.Nodes { - gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) - } + // TODO: more test scenarios: + // - driver returns "unimplemented" as method response + // - driver returns "Unimplemented" as part of stream + // - driver returns EOF + // - driver changes resources }) }) @@ -241,11 +293,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.It("supports claim and class parameters", func(ctx context.Context) { objects := genParameters() - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) objects = append(objects, pod, template) @@ -263,11 +310,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod, template) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -298,11 +340,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -340,11 +377,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -1278,31 +1310,6 @@ func (b *builder) rawParameterData(kv ...string) []byte { return raw } -func (b *builder) nodeResourceSlice(nodeName string, capacity int) *resourcev1alpha2.NodeResourceSlice { - slice := &resourcev1alpha2.NodeResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - Name: b.driver.Name + "-" + nodeName, - }, - - NodeName: nodeName, - DriverName: b.driver.Name, - - NodeResourceModel: resourcev1alpha2.NodeResourceModel{ - NamedResources: &resourcev1alpha2.NamedResourcesResources{}, - }, - } - - for i := 0; i < capacity; i++ { - slice.NodeResourceModel.NamedResources.Instances = append(slice.NodeResourceModel.NamedResources.Instances, - resourcev1alpha2.NamedResourcesInstance{ - Name: fmt.Sprintf("instance-%d", i), - }, - ) - } - - return slice -} - // makePod returns a simple pod with no resource claims. // The pod prints its env and waits. func (b *builder) pod() *v1.Pod { diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 9a564f10e78..8a95669d13e 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -26,7 +26,10 @@ import ( "sync" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + resourceapi "k8s.io/api/resource/v1alpha2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" @@ -35,6 +38,7 @@ import ( ) type ExamplePlugin struct { + stopCh <-chan struct{} logger klog.Logger d kubeletplugin.DRAPlugin fileOps FileOperations @@ -80,7 +84,8 @@ func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string { return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID)) } -// FileOperations defines optional callbacks for handling CDI files. +// FileOperations defines optional callbacks for handling CDI files +// and some other configuration. type FileOperations struct { // Create must overwrite the file. Create func(name string, content []byte) error @@ -88,10 +93,16 @@ type FileOperations struct { // Remove must remove the file. It must not return an error when the // file does not exist. Remove func(name string) error + + // NumResourceInstances determines whether the plugin reports resources + // instances and how many. A negative value causes it to report "not implemented" + // in the NodeListAndWatchResources gRPC call. + NumResourceInstances int } // StartPlugin sets up the servers that are necessary for a DRA kubelet plugin. -func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) { +func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) { + logger := klog.FromContext(ctx) if fileOps.Create == nil { fileOps.Create = func(name string, content []byte) error { return os.WriteFile(name, content, os.FileMode(0644)) @@ -106,6 +117,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, } } ex := &ExamplePlugin{ + stopCh: ctx.Done(), logger: logger, fileOps: fileOps, cdiDir: cdiDir, @@ -118,6 +130,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, kubeletplugin.Logger(logger), kubeletplugin.DriverName(driverName), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), + kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), ) d, err := kubeletplugin.Start(ex, opts...) if err != nil { @@ -330,6 +343,39 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv return resp, nil } +func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, stream drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error { + if ex.fileOps.NumResourceInstances < 0 { + ex.logger.Info("Sending no NodeResourcesResponse") + return status.New(codes.Unimplemented, "node resource support disabled").Err() + } + + instances := make([]resourceapi.NamedResourcesInstance, ex.fileOps.NumResourceInstances) + for i := 0; i < ex.fileOps.NumResourceInstances; i++ { + instances[i].Name = fmt.Sprintf("instance-%d", i) + } + resp := &drapbv1alpha3.NodeListAndWatchResourcesResponse{ + Resources: []*resourceapi.NodeResourceModel{ + { + NamedResources: &resourceapi.NamedResourcesResources{ + Instances: instances, + }, + }, + }, + } + + ex.logger.Info("Sending NodeListAndWatchResourcesResponse", "response", resp) + if err := stream.Send(resp); err != nil { + return err + } + + // Keep the stream open until the test is done. + // TODO: test sending more updates later + <-ex.stopCh + ex.logger.Info("Done sending NodeListAndWatchResourcesResponse, closing stream") + + return nil +} + func (ex *ExamplePlugin) GetPreparedResources() []ClaimID { ex.mutex.Lock() defer ex.mutex.Unlock() @@ -360,6 +406,25 @@ func (ex *ExamplePlugin) recordGRPCCall(ctx context.Context, req interface{}, in return call.Response, call.Err } +func (ex *ExamplePlugin) recordGRPCStream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + call := GRPCCall{ + FullMethod: info.FullMethod, + } + ex.mutex.Lock() + ex.gRPCCalls = append(ex.gRPCCalls, call) + index := len(ex.gRPCCalls) - 1 + ex.mutex.Unlock() + + // We don't hold the mutex here to allow concurrent calls. + call.Err = handler(srv, stream) + + ex.mutex.Lock() + ex.gRPCCalls[index] = call + ex.mutex.Unlock() + + return call.Err +} + func (ex *ExamplePlugin) GetGRPCCalls() []GRPCCall { ex.mutex.Lock() defer ex.mutex.Unlock() diff --git a/test/e2e/dra/test-driver/app/server.go b/test/e2e/dra/test-driver/app/server.go index 8bafdb649fc..3f55c541463 100644 --- a/test/e2e/dra/test-driver/app/server.go +++ b/test/e2e/dra/test-driver/app/server.go @@ -287,7 +287,7 @@ func NewCommand() *cobra.Command { return fmt.Errorf("create socket directory: %w", err) } - plugin, err := StartPlugin(logger, *cdiDir, *driverName, "", FileOperations{}, + plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{}, kubeletplugin.PluginSocketPath(*endpoint), kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), kubeletplugin.KubeletPluginSocketPath(*draAddress), diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index 5344a05753e..ad2fab45d6d 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -68,7 +68,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, f.Context("Resource Kubelet Plugin", f.WithSerial(), func() { ginkgo.BeforeEach(func(ctx context.Context) { - kubeletPlugin = newKubeletPlugin(getNodeName(ctx, f)) + kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f)) }) ginkgo.It("must register after Kubelet restart", func(ctx context.Context) { @@ -88,7 +88,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.It("must register after plugin restart", func(ctx context.Context) { ginkgo.By("restart Kubelet Plugin") kubeletPlugin.Stop() - kubeletPlugin = newKubeletPlugin(getNodeName(ctx, f)) + kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f)) ginkgo.By("wait for Kubelet plugin re-registration") gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered) @@ -134,9 +134,10 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, }) // Run Kubelet plugin and wait until it's registered -func newKubeletPlugin(nodeName string) *testdriver.ExamplePlugin { +func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin { ginkgo.By("start Kubelet plugin") logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName) + ctx = klog.NewContext(ctx, logger) // Ensure that directories exist, creating them if necessary. We want // to know early if there is a setup problem that would prevent @@ -147,7 +148,7 @@ func newKubeletPlugin(nodeName string) *testdriver.ExamplePlugin { framework.ExpectNoError(err, "create socket directory") plugin, err := testdriver.StartPlugin( - logger, + ctx, cdiDir, driverName, "",