mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-23 18:40:53 +00:00
dra kubelet: publish NodeResourceSlices
The information is received from the DRA driver plugin through a new gRPC streaming interface. This is backwards compatible with old DRA driver kubelet plugins, their gRPC server will return "not implemented" and that can be handled by kubelet. Therefore no API break is needed. However, DRA drivers need to be updated because the Go API changed. They can return status.New(codes.Unimplemented, "no node resource support").Err() if they don't support the new ListAndWatchResources method and structured parameters. The controller in kubelet then synchronizes this information from the driver with NodeResourceSlice objects, creating, updating and deleting them as needed.
This commit is contained in:
parent
5e40afca06
commit
d59676a545
@ -815,6 +815,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
|
||||
kubeDeps.Mounter,
|
||||
kubeDeps.CAdvisorInterface,
|
||||
cm.NodeConfig{
|
||||
NodeName: nodeName,
|
||||
RuntimeCgroupsName: s.RuntimeCgroups,
|
||||
SystemCgroupsName: s.SystemCgroups,
|
||||
KubeletCgroupsName: s.KubeletCgroups,
|
||||
|
@ -135,6 +135,7 @@ type ContainerManager interface {
|
||||
}
|
||||
|
||||
type NodeConfig struct {
|
||||
NodeName types.NodeName
|
||||
RuntimeCgroupsName string
|
||||
SystemCgroupsName string
|
||||
KubeletCgroupsName string
|
||||
|
@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
||||
// initialize DRA manager
|
||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
|
||||
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
|
||||
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir)
|
||||
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ type ManagerImpl struct {
|
||||
}
|
||||
|
||||
// NewManagerImpl creates a new manager.
|
||||
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) {
|
||||
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
|
||||
klog.V(2).InfoS("Creating DRA manager")
|
||||
|
||||
claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
|
||||
|
@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory)
|
||||
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker")
|
||||
if test.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
manager, err := NewManagerImpl(kubeClient, t.TempDir())
|
||||
manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker")
|
||||
assert.NoError(t, err)
|
||||
|
||||
if test.claimInfo != nil {
|
||||
@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) {
|
||||
}
|
||||
defer draServerInfo.teardownFn()
|
||||
|
||||
plg := plugin.NewRegistrationHandler()
|
||||
plg := plugin.NewRegistrationHandler(nil, "worker")
|
||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
|
||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||
}
|
||||
@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) {
|
||||
}
|
||||
defer draServerInfo.teardownFn()
|
||||
|
||||
plg := plugin.NewRegistrationHandler()
|
||||
plg := plugin.NewRegistrationHandler(nil, "worker")
|
||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
|
||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||
}
|
||||
|
@ -198,3 +198,20 @@ func (p *plugin) NodeUnprepareResources(
|
||||
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
|
||||
return response, err
|
||||
}
|
||||
|
||||
func (p *plugin) NodeListAndWatchResources(
|
||||
ctx context.Context,
|
||||
req *drapb.NodeListAndWatchResourcesRequest,
|
||||
opts ...grpc.CallOption,
|
||||
) (drapb.Node_NodeListAndWatchResourcesClient, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req)
|
||||
|
||||
conn, err := p.getOrCreateGRPCConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeClient := drapb.NewNodeClient(conn)
|
||||
return nodeClient.NodeListAndWatchResources(ctx, req, opts...)
|
||||
}
|
||||
|
@ -43,6 +43,16 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *
|
||||
return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
|
||||
if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeV1alpha2GRPCServer struct {
|
||||
drapbv1alpha2.UnimplementedNodeServer
|
||||
}
|
||||
@ -288,3 +298,82 @@ func TestNodeUnprepareResource(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestListAndWatchResources(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
description string
|
||||
serverSetup func(string) (string, tearDown, error)
|
||||
serverVersion string
|
||||
request *drapbv1alpha3.NodeListAndWatchResourcesRequest
|
||||
responses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
|
||||
expectError string
|
||||
}{
|
||||
{
|
||||
description: "server supports NodeResources API",
|
||||
serverSetup: setupFakeGRPCServer,
|
||||
serverVersion: v1alpha3Version,
|
||||
request: &drapbv1alpha3.NodeListAndWatchResourcesRequest{},
|
||||
responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
expectError: "EOF",
|
||||
},
|
||||
{
|
||||
description: "server doesn't support NodeResources API",
|
||||
serverSetup: setupFakeGRPCServer,
|
||||
serverVersion: v1alpha2Version,
|
||||
request: new(drapbv1alpha3.NodeListAndWatchResourcesRequest),
|
||||
expectError: "Unimplemented",
|
||||
},
|
||||
} {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer teardown()
|
||||
|
||||
p := &plugin{
|
||||
endpoint: addr,
|
||||
version: v1alpha3Version,
|
||||
}
|
||||
|
||||
conn, err := p.getOrCreateGRPCConn()
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
draPlugins.add("dummy-plugin", p)
|
||||
defer draPlugins.delete("dummy-plugin")
|
||||
|
||||
client, err := NewDRAPluginClient("dummy-plugin")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stream, err := client.NodeListAndWatchResources(context.Background(), test.request)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
|
||||
var actualErr error
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
actualErr = err
|
||||
break
|
||||
}
|
||||
actualResponses = append(actualResponses, resp)
|
||||
}
|
||||
assert.Equal(t, test.responses, actualResponses)
|
||||
assert.Contains(t, actualErr.Error(), test.expectError)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
479
pkg/kubelet/cm/dra/plugin/noderesources.go
Normal file
479
pkg/kubelet/cm/dra/plugin/noderesources.go
Normal file
@ -0,0 +1,479 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
resourceapi "k8s.io/api/resource/v1alpha2"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
resourceinformers "k8s.io/client-go/informers/resource/v1alpha2"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
|
||||
)
|
||||
|
||||
const (
|
||||
// resyncPeriod for informer
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
|
||||
resyncPeriod = time.Duration(10 * time.Minute)
|
||||
)
|
||||
|
||||
// nodeResourcesController collects resource information from all registered
|
||||
// plugins and synchronizes that information with NodeResourceSlice objects.
|
||||
type nodeResourcesController struct {
|
||||
ctx context.Context
|
||||
kubeClient kubernetes.Interface
|
||||
nodeName string
|
||||
wg sync.WaitGroup
|
||||
queue workqueue.RateLimitingInterface
|
||||
sliceStore cache.Store
|
||||
|
||||
mutex sync.RWMutex
|
||||
activePlugins map[string]*activePlugin
|
||||
}
|
||||
|
||||
// activePlugin holds the resource information about one plugin
|
||||
// and the gRPC stream that is used to retrieve that. The context
|
||||
// used by that stream can be canceled separately to stop
|
||||
// the monitoring.
|
||||
type activePlugin struct {
|
||||
// cancel is the function which cancels the monitorPlugin goroutine
|
||||
// for this plugin.
|
||||
cancel func(reason error)
|
||||
|
||||
// resources is protected by the nodeResourcesController read/write lock.
|
||||
// When receiving updates from the driver, the entire slice gets replaced,
|
||||
// so it is okay to not do a deep copy of it. Only retrieving the slice
|
||||
// must be protected by a read lock.
|
||||
resources []*resourceapi.NodeResourceModel
|
||||
}
|
||||
|
||||
// startNodeResourcesController constructs a new controller and starts it.
|
||||
//
|
||||
// If a kubeClient is provided, then it synchronizes NodeResourceSlices
|
||||
// with the resource information provided by plugins. Without it,
|
||||
// the controller is inactive. This can happen when kubelet is run stand-alone
|
||||
// without an apiserver. In that case we can't and don't need to publish
|
||||
// NodeResourceSlices.
|
||||
func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController {
|
||||
if kubeClient == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger = klog.LoggerWithName(logger, "node resources controller")
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
c := &nodeResourcesController{
|
||||
ctx: ctx,
|
||||
kubeClient: kubeClient,
|
||||
nodeName: nodeName,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"),
|
||||
activePlugins: make(map[string]*activePlugin),
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.run(ctx)
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// waitForStop blocks until all background activity spawned by
|
||||
// the controller has stopped. The context passed to start must
|
||||
// be canceled for that to happen.
|
||||
//
|
||||
// Not needed at the moment, but if it was, this is what it would
|
||||
// look like...
|
||||
// func (c *nodeResourcesController) waitForStop() {
|
||||
// if c == nil {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// c.wg.Wait()
|
||||
// }
|
||||
|
||||
// addPlugin is called whenever a plugin has been (re-)registered.
|
||||
func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName)
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if active := c.activePlugins[driverName]; active != nil {
|
||||
active.cancel(errors.New("plugin has re-registered"))
|
||||
}
|
||||
active := &activePlugin{}
|
||||
cancelCtx, cancel := context.WithCancelCause(c.ctx)
|
||||
active.cancel = cancel
|
||||
c.activePlugins[driverName] = active
|
||||
c.queue.Add(driverName)
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.monitorPlugin(cancelCtx, active, driverName, pluginInstance)
|
||||
}()
|
||||
}
|
||||
|
||||
// removePlugin is called whenever a plugin has been unregistered.
|
||||
func (c *nodeResourcesController) removePlugin(driverName string) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName)
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
if active, ok := c.activePlugins[driverName]; ok {
|
||||
active.cancel(errors.New("plugin has unregistered"))
|
||||
delete(c.activePlugins, driverName)
|
||||
c.queue.Add(driverName)
|
||||
}
|
||||
}
|
||||
|
||||
// monitorPlugin calls the plugin to retrieve resource information and caches
|
||||
// all responses that it gets for processing in the sync method. It keeps
|
||||
// retrying until an error or EOF response indicates that no further data is
|
||||
// going to be sent, then watch resources of the plugin stops until it
|
||||
// re-registers.
|
||||
func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger = klog.LoggerWithValues(logger, "driverName", driverName)
|
||||
logger.Info("Starting to monitor node resources of the plugin")
|
||||
defer func() {
|
||||
r := recover()
|
||||
logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r)
|
||||
}()
|
||||
|
||||
// Keep trying until canceled.
|
||||
for ctx.Err() == nil {
|
||||
logger.V(5).Info("Calling NodeListAndWatchResources")
|
||||
stream, err := pluginInstance.NodeListAndWatchResources(ctx, new(drapb.NodeListAndWatchResourcesRequest))
|
||||
if err != nil {
|
||||
switch {
|
||||
case status.Convert(err).Code() == codes.Unimplemented:
|
||||
// The plugin simply doesn't provide node resources.
|
||||
active.cancel(errors.New("plugin does not support node resource reporting"))
|
||||
default:
|
||||
// This is a problem, report it and retry.
|
||||
logger.Error(err, "Creating gRPC stream for node resources failed")
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff?
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
for {
|
||||
response, err := stream.Recv()
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, io.EOF):
|
||||
// This is okay. Some plugins might never change their
|
||||
// resources after reporting them once.
|
||||
active.cancel(errors.New("plugin has closed the stream"))
|
||||
case status.Convert(err).Code() == codes.Unimplemented:
|
||||
// The plugin has the method, does not really implement it.
|
||||
active.cancel(errors.New("plugin does not support node resource reporting"))
|
||||
case ctx.Err() == nil:
|
||||
// This is a problem, report it and retry.
|
||||
logger.Error(err, "Reading node resources from gRPC stream failed")
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123689): expontential backoff?
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
loggerV.Info("Driver resources updated", "resources", response.Resources)
|
||||
} else {
|
||||
logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources))
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
active.resources = response.Resources
|
||||
c.mutex.Unlock()
|
||||
c.queue.Add(driverName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run is running in the background. It handles blocking initialization (like
|
||||
// syncing the informer) and then syncs the actual with the desired state.
|
||||
func (c *nodeResourcesController) run(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// When kubelet starts, we have two choices:
|
||||
// - Sync immediately, which in practice will delete all NodeResourceSlices
|
||||
// because no plugin has registered yet. We could do a DeleteCollection
|
||||
// to speed this up.
|
||||
// - Wait a bit, then sync. If all plugins have re-registered in the meantime,
|
||||
// we might not need to change any NodeResourceSlice.
|
||||
//
|
||||
// For now syncing starts immediately, with no DeleteCollection. This
|
||||
// can be reconsidered later.
|
||||
|
||||
// While kubelet starts up, there are errors:
|
||||
// E0226 13:41:19.880621 126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.NodeResourceSlice: failed to list *v1alpha2.NodeResourceSlice: noderesourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "noderesourceslices" in API group "resource.k8s.io" at the cluster scope
|
||||
//
|
||||
// The credentials used by kubeClient seem to get swapped out later,
|
||||
// because eventually these list calls succeed.
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123691): can we avoid these error log entries? Perhaps wait here?
|
||||
|
||||
// We could use an indexer on driver name, but that seems overkill.
|
||||
informer := resourceinformers.NewFilteredNodeResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = "nodeName=" + c.nodeName
|
||||
})
|
||||
c.sliceStore = informer.GetStore()
|
||||
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
slice, ok := obj.(*resourceapi.NodeResourceSlice)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
logger.V(5).Info("NodeResourceSlice add", "slice", klog.KObj(slice))
|
||||
c.queue.Add(slice.DriverName)
|
||||
},
|
||||
UpdateFunc: func(old, new any) {
|
||||
oldSlice, ok := old.(*resourceapi.NodeResourceSlice)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
newSlice, ok := new.(*resourceapi.NodeResourceSlice)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
loggerV.Info("NodeResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
|
||||
} else {
|
||||
logger.V(5).Info("NodeResourceSlice update", "slice", klog.KObj(newSlice))
|
||||
}
|
||||
c.queue.Add(newSlice.DriverName)
|
||||
},
|
||||
DeleteFunc: func(obj any) {
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
slice, ok := obj.(*resourceapi.NodeResourceSlice)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
logger.V(5).Info("NodeResourceSlice delete", "slice", klog.KObj(slice))
|
||||
c.queue.Add(slice.DriverName)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(err, "Registering event handler on the NodeResourceSlice informer failed, disabling resource monitoring")
|
||||
return
|
||||
}
|
||||
|
||||
// Start informer and wait for our cache to be populated.
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
informer.Run(ctx.Done())
|
||||
}()
|
||||
for !handler.HasSynced() {
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
logger.Info("NodeResourceSlice informer has synced")
|
||||
|
||||
for c.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool {
|
||||
key, shutdown := c.queue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
driverName := key.(string)
|
||||
|
||||
// Panics are caught and treated like errors.
|
||||
var err error
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("internal error: %v", r)
|
||||
}
|
||||
}()
|
||||
err = c.sync(ctx, driverName)
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
// TODO (https://github.com/kubernetes/enhancements/issues/3077): contextual logging in utilruntime
|
||||
utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
// Return without removing the work item from the queue.
|
||||
// It will be retried.
|
||||
return true
|
||||
}
|
||||
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// Gather information about the actual and desired state.
|
||||
slices := c.sliceStore.List()
|
||||
var driverResources []*resourceapi.NodeResourceModel
|
||||
c.mutex.RLock()
|
||||
if active, ok := c.activePlugins[driverName]; ok {
|
||||
// No need for a deep copy, the entire slice gets replaced on writes.
|
||||
driverResources = active.resources
|
||||
}
|
||||
c.mutex.RUnlock()
|
||||
|
||||
// Resources that are not yet stored in any slice need to be published.
|
||||
// Here we track the indices of any resources that are already stored.
|
||||
storedResourceIndices := sets.New[int]()
|
||||
|
||||
// Slices that don't match any driver resource can either be updated (if there
|
||||
// are new driver resources that need to be stored) or they need to be deleted.
|
||||
obsoleteSlices := make([]*resourceapi.NodeResourceSlice, 0, len(slices))
|
||||
|
||||
// Match slices with resource information.
|
||||
for _, obj := range slices {
|
||||
slice := obj.(*resourceapi.NodeResourceSlice)
|
||||
if slice.DriverName != driverName {
|
||||
continue
|
||||
}
|
||||
|
||||
index := indexOfModel(driverResources, &slice.NodeResourceModel)
|
||||
if index >= 0 {
|
||||
storedResourceIndices.Insert(index)
|
||||
continue
|
||||
}
|
||||
|
||||
obsoleteSlices = append(obsoleteSlices, slice)
|
||||
}
|
||||
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
// Dump entire resource information.
|
||||
loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources)
|
||||
} else {
|
||||
logger.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources))
|
||||
}
|
||||
|
||||
// Update stale slices before removing what's left.
|
||||
//
|
||||
// We don't really know which of these slices might have
|
||||
// been used for "the" driver resource because they don't
|
||||
// have a unique ID. In practice, a driver is most likely
|
||||
// to just give us one NodeResourceModel, in which case
|
||||
// this isn't a problem at all. If we have more than one,
|
||||
// then at least conceptually it currently doesn't matter
|
||||
// where we publish it.
|
||||
//
|
||||
// The long-term goal is to move the handling of
|
||||
// NodeResourceSlice objects into the driver, with kubelet
|
||||
// just acting as a REST proxy. The advantage of that will
|
||||
// be that kubelet won't need to support the same
|
||||
// resource API version as the driver and the control plane.
|
||||
// With that approach, the driver will be able to match
|
||||
// up objects more intelligently.
|
||||
numObsoleteSlices := len(obsoleteSlices)
|
||||
for index, resource := range driverResources {
|
||||
if storedResourceIndices.Has(index) {
|
||||
// No need to do anything, it is already stored exactly
|
||||
// like this in an existing slice.
|
||||
continue
|
||||
}
|
||||
|
||||
if numObsoleteSlices > 0 {
|
||||
// Update one existing slice.
|
||||
slice := obsoleteSlices[numObsoleteSlices-1]
|
||||
numObsoleteSlices--
|
||||
slice = slice.DeepCopy()
|
||||
slice.NodeResourceModel = *resource
|
||||
logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice))
|
||||
if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("update node resource slice: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Create a new slice.
|
||||
slice := &resourceapi.NodeResourceSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: c.nodeName + "-" + driverName + "-",
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123692): node object as owner
|
||||
},
|
||||
NodeName: c.nodeName,
|
||||
DriverName: driverName,
|
||||
NodeResourceModel: *resource,
|
||||
}
|
||||
logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice))
|
||||
if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("create node resource slice: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// All remaining slices are truly orphaned.
|
||||
for i := 0; i < numObsoleteSlices; i++ {
|
||||
slice := obsoleteSlices[i]
|
||||
logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice))
|
||||
if err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
|
||||
return fmt.Errorf("delete node resource slice: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func indexOfModel(models []*resourceapi.NodeResourceModel, model *resourceapi.NodeResourceModel) int {
|
||||
for index, m := range models {
|
||||
if apiequality.Semantic.DeepEqual(m, model) {
|
||||
return index
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
@ -29,6 +29,7 @@ import (
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
utilversion "k8s.io/apimachinery/pkg/util/version"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@ -94,11 +95,23 @@ func (p *plugin) setVersion(version string) {
|
||||
}
|
||||
|
||||
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
|
||||
type RegistrationHandler struct{}
|
||||
type RegistrationHandler struct {
|
||||
controller *nodeResourcesController
|
||||
}
|
||||
|
||||
// NewPluginHandler returns new registration handler.
|
||||
func NewRegistrationHandler() *RegistrationHandler {
|
||||
return &RegistrationHandler{}
|
||||
//
|
||||
// Must only be called once per process because it manages global state.
|
||||
// If a kubeClient is provided, then it synchronizes NodeResourceSlices
|
||||
// with the resource information provided by plugins.
|
||||
func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler {
|
||||
handler := &RegistrationHandler{}
|
||||
|
||||
// If kubelet ever gets an API for stopping registration handlers, then
|
||||
// that would need to be hooked up with stopping the controller.
|
||||
handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName)
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
// RegisterPlugin is called when a plugin can be registered.
|
||||
@ -110,15 +123,18 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
||||
return err
|
||||
}
|
||||
|
||||
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
|
||||
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
|
||||
// By default we assume the supported plugin version is v1alpha3
|
||||
draPlugins.add(pluginName, &plugin{
|
||||
pluginInstance := &plugin{
|
||||
conn: nil,
|
||||
endpoint: endpoint,
|
||||
version: v1alpha3Version,
|
||||
highestSupportedVersion: highestSupportedVersion,
|
||||
})
|
||||
}
|
||||
|
||||
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
|
||||
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
|
||||
// By default we assume the supported plugin version is v1alpha3
|
||||
draPlugins.add(pluginName, pluginInstance)
|
||||
h.controller.addPlugin(pluginName, pluginInstance)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -178,6 +194,7 @@ func deregisterPlugin(pluginName string) {
|
||||
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
|
||||
klog.InfoS("DeRegister DRA plugin", "name", pluginName)
|
||||
deregisterPlugin(pluginName)
|
||||
h.controller.removePlugin(pluginName)
|
||||
}
|
||||
|
||||
// ValidatePlugin is called by kubelet's plugin watcher upon detection
|
||||
|
@ -23,6 +23,10 @@ import (
|
||||
)
|
||||
|
||||
func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
|
||||
newRegistrationHandler := func() *RegistrationHandler {
|
||||
return NewRegistrationHandler(nil, "worker")
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
description string
|
||||
handler func() *RegistrationHandler
|
||||
@ -33,19 +37,19 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
description: "no versions provided",
|
||||
handler: NewRegistrationHandler,
|
||||
handler: newRegistrationHandler,
|
||||
shouldError: true,
|
||||
},
|
||||
{
|
||||
description: "unsupported version",
|
||||
handler: NewRegistrationHandler,
|
||||
handler: newRegistrationHandler,
|
||||
versions: []string{"v2.0.0"},
|
||||
shouldError: true,
|
||||
},
|
||||
{
|
||||
description: "plugin already registered with a higher supported version",
|
||||
handler: func() *RegistrationHandler {
|
||||
handler := NewRegistrationHandler()
|
||||
handler := newRegistrationHandler()
|
||||
if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -57,7 +61,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
|
||||
},
|
||||
{
|
||||
description: "should validate the plugin",
|
||||
handler: NewRegistrationHandler,
|
||||
handler: newRegistrationHandler,
|
||||
pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide",
|
||||
versions: []string{"v1.3.0"},
|
||||
},
|
||||
@ -74,7 +78,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
handler := NewRegistrationHandler()
|
||||
handler := newRegistrationHandler()
|
||||
handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide")
|
||||
handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide")
|
||||
})
|
||||
|
@ -1561,7 +1561,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
|
||||
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
|
||||
// Adding Registration Callback function for DRA Plugin
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
|
||||
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler()))
|
||||
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname)))
|
||||
}
|
||||
// Adding Registration Callback function for Device Manager
|
||||
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
|
||||
|
@ -140,7 +140,16 @@ func KubeletPluginSocketPath(path string) Option {
|
||||
// may be used more than once and each interceptor will get called.
|
||||
func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
|
||||
return func(o *options) error {
|
||||
o.interceptors = append(o.interceptors, interceptor)
|
||||
o.unaryInterceptors = append(o.unaryInterceptors, interceptor)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// GRPCStreamInterceptor is called for each gRPC streaming method call. This option
|
||||
// may be used more than once and each interceptor will get called.
|
||||
func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
|
||||
return func(o *options) error {
|
||||
o.streamInterceptors = append(o.streamInterceptors, interceptor)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -170,7 +179,8 @@ type options struct {
|
||||
draEndpoint endpoint
|
||||
draAddress string
|
||||
pluginRegistrationEndpoint endpoint
|
||||
interceptors []grpc.UnaryServerInterceptor
|
||||
unaryInterceptors []grpc.UnaryServerInterceptor
|
||||
streamInterceptors []grpc.StreamServerInterceptor
|
||||
|
||||
nodeV1alpha2, nodeV1alpha3 bool
|
||||
}
|
||||
@ -215,7 +225,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
|
||||
|
||||
// Run the node plugin gRPC server first to ensure that it is ready.
|
||||
implemented := false
|
||||
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
|
||||
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
|
||||
if nodeServer, ok := nodeServer.(drapbv1alpha3.NodeServer); ok && o.nodeV1alpha3 {
|
||||
o.logger.V(5).Info("registering drapbv1alpha3.NodeServer")
|
||||
drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer)
|
||||
@ -246,7 +256,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
|
||||
}
|
||||
|
||||
// Now make it available to kubelet.
|
||||
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
|
||||
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("start registrar: %v", err)
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ type nodeRegistrar struct {
|
||||
}
|
||||
|
||||
// startRegistrar returns a running instance.
|
||||
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
|
||||
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
|
||||
n := &nodeRegistrar{
|
||||
logger: logger,
|
||||
registrationServer: registrationServer{
|
||||
@ -40,7 +40,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.U
|
||||
supportedVersions: []string{"1.0.0"}, // TODO: is this correct?
|
||||
},
|
||||
}
|
||||
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
|
||||
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
|
||||
registerapi.RegisterRegistrationServer(grpcServer, n)
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -54,7 +54,7 @@ type endpoint struct {
|
||||
|
||||
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
|
||||
// which handles requests for arbitrary services.
|
||||
func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
|
||||
func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
|
||||
s := &grpcServer{
|
||||
logger: logger,
|
||||
endpoint: endpoint,
|
||||
@ -79,12 +79,15 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.
|
||||
// Run a gRPC server. It will close the listening socket when
|
||||
// shutting down, so we don't need to do that.
|
||||
var opts []grpc.ServerOption
|
||||
var finalInterceptors []grpc.UnaryServerInterceptor
|
||||
var finalUnaryInterceptors []grpc.UnaryServerInterceptor
|
||||
var finalStreamInterceptors []grpc.StreamServerInterceptor
|
||||
if grpcVerbosity >= 0 {
|
||||
finalInterceptors = append(finalInterceptors, s.interceptor)
|
||||
finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor)
|
||||
}
|
||||
finalInterceptors = append(finalInterceptors, interceptors...)
|
||||
opts = append(opts, grpc.ChainUnaryInterceptor(finalInterceptors...))
|
||||
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
|
||||
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
|
||||
opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...))
|
||||
opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...))
|
||||
s.server = grpc.NewServer(opts...)
|
||||
for _, service := range services {
|
||||
service(s.server)
|
||||
|
@ -342,6 +342,88 @@ func (m *NodeUnprepareResourceResponse) GetError() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
type NodeListAndWatchResourcesRequest struct {
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesRequest) Reset() { *m = NodeListAndWatchResourcesRequest{} }
|
||||
func (*NodeListAndWatchResourcesRequest) ProtoMessage() {}
|
||||
func (*NodeListAndWatchResourcesRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_NodeListAndWatchResourcesRequest.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_NodeListAndWatchResourcesRequest.Merge(m, src)
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_NodeListAndWatchResourcesRequest.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_NodeListAndWatchResourcesRequest proto.InternalMessageInfo
|
||||
|
||||
type NodeListAndWatchResourcesResponse struct {
|
||||
Resources []*v1alpha2.NodeResourceModel `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) Reset() { *m = NodeListAndWatchResourcesResponse{} }
|
||||
func (*NodeListAndWatchResourcesResponse) ProtoMessage() {}
|
||||
func (*NodeListAndWatchResourcesResponse) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_00212fb1f9d3bf1c, []int{7}
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_NodeListAndWatchResourcesResponse.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_NodeListAndWatchResourcesResponse.Merge(m, src)
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_NodeListAndWatchResourcesResponse.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_NodeListAndWatchResourcesResponse proto.InternalMessageInfo
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) GetResources() []*v1alpha2.NodeResourceModel {
|
||||
if m != nil {
|
||||
return m.Resources
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Claim struct {
|
||||
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
|
||||
// This field is REQUIRED.
|
||||
@ -368,7 +450,7 @@ type Claim struct {
|
||||
func (m *Claim) Reset() { *m = Claim{} }
|
||||
func (*Claim) ProtoMessage() {}
|
||||
func (*Claim) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
|
||||
return fileDescriptor_00212fb1f9d3bf1c, []int{8}
|
||||
}
|
||||
func (m *Claim) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
@ -441,49 +523,55 @@ func init() {
|
||||
proto.RegisterType((*NodeUnprepareResourcesResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse")
|
||||
proto.RegisterMapType((map[string]*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse.ClaimsEntry")
|
||||
proto.RegisterType((*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourceResponse")
|
||||
proto.RegisterType((*NodeListAndWatchResourcesRequest)(nil), "v1alpha3.NodeListAndWatchResourcesRequest")
|
||||
proto.RegisterType((*NodeListAndWatchResourcesResponse)(nil), "v1alpha3.NodeListAndWatchResourcesResponse")
|
||||
proto.RegisterType((*Claim)(nil), "v1alpha3.Claim")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
|
||||
|
||||
var fileDescriptor_00212fb1f9d3bf1c = []byte{
|
||||
// 562 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
|
||||
0x10, 0xce, 0x36, 0x49, 0x45, 0x26, 0x52, 0x8b, 0x56, 0x15, 0xb2, 0x42, 0x31, 0x91, 0x45, 0x49,
|
||||
0x0e, 0x60, 0x0b, 0x07, 0x50, 0x05, 0xe2, 0x92, 0x16, 0x54, 0x10, 0x42, 0xc8, 0x88, 0x0b, 0x97,
|
||||
0xb0, 0xb1, 0x07, 0xc7, 0x4a, 0x62, 0x9b, 0x5d, 0x3b, 0x52, 0x6f, 0x3c, 0x02, 0x8f, 0xd5, 0x03,
|
||||
0x07, 0xc4, 0x89, 0x53, 0x45, 0xcd, 0x8d, 0xa7, 0x40, 0x5e, 0xdb, 0x69, 0x13, 0x39, 0x4d, 0xa5,
|
||||
0xde, 0x66, 0xe7, 0xef, 0x9b, 0xfd, 0xe6, 0x07, 0x1a, 0x2c, 0xf4, 0xf4, 0x90, 0x07, 0x51, 0x40,
|
||||
0x6f, 0xcc, 0x1e, 0xb1, 0x49, 0x38, 0x62, 0xbd, 0xd6, 0x43, 0xd7, 0x8b, 0x46, 0xf1, 0x50, 0xb7,
|
||||
0x83, 0xa9, 0xe1, 0x06, 0x6e, 0x60, 0x48, 0x87, 0x61, 0xfc, 0x45, 0xbe, 0xe4, 0x43, 0x4a, 0x59,
|
||||
0x60, 0xeb, 0xc1, 0x78, 0x5f, 0xe8, 0x5e, 0x60, 0xb0, 0xd0, 0x33, 0x38, 0x8a, 0x20, 0xe6, 0x36,
|
||||
0x1a, 0x79, 0x32, 0xd3, 0x70, 0xd1, 0x47, 0xce, 0x22, 0x74, 0x32, 0x6f, 0xed, 0x15, 0xdc, 0x7e,
|
||||
0x17, 0x38, 0xf8, 0x9e, 0x63, 0xc8, 0x38, 0x5a, 0xb9, 0xbf, 0xb0, 0xf0, 0x6b, 0x8c, 0x22, 0xa2,
|
||||
0x1d, 0xd8, 0xb4, 0x27, 0xcc, 0x9b, 0x0a, 0x85, 0xb4, 0xab, 0xdd, 0xa6, 0xb9, 0xad, 0x17, 0x65,
|
||||
0xe9, 0x07, 0xa9, 0xde, 0xca, 0xcd, 0xda, 0x0f, 0x02, 0xbb, 0xe5, 0x89, 0x44, 0x18, 0xf8, 0x02,
|
||||
0xe9, 0x9b, 0xa5, 0x4c, 0xe6, 0x79, 0xa6, 0xcb, 0xe2, 0x32, 0x18, 0xf1, 0xd2, 0x8f, 0xf8, 0x71,
|
||||
0x01, 0xd6, 0xfa, 0x0c, 0xcd, 0x0b, 0x6a, 0x7a, 0x13, 0xaa, 0x63, 0x3c, 0x56, 0x48, 0x9b, 0x74,
|
||||
0x1b, 0x56, 0x2a, 0xd2, 0xe7, 0x50, 0x9f, 0xb1, 0x49, 0x8c, 0xca, 0x46, 0x9b, 0x74, 0x9b, 0xe6,
|
||||
0xde, 0xa5, 0x58, 0x05, 0x94, 0x95, 0xc5, 0x3c, 0xdb, 0xd8, 0x27, 0x9a, 0x53, 0x4a, 0xcb, 0xfc,
|
||||
0x33, 0x06, 0x34, 0x6d, 0xc7, 0x1b, 0x38, 0x38, 0xf3, 0x6c, 0xcc, 0x7e, 0xd4, 0xe8, 0x6f, 0x25,
|
||||
0xa7, 0x77, 0xe1, 0xe0, 0xf0, 0xf5, 0x61, 0xa6, 0xb5, 0xc0, 0x76, 0xbc, 0x5c, 0xa6, 0x3b, 0x50,
|
||||
0x47, 0xce, 0x03, 0x2e, 0x0b, 0x6a, 0x58, 0xd9, 0x43, 0x3b, 0x82, 0x3b, 0x29, 0xca, 0x47, 0x3f,
|
||||
0xbc, 0x2e, 0xfd, 0xbf, 0x08, 0xa8, 0xab, 0x52, 0xe5, 0x35, 0xbf, 0x5d, 0xca, 0xf5, 0x78, 0x91,
|
||||
0x94, 0xd5, 0x91, 0xa5, 0x2d, 0x18, 0xae, 0x6b, 0xc1, 0x8b, 0xc5, 0x16, 0x74, 0xd6, 0xa0, 0x95,
|
||||
0x35, 0xe1, 0xc9, 0x0a, 0x7a, 0xe6, 0x5f, 0x9a, 0xb3, 0x4a, 0x2e, 0xb2, 0xfa, 0x8f, 0x40, 0x5d,
|
||||
0xd6, 0x46, 0x77, 0xa1, 0xe1, 0xb3, 0x29, 0x8a, 0x90, 0xd9, 0x98, 0xfb, 0x9c, 0x2b, 0xd2, 0x9a,
|
||||
0x63, 0xcf, 0xc9, 0x3b, 0x92, 0x8a, 0x94, 0x42, 0x2d, 0x35, 0x2b, 0x55, 0xa9, 0x92, 0x32, 0xed,
|
||||
0xc0, 0x76, 0xb1, 0x45, 0x83, 0x11, 0xf3, 0x9d, 0x09, 0x2a, 0x35, 0x69, 0xde, 0x2a, 0xd4, 0x47,
|
||||
0x52, 0x4b, 0x23, 0x68, 0x89, 0x88, 0xc7, 0x76, 0x14, 0x73, 0x74, 0x06, 0xcb, 0x31, 0x75, 0xc9,
|
||||
0xf9, 0x53, 0x3d, 0x5b, 0x4e, 0x3d, 0xdd, 0xf3, 0xc2, 0xa5, 0x60, 0xc6, 0xd4, 0x3f, 0xcc, 0xe3,
|
||||
0xad, 0x85, 0xdc, 0x96, 0x22, 0x56, 0x58, 0xcc, 0x53, 0x02, 0xb5, 0x94, 0x24, 0xea, 0xc2, 0x4e,
|
||||
0xd9, 0x1e, 0xd1, 0xbd, 0x75, 0x7b, 0x26, 0x27, 0xad, 0x75, 0xff, 0x6a, 0xeb, 0xa8, 0x55, 0xe8,
|
||||
0x14, 0x6e, 0x95, 0xcf, 0x0b, 0xed, 0xac, 0x9f, 0xa8, 0x0c, 0xac, 0x7b, 0xd5, 0xd1, 0xd3, 0x2a,
|
||||
0xfd, 0xfe, 0xc9, 0x99, 0x4a, 0x7e, 0x9f, 0xa9, 0x95, 0x6f, 0x89, 0x4a, 0x4e, 0x12, 0x95, 0xfc,
|
||||
0x4c, 0x54, 0xf2, 0x27, 0x51, 0xc9, 0xf7, 0xbf, 0x6a, 0xe5, 0xd3, 0xbd, 0xfc, 0xd8, 0x8d, 0xe3,
|
||||
0x21, 0x4e, 0x30, 0x32, 0xc2, 0xb1, 0x9b, 0x1e, 0x3e, 0x61, 0x38, 0x9c, 0x15, 0x47, 0xaf, 0x37,
|
||||
0xdc, 0x94, 0xb7, 0xae, 0xf7, 0x3f, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x14, 0x30, 0xd4, 0x5f, 0x05,
|
||||
0x00, 0x00,
|
||||
// 631 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xd3, 0x40,
|
||||
0x10, 0xce, 0xb6, 0x4d, 0x45, 0x26, 0x52, 0x8b, 0x56, 0x15, 0x0a, 0xa6, 0x98, 0x60, 0x51, 0x12,
|
||||
0xf1, 0x63, 0x83, 0x0b, 0xa8, 0x02, 0x71, 0xa0, 0x2d, 0xa8, 0xa0, 0x16, 0x21, 0x23, 0x84, 0xc4,
|
||||
0xa5, 0x6c, 0xbc, 0x8b, 0x63, 0xc5, 0xb1, 0xcd, 0xae, 0x5d, 0xd1, 0x1b, 0x8f, 0xc0, 0x63, 0xf5,
|
||||
0xc0, 0x01, 0x71, 0xea, 0x09, 0xd1, 0x70, 0xe3, 0x29, 0x90, 0xd7, 0xde, 0xb4, 0x89, 0x9c, 0xa4,
|
||||
0x12, 0xb7, 0xd9, 0xf9, 0xf9, 0x66, 0xe7, 0x9b, 0x99, 0x5d, 0xa8, 0x91, 0xd8, 0x37, 0x63, 0x1e,
|
||||
0x25, 0x11, 0xbe, 0x70, 0x70, 0x9f, 0x04, 0x71, 0x97, 0xac, 0x6b, 0x77, 0x3d, 0x3f, 0xe9, 0xa6,
|
||||
0x1d, 0xd3, 0x8d, 0xfa, 0x96, 0x17, 0x79, 0x91, 0x25, 0x1d, 0x3a, 0xe9, 0x27, 0x79, 0x92, 0x07,
|
||||
0x29, 0xe5, 0x81, 0xda, 0x9d, 0xde, 0x86, 0x30, 0xfd, 0xc8, 0x22, 0xb1, 0x6f, 0x71, 0x26, 0xa2,
|
||||
0x94, 0xbb, 0xcc, 0x2a, 0xc0, 0x6c, 0xcb, 0x63, 0x21, 0xe3, 0x24, 0x61, 0x34, 0xf7, 0x36, 0x5e,
|
||||
0xc0, 0x95, 0xd7, 0x11, 0x65, 0x6f, 0x38, 0x8b, 0x09, 0x67, 0x4e, 0xe1, 0x2f, 0x1c, 0xf6, 0x39,
|
||||
0x65, 0x22, 0xc1, 0x2d, 0x58, 0x74, 0x03, 0xe2, 0xf7, 0x45, 0x03, 0x35, 0xe7, 0xdb, 0x75, 0x7b,
|
||||
0xd9, 0x54, 0xd7, 0x32, 0xb7, 0x32, 0xbd, 0x53, 0x98, 0x8d, 0xef, 0x08, 0x56, 0xcb, 0x81, 0x44,
|
||||
0x1c, 0x85, 0x82, 0xe1, 0x57, 0x63, 0x48, 0xf6, 0x29, 0xd2, 0xb4, 0xb8, 0x3c, 0x8d, 0x78, 0x1e,
|
||||
0x26, 0xfc, 0x50, 0x25, 0xd3, 0x3e, 0x42, 0xfd, 0x8c, 0x1a, 0x5f, 0x84, 0xf9, 0x1e, 0x3b, 0x6c,
|
||||
0xa0, 0x26, 0x6a, 0xd7, 0x9c, 0x4c, 0xc4, 0x4f, 0xa0, 0x7a, 0x40, 0x82, 0x94, 0x35, 0xe6, 0x9a,
|
||||
0xa8, 0x5d, 0xb7, 0xd7, 0xa6, 0xe6, 0x52, 0xa9, 0x9c, 0x3c, 0xe6, 0xf1, 0xdc, 0x06, 0x32, 0x68,
|
||||
0x29, 0x2d, 0xc3, 0x62, 0x2c, 0xa8, 0xbb, 0xd4, 0xdf, 0xa7, 0xec, 0xc0, 0x77, 0x59, 0x5e, 0x51,
|
||||
0x6d, 0x73, 0x69, 0xf0, 0xeb, 0x1a, 0x6c, 0x6d, 0xbf, 0xdc, 0xce, 0xb5, 0x0e, 0xb8, 0xd4, 0x2f,
|
||||
0x64, 0xbc, 0x02, 0x55, 0xc6, 0x79, 0xc4, 0xe5, 0x85, 0x6a, 0x4e, 0x7e, 0x30, 0x76, 0xe0, 0x6a,
|
||||
0x96, 0xe5, 0x5d, 0x18, 0xff, 0x2f, 0xfd, 0x3f, 0x11, 0xe8, 0x93, 0xa0, 0x8a, 0x3b, 0xef, 0x8e,
|
||||
0x61, 0x3d, 0x18, 0x25, 0x65, 0x72, 0x64, 0x69, 0x0b, 0x3a, 0xb3, 0x5a, 0xf0, 0x74, 0xb4, 0x05,
|
||||
0xad, 0x19, 0xd9, 0xca, 0x9a, 0xf0, 0x70, 0x02, 0x3d, 0xc3, 0x92, 0x86, 0xac, 0xa2, 0xb3, 0xac,
|
||||
0x1a, 0xd0, 0xcc, 0xc2, 0x76, 0x7d, 0x91, 0x3c, 0x0b, 0xe9, 0x7b, 0x92, 0xb8, 0xdd, 0x71, 0x62,
|
||||
0x0d, 0x0e, 0xd7, 0xa7, 0xf8, 0x14, 0xf0, 0x7b, 0x50, 0x53, 0x0b, 0xa4, 0x48, 0xb3, 0xcc, 0x7c,
|
||||
0xbb, 0xcc, 0x6c, 0x51, 0x95, 0x51, 0x95, 0x66, 0xcb, 0xd2, 0x14, 0xce, 0x5e, 0x44, 0x59, 0xe0,
|
||||
0x9c, 0x22, 0x18, 0x7f, 0x11, 0x54, 0x25, 0x67, 0x78, 0x15, 0x6a, 0x21, 0xe9, 0x33, 0x11, 0x13,
|
||||
0x97, 0x15, 0x77, 0x3f, 0x55, 0x64, 0x5c, 0xa6, 0x3e, 0x2d, 0x26, 0x25, 0x13, 0x31, 0x86, 0x85,
|
||||
0xcc, 0xdc, 0x98, 0x97, 0x2a, 0x29, 0xe3, 0x16, 0x2c, 0x2b, 0xe8, 0xfd, 0x2e, 0x09, 0x69, 0xc0,
|
||||
0x1a, 0x0b, 0xd2, 0xbc, 0xa4, 0xd4, 0x3b, 0x52, 0x8b, 0x13, 0xd0, 0x44, 0xc2, 0x53, 0x37, 0x49,
|
||||
0x39, 0xa3, 0xfb, 0xe3, 0x31, 0x55, 0x59, 0xd6, 0xa3, 0xe9, 0x65, 0xbd, 0x1d, 0xc6, 0x3b, 0x23,
|
||||
0xd8, 0x4e, 0x43, 0x4c, 0xb0, 0xd8, 0xc7, 0x73, 0xb0, 0x90, 0xb1, 0x81, 0x3d, 0x58, 0x29, 0xdb,
|
||||
0x6f, 0xbc, 0x36, 0x6b, 0xff, 0x65, 0xa3, 0xb4, 0x9b, 0xe7, 0x7b, 0x26, 0x8c, 0x0a, 0xee, 0xc3,
|
||||
0xa5, 0xf2, 0x39, 0xc6, 0xad, 0xd9, 0x93, 0x9e, 0x27, 0x6b, 0x9f, 0x77, 0x25, 0x8c, 0x0a, 0xfe,
|
||||
0x02, 0x97, 0x27, 0x4e, 0x10, 0xbe, 0x35, 0x0a, 0x34, 0x6d, 0x14, 0xb5, 0xdb, 0xe7, 0xf2, 0x55,
|
||||
0x79, 0xef, 0xa1, 0xcd, 0xcd, 0xa3, 0x13, 0x1d, 0x1d, 0x9f, 0xe8, 0x95, 0xaf, 0x03, 0x1d, 0x1d,
|
||||
0x0d, 0x74, 0xf4, 0x63, 0xa0, 0xa3, 0xdf, 0x03, 0x1d, 0x7d, 0xfb, 0xa3, 0x57, 0x3e, 0xdc, 0x28,
|
||||
0x9e, 0xff, 0x5e, 0xda, 0x61, 0x01, 0x4b, 0xac, 0xb8, 0xe7, 0x65, 0x5f, 0x81, 0xb0, 0x28, 0x27,
|
||||
0xea, 0x1b, 0x58, 0xef, 0x2c, 0xca, 0xd7, 0x7f, 0xfd, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20,
|
||||
0x6d, 0x9d, 0x90, 0x71, 0x06, 0x00, 0x00,
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
@ -506,6 +594,10 @@ type NodeClient interface {
|
||||
// NodeUnprepareResources is the opposite of NodePrepareResources.
|
||||
// The same error handling rules apply,
|
||||
NodeUnprepareResources(ctx context.Context, in *NodeUnprepareResourcesRequest, opts ...grpc.CallOption) (*NodeUnprepareResourcesResponse, error)
|
||||
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
|
||||
// At the start and whenever resource availability changes, the
|
||||
// plugin must send one such object with all information to the Kubelet.
|
||||
NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error)
|
||||
}
|
||||
|
||||
type nodeClient struct {
|
||||
@ -534,6 +626,38 @@ func (c *nodeClient) NodeUnprepareResources(ctx context.Context, in *NodeUnprepa
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *nodeClient) NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &_Node_serviceDesc.Streams[0], "/v1alpha3.Node/NodeListAndWatchResources", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &nodeNodeListAndWatchResourcesClient{stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type Node_NodeListAndWatchResourcesClient interface {
|
||||
Recv() (*NodeListAndWatchResourcesResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type nodeNodeListAndWatchResourcesClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *nodeNodeListAndWatchResourcesClient) Recv() (*NodeListAndWatchResourcesResponse, error) {
|
||||
m := new(NodeListAndWatchResourcesResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// NodeServer is the server API for Node service.
|
||||
type NodeServer interface {
|
||||
// NodePrepareResources prepares several ResourceClaims
|
||||
@ -544,6 +668,10 @@ type NodeServer interface {
|
||||
// NodeUnprepareResources is the opposite of NodePrepareResources.
|
||||
// The same error handling rules apply,
|
||||
NodeUnprepareResources(context.Context, *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error)
|
||||
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
|
||||
// At the start and whenever resource availability changes, the
|
||||
// plugin must send one such object with all information to the Kubelet.
|
||||
NodeListAndWatchResources(*NodeListAndWatchResourcesRequest, Node_NodeListAndWatchResourcesServer) error
|
||||
}
|
||||
|
||||
// UnimplementedNodeServer can be embedded to have forward compatible implementations.
|
||||
@ -556,6 +684,9 @@ func (*UnimplementedNodeServer) NodePrepareResources(ctx context.Context, req *N
|
||||
func (*UnimplementedNodeServer) NodeUnprepareResources(ctx context.Context, req *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method NodeUnprepareResources not implemented")
|
||||
}
|
||||
func (*UnimplementedNodeServer) NodeListAndWatchResources(req *NodeListAndWatchResourcesRequest, srv Node_NodeListAndWatchResourcesServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method NodeListAndWatchResources not implemented")
|
||||
}
|
||||
|
||||
func RegisterNodeServer(s *grpc.Server, srv NodeServer) {
|
||||
s.RegisterService(&_Node_serviceDesc, srv)
|
||||
@ -597,6 +728,27 @@ func _Node_NodeUnprepareResources_Handler(srv interface{}, ctx context.Context,
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Node_NodeListAndWatchResources_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(NodeListAndWatchResourcesRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(NodeServer).NodeListAndWatchResources(m, &nodeNodeListAndWatchResourcesServer{stream})
|
||||
}
|
||||
|
||||
type Node_NodeListAndWatchResourcesServer interface {
|
||||
Send(*NodeListAndWatchResourcesResponse) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type nodeNodeListAndWatchResourcesServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *nodeNodeListAndWatchResourcesServer) Send(m *NodeListAndWatchResourcesResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
var _Node_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "v1alpha3.Node",
|
||||
HandlerType: (*NodeServer)(nil),
|
||||
@ -610,7 +762,13 @@ var _Node_serviceDesc = grpc.ServiceDesc{
|
||||
Handler: _Node_NodeUnprepareResources_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "NodeListAndWatchResources",
|
||||
Handler: _Node_NodeListAndWatchResources_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "api.proto",
|
||||
}
|
||||
|
||||
@ -855,6 +1013,66 @@ func (m *NodeUnprepareResourceResponse) MarshalToSizedBuffer(dAtA []byte) (int,
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesRequest) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.Resources) > 0 {
|
||||
for iNdEx := len(m.Resources) - 1; iNdEx >= 0; iNdEx-- {
|
||||
{
|
||||
size, err := m.Resources[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintApi(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *Claim) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
@ -1037,6 +1255,30 @@ func (m *NodeUnprepareResourceResponse) Size() (n int) {
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *NodeListAndWatchResourcesResponse) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.Resources) > 0 {
|
||||
for _, e := range m.Resources {
|
||||
l = e.Size()
|
||||
n += 1 + l + sovApi(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *Claim) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
@ -1165,6 +1407,30 @@ func (this *NodeUnprepareResourceResponse) String() string {
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func (this *NodeListAndWatchResourcesRequest) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&NodeListAndWatchResourcesRequest{`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func (this *NodeListAndWatchResourcesResponse) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
repeatedStringForResources := "[]*NodeResourceModel{"
|
||||
for _, f := range this.Resources {
|
||||
repeatedStringForResources += strings.Replace(fmt.Sprintf("%v", f), "NodeResourceModel", "v1alpha2.NodeResourceModel", 1) + ","
|
||||
}
|
||||
repeatedStringForResources += "}"
|
||||
s := strings.Join([]string{`&NodeListAndWatchResourcesResponse{`,
|
||||
`Resources:` + repeatedStringForResources + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func (this *Claim) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
@ -1914,6 +2180,140 @@ func (m *NodeUnprepareResourceResponse) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesRequest) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowApi
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipApi(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthApi
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *NodeListAndWatchResourcesResponse) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowApi
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Resources", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowApi
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthApi
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthApi
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Resources = append(m.Resources, &v1alpha2.NodeResourceModel{})
|
||||
if err := m.Resources[len(m.Resources)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipApi(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthApi
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *Claim) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
|
@ -44,6 +44,12 @@ service Node {
|
||||
// The same error handling rules apply,
|
||||
rpc NodeUnprepareResources (NodeUnprepareResourcesRequest)
|
||||
returns (NodeUnprepareResourcesResponse) {}
|
||||
|
||||
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
|
||||
// At the start and whenever resource availability changes, the
|
||||
// plugin must send one such object with all information to the Kubelet.
|
||||
rpc NodeListAndWatchResources(NodeListAndWatchResourcesRequest)
|
||||
returns (stream NodeListAndWatchResourcesResponse) {}
|
||||
}
|
||||
|
||||
message NodePrepareResourcesRequest {
|
||||
@ -88,6 +94,13 @@ message NodeUnprepareResourceResponse {
|
||||
string error = 1;
|
||||
}
|
||||
|
||||
message NodeListAndWatchResourcesRequest {
|
||||
}
|
||||
|
||||
message NodeListAndWatchResourcesResponse {
|
||||
repeated k8s.io.api.resource.v1alpha2.NodeResourceModel resources = 1;
|
||||
}
|
||||
|
||||
message Claim {
|
||||
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
|
||||
// This field is REQUIRED.
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -50,10 +51,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
|
||||
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
|
||||
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
|
||||
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
|
||||
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
|
||||
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
|
||||
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
|
||||
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
|
||||
NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
|
||||
)
|
||||
|
||||
type Nodes struct {
|
||||
@ -105,6 +107,7 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a
|
||||
// not run on all nodes.
|
||||
resources.Nodes = nodes.NodeNames
|
||||
}
|
||||
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
|
||||
d.SetUp(nodes, resources)
|
||||
ginkgo.DeferCleanup(d.TearDown)
|
||||
})
|
||||
@ -182,6 +185,10 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
if d.parameterMode == "" {
|
||||
d.parameterMode = parameterModeConfigMap
|
||||
}
|
||||
var numResourceInstances = -1 // disabled
|
||||
if d.parameterMode != parameterModeConfigMap {
|
||||
numResourceInstances = resources.MaxAllocations
|
||||
}
|
||||
switch d.parameterMode {
|
||||
case parameterModeConfigMap, parameterModeTranslated:
|
||||
d.parameterAPIGroup = ""
|
||||
@ -259,7 +266,8 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
pod := pod
|
||||
nodename := pod.Spec.NodeName
|
||||
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
|
||||
plugin, err := app.StartPlugin(logger, "/cdi", d.Name, nodename,
|
||||
loggerCtx := klog.NewContext(ctx, logger)
|
||||
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
|
||||
app.FileOperations{
|
||||
Create: func(name string, content []byte) error {
|
||||
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
|
||||
@ -269,11 +277,15 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
|
||||
klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
|
||||
return d.removeFile(&pod, name)
|
||||
},
|
||||
NumResourceInstances: numResourceInstances,
|
||||
},
|
||||
kubeletplugin.GRPCVerbosity(0),
|
||||
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
return d.interceptor(nodename, ctx, req, info, handler)
|
||||
}),
|
||||
kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
|
||||
return d.streamInterceptor(nodename, srv, ss, info, handler)
|
||||
}),
|
||||
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
|
||||
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
|
||||
kubeletplugin.KubeletPluginSocketPath(draAddr),
|
||||
@ -349,6 +361,16 @@ func (d *Driver) TearDown() {
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
func (d *Driver) IsGone(ctx context.Context) {
|
||||
gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) {
|
||||
slices, err := d.f.ClientSet.ResourceV1alpha2().NodeResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return slices.Items, err
|
||||
}).Should(gomega.BeEmpty())
|
||||
}
|
||||
|
||||
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
@ -362,6 +384,22 @@ func (d *Driver) interceptor(nodename string, ctx context.Context, req interface
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
// Stream calls block for a long time. We must not hold the lock while
|
||||
// they are running.
|
||||
d.mutex.Lock()
|
||||
m := MethodInstance{nodename, info.FullMethod}
|
||||
d.callCounts[m]++
|
||||
fail := d.fail[m]
|
||||
d.mutex.Unlock()
|
||||
|
||||
if fail {
|
||||
return errors.New("injected error")
|
||||
}
|
||||
|
||||
return handler(srv, stream)
|
||||
}
|
||||
|
||||
func (d *Driver) Fail(m MethodInstance, injectError bool) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/onsi/gomega/gcustom"
|
||||
"github.com/onsi/gomega/gstruct"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
@ -82,116 +83,167 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
|
||||
ginkgo.Context("kubelet", func() {
|
||||
nodes := NewNodes(f, 1, 1)
|
||||
driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance.
|
||||
b := newBuilder(f, driver)
|
||||
|
||||
ginkgo.It("registers plugin", func() {
|
||||
ginkgo.By("the driver is running")
|
||||
})
|
||||
ginkgo.Context("with ConfigMap parameters", func() {
|
||||
driver := NewDriver(f, nodes, networkResources)
|
||||
b := newBuilder(f, driver)
|
||||
|
||||
ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) {
|
||||
// We have exactly one host.
|
||||
m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod}
|
||||
|
||||
driver.Fail(m, true)
|
||||
|
||||
ginkgo.By("waiting for container startup to fail")
|
||||
parameters := b.parameters()
|
||||
pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
|
||||
b.create(ctx, parameters, pod, template)
|
||||
|
||||
ginkgo.By("wait for NodePrepareResources call")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
if driver.CallCount(m) == 0 {
|
||||
return errors.New("NodePrepareResources not called yet")
|
||||
}
|
||||
return nil
|
||||
}).WithTimeout(podStartTimeout).Should(gomega.Succeed())
|
||||
|
||||
ginkgo.By("allowing container startup to succeed")
|
||||
callCount := driver.CallCount(m)
|
||||
driver.Fail(m, false)
|
||||
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)
|
||||
framework.ExpectNoError(err, "start pod with inline resource claim")
|
||||
if driver.CallCount(m) == callCount {
|
||||
framework.Fail("NodePrepareResources should have been called again")
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) {
|
||||
// Pretend that the resource is allocated and reserved for some other entity.
|
||||
// Until the resourceclaim controller learns to remove reservations for
|
||||
// arbitrary types we can simply fake somthing here.
|
||||
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
b.create(ctx, claim)
|
||||
claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err, "get claim")
|
||||
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
|
||||
claim.Status.DriverName = driver.Name
|
||||
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{
|
||||
APIGroup: "example.com",
|
||||
Resource: "some",
|
||||
Name: "thing",
|
||||
UID: "12345",
|
||||
ginkgo.It("registers plugin", func() {
|
||||
ginkgo.By("the driver is running")
|
||||
})
|
||||
_, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err, "update claim")
|
||||
|
||||
pod := b.podExternal()
|
||||
ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) {
|
||||
// We have exactly one host.
|
||||
m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod}
|
||||
|
||||
// This bypasses scheduling and therefore the pod gets
|
||||
// to run on the node although it never gets added to
|
||||
// the `ReservedFor` field of the claim.
|
||||
pod.Spec.NodeName = nodes.NodeNames[0]
|
||||
b.create(ctx, pod)
|
||||
driver.Fail(m, true)
|
||||
|
||||
gomega.Consistently(ctx, func(ctx context.Context) error {
|
||||
testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err)
|
||||
ginkgo.By("waiting for container startup to fail")
|
||||
parameters := b.parameters()
|
||||
pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
|
||||
b.create(ctx, parameters, pod, template)
|
||||
|
||||
ginkgo.By("wait for NodePrepareResources call")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
if driver.CallCount(m) == 0 {
|
||||
return errors.New("NodePrepareResources not called yet")
|
||||
}
|
||||
return nil
|
||||
}).WithTimeout(podStartTimeout).Should(gomega.Succeed())
|
||||
|
||||
ginkgo.By("allowing container startup to succeed")
|
||||
callCount := driver.CallCount(m)
|
||||
driver.Fail(m, false)
|
||||
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)
|
||||
framework.ExpectNoError(err, "start pod with inline resource claim")
|
||||
if driver.CallCount(m) == callCount {
|
||||
framework.Fail("NodePrepareResources should have been called again")
|
||||
}
|
||||
if testPod.Status.Phase != v1.PodPending {
|
||||
return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending)
|
||||
})
|
||||
|
||||
ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) {
|
||||
// Pretend that the resource is allocated and reserved for some other entity.
|
||||
// Until the resourceclaim controller learns to remove reservations for
|
||||
// arbitrary types we can simply fake somthing here.
|
||||
claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
b.create(ctx, claim)
|
||||
claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err, "get claim")
|
||||
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
|
||||
claim.Status.DriverName = driver.Name
|
||||
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{
|
||||
APIGroup: "example.com",
|
||||
Resource: "some",
|
||||
Name: "thing",
|
||||
UID: "12345",
|
||||
})
|
||||
_, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err, "update claim")
|
||||
|
||||
pod := b.podExternal()
|
||||
|
||||
// This bypasses scheduling and therefore the pod gets
|
||||
// to run on the node although it never gets added to
|
||||
// the `ReservedFor` field of the claim.
|
||||
pod.Spec.NodeName = nodes.NodeNames[0]
|
||||
b.create(ctx, pod)
|
||||
|
||||
gomega.Consistently(ctx, func(ctx context.Context) error {
|
||||
testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err)
|
||||
}
|
||||
if testPod.Status.Phase != v1.PodPending {
|
||||
return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending)
|
||||
}
|
||||
return nil
|
||||
}, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
|
||||
})
|
||||
|
||||
ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) {
|
||||
parameters := b.parameters()
|
||||
claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate)
|
||||
pod := b.podExternal()
|
||||
zero := int64(0)
|
||||
pod.Spec.TerminationGracePeriodSeconds = &zero
|
||||
|
||||
b.create(ctx, parameters, claim, pod)
|
||||
|
||||
b.testPod(ctx, f.ClientSet, pod)
|
||||
|
||||
ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name))
|
||||
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero})
|
||||
if !apierrors.IsNotFound(err) {
|
||||
framework.ExpectNoError(err, "force delete test pod")
|
||||
}
|
||||
return nil
|
||||
}, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
|
||||
|
||||
for host, plugin := range b.driver.Nodes {
|
||||
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
|
||||
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) {
|
||||
parameters := b.parameters()
|
||||
pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
for i := range pod.Spec.Containers {
|
||||
pod.Spec.Containers[i].Resources.Claims = nil
|
||||
}
|
||||
b.create(ctx, parameters, pod, template)
|
||||
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod")
|
||||
for host, plugin := range b.driver.Nodes {
|
||||
gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host)
|
||||
}
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) {
|
||||
parameters := b.parameters()
|
||||
claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate)
|
||||
pod := b.podExternal()
|
||||
zero := int64(0)
|
||||
pod.Spec.TerminationGracePeriodSeconds = &zero
|
||||
ginkgo.Context("with structured parameters", func() {
|
||||
driver := NewDriver(f, nodes, perNode(1, nodes))
|
||||
driver.parameterMode = parameterModeStructured
|
||||
|
||||
b.create(ctx, parameters, claim, pod)
|
||||
f.It("must manage NodeResourceSlice", f.WithSlow(), func(ctx context.Context) {
|
||||
nodeName := nodes.NodeNames[0]
|
||||
driverName := driver.Name
|
||||
m := MethodInstance{nodeName, NodeListAndWatchResourcesMethod}
|
||||
ginkgo.By("wait for NodeListAndWatchResources call")
|
||||
gomega.Eventually(ctx, func() int64 {
|
||||
return driver.CallCount(m)
|
||||
}).WithTimeout(podStartTimeout).Should(gomega.BeNumerically(">", int64(0)), "NodeListAndWatchResources call count")
|
||||
|
||||
b.testPod(ctx, f.ClientSet, pod)
|
||||
ginkgo.By("check if NodeResourceSlice object exists on the API server")
|
||||
resourceClient := f.ClientSet.ResourceV1alpha2().NodeResourceSlices()
|
||||
matchSlices := gomega.And(
|
||||
gomega.HaveLen(1),
|
||||
gomega.ContainElement(gstruct.MatchAllFields(gstruct.Fields{
|
||||
"TypeMeta": gstruct.Ignore(),
|
||||
"ObjectMeta": gstruct.Ignore(), // TODO: validate ownerref
|
||||
"NodeName": gomega.Equal(nodes.NodeNames[0]),
|
||||
"DriverName": gomega.Equal(driver.Name),
|
||||
"NodeResourceModel": gomega.Equal(resourcev1alpha2.NodeResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{
|
||||
Instances: []resourcev1alpha2.NamedResourcesInstance{{Name: "instance-0"}},
|
||||
}}),
|
||||
})),
|
||||
)
|
||||
getSlices := func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) {
|
||||
slices, err := resourceClient.List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("nodeName=%s,driverName=%s", nodeName, driverName)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return slices.Items, nil
|
||||
}
|
||||
gomega.Eventually(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices)
|
||||
gomega.Consistently(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices)
|
||||
|
||||
ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name))
|
||||
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero})
|
||||
if !apierrors.IsNotFound(err) {
|
||||
framework.ExpectNoError(err, "force delete test pod")
|
||||
}
|
||||
// Removal of node resource slice is tested by the general driver removal code.
|
||||
})
|
||||
|
||||
for host, plugin := range b.driver.Nodes {
|
||||
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
|
||||
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) {
|
||||
parameters := b.parameters()
|
||||
pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
for i := range pod.Spec.Containers {
|
||||
pod.Spec.Containers[i].Resources.Claims = nil
|
||||
}
|
||||
b.create(ctx, parameters, pod, template)
|
||||
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod")
|
||||
for host, plugin := range b.driver.Nodes {
|
||||
gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host)
|
||||
}
|
||||
// TODO: more test scenarios:
|
||||
// - driver returns "unimplemented" as method response
|
||||
// - driver returns "Unimplemented" as part of stream
|
||||
// - driver returns EOF
|
||||
// - driver changes resources
|
||||
})
|
||||
})
|
||||
|
||||
@ -241,11 +293,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
ginkgo.It("supports claim and class parameters", func(ctx context.Context) {
|
||||
objects := genParameters()
|
||||
|
||||
// TODO: replace with publishing NodeResourceSlice through kubelet
|
||||
if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured {
|
||||
objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations))
|
||||
}
|
||||
|
||||
pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
|
||||
objects = append(objects, pod, template)
|
||||
|
||||
@ -263,11 +310,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
objects = append(objects, pod, template)
|
||||
}
|
||||
|
||||
// TODO: replace with publishing NodeResourceSlice through kubelet
|
||||
if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured {
|
||||
objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations))
|
||||
}
|
||||
|
||||
b.create(ctx, objects...)
|
||||
|
||||
// We don't know the order. All that matters is that all of them get scheduled eventually.
|
||||
@ -298,11 +340,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
objects = append(objects, pod)
|
||||
}
|
||||
|
||||
// TODO: replace with publishing NodeResourceSlice through kubelet
|
||||
if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured {
|
||||
objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations))
|
||||
}
|
||||
|
||||
b.create(ctx, objects...)
|
||||
|
||||
// We don't know the order. All that matters is that all of them get scheduled eventually.
|
||||
@ -340,11 +377,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
objects = append(objects, pod)
|
||||
}
|
||||
|
||||
// TODO: replace with publishing NodeResourceSlice through kubelet
|
||||
if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured {
|
||||
objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations))
|
||||
}
|
||||
|
||||
b.create(ctx, objects...)
|
||||
|
||||
// We don't know the order. All that matters is that all of them get scheduled eventually.
|
||||
@ -1278,31 +1310,6 @@ func (b *builder) rawParameterData(kv ...string) []byte {
|
||||
return raw
|
||||
}
|
||||
|
||||
func (b *builder) nodeResourceSlice(nodeName string, capacity int) *resourcev1alpha2.NodeResourceSlice {
|
||||
slice := &resourcev1alpha2.NodeResourceSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: b.driver.Name + "-" + nodeName,
|
||||
},
|
||||
|
||||
NodeName: nodeName,
|
||||
DriverName: b.driver.Name,
|
||||
|
||||
NodeResourceModel: resourcev1alpha2.NodeResourceModel{
|
||||
NamedResources: &resourcev1alpha2.NamedResourcesResources{},
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < capacity; i++ {
|
||||
slice.NodeResourceModel.NamedResources.Instances = append(slice.NodeResourceModel.NamedResources.Instances,
|
||||
resourcev1alpha2.NamedResourcesInstance{
|
||||
Name: fmt.Sprintf("instance-%d", i),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
return slice
|
||||
}
|
||||
|
||||
// makePod returns a simple pod with no resource claims.
|
||||
// The pod prints its env and waits.
|
||||
func (b *builder) pod() *v1.Pod {
|
||||
|
@ -26,7 +26,10 @@ import (
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
resourceapi "k8s.io/api/resource/v1alpha2"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/dynamic-resource-allocation/kubeletplugin"
|
||||
"k8s.io/klog/v2"
|
||||
@ -35,6 +38,7 @@ import (
|
||||
)
|
||||
|
||||
type ExamplePlugin struct {
|
||||
stopCh <-chan struct{}
|
||||
logger klog.Logger
|
||||
d kubeletplugin.DRAPlugin
|
||||
fileOps FileOperations
|
||||
@ -80,7 +84,8 @@ func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string {
|
||||
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID))
|
||||
}
|
||||
|
||||
// FileOperations defines optional callbacks for handling CDI files.
|
||||
// FileOperations defines optional callbacks for handling CDI files
|
||||
// and some other configuration.
|
||||
type FileOperations struct {
|
||||
// Create must overwrite the file.
|
||||
Create func(name string, content []byte) error
|
||||
@ -88,10 +93,16 @@ type FileOperations struct {
|
||||
// Remove must remove the file. It must not return an error when the
|
||||
// file does not exist.
|
||||
Remove func(name string) error
|
||||
|
||||
// NumResourceInstances determines whether the plugin reports resources
|
||||
// instances and how many. A negative value causes it to report "not implemented"
|
||||
// in the NodeListAndWatchResources gRPC call.
|
||||
NumResourceInstances int
|
||||
}
|
||||
|
||||
// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
|
||||
func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
|
||||
func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if fileOps.Create == nil {
|
||||
fileOps.Create = func(name string, content []byte) error {
|
||||
return os.WriteFile(name, content, os.FileMode(0644))
|
||||
@ -106,6 +117,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string,
|
||||
}
|
||||
}
|
||||
ex := &ExamplePlugin{
|
||||
stopCh: ctx.Done(),
|
||||
logger: logger,
|
||||
fileOps: fileOps,
|
||||
cdiDir: cdiDir,
|
||||
@ -118,6 +130,7 @@ func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string,
|
||||
kubeletplugin.Logger(logger),
|
||||
kubeletplugin.DriverName(driverName),
|
||||
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
|
||||
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
|
||||
)
|
||||
d, err := kubeletplugin.Start(ex, opts...)
|
||||
if err != nil {
|
||||
@ -330,6 +343,39 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, stream drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
|
||||
if ex.fileOps.NumResourceInstances < 0 {
|
||||
ex.logger.Info("Sending no NodeResourcesResponse")
|
||||
return status.New(codes.Unimplemented, "node resource support disabled").Err()
|
||||
}
|
||||
|
||||
instances := make([]resourceapi.NamedResourcesInstance, ex.fileOps.NumResourceInstances)
|
||||
for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
|
||||
instances[i].Name = fmt.Sprintf("instance-%d", i)
|
||||
}
|
||||
resp := &drapbv1alpha3.NodeListAndWatchResourcesResponse{
|
||||
Resources: []*resourceapi.NodeResourceModel{
|
||||
{
|
||||
NamedResources: &resourceapi.NamedResourcesResources{
|
||||
Instances: instances,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ex.logger.Info("Sending NodeListAndWatchResourcesResponse", "response", resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Keep the stream open until the test is done.
|
||||
// TODO: test sending more updates later
|
||||
<-ex.stopCh
|
||||
ex.logger.Info("Done sending NodeListAndWatchResourcesResponse, closing stream")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {
|
||||
ex.mutex.Lock()
|
||||
defer ex.mutex.Unlock()
|
||||
@ -360,6 +406,25 @@ func (ex *ExamplePlugin) recordGRPCCall(ctx context.Context, req interface{}, in
|
||||
return call.Response, call.Err
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) recordGRPCStream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
call := GRPCCall{
|
||||
FullMethod: info.FullMethod,
|
||||
}
|
||||
ex.mutex.Lock()
|
||||
ex.gRPCCalls = append(ex.gRPCCalls, call)
|
||||
index := len(ex.gRPCCalls) - 1
|
||||
ex.mutex.Unlock()
|
||||
|
||||
// We don't hold the mutex here to allow concurrent calls.
|
||||
call.Err = handler(srv, stream)
|
||||
|
||||
ex.mutex.Lock()
|
||||
ex.gRPCCalls[index] = call
|
||||
ex.mutex.Unlock()
|
||||
|
||||
return call.Err
|
||||
}
|
||||
|
||||
func (ex *ExamplePlugin) GetGRPCCalls() []GRPCCall {
|
||||
ex.mutex.Lock()
|
||||
defer ex.mutex.Unlock()
|
||||
|
@ -287,7 +287,7 @@ func NewCommand() *cobra.Command {
|
||||
return fmt.Errorf("create socket directory: %w", err)
|
||||
}
|
||||
|
||||
plugin, err := StartPlugin(logger, *cdiDir, *driverName, "", FileOperations{},
|
||||
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
|
||||
kubeletplugin.PluginSocketPath(*endpoint),
|
||||
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
|
||||
kubeletplugin.KubeletPluginSocketPath(*draAddress),
|
||||
|
@ -68,7 +68,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
|
||||
f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
kubeletPlugin = newKubeletPlugin(getNodeName(ctx, f))
|
||||
kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
|
||||
})
|
||||
|
||||
ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
|
||||
@ -88,7 +88,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
ginkgo.It("must register after plugin restart", func(ctx context.Context) {
|
||||
ginkgo.By("restart Kubelet Plugin")
|
||||
kubeletPlugin.Stop()
|
||||
kubeletPlugin = newKubeletPlugin(getNodeName(ctx, f))
|
||||
kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
|
||||
|
||||
ginkgo.By("wait for Kubelet plugin re-registration")
|
||||
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
|
||||
@ -134,9 +134,10 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
})
|
||||
|
||||
// Run Kubelet plugin and wait until it's registered
|
||||
func newKubeletPlugin(nodeName string) *testdriver.ExamplePlugin {
|
||||
func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin {
|
||||
ginkgo.By("start Kubelet plugin")
|
||||
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName)
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
// Ensure that directories exist, creating them if necessary. We want
|
||||
// to know early if there is a setup problem that would prevent
|
||||
@ -147,7 +148,7 @@ func newKubeletPlugin(nodeName string) *testdriver.ExamplePlugin {
|
||||
framework.ExpectNoError(err, "create socket directory")
|
||||
|
||||
plugin, err := testdriver.StartPlugin(
|
||||
logger,
|
||||
ctx,
|
||||
cdiDir,
|
||||
driverName,
|
||||
"",
|
||||
|
Loading…
Reference in New Issue
Block a user