diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 5966e59abba..f25a4bd0157 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -26,51 +26,122 @@ import ( "google.golang.org/grpc" "k8s.io/klog/v2" + resourceapi "k8s.io/api/resource/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceslice" - drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" - drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1" + drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) -// DRAPlugin gets returned by Start and defines the public API of the generic -// dynamic resource allocation plugin. +// DRAPlugin is the interface that needs to be implemented by a DRA driver to +// use this helper package. The helper package then implements the gRPC +// interface expected by the kubelet by wrapping the DRAPlugin implementation. type DRAPlugin interface { - // Stop ensures that all spawned goroutines are stopped and frees - // resources. - Stop() + // PrepareResourceClaims is called to prepare all resources allocated + // for the given ResourceClaims. This is used to implement + // the gRPC NodePrepareResources call. + // + // It gets called with the complete list of claims that are needed + // by some pod. In contrast to the gRPC call, the helper has + // already retrieved the actual ResourceClaim objects. + // + // In addition to that, the helper also: + // - verifies that all claims are really allocated + // - increments a numeric counter for each call and + // adds its value to a per-context logger with "requestID" as key + // - adds the method name with "method" as key to that logger + // - logs the gRPC call and response (configurable with GRPCVerbosity) + // - serializes all gRPC calls unless the driver explicitly opted out of that + // + // This call must be idempotent because the kubelet might have to ask + // for preparation multiple times, for example if it gets restarted. + // + // A DRA driver should verify that all devices listed in a + // [resourceapi.DeviceRequestAllocationResult] are not already in use + // for some other ResourceClaim. Kubernetes tries very hard to ensure + // that, but if something went wrong, then the DRA driver is the last + // line of defense against using the same device for two different + // unrelated workloads. + // + // If an error is returned, the result is ignored. Otherwise the result + // must have exactly one entry for each claim, identified by the UID of + // the corresponding ResourceClaim. For each claim, preparation + // can be either successful (no error set in the per-ResourceClaim PrepareResult) + // or can be reported as failed. + // + // It is possible to create the CDI spec files which define the CDI devices + // on-the-fly in PrepareResourceClaims. UnprepareResourceClaims then can + // remove them. Container runtimes may cache CDI specs but must reload + // files in case of a cache miss. To avoid false cache hits, the unique + // name in the CDI device ID should not be reused. A DRA driver can use + // the claim UID for it. + PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (result map[types.UID]PrepareResult, err error) - // RegistrationStatus returns the result of registration, nil if none - // received yet. - RegistrationStatus() *registerapi.RegistrationStatus + // UnprepareResourceClaims must undo whatever work PrepareResourceClaims did. + // + // At the time when this gets called, the original ResourceClaims may have + // been deleted already. They also don't get cached by the kubelet. Therefore + // parameters for each ResourcClaim are only the UID, namespace and name. + // It is the responsibility of the DRA driver to cache whatever additional + // information it might need about prepared resources. + // + // This call must be idempotent because the kubelet might have to ask + // for un-preparation multiple times, for example if it gets restarted. + // Therefore it is not an error if this gets called for a ResourceClaim + // which is not currently prepared. + // + // As with PrepareResourceClaims, the helper takes care of logging + // and serialization. + // + // The conventions for returning one overall error and several per-ResourceClaim + // errors are the same as in PrepareResourceClaims. + UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error) +} - // PublishResources may be called one or more times to publish - // resource information in ResourceSlice objects. If it never gets - // called, then the kubelet plugin does not manage any ResourceSlice - // objects. - // - // PublishResources does not block, so it might still take a while - // after it returns before all information is actually written - // to the API server. - // - // It is the responsibility of the caller to ensure that the pools and - // slices described in the driver resources parameters are valid - // according to the restrictions defined in the resource.k8s.io API. - // - // Invalid ResourceSlices will be rejected by the apiserver during - // publishing, which happens asynchronously and thus does not - // get returned as error here. The only error returned here is - // when publishing was not set up properly, for example missing - // [KubeClient] or [NodeName] options. - // - // The caller may modify the resources after this call returns. - PublishResources(ctx context.Context, resources resourceslice.DriverResources) error +// PrepareResult contains the result of preparing one particular ResourceClaim. +type PrepareResult struct { + // Err, if non-nil, describes a problem that occurred while preparing + // the ResourceClaim. The devices are then ignored and the kubelet will + // try to prepare the ResourceClaim again later. + Err error - // This unexported method ensures that we can modify the interface - // without causing an API break of the package - // (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme). - internal() + // Devices contains the IDs of CDI devices associated with specific requests + // in a ResourceClaim. Those IDs will be passed on to the container runtime + // by the kubelet. + // + // The empty slice is also valid. + Devices []Device +} + +// Device provides the CDI device IDs for one request in a ResourceClaim. +type Device struct { + // Requests lists the names of requests or subrequests in the + // ResourceClaim that this device is associated with. The subrequest + // name may be included here, but it is also okay to just return + // the request name. + // + // A DRA driver can get this string from the Request field in + // [resourceapi.DeviceRequestAllocationResult], which includes the + // subrequest name if there is one. + // + // If empty, the device is associated with all requests. + Requests []string + + // PoolName identifies the DRA driver's pool which contains the device. + // Must not be empty. + PoolName string + + // DeviceName identifies the device inside that pool. + // Must not be empty. + DeviceName string + + // CDIDeviceIDs lists all CDI devices associated with this DRA device. + // Each ID must be of the form "/=". + // May be empty. + CDIDeviceIDs []string } // Option implements the functional options pattern for Start. @@ -173,17 +244,11 @@ func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option { } } -// NodeV1alpha4 explicitly chooses whether the DRA gRPC API v1alpha4 -// gets enabled. -func NodeV1alpha4(enabled bool) Option { - return func(o *options) error { - o.nodeV1alpha4 = enabled - return nil - } -} - // NodeV1beta1 explicitly chooses whether the DRA gRPC API v1beta1 -// gets enabled. +// gets enabled. True by default. +// +// This is used in Kubernetes for end-to-end testing. The default should +// be fine for DRA drivers. func NodeV1beta1(enabled bool) Option { return func(o *options) error { o.nodeV1beta1 = enabled @@ -223,6 +288,18 @@ func NodeUID(nodeUID types.UID) Option { } } +// Serialize overrides whether the helper serializes the prepare and unprepare +// calls. The default is to serialize. +// +// A DRA driver can opt out of that to speed up parallel processing, but then +// must handle concurrency itself. +func Serialize(enabled bool) Option { + return func(o *options) error { + o.serialize = enabled + return nil + } +} + type options struct { logger klog.Logger grpcVerbosity int @@ -235,25 +312,27 @@ type options struct { unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface - - nodeV1alpha4 bool - nodeV1beta1 bool + serialize bool + nodeV1beta1 bool } -// draPlugin combines the kubelet registration service and the DRA node plugin -// service. -type draPlugin struct { +// Helper combines the kubelet registration service and the DRA node plugin +// service and implements them by calling a [DRAPlugin] implementation. +type Helper struct { // backgroundCtx is for activities that are started later. backgroundCtx context.Context // cancel cancels the backgroundCtx. - cancel func(cause error) - wg sync.WaitGroup - registrar *nodeRegistrar - plugin *grpcServer - driverName string - nodeName string - nodeUID types.UID - kubeClient kubernetes.Interface + cancel func(cause error) + wg sync.WaitGroup + registrar *nodeRegistrar + pluginServer *grpcServer + plugin DRAPlugin + driverName string + nodeName string + nodeUID types.UID + kubeClient kubernetes.Interface + serialize bool + grpcMutex sync.Mutex // Information about resource publishing changes concurrently and thus // must be protected by the mutex. The controller gets started only @@ -263,7 +342,7 @@ type draPlugin struct { } // Start sets up two gRPC servers (one for registration, one for the DRA node -// client). By default, all APIs implemented by the nodeServer get registered. +// client) and implements them by calling a [DRAPlugin] implementation. // // The context and/or DRAPlugin.Stop can be used to stop all background activity. // Stop also blocks. A logger can be stored in the context to add values or @@ -271,18 +350,12 @@ type draPlugin struct { // // If the plugin will be used to publish resources, [KubeClient] and [NodeName] // options are mandatory. -// -// The DRA driver decides which gRPC interfaces it implements. At least one -// implementation of [drapbv1alpha4.NodeServer] or [drapbv1beta1.DRAPluginServer] -// is required. Implementing drapbv1beta1.DRAPluginServer is recommended for -// DRA driver targeting Kubernetes >= 1.32. To be compatible with Kubernetes 1.31, -// DRA drivers must implement only [drapbv1alpha4.NodeServer]. -func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (result DRAPlugin, finalErr error) { +func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) { logger := klog.FromContext(ctx) o := options{ logger: klog.Background(), grpcVerbosity: 6, // Logs requests and responses, which can be large. - nodeV1alpha4: true, + serialize: true, nodeV1beta1: true, } for _, option := range opts { @@ -305,11 +378,13 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar") } - d := &draPlugin{ + d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, nodeUID: o.nodeUID, kubeClient: o.kubeClient, + serialize: o.serialize, + plugin: plugin, } // Stop calls cancel and therefore both cancellation @@ -337,37 +412,21 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu // Run the node plugin gRPC server first to ensure that it is ready. var supportedServices []string - plugin, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { - for _, nodeServer := range nodeServers { - if nodeServer, ok := nodeServer.(drapbv1alpha4.NodeServer); ok && o.nodeV1alpha4 { - logger.V(5).Info("registering v1alpha4.Node gGRPC service") - drapbv1alpha4.RegisterNodeServer(grpcServer, nodeServer) - supportedServices = append(supportedServices, drapbv1alpha4.NodeService) - } - if nodeServer, ok := nodeServer.(drapbv1beta1.DRAPluginServer); ok && o.nodeV1beta1 { - logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") - drapbv1beta1.RegisterDRAPluginServer(grpcServer, nodeServer) - supportedServices = append(supportedServices, drapbv1beta1.DRAPluginService) - } + pluginServer, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) { + if o.nodeV1beta1 { + logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") + drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) + supportedServices = append(supportedServices, drapb.DRAPluginService) } }) if err != nil { return nil, fmt.Errorf("start node client: %v", err) } - d.plugin = plugin + d.pluginServer = pluginServer if len(supportedServices) == 0 { return nil, errors.New("no supported DRA gRPC API is implemented and enabled") } - // Backwards compatibility hack: if only the alpha gRPC service is enabled, - // then we can support registration against a 1.31 kubelet by reporting "1.0.0" - // as version. That also works with 1.32 because 1.32 supports that legacy - // behavior and 1.31 works because it doesn't fail while parsing "v1alpha3.Node" - // as version. - if len(supportedServices) == 1 && supportedServices[0] == drapbv1alpha4.NodeService { - supportedServices = []string{"1.0.0"} - } - // Now make it available to kubelet. registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint) if err != nil { @@ -383,7 +442,7 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu <-ctx.Done() // Time to stop. - d.plugin.stop() + d.pluginServer.stop() d.registrar.stop() // d.resourceSliceController is set concurrently. @@ -395,8 +454,8 @@ func Start(ctx context.Context, nodeServers []interface{}, opts ...Option) (resu return d, nil } -// Stop implements [DRAPlugin.Stop]. -func (d *draPlugin) Stop() { +// Stop ensures that all spawned goroutines are stopped and frees resources. +func (d *Helper) Stop() { if d == nil { return } @@ -405,9 +464,27 @@ func (d *draPlugin) Stop() { d.wg.Wait() } -// PublishResources implements [DRAPlugin.PublishResources]. Returns en error if -// kubeClient or nodeName are unset. -func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.DriverResources) error { +// PublishResources may be called one or more times to publish +// resource information in ResourceSlice objects. If it never gets +// called, then the kubelet plugin does not manage any ResourceSlice +// objects. +// +// PublishResources does not block, so it might still take a while +// after it returns before all information is actually written +// to the API server. +// +// It is the responsibility of the caller to ensure that the pools and +// slices described in the driver resources parameters are valid +// according to the restrictions defined in the resource.k8s.io API. +// +// Invalid ResourceSlices will be rejected by the apiserver during +// publishing, which happens asynchronously and thus does not +// get returned as error here. The only error returned here is +// when publishing was not set up properly, for example missing +// [KubeClient] or [NodeName] options. +// +// The caller may modify the resources after this call returns. +func (d *Helper) PublishResources(_ context.Context, resources resourceslice.DriverResources) error { if d.kubeClient == nil { return errors.New("no KubeClient found to publish resources") } @@ -456,12 +533,118 @@ func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice. return nil } -// RegistrationStatus implements [DRAPlugin.RegistrationStatus]. -func (d *draPlugin) RegistrationStatus() *registerapi.RegistrationStatus { +// RegistrationStatus returns the result of registration, nil if none received yet. +func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus { if d.registrar == nil { return nil } + // TODO: protect against concurrency issues. return d.registrar.status } -func (d *draPlugin) internal() {} +// serializeGRPCIfEnabled locks a mutex if serialization is enabled. +// Either way it returns a method that the caller must invoke +// via defer. +func (d *Helper) serializeGRPCIfEnabled() func() { + if !d.serialize { + return func() {} + } + d.grpcMutex.Lock() + return d.grpcMutex.Unlock +} + +// nodePluginImplementation is a thin wrapper around the helper instance. +// It prevents polluting the public API with these implementation details. +type nodePluginImplementation struct { + *Helper +} + +// NodePrepareResources implements [drapb.NodePrepareResources]. +func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { + // Do slow API calls before serializing. + claims, err := d.getResourceClaims(ctx, req.Claims) + if err != nil { + return nil, fmt.Errorf("get resource claims: %w", err) + } + + defer d.serializeGRPCIfEnabled()() + + result, err := d.plugin.PrepareResourceClaims(ctx, claims) + if err != nil { + return nil, fmt.Errorf("prepare resource claims: %w", err) + } + + resp := &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{}} + for uid, claimResult := range result { + var devices []*drapb.Device + for _, result := range claimResult.Devices { + device := &drapb.Device{ + RequestNames: stripSubrequestNames(result.Requests), + PoolName: result.PoolName, + DeviceName: result.DeviceName, + CDIDeviceIDs: result.CDIDeviceIDs, + } + devices = append(devices, device) + } + resp.Claims[string(uid)] = &drapb.NodePrepareResourceResponse{ + Error: errorString(claimResult.Err), + Devices: devices, + } + } + return resp, nil +} + +func errorString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + +func stripSubrequestNames(names []string) []string { + stripped := make([]string, len(names)) + for i, name := range names { + stripped[i] = resourceclaim.BaseRequestRef(name) + } + return stripped +} + +func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims []*drapb.Claim) ([]*resourceapi.ResourceClaim, error) { + var resourceClaims []*resourceapi.ResourceClaim + for _, claimReq := range claims { + claim, err := d.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{}) + if err != nil { + return resourceClaims, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err) + } + if claim.Status.Allocation == nil { + return resourceClaims, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name) + } + if claim.UID != types.UID(claimReq.UID) { + return resourceClaims, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name) + } + resourceClaims = append(resourceClaims, claim) + } + return resourceClaims, nil +} + +// NodeUnprepareResources implements [draapi.NodeUnprepareResources]. +func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { + defer d.serializeGRPCIfEnabled() + + claims := make([]NamespacedObject, 0, len(req.Claims)) + for _, claim := range req.Claims { + claims = append(claims, NamespacedObject{UID: types.UID(claim.UID), NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: claim.Namespace}}) + } + result, err := d.plugin.UnprepareResourceClaims(ctx, claims) + if err != nil { + return nil, fmt.Errorf("unprepare resource claims: %w", err) + } + + resp := &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{}} + for uid, err := range result { + resp.Claims[string(uid)] = &drapb.NodeUnprepareResourceResponse{ + Error: errorString(err), + } + } + return resp, nil +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go new file mode 100644 index 00000000000..d8bf451794a --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/namespacedobject.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeletplugin + +import ( + "k8s.io/apimachinery/pkg/types" +) + +// NamespacedObject comprises a resource name with a mandatory namespace +// and optional UID. It gets rendered as "/:[]" +// (text output) or as an object (JSON output). +type NamespacedObject struct { + types.NamespacedName + UID types.UID +} + +// String returns the general purpose string representation +func (n NamespacedObject) String() string { + if n.UID != "" { + return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID) + } + return n.Namespace + string(types.Separator) + n.Name +} + +// MarshalLog emits a struct containing required key/value pair +func (n NamespacedObject) MarshalLog() interface{} { + return struct { + Name string `json:"name"` + Namespace string `json:"namespace,omitempty"` + UID types.UID `json:"uid,omitempty"` + }{ + Name: n.Name, + Namespace: n.Namespace, + UID: n.UID, + } +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go index c8b0a2594d7..13d4f5ac46a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go @@ -82,14 +82,18 @@ func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryIntercept // Run a gRPC server. It will close the listening socket when // shutting down, so we don't need to do that. + // The per-context logger always gets initialized because + // there might be log output inside the method implementations. var opts []grpc.ServerOption - finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)} - finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)} - if grpcVerbosity >= 0 { - finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor) - finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor) + finalUnaryInterceptors := []grpc.UnaryServerInterceptor{ + unaryContextInterceptor(valueCtx), + s.interceptor, } finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...) + finalStreamInterceptors := []grpc.StreamServerInterceptor{ + streamContextInterceptor(valueCtx), + s.streamInterceptor, + } finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...) opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...)) @@ -164,7 +168,6 @@ func (m mergeCtx) Value(i interface{}) interface{} { // sequentially increasing request ID and adds that logger to the context. It // also logs request and response. func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - requestID := atomic.AddInt64(&requestID, 1) logger := klog.FromContext(ctx) logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod) diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index ea2ef1f2675..61fdb22fc5f 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -430,7 +430,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), kubeletplugin.KubeletPluginSocketPath(draAddr), - kubeletplugin.NodeV1alpha4(d.NodeV1alpha4), kubeletplugin.NodeV1beta1(d.NodeV1beta1), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index e03479ecaa2..b93c44a9b1e 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1737,15 +1737,13 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, }) }) - multipleDrivers := func(nodeV1alpha4, nodeV1beta1 bool) { + multipleDrivers := func(nodeV1beta1 bool) { nodes := NewNodes(f, 1, 4) driver1 := NewDriver(f, nodes, perNode(2, nodes)) - driver1.NodeV1alpha4 = nodeV1alpha4 driver1.NodeV1beta1 = nodeV1beta1 b1 := newBuilder(f, driver1) driver2 := NewDriver(f, nodes, perNode(2, nodes)) - driver2.NodeV1alpha4 = nodeV1alpha4 driver2.NodeV1beta1 = nodeV1beta1 driver2.NameSuffix = "-other" b2 := newBuilder(f, driver2) @@ -1769,16 +1767,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, b1.testPod(ctx, f, pod) }) } - multipleDriversContext := func(prefix string, nodeV1alpha4, nodeV1beta1 bool) { + multipleDriversContext := func(prefix string, nodeV1beta1 bool) { ginkgo.Context(prefix, func() { - multipleDrivers(nodeV1alpha4, nodeV1beta1) + multipleDrivers(nodeV1beta1) }) } ginkgo.Context("multiple drivers", func() { - multipleDriversContext("using only drapbv1alpha4", true, false) - multipleDriversContext("using only drapbv1beta1", false, true) - multipleDriversContext("using both drav1alpha4 and drapbv1beta1", true, true) + multipleDriversContext("using only drapbv1beta1", true) }) ginkgo.It("runs pod after driver starts", func(ctx context.Context) { diff --git a/test/e2e/dra/test-driver/README.md b/test/e2e/dra/test-driver/README.md index 81d89389c99..abae6a8c0c5 100644 --- a/test/e2e/dra/test-driver/README.md +++ b/test/e2e/dra/test-driver/README.md @@ -60,7 +60,11 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio In another: ``` -sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry +sudo mkdir -p /var/run/cdi +sudo mkdir -p /var/lib/kubelet/plugins +sudo mkdir -p /var/lib/kubelet/plugins_registry +sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins +sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1 ``` diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 863477c54e7..498e02fc715 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -40,15 +40,13 @@ import ( "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" - drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4" - drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1" ) type ExamplePlugin struct { stopCh <-chan struct{} logger klog.Logger kubeClient kubernetes.Interface - d kubeletplugin.DRAPlugin + d *kubeletplugin.Helper fileOps FileOperations cdiDir string @@ -56,8 +54,11 @@ type ExamplePlugin struct { nodeName string deviceNames sets.Set[string] + // The mutex is needed because there are other goroutines checking the state. + // Serializing in the gRPC server alone is not enough because writing would + // race with reading. mutex sync.Mutex - prepared map[ClaimID][]Device // prepared claims -> result of nodePrepareResource + prepared map[ClaimID][]kubeletplugin.Device // prepared claims -> result of nodePrepareResource gRPCCalls []GRPCCall blockPrepareResourcesMutex sync.Mutex @@ -89,7 +90,7 @@ type GRPCCall struct { // sufficient to make the ClaimID unique. type ClaimID struct { Name string - UID string + UID types.UID } type Device struct { @@ -99,10 +100,10 @@ type Device struct { CDIDeviceID string } -var _ drapb.DRAPluginServer = &ExamplePlugin{} +var _ kubeletplugin.DRAPlugin = &ExamplePlugin{} // getJSONFilePath returns the absolute path where CDI file is/should be. -func (ex *ExamplePlugin) getJSONFilePath(claimUID string, requestName string) string { +func (ex *ExamplePlugin) getJSONFilePath(claimUID types.UID, requestName string) string { baseRequestRef := resourceclaim.BaseRequestRef(requestName) return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s-%s.json", ex.driverName, claimUID, baseRequestRef)) } @@ -151,7 +152,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube cdiDir: cdiDir, driverName: driverName, nodeName: nodeName, - prepared: make(map[ClaimID][]Device), + prepared: make(map[ClaimID][]kubeletplugin.Device), deviceNames: sets.New[string](), } @@ -167,16 +168,9 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube kubeletplugin.KubeClient(kubeClient), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), + kubeletplugin.Serialize(false), // The example plugin does its own locking. ) - // Both APIs get provided, the legacy one via wrapping. The options - // determine which one(s) really get served (by default, both). - // The options are a bit redundant now because a single instance cannot - // implement both, but that might be different in the future. - nodeServers := []any{ - drapb.DRAPluginServer(ex), // Casting is done only for clarity here, it's not needed. - drapbv1alpha4.V1Beta1ServerWrapper{DRAPluginServer: ex}, - } - d, err := kubeletplugin.Start(ctx, nodeServers, opts...) + d, err := kubeletplugin.Start(ctx, ex, opts...) if err != nil { return nil, fmt.Errorf("start kubelet plugin: %w", err) } @@ -300,34 +294,21 @@ func (ex *ExamplePlugin) getUnprepareResourcesFailure() error { // a deterministic name to simplify NodeUnprepareResource (no need to remember // or discover the name) and idempotency (when called again, the file simply // gets written again). -func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drapb.Claim) ([]Device, error) { +func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) { logger := klog.FromContext(ctx) - // The plugin must retrieve the claim itself to get it in the version - // that it understands. - claim, err := ex.kubeClient.ResourceV1beta1().ResourceClaims(claimReq.Namespace).Get(ctx, claimReq.Name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("retrieve claim %s/%s: %w", claimReq.Namespace, claimReq.Name, err) - } - if claim.Status.Allocation == nil { - return nil, fmt.Errorf("claim %s/%s not allocated", claimReq.Namespace, claimReq.Name) - } - if claim.UID != types.UID(claimReq.UID) { - return nil, fmt.Errorf("claim %s/%s got replaced", claimReq.Namespace, claimReq.Name) - } - ex.mutex.Lock() defer ex.mutex.Unlock() ex.blockPrepareResourcesMutex.Lock() defer ex.blockPrepareResourcesMutex.Unlock() - claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} + claimID := ClaimID{Name: claim.Name, UID: claim.UID} if result, ok := ex.prepared[claimID]; ok { // Idempotent call, nothing to do. return result, nil } - var devices []Device + var devices []kubeletplugin.Device for _, result := range claim.Status.Allocation.Devices.Results { // Only handle allocations for the current driver. if ex.driverName != result.Driver { @@ -356,7 +337,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap claimReqName = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(claimReqName, "_") env[claimReqName] = "true" - deviceName := "claim-" + claimReq.UID + "-" + baseRequestName + deviceName := "claim-" + string(claim.UID) + "-" + baseRequestName vendor := ex.driverName class := "test" cdiDeviceID := vendor + "/" + class + "=" + deviceName @@ -391,7 +372,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap }, }, } - filePath := ex.getJSONFilePath(claimReq.UID, baseRequestName) + filePath := ex.getJSONFilePath(claim.UID, baseRequestName) buffer, err := json.Marshal(spec) if err != nil { return nil, fmt.Errorf("marshal spec: %w", err) @@ -399,11 +380,11 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimReq *drap if err := ex.fileOps.Create(filePath, buffer); err != nil { return nil, fmt.Errorf("failed to write CDI file: %w", err) } - device := Device{ - PoolName: result.Pool, - DeviceName: result.Device, - RequestName: baseRequestName, - CDIDeviceID: cdiDeviceID, + device := kubeletplugin.Device{ + PoolName: result.Pool, + DeviceName: result.Device, + Requests: []string{result.Request}, // May also return baseRequestName here. + CDIDeviceIDs: []string{cdiDeviceID}, } devices = append(devices, device) } @@ -434,48 +415,35 @@ func extractParameters(parameters runtime.RawExtension, env *map[string]string, return nil } -func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) { - resp := &drapb.NodePrepareResourcesResponse{ - Claims: make(map[string]*drapb.NodePrepareResourceResponse), - } - +func (ex *ExamplePlugin) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) { if failure := ex.getPrepareResourcesFailure(); failure != nil { - return resp, failure + return nil, failure } - for _, claimReq := range req.Claims { - devices, err := ex.nodePrepareResource(ctx, claimReq) + result := make(map[types.UID]kubeletplugin.PrepareResult) + for _, claim := range claims { + devices, err := ex.nodePrepareResource(ctx, claim) + var claimResult kubeletplugin.PrepareResult if err != nil { - resp.Claims[claimReq.UID] = &drapb.NodePrepareResourceResponse{ - Error: err.Error(), - } + claimResult.Err = err } else { - r := &drapb.NodePrepareResourceResponse{} - for _, device := range devices { - pbDevice := &drapb.Device{ - PoolName: device.PoolName, - DeviceName: device.DeviceName, - RequestNames: []string{device.RequestName}, - CDIDeviceIDs: []string{device.CDIDeviceID}, - } - r.Devices = append(r.Devices, pbDevice) - } - resp.Claims[claimReq.UID] = r + claimResult.Devices = devices } + result[claim.UID] = claimResult } - return resp, nil + return result, nil } // NodeUnprepareResource removes the CDI file created by // NodePrepareResource. It's idempotent, therefore it is not an error when that // file is already gone. -func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *drapb.Claim) error { +func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error { ex.blockUnprepareResourcesMutex.Lock() defer ex.blockUnprepareResourcesMutex.Unlock() logger := klog.FromContext(ctx) - claimID := ClaimID{Name: claimReq.Name, UID: claimReq.UID} + claimID := ClaimID{Name: claimRef.Name, UID: claimRef.UID} devices, ok := ex.prepared[claimID] if !ok { // Idempotent call, nothing to do. @@ -483,11 +451,14 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr } for _, device := range devices { - filePath := ex.getJSONFilePath(claimReq.UID, device.RequestName) - if err := ex.fileOps.Remove(filePath); err != nil { - return fmt.Errorf("error removing CDI file: %w", err) + // In practice we only prepare one, but let's not assume that here. + for _, request := range device.Requests { + filePath := ex.getJSONFilePath(claimRef.UID, request) + if err := ex.fileOps.Remove(filePath); err != nil { + return fmt.Errorf("error removing CDI file: %w", err) + } + logger.V(3).Info("CDI file removed", "path", filePath) } - logger.V(3).Info("CDI file removed", "path", filePath) } delete(ex.prepared, claimID) @@ -495,26 +466,18 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimReq *dr return nil } -func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - resp := &drapb.NodeUnprepareResourcesResponse{ - Claims: make(map[string]*drapb.NodeUnprepareResourceResponse), - } +func (ex *ExamplePlugin) UnprepareResourceClaims(ctx context.Context, claims []kubeletplugin.NamespacedObject) (map[types.UID]error, error) { + result := make(map[types.UID]error) if failure := ex.getUnprepareResourcesFailure(); failure != nil { - return resp, failure + return nil, failure } - for _, claimReq := range req.Claims { - err := ex.nodeUnprepareResource(ctx, claimReq) - if err != nil { - resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{ - Error: err.Error(), - } - } else { - resp.Claims[claimReq.UID] = &drapb.NodeUnprepareResourceResponse{} - } + for _, claimRef := range claims { + err := ex.nodeUnprepareResource(ctx, claimRef) + result[claimRef.UID] = err } - return resp, nil + return result, nil } func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {