diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index dc0e697d0dd..9fd66a56e1a 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -306,6 +306,10 @@ func TestGetResources(t *testing.T) { } } +func getFakeNode() (*v1.Node, error) { + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil +} + func TestPrepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() @@ -760,7 +764,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, "worker") + plg := plugin.NewRegistrationHandler(nil, getFakeNode) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -1060,7 +1064,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, "worker") + plg := plugin.NewRegistrationHandler(nil, getFakeNode) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/noderesources.go b/pkg/kubelet/cm/dra/plugin/noderesources.go index fc4148b78ea..0a02af69834 100644 --- a/pkg/kubelet/cm/dra/plugin/noderesources.go +++ b/pkg/kubelet/cm/dra/plugin/noderesources.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha2" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,6 +40,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3" + "k8s.io/utils/ptr" ) const ( @@ -52,7 +54,7 @@ const ( type nodeResourcesController struct { ctx context.Context kubeClient kubernetes.Interface - nodeName string + getNode func() (*v1.Node, error) wg sync.WaitGroup queue workqueue.RateLimitingInterface sliceStore cache.Store @@ -84,7 +86,7 @@ type activePlugin struct { // the controller is inactive. This can happen when kubelet is run stand-alone // without an apiserver. In that case we can't and don't need to publish // ResourceSlices. -func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController { +func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *nodeResourcesController { if kubeClient == nil { return nil } @@ -96,7 +98,7 @@ func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Int c := &nodeResourcesController{ ctx: ctx, kubeClient: kubeClient, - nodeName: nodeName, + getNode: getNode, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"), activePlugins: make(map[string]*activePlugin), } @@ -252,16 +254,29 @@ func (c *nodeResourcesController) run(ctx context.Context) { // For now syncing starts immediately, with no DeleteCollection. This // can be reconsidered later. - // While kubelet starts up, there are errors: - // E0226 13:41:19.880621 126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.ResourceSlice: failed to list *v1alpha2.ResourceSlice: resourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "resourceslices" in API group "resource.k8s.io" at the cluster scope - // - // The credentials used by kubeClient seem to get swapped out later, - // because eventually these list calls succeed. - // TODO (https://github.com/kubernetes/kubernetes/issues/123691): can we avoid these error log entries? Perhaps wait here? + // Wait until we're able to get a Node object. + // This means that the object is created on the API server, + // the kubeclient is functional and the node informer cache is populated with the node object. + // Without this it doesn't make sense to proceed further as we need a node name and + // a node UID for this controller to work. + var node *v1.Node + var err error + for { + node, err = c.getNode() + if err == nil { + break + } + logger.V(5).Info("Getting Node object failed, waiting", "err", err) + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + } // We could use an indexer on driver name, but that seems overkill. informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { - options.FieldSelector = "nodeName=" + c.nodeName + options.FieldSelector = "nodeName=" + node.Name }) c.sliceStore = informer.GetStore() handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -441,13 +456,29 @@ func (c *nodeResourcesController) sync(ctx context.Context, driverName string) e continue } + // Although node name and UID are unlikely to change + // we're getting updated node object just to be on the safe side. + // It's a cheap operation as it gets an object from the node informer cache. + node, err := c.getNode() + if err != nil { + return fmt.Errorf("retrieve node object: %w", err) + } + // Create a new slice. slice := &resourceapi.ResourceSlice{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: c.nodeName + "-" + driverName + "-", - // TODO (https://github.com/kubernetes/kubernetes/issues/123692): node object as owner + GenerateName: node.Name + "-" + driverName + "-", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.WithKind("Node").Version, + Kind: v1.SchemeGroupVersion.WithKind("Node").Kind, + Name: node.Name, + UID: node.UID, + Controller: ptr.To(true), + }, + }, }, - NodeName: c.nodeName, + NodeName: node.Name, DriverName: driverName, ResourceModel: *resource, } diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index f85a14b8369..01470f94e84 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + v1 "k8s.io/api/core/v1" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -104,12 +105,12 @@ type RegistrationHandler struct { // Must only be called once per process because it manages global state. // If a kubeClient is provided, then it synchronizes ResourceSlices // with the resource information provided by plugins. -func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler { +func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { handler := &RegistrationHandler{} // If kubelet ever gets an API for stopping registration handlers, then // that would need to be hooked up with stopping the controller. - handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName) + handler.controller = startNodeResourcesController(context.TODO(), kubeClient, getNode) return handler } diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index f9d70238f60..d5d927bbccd 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -20,11 +20,17 @@ import ( "testing" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func getFakeNode() (*v1.Node, error) { + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil +} + func TestRegistrationHandler_ValidatePlugin(t *testing.T) { newRegistrationHandler := func() *RegistrationHandler { - return NewRegistrationHandler(nil, "worker") + return NewRegistrationHandler(nil, getFakeNode) } for _, test := range []struct { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5b8545d683f..17179571de7 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1557,7 +1557,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for DRA Plugin if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname))) + kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay))) } // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 322153536b8..520e6db9b10 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -905,10 +905,23 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, resourceClient := f.ClientSet.ResourceV1alpha2().ResourceSlices() var expectedObjects []any for _, nodeName := range nodes.NodeNames { + node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err, "get node") expectedObjects = append(expectedObjects, gstruct.MatchAllFields(gstruct.Fields{ - "TypeMeta": gstruct.Ignore(), - "ObjectMeta": gstruct.Ignore(), // TODO (https://github.com/kubernetes/kubernetes/issues/123692): validate ownerref + "TypeMeta": gstruct.Ignore(), + "ObjectMeta": gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "OwnerReferences": gomega.ContainElements( + gstruct.MatchAllFields(gstruct.Fields{ + "APIVersion": gomega.Equal("v1"), + "Kind": gomega.Equal("Node"), + "Name": gomega.Equal(nodeName), + "UID": gomega.Equal(node.UID), + "Controller": gomega.Equal(ptr.To(true)), + "BlockOwnerDeletion": gomega.BeNil(), + }), + ), + }), "NodeName": gomega.Equal(nodeName), "DriverName": gomega.Equal(driver.Name), "ResourceModel": gomega.Equal(resourcev1alpha2.ResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{