DRA: remove support for v1alpha2 kubelet API

The v1alpha2 API is several releases old. No current drivers should still
depend on it.
This commit is contained in:
Patrick Ohly 2024-04-10 14:50:46 +02:00
parent f39ece24b2
commit 77341f7595
9 changed files with 50 additions and 1809 deletions

View File

@ -22,112 +22,13 @@ import (
"time"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"k8s.io/klog/v2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
const PluginClientTimeout = 45 * time.Second
type (
nodeResourceManager interface {
Prepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error)
Unprepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error)
}
v1alpha2NodeResourceManager struct{}
v1alpha3NodeResourceManager struct{}
)
var nodeResourceManagers = map[string]nodeResourceManager{
v1alpha2Version: v1alpha2NodeResourceManager{},
v1alpha3Version: v1alpha3NodeResourceManager{},
}
func (v1alpha2rm v1alpha2NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
nodeClient := drapbv1alpha2.NewNodeClient(conn)
response := &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}
for _, claim := range req.Claims {
req := &drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
StructuredResourceHandle: claim.StructuredResourceHandle,
}
res, err := nodeClient.NodePrepareResource(ctx, req)
result := &drapb.NodePrepareResourceResponse{}
if err != nil {
result.Error = err.Error()
} else {
result.CDIDevices = res.CdiDevices
}
response.Claims[claim.Uid] = result
}
return response, nil
}
func (v1alpha2rm v1alpha2NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
nodeClient := drapbv1alpha2.NewNodeClient(conn)
response := &drapb.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
for _, claim := range req.Claims {
_, err := nodeClient.NodeUnprepareResource(ctx,
&drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodeUnprepareResourceResponse{}
if err != nil {
result.Error = err.Error()
}
response.Claims[claim.Uid] = result
}
return response, nil
}
func (v1alpha3rm v1alpha3NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodePrepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
p.setVersion(v1alpha2Version)
return nodeResourceManagers[v1alpha2Version].Prepare(ctx, conn, p, req)
}
return nil, err
}
return response, nil
}
func (v1alpha3rm v1alpha3NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodeUnprepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
p.setVersion(v1alpha2Version)
return nodeResourceManagers[v1alpha2Version].Unprepare(ctx, conn, p, req)
}
return nil, err
}
return response, nil
}
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
if pluginName == "" {
return nil, fmt.Errorf("plugin name is empty")
@ -157,15 +58,8 @@ func (p *plugin) NodePrepareResources(
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
defer cancel()
version := p.getVersion()
resourceManager, exists := nodeResourceManagers[version]
if !exists {
err := fmt.Errorf("unsupported plugin version: %s", version)
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", nil, "err", err)
return nil, err
}
response, err := resourceManager.Prepare(ctx, conn, p, req)
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodePrepareResources(ctx, req)
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
return response, err
}
@ -186,15 +80,8 @@ func (p *plugin) NodeUnprepareResources(
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
defer cancel()
version := p.getVersion()
resourceManager, exists := nodeResourceManagers[version]
if !exists {
err := fmt.Errorf("unsupported plugin version: %s", version)
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", nil, "err", err)
return nil, err
}
response, err := resourceManager.Unprepare(ctx, conn, p, req)
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodeUnprepareResources(ctx, req)
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
return response, err
}

View File

@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
@ -35,11 +34,14 @@ type fakeV1alpha3GRPCServer struct {
drapbv1alpha3.UnimplementedNodeServer
}
func (f *fakeV1alpha3GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
var _ drapbv1alpha3.NodeServer = &fakeV1alpha3GRPCServer{}
func (f *fakeV1alpha3GRPCServer) NodePrepareResources(ctx context.Context, in *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
return &drapbv1alpha3.NodePrepareResourcesResponse{Claims: map[string]*drapbv1alpha3.NodePrepareResourceResponse{"dummy": {CDIDevices: []string{"dummy"}}}}, nil
}
func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
func (f *fakeV1alpha3GRPCServer) NodeUnprepareResources(ctx context.Context, in *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
}
@ -53,18 +55,6 @@ func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.No
return nil
}
type fakeV1alpha2GRPCServer struct {
drapbv1alpha2.UnimplementedNodeServer
}
func (f *fakeV1alpha2GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha2.NodePrepareResourceRequest) (*drapbv1alpha2.NodePrepareResourceResponse, error) {
return &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{"dummy"}}, nil
}
func (f *fakeV1alpha2GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha2.NodeUnprepareResourceRequest) (*drapbv1alpha2.NodeUnprepareResourceResponse, error) {
return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
}
type tearDown func()
func setupFakeGRPCServer(version string) (string, tearDown, error) {
@ -88,9 +78,6 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) {
s := grpc.NewServer()
switch version {
case v1alpha2Version:
fakeGRPCServer := &fakeV1alpha2GRPCServer{}
drapbv1alpha2.RegisterNodeServer(s, fakeGRPCServer)
case v1alpha3Version:
fakeGRPCServer := &fakeV1alpha3GRPCServer{}
drapbv1alpha3.RegisterNodeServer(s, fakeGRPCServer)
@ -120,7 +107,6 @@ func TestGRPCConnIsReused(t *testing.T) {
p := &plugin{
endpoint: addr,
version: v1alpha3Version,
}
conn, err := p.getOrCreateGRPCConn()
@ -231,7 +217,7 @@ func TestNewDRAPluginClient(t *testing.T) {
}
}
func TestNodeUnprepareResource(t *testing.T) {
func TestNodeUnprepareResources(t *testing.T) {
for _, test := range []struct {
description string
serverSetup func(string) (string, tearDown, error)
@ -244,21 +230,6 @@ func TestNodeUnprepareResource(t *testing.T) {
serverVersion: v1alpha3Version,
request: &drapbv1alpha3.NodeUnprepareResourcesRequest{},
},
{
description: "server supports v1alpha2, plugin client should fallback",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha2Version,
request: &drapbv1alpha3.NodeUnprepareResourcesRequest{
Claims: []*drapbv1alpha3.Claim{
{
Namespace: "dummy-namespace",
Uid: "dummy-uid",
Name: "dummy-claim",
ResourceHandle: "dummy-resource",
},
},
},
},
} {
t.Run(test.description, func(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
@ -269,7 +240,6 @@ func TestNodeUnprepareResource(t *testing.T) {
p := &plugin{
endpoint: addr,
version: v1alpha3Version,
clientTimeout: PluginClientTimeout,
}
@ -320,13 +290,6 @@ func TestListAndWatchResources(t *testing.T) {
},
expectError: "EOF",
},
{
description: "server doesn't support NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha2Version,
request: new(drapbv1alpha3.NodeListAndWatchResourcesRequest),
expectError: "Unimplemented",
},
} {
t.Run(test.description, func(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
@ -337,7 +300,6 @@ func TestListAndWatchResources(t *testing.T) {
p := &plugin{
endpoint: addr,
version: v1alpha3Version,
}
conn, err := p.getOrCreateGRPCConn()

View File

@ -38,16 +38,13 @@ const (
// DRAPluginName is the name of the in-tree DRA Plugin.
DRAPluginName = "kubernetes.io/dra"
v1alpha3Version = "v1alpha3"
v1alpha2Version = "v1alpha2"
)
// Plugin is a description of a DRA Plugin, defined by an endpoint
// and the highest DRA version supported.
// Plugin is a description of a DRA Plugin, defined by an endpoint.
type plugin struct {
sync.Mutex
conn *grpc.ClientConn
endpoint string
version string
highestSupportedVersion *utilversion.Version
clientTimeout time.Duration
}
@ -84,18 +81,6 @@ func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
return p.conn, nil
}
func (p *plugin) getVersion() string {
p.Lock()
defer p.Unlock()
return p.version
}
func (p *plugin) setVersion(version string) {
p.Lock()
p.version = version
p.Unlock()
}
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
controller *nodeResourcesController
@ -135,7 +120,6 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
pluginInstance := &plugin{
conn: nil,
endpoint: endpoint,
version: v1alpha3Version,
highestSupportedVersion: highestSupportedVersion,
clientTimeout: timeout,
}

View File

@ -24,7 +24,6 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
@ -154,16 +153,7 @@ func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
}
}
// NodeV1alpha2 explicitly chooses whether the DRA gRPC API v1alpha2
// gets enabled.
func NodeV1alpha2(enabled bool) Option {
return func(o *options) error {
o.nodeV1alpha2 = enabled
return nil
}
}
// NodeV1alpha2 explicitly chooses whether the DRA gRPC API v1alpha3
// NodeV1alpha3 explicitly chooses whether the DRA gRPC API v1alpha3
// gets enabled.
func NodeV1alpha3(enabled bool) Option {
return func(o *options) error {
@ -182,7 +172,7 @@ type options struct {
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
nodeV1alpha2, nodeV1alpha3 bool
nodeV1alpha3 bool
}
// draPlugin combines the kubelet registration service and the DRA node plugin
@ -200,7 +190,6 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
o := options{
logger: klog.Background(),
grpcVerbosity: 4,
nodeV1alpha2: true,
nodeV1alpha3: true,
}
for _, option := range opts {
@ -231,11 +220,6 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer)
implemented = true
}
if nodeServer, ok := nodeServer.(drapbv1alpha2.NodeServer); ok && o.nodeV1alpha2 {
o.logger.V(5).Info("registering drapbv1alpha2.NodeServer")
drapbv1alpha2.RegisterNodeServer(grpcServer, nodeServer)
implemented = true
}
})
if err != nil {
return nil, fmt.Errorf("start node client: %v", err)

File diff suppressed because it is too large Load Diff

View File

@ -1,92 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// To regenerate api.pb.go run `hack/update-codegen.sh protobindings`
syntax = "proto3";
package v1alpha2;
option go_package = "k8s.io/kubelet/pkg/apis/dra/v1alpha2";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "k8s.io/api/resource/v1alpha2/generated.proto";
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.goproto_getters_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_unrecognized_all) = false;
service Node {
rpc NodePrepareResource (NodePrepareResourceRequest)
returns (NodePrepareResourceResponse) {}
rpc NodeUnprepareResource (NodeUnprepareResourceRequest)
returns (NodeUnprepareResourceResponse) {}
}
message NodePrepareResourceRequest {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
string namespace = 1;
// The UID of the Resource claim (ResourceClaim.meta.UUID).
// This field is REQUIRED.
string claim_uid = 2;
// The name of the Resource claim (ResourceClaim.meta.Name)
// This field is REQUIRED.
string claim_name = 3;
// Resource handle (AllocationResult.ResourceHandles[*].Data)
// This field is REQUIRED.
string resource_handle = 4;
// Structured parameter resource handle (AllocationResult.ResourceHandles[*].StructuredData).
// This field is OPTIONAL. If present, it needs to be used
// instead of resource_handle. It will only have a single entry.
//
// Using "repeated" instead of "optional" is a workaround for https://github.com/gogo/protobuf/issues/713.
repeated k8s.io.api.resource.v1alpha2.StructuredResourceHandle structured_resource_handle = 5;
}
message NodePrepareResourceResponse {
// These are the additional devices that kubelet must
// make available via the container runtime. A resource
// may have zero or more devices.
repeated string cdi_devices = 1;
}
message NodeUnprepareResourceRequest {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
string namespace = 1;
// The UID of the Resource claim (ResourceClaim.meta.UUID).
// This field is REQUIRED.
string claim_uid = 2;
// The name of the Resource claim (ResourceClaim.meta.Name)
// This field is REQUIRED.
string claim_name = 3;
// Resource handle (AllocationResult.ResourceHandles[*].Data)
// This field is REQUIRED.
string resource_handle = 4;
// Structured parameter resource handle (AllocationResult.ResourceHandles[*].StructuredData).
// This field is OPTIONAL. If present, it needs to be used
// instead of resource_handle. It will only have a single entry.
repeated k8s.io.api.resource.v1alpha2.StructuredResourceHandle structured_resource_handle = 5;
}
message NodeUnprepareResourceResponse {
// Intentionally empty.
}

View File

@ -55,9 +55,7 @@ import (
)
const (
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
)
@ -146,7 +144,6 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
NodeV1alpha2: true,
NodeV1alpha3: true,
}
@ -186,7 +183,7 @@ type Driver struct {
claimParameterAPIKind string
classParameterAPIKind string
NodeV1alpha2, NodeV1alpha3 bool
NodeV1alpha3 bool
mutex sync.Mutex
fail map[MethodInstance]bool
@ -339,7 +336,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
kubeletplugin.KubeletPluginSocketPath(draAddr),
kubeletplugin.NodeV1alpha2(d.NodeV1alpha2),
kubeletplugin.NodeV1alpha3(d.NodeV1alpha3),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)

View File

@ -1114,16 +1114,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
})
multipleDrivers := func(nodeV1alpha2, nodeV1alpha3 bool) {
multipleDrivers := func(nodeV1alpha3 bool) {
nodes := NewNodes(f, 1, 4)
driver1 := NewDriver(f, nodes, perNode(2, nodes))
driver1.NodeV1alpha2 = nodeV1alpha2
driver1.NodeV1alpha3 = nodeV1alpha3
b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, perNode(2, nodes))
driver2.NameSuffix = "-other"
driver2.NodeV1alpha2 = nodeV1alpha2
driver2.NodeV1alpha3 = nodeV1alpha3
b2 := newBuilder(f, driver2)
@ -1150,16 +1148,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
b1.testPod(ctx, f.ClientSet, pod)
})
}
multipleDriversContext := func(prefix string, nodeV1alpha2, nodeV1alpha3 bool) {
multipleDriversContext := func(prefix string, nodeV1alpha3 bool) {
ginkgo.Context(prefix, func() {
multipleDrivers(nodeV1alpha2, nodeV1alpha3)
multipleDrivers(nodeV1alpha3)
})
}
ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha2", true, false)
multipleDriversContext("using only drapbv1alpha3", false, true)
multipleDriversContext("using both drapbv1alpha2 and drapbv1alpha3", true, true)
multipleDriversContext("using only drapbv1alpha3", true)
})
})

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
@ -80,7 +79,6 @@ type ClaimID struct {
UID string
}
var _ drapbv1alpha2.NodeServer = &ExamplePlugin{}
var _ drapbv1alpha3.NodeServer = &ExamplePlugin{}
// getJSONFilePath returns the absolute path where CDI file is/should be.
@ -174,7 +172,7 @@ func (ex *ExamplePlugin) Block() {
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
// gets written again).
func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1alpha2.NodePrepareResourceRequest) (*drapbv1alpha2.NodePrepareResourceResponse, error) {
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) ([]string, error) {
logger := klog.FromContext(ctx)
// Block to emulate plugin stuckness or slowness.
@ -187,32 +185,30 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
ex.mutex.Lock()
defer ex.mutex.Unlock()
deviceName := "claim-" + req.ClaimUid
deviceName := "claim-" + claimUID
vendor := ex.driverName
class := "test"
dev := vendor + "/" + class + "=" + deviceName
resp := &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{dev}}
claimID := ClaimID{Name: req.ClaimName, UID: req.ClaimUid}
claimID := ClaimID{Name: claimName, UID: claimUID}
if _, ok := ex.prepared[claimID]; ok {
// Idempotent call, nothing to do.
return resp, nil
return []string{dev}, nil
}
// Determine environment variables.
var p parameters
var resourceHandle any
var actualResourceHandle any
var instanceNames []string
switch len(req.StructuredResourceHandle) {
switch len(structuredResourceHandle) {
case 0:
// Control plane controller did the allocation.
if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil {
if err := json.Unmarshal([]byte(resourceHandle), &p); err != nil {
return nil, fmt.Errorf("unmarshal resource handle: %w", err)
}
resourceHandle = req.ResourceHandle
actualResourceHandle = resourceHandle
case 1:
// Scheduler did the allocation with structured parameters.
handle := req.StructuredResourceHandle[0]
handle := structuredResourceHandle[0]
if handle == nil {
return nil, errors.New("unexpected nil StructuredResourceHandle")
}
@ -243,10 +239,10 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
}
instanceNames = append(instanceNames, instanceName)
}
resourceHandle = handle
actualResourceHandle = handle
default:
// Huh?
return nil, fmt.Errorf("invalid length of NodePrepareResourceRequest.StructuredResourceHandle: %d", len(req.StructuredResourceHandle))
return nil, fmt.Errorf("invalid length of NodePrepareResourceRequest.StructuredResourceHandle: %d", len(structuredResourceHandle))
}
// Sanity check scheduling.
@ -274,7 +270,7 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
},
},
}
filePath := ex.getJSONFilePath(req.ClaimUid)
filePath := ex.getJSONFilePath(claimUID)
buffer, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("marshal spec: %w", err)
@ -283,13 +279,13 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1al
return nil, fmt.Errorf("failed to write CDI file %v", err)
}
ex.prepared[claimID] = resourceHandle
ex.prepared[claimID] = actualResourceHandle
for _, instanceName := range instanceNames {
ex.instancesInUse.Insert(instanceName)
}
logger.V(3).Info("CDI file created", "path", filePath, "device", dev)
return resp, nil
return []string{dev}, nil
}
func extractParameters(parameters runtime.RawExtension, env *map[string]string, kind string) error {
@ -314,20 +310,14 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
}
for _, claimReq := range req.Claims {
claimResp, err := ex.NodePrepareResource(ctx, &drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
StructuredResourceHandle: claimReq.StructuredResourceHandle,
})
cdiDevices, err := ex.nodePrepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
if err != nil {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodePrepareResourceResponse{
Error: err.Error(),
}
} else {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodePrepareResourceResponse{
CDIDevices: claimResp.CdiDevices,
CDIDevices: cdiDevices,
}
}
}
@ -337,45 +327,44 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
// NodeUnprepareResource removes the CDI file created by
// NodePrepareResource. It's idempotent, therefore it is not an error when that
// file is already gone.
func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1alpha2.NodeUnprepareResourceRequest) (*drapbv1alpha2.NodeUnprepareResourceResponse, error) {
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) error {
logger := klog.FromContext(ctx)
// Block to emulate plugin stuckness or slowness.
// By default the call will not be blocked as ex.block = false.
if ex.block {
<-ctx.Done()
return nil, ctx.Err()
return ctx.Err()
}
filePath := ex.getJSONFilePath(req.ClaimUid)
filePath := ex.getJSONFilePath(claimUID)
if err := ex.fileOps.Remove(filePath); err != nil {
return nil, fmt.Errorf("error removing CDI file: %w", err)
return fmt.Errorf("error removing CDI file: %w", err)
}
logger.V(3).Info("CDI file removed", "path", filePath)
ex.mutex.Lock()
defer ex.mutex.Unlock()
claimID := ClaimID{Name: req.ClaimName, UID: req.ClaimUid}
resp := &drapbv1alpha2.NodeUnprepareResourceResponse{}
claimID := ClaimID{Name: claimName, UID: claimUID}
expectedResourceHandle, ok := ex.prepared[claimID]
if !ok {
// Idempotent call, nothing to do.
return resp, nil
return nil
}
var actualResourceHandle any = req.ResourceHandle
if req.StructuredResourceHandle != nil {
if len(req.StructuredResourceHandle) != 1 {
return nil, fmt.Errorf("unexpected number of entries in StructuredResourceHandle: %d", len(req.StructuredResourceHandle))
var actualResourceHandle any = resourceHandle
if structuredResourceHandle != nil {
if len(structuredResourceHandle) != 1 {
return fmt.Errorf("unexpected number of entries in StructuredResourceHandle: %d", len(structuredResourceHandle))
}
actualResourceHandle = req.StructuredResourceHandle[0]
actualResourceHandle = structuredResourceHandle[0]
}
if diff := cmp.Diff(expectedResourceHandle, actualResourceHandle); diff != "" {
return nil, fmt.Errorf("difference between expected (-) and actual resource handle (+):\n%s", diff)
return fmt.Errorf("difference between expected (-) and actual resource handle (+):\n%s", diff)
}
delete(ex.prepared, claimID)
if structuredResourceHandle := req.StructuredResourceHandle; structuredResourceHandle != nil {
if structuredResourceHandle := structuredResourceHandle; structuredResourceHandle != nil {
for _, handle := range structuredResourceHandle {
for _, result := range handle.Results {
instanceName := result.NamedResources.Name
@ -383,8 +372,9 @@ func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1
}
}
}
delete(ex.prepared, ClaimID{Name: claimName, UID: claimUID})
return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
return nil
}
func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
@ -392,13 +382,7 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
Claims: make(map[string]*drapbv1alpha3.NodeUnprepareResourceResponse),
}
for _, claimReq := range req.Claims {
_, err := ex.NodeUnprepareResource(ctx, &drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
StructuredResourceHandle: claimReq.StructuredResourceHandle,
})
err := ex.nodeUnprepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
if err != nil {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodeUnprepareResourceResponse{
Error: err.Error(),