Merge pull request #119012 from pohly/dra-batch-node-prepare

kubelet: support batched prepare/unprepare in v1alpha3 DRA plugin API
This commit is contained in:
Kubernetes Prow Robot 2023-07-12 10:57:37 -07:00 committed by GitHub
commit 047d040ce7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2604 additions and 171 deletions

View File

@ -28,6 +28,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -62,10 +63,12 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
}
// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// plugin resources for the input container, issue NodePrepareResources rpc requests
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
batches := make(map[string][]*drapb.Claim)
claimInfos := make(map[types.UID]*ClaimInfo)
for i := range pod.Spec.ResourceClaims {
podClaim := &pod.Spec.ResourceClaims[i]
klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name)
@ -139,7 +142,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
sets.New(string(pod.UID)),
)
// Walk through each resourceHandle
// Loop through all plugins and prepare for calling NodePrepareResources.
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
@ -147,48 +150,71 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}
// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
ResourceHandle: resourceHandle.Data,
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
}
// Add the CDI Devices returned by NodePrepareResource to
// Call NodePrepareResources for all claims in each batch.
// If there is any error, processing gets aborted.
// We could try to continue, but that would make the code more complex.
for pluginName, claims := range batches {
// Call NodePrepareResources RPC for all resource handles.
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
}
response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodePrepareResources failed: %v", err)
}
for claimUID, result := range response.Claims {
reqClaim := lookupClaimRequest(claims, claimUID)
if reqClaim == nil {
return fmt.Errorf("NodePrepareResources returned result for unknown claim UID %s", claimUID)
}
if result.Error != "" {
return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
}
claimInfo := claimInfos[types.UID(claimUID)]
// Add the CDI Devices returned by NodePrepareResources to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
err = claimInfo.addCDIDevices(pluginName, result.CDIDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}
// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// NodePrepareResources call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// NodePrepareResources and syncToCheckpoint. It will result
// in not calling NodeUnprepareResources for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
}
// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
// Checkpoint to reduce redundant calls to
// NodePrepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
unfinished := len(claims) - len(response.Claims)
if unfinished != 0 {
return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
}
}
// Checkpoint to capture all of the previous addPodReference() calls.
@ -199,6 +225,15 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
return nil
}
func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
for _, claim := range claims {
if claim.Uid == claimUID {
return claim
}
}
return nil
}
func claimIsUsedByPod(podClaim *v1.PodResourceClaim, pod *v1.Pod) bool {
if claimIsUsedByContainers(podClaim, pod.Spec.InitContainers) {
return true
@ -274,7 +309,8 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
// Call NodeUnprepareResource RPC for every resource claim referenced by the pod
batches := make(map[string][]*drapb.Claim)
claimInfos := make(map[types.UID]*ClaimInfo)
for i := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err != nil {
@ -324,8 +360,7 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Loop through all plugins and call NodeUnprepareResource only for the
// last pod that references the claim
// Loop through all plugins and prepare for calling NodeUnprepareResources.
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
@ -334,38 +369,62 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
pluginName = claimInfo.DriverName
}
// Call NodeUnprepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
ResourceHandle: resourceHandle.Data,
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
resourceHandle.Data)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
}
// Call NodeUnprepareResources for all claims in each batch.
// If there is any error, processing gets aborted.
// We could try to continue, but that would make the code more complex.
for pluginName, claims := range batches {
// Call NodeUnprepareResources RPC for all resource handles.
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
}
response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodeUnprepareResources failed: %v", err)
}
// Delete last pod UID only if all NodeUnprepareResource calls succeed.
// This ensures that the status manager doesn't enter termination status
// for the pod. This logic is implemented in
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
for claimUID, result := range response.Claims {
reqClaim := lookupClaimRequest(claims, claimUID)
if reqClaim == nil {
return fmt.Errorf("NodeUnprepareResources returned result for unknown claim UID %s", claimUID)
}
if result.Error != "" {
return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, err)
}
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
// Delete last pod UID only if unprepare succeeds.
// This ensures that the status manager doesn't enter termination status
// for the pod. This logic is implemented in
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo := claimInfos[types.UID(claimUID)]
claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
}
// Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
unfinished := len(claims) - len(response.Claims)
if unfinished != 0 {
return fmt.Errorf("NodeUnprepareResources left out %d claims", unfinished)
}
}
// Checkpoint to capture all of the previous deletePodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {

View File

@ -25,68 +25,53 @@ import (
"time"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/types"
grpcstatus "google.golang.org/grpc/status"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
const PluginClientTimeout = 10 * time.Second
type Client interface {
NodePrepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodePrepareResourceResponse, error)
NodeUnprepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodeUnprepareResourceResponse, error)
}
// Strongly typed address.
type draAddr string
// draPluginClient encapsulates all dra plugin methods.
type draPluginClient struct {
pluginName string
addr draAddr
nodeV1ClientCreator nodeV1ClientCreator
pluginName string
addr draAddr
nodeClientCreator nodeClientCreator
}
var _ Client = &draPluginClient{}
var _ drapb.NodeClient = &draPluginClient{}
type nodeV1ClientCreator func(addr draAddr) (
nodeClient drapbv1.NodeClient,
type nodeClientCreator func(addr draAddr) (
nodeClient drapb.NodeClient,
nodeClientOld drapbv1alpha2.NodeClient,
closer io.Closer,
err error,
)
// newV1NodeClient creates a new NodeClient with the internally used gRPC
// newNodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// This is the default implementation for the nodeClientCreator, used in
// newDRAPluginClient.
func newV1NodeClient(addr draAddr) (nodeClient drapbv1.NodeClient, closer io.Closer, err error) {
func newNodeClient(addr draAddr) (nodeClient drapb.NodeClient, nodeClientOld drapbv1alpha2.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return drapbv1.NewNodeClient(conn), conn, nil
return drapb.NewNodeClient(conn), drapbv1alpha2.NewNodeClient(conn), conn, nil
}
func NewDRAPluginClient(pluginName string) (Client, error) {
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
if pluginName == "" {
return nil, fmt.Errorf("plugin name is empty")
}
@ -97,84 +82,114 @@ func NewDRAPluginClient(pluginName string) (Client, error) {
}
return &draPluginClient{
pluginName: pluginName,
addr: draAddr(existingPlugin.endpoint),
nodeV1ClientCreator: newV1NodeClient,
pluginName: pluginName,
addr: draAddr(existingPlugin.endpoint),
nodeClientCreator: newNodeClient,
}, nil
}
func (r *draPluginClient) NodePrepareResource(
func (r *draPluginClient) NodePrepareResources(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodePrepareResourceResponse, error) {
klog.V(4).InfoS(
log("calling NodePrepareResource rpc"),
"namespace", namespace,
"claimUID", claimUID,
"claimName", claimName,
"resourceHandle", resourceHandle)
req *drapb.NodePrepareResourcesRequest,
opts ...grpc.CallOption,
) (resp *drapb.NodePrepareResourcesResponse, err error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err)
if r.nodeV1ClientCreator == nil {
return nil, errors.New("failed to call NodePrepareResource. nodeV1ClientCreator is nil")
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodePrepareResources. nodeClientCreator is nil")
}
nodeClient, closer, err := r.nodeV1ClientCreator(r.addr)
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
if err != nil {
return nil, err
}
defer closer.Close()
req := &drapbv1.NodePrepareResourceRequest{
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
ResourceHandle: resourceHandle,
}
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
return nodeClient.NodePrepareResource(ctx, req)
resp, err = nodeClient.NodePrepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
// Fall back to the older gRPC API.
resp = &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}
err = nil
for _, claim := range req.Claims {
respOld, errOld := nodeClientOld.NodePrepareResource(ctx,
&drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodePrepareResourceResponse{}
if errOld != nil {
result.Error = errOld.Error()
} else {
result.CDIDevices = respOld.CdiDevices
}
resp.Claims[claim.Uid] = result
}
}
}
return
}
func (r *draPluginClient) NodeUnprepareResource(
func (r *draPluginClient) NodeUnprepareResources(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodeUnprepareResourceResponse, error) {
klog.V(4).InfoS(
log("calling NodeUnprepareResource rpc"),
"namespace", namespace,
"claimUID", claimUID,
"claimname", claimName,
"resourceHandle", resourceHandle)
req *drapb.NodeUnprepareResourcesRequest,
opts ...grpc.CallOption,
) (resp *drapb.NodeUnprepareResourcesResponse, err error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err)
if r.nodeV1ClientCreator == nil {
return nil, errors.New("nodeV1ClientCreate is nil")
if r.nodeClientCreator == nil {
return nil, errors.New("failed to call NodeUnprepareResources. nodeClientCreator is nil")
}
nodeClient, closer, err := r.nodeV1ClientCreator(r.addr)
nodeClient, nodeClientOld, closer, err := r.nodeClientCreator(r.addr)
if err != nil {
return nil, err
}
defer closer.Close()
req := &drapbv1.NodeUnprepareResourceRequest{
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
ResourceHandle: resourceHandle,
}
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()
return nodeClient.NodeUnprepareResource(ctx, req)
resp, err = nodeClient.NodeUnprepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
// Fall back to the older gRPC API.
resp = &drapb.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
err = nil
for _, claim := range req.Claims {
_, errOld := nodeClientOld.NodeUnprepareResource(ctx,
&drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodeUnprepareResourceResponse{}
if errOld != nil {
result.Error = errOld.Error()
}
resp.Claims[claim.Uid] = result
}
}
}
return
}
func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) {

View File

@ -24,7 +24,8 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
@ -144,6 +145,24 @@ func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
}
}
// NodeV1alpha2 explicitly chooses whether the DRA gRPC API v1alpha2
// gets enabled.
func NodeV1alpha2(enabled bool) Option {
return func(o *options) error {
o.nodeV1alpha2 = enabled
return nil
}
}
// NodeV1alpha2 explicitly chooses whether the DRA gRPC API v1alpha3
// gets enabled.
func NodeV1alpha3(enabled bool) Option {
return func(o *options) error {
o.nodeV1alpha3 = enabled
return nil
}
}
type options struct {
logger klog.Logger
grpcVerbosity int
@ -152,6 +171,8 @@ type options struct {
draAddress string
pluginRegistrationEndpoint endpoint
interceptors []grpc.UnaryServerInterceptor
nodeV1alpha2, nodeV1alpha3 bool
}
// draPlugin combines the kubelet registration service and the DRA node plugin
@ -162,13 +183,15 @@ type draPlugin struct {
}
// Start sets up two gRPC servers (one for registration, one for the DRA node
// client).
func Start(nodeServer drapbv1.NodeServer, opts ...Option) (result DRAPlugin, finalErr error) {
// client). By default, all APIs implemented by the nodeServer get registered.
func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr error) {
d := &draPlugin{}
o := options{
logger: klog.Background(),
grpcVerbosity: 4,
nodeV1alpha2: true,
nodeV1alpha3: true,
}
for _, option := range opts {
if err := option(&o); err != nil {
@ -191,8 +214,18 @@ func Start(nodeServer drapbv1.NodeServer, opts ...Option) (result DRAPlugin, fin
}
// Run the node plugin gRPC server first to ensure that it is ready.
implemented := false
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
drapbv1.RegisterNodeServer(grpcServer, nodeServer)
if nodeServer, ok := nodeServer.(drapbv1alpha3.NodeServer); ok && o.nodeV1alpha3 {
o.logger.V(5).Info("registering drapbv1alpha3.NodeServer")
drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer)
implemented = true
}
if nodeServer, ok := nodeServer.(drapbv1alpha2.NodeServer); ok && o.nodeV1alpha2 {
o.logger.V(5).Info("registering drapbv1alpha2.NodeServer")
drapbv1alpha2.RegisterNodeServer(grpcServer, nodeServer)
implemented = true
}
})
if err != nil {
return nil, fmt.Errorf("start node client: %v", err)
@ -208,6 +241,9 @@ func Start(nodeServer drapbv1.NodeServer, opts ...Option) (result DRAPlugin, fin
plugin.stop()
}
}()
if !implemented {
return nil, errors.New("no supported DRA gRPC API is implemented and enabled")
}
// Now make it available to kubelet.
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,103 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// To regenerate api.pb.go run `hack/update-codegen.sh protobindings`
syntax = "proto3";
package v1alpha3;
option go_package = "k8s.io/kubelet/pkg/apis/dra/v1alpha3";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.goproto_getters_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_unrecognized_all) = false;
service Node {
// NodePrepareResources prepares several ResourceClaims
// for use on the node. If an error is returned, the
// response is ignored. Failures for individidual claims
// can be reported inside NodePrepareResourcesResponse.
rpc NodePrepareResources (NodePrepareResourcesRequest)
returns (NodePrepareResourcesResponse) {}
// NodeUnprepareResources is the opposite of NodePrepareResources.
// The same error handling rules apply,
rpc NodeUnprepareResources (NodeUnprepareResourcesRequest)
returns (NodeUnprepareResourcesResponse) {}
}
message NodePrepareResourcesRequest {
// The list of ResourceClaims that are to be prepared.
repeated Claim claims = 1;
}
message NodePrepareResourcesResponse {
// The ResourceClaims for which preparation was done
// or attempted, with claim_uid as key.
//
// It is an error if some claim listed in NodePrepareResourcesRequest
// does not get prepared. NodePrepareResources
// will be called again for those that are missing.
map<string, NodePrepareResourceResponse> claims = 1;
}
message NodePrepareResourceResponse {
// These are the additional devices that kubelet must
// make available via the container runtime. A resource
// may have zero or more devices.
repeated string cdi_devices = 1 [(gogoproto.customname) = "CDIDevices"];
// If non-empty, preparing the ResourceClaim failed.
// cdi_devices is ignored in that case.
string error = 2;
}
message NodeUnprepareResourcesRequest {
// The list of ResourceClaims that are to be unprepared.
repeated Claim claims = 1;
}
message NodeUnprepareResourcesResponse {
// The ResourceClaims for which preparation was reverted.
// The same rules as for NodePrepareResourcesResponse.claims
// apply.
map<string, NodeUnprepareResourceResponse> claims = 1;
}
message NodeUnprepareResourceResponse {
// If non-empty, unpreparing the ResourceClaim failed.
string error = 1;
}
message Claim {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
string namespace = 1;
// The UID of the Resource claim (ResourceClaim.meta.UUID).
// This field is REQUIRED.
string uid = 2;
// The name of the Resource claim (ResourceClaim.meta.Name)
// This field is REQUIRED.
string name = 3;
// Resource handle (AllocationResult.ResourceHandles[*].Data)
// This field is REQUIRED.
string resource_handle = 4;
}

View File

@ -48,8 +48,10 @@ import (
)
const (
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
NodePrepareResourceMethod = "/v1alpha2.Node/NodePrepareResource"
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource"
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
)
type Nodes struct {
@ -87,9 +89,11 @@ func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
// up after the test.
func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
NodeV1alpha2: true,
NodeV1alpha3: true,
}
ginkgo.BeforeEach(func() {
@ -121,6 +125,8 @@ type Driver struct {
Name string
Nodes map[string]*app.ExamplePlugin
NodeV1alpha2, NodeV1alpha3 bool
mutex sync.Mutex
fail map[MethodInstance]bool
callCounts map[MethodInstance]int64
@ -229,6 +235,8 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
kubeletplugin.KubeletPluginSocketPath(draAddr),
kubeletplugin.NodeV1alpha2(d.NodeV1alpha2),
kubeletplugin.NodeV1alpha3(d.NodeV1alpha3),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() {

View File

@ -67,9 +67,9 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
ginkgo.By("the driver is running")
})
ginkgo.It("must retry NodePrepareResource", func(ctx context.Context) {
ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) {
// We have exactly one host.
m := MethodInstance{driver.Nodenames()[0], NodePrepareResourceMethod}
m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod}
driver.Fail(m, true)
@ -79,10 +79,10 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
b.create(ctx, parameters, pod, template)
ginkgo.By("wait for NodePrepareResource call")
ginkgo.By("wait for NodePrepareResources call")
gomega.Eventually(ctx, func(ctx context.Context) error {
if driver.CallCount(m) == 0 {
return errors.New("NodePrepareResource not called yet")
return errors.New("NodePrepareResources not called yet")
}
return nil
}).WithTimeout(podStartTimeout).Should(gomega.Succeed())
@ -93,7 +93,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "start pod with inline resource claim")
if driver.CallCount(m) == callCount {
framework.Fail("NodePrepareResource should have been called again")
framework.Fail("NodePrepareResources should have been called again")
}
})
@ -593,44 +593,64 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
})
})
ginkgo.Context("multiple drivers", func() {
multipleDrivers := func(nodeV1alpha2, nodeV1alpha3 bool) {
nodes := NewNodes(f, 1, 4)
driver1 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 1,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
}
})
driver1.NodeV1alpha2 = nodeV1alpha2
driver1.NodeV1alpha3 = nodeV1alpha3
b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 1,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
}
})
driver2.NameSuffix = "-other"
driver2.NodeV1alpha2 = nodeV1alpha2
driver2.NodeV1alpha3 = nodeV1alpha3
b2 := newBuilder(f, driver2)
ginkgo.It("work", func(ctx context.Context) {
parameters1 := b1.parameters()
parameters2 := b2.parameters()
claim1 := b1.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
claim1b := b1.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
claim2 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
claim2b := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod := b1.podExternal()
pod.Spec.ResourceClaims = append(pod.Spec.ResourceClaims,
v1.PodResourceClaim{
Name: "claim2",
Source: v1.ClaimSource{
ResourceClaimName: &claim2.Name,
for i, claim := range []*resourcev1alpha2.ResourceClaim{claim1b, claim2, claim2b} {
claim := claim
pod.Spec.ResourceClaims = append(pod.Spec.ResourceClaims,
v1.PodResourceClaim{
Name: fmt.Sprintf("claim%d", i+1),
Source: v1.ClaimSource{
ResourceClaimName: &claim.Name,
},
},
},
)
b1.create(ctx, parameters1, parameters2, claim1, claim2, pod)
)
}
b1.create(ctx, parameters1, parameters2, claim1, claim1b, claim2, claim2b, pod)
b1.testPod(ctx, f.ClientSet, pod)
})
}
multipleDriversContext := func(prefix string, nodeV1alpha2, nodeV1alpha3 bool) {
ginkgo.Context(prefix, func() {
multipleDrivers(nodeV1alpha2, nodeV1alpha3)
})
}
ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha2", true, false)
multipleDriversContext("using only drapbv1alpha3", false, true)
multipleDriversContext("using both drapbv1alpha2 and drapbv1alpha3", true, true)
})
})

View File

@ -42,3 +42,13 @@ var NodePrepareResourceCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall)
}
return false, nil
}).WithMessage("contain NodePrepareResource call")
// NodePrepareResoucesCalled checks that NodePrepareResources API has been called
var NodePrepareResourcesCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodePrepareResources call")

View File

@ -28,7 +28,8 @@ import (
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
type ExamplePlugin struct {
@ -69,7 +70,7 @@ type ClaimID struct {
UID string
}
var _ drapbv1.NodeServer = &ExamplePlugin{}
var _ drapbv1alpha2.NodeServer = &ExamplePlugin{}
// getJSONFilePath returns the absolute path where CDI file is/should be.
func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string {
@ -147,7 +148,7 @@ func (ex *ExamplePlugin) Block() {
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
// gets written again).
func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.NodePrepareResourceRequest) (*drapbv1.NodePrepareResourceResponse, error) {
func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1alpha2.NodePrepareResourceRequest) (*drapbv1alpha2.NodePrepareResourceResponse, error) {
logger := klog.FromContext(ctx)
// Block to emulate plugin stuckness or slowness.
@ -201,7 +202,7 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.N
}
dev := vendor + "/" + class + "=" + deviceName
resp := &drapbv1.NodePrepareResourceResponse{CdiDevices: []string{dev}}
resp := &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{dev}}
ex.mutex.Lock()
defer ex.mutex.Unlock()
@ -211,10 +212,34 @@ func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.N
return resp, nil
}
func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
resp := &drapbv1alpha3.NodePrepareResourcesResponse{
Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
}
for _, claimReq := range req.Claims {
claimResp, err := ex.NodePrepareResource(ctx, &drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
})
if err != nil {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodePrepareResourceResponse{
Error: err.Error(),
}
} else {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodePrepareResourceResponse{
CDIDevices: claimResp.CdiDevices,
}
}
}
return resp, nil
}
// NodeUnprepareResource removes the CDI file created by
// NodePrepareResource. It's idempotent, therefore it is not an error when that
// file is already gone.
func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1.NodeUnprepareResourceRequest) (*drapbv1.NodeUnprepareResourceResponse, error) {
func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1alpha2.NodeUnprepareResourceRequest) (*drapbv1alpha2.NodeUnprepareResourceResponse, error) {
logger := klog.FromContext(ctx)
// Block to emulate plugin stuckness or slowness.
@ -234,7 +259,29 @@ func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1
defer ex.mutex.Unlock()
delete(ex.prepared, ClaimID{Name: req.ClaimName, UID: req.ClaimUid})
return &drapbv1.NodeUnprepareResourceResponse{}, nil
return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
}
func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
resp := &drapbv1alpha3.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapbv1alpha3.NodeUnprepareResourceResponse),
}
for _, claimReq := range req.Claims {
_, err := ex.NodeUnprepareResource(ctx, &drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claimReq.Namespace,
ClaimName: claimReq.Name,
ClaimUid: claimReq.Uid,
ResourceHandle: claimReq.ResourceHandle,
})
if err != nil {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodeUnprepareResourceResponse{
Error: err.Error(),
}
} else {
resp.Claims[claimReq.Uid] = &drapbv1alpha3.NodeUnprepareResourceResponse{}
}
}
return resp, nil
}
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {

View File

@ -109,8 +109,8 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation][Node
framework.ExpectNoError(err)
})
ginkgo.It("must keep pod in pending state if NodePrepareResource times out", func(ctx context.Context) {
ginkgo.By("set delay for the NodePrepareResource call")
ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
ginkgo.By("set delay for the NodePrepareResources call")
kubeletPlugin.Block()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
@ -120,8 +120,8 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation][Node
})
framework.ExpectNoError(err)
ginkgo.By("wait for NodePrepareResource call")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourceCalled)
ginkgo.By("wait for NodePrepareResources call")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesCalled)
// TODO: Check condition or event when implemented
// see https://github.com/kubernetes/kubernetes/issues/118468 for details

1
vendor/modules.txt vendored
View File

@ -2262,6 +2262,7 @@ k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1
k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1
k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1
k8s.io/kubelet/pkg/apis/dra/v1alpha2
k8s.io/kubelet/pkg/apis/dra/v1alpha3
k8s.io/kubelet/pkg/apis/pluginregistration/v1
k8s.io/kubelet/pkg/apis/podresources/v1
k8s.io/kubelet/pkg/apis/podresources/v1alpha1