diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index c4c42cc8802..3899df80ee3 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -41,7 +41,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" otelsdkresource "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/semconv/v1.12.0" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -756,7 +756,9 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend ExperimentalTopologyManagerPolicyOptions: topologyManagerPolicyOptions, }, s.FailSwapOn, - kubeDeps.Recorder) + kubeDeps.Recorder, + kubeDeps.KubeClient, + ) if err != nil { return err diff --git a/hack/make-rules/update.sh b/hack/make-rules/update.sh index 25f8dc3be3a..447fc6624c8 100755 --- a/hack/make-rules/update.sh +++ b/hack/make-rules/update.sh @@ -40,6 +40,7 @@ BASH_TARGETS=" update-codegen update-generated-runtime update-generated-device-plugin + update-generated-dynamic-resource-allocation update-generated-api-compatibility-data update-generated-docs update-generated-swagger-docs diff --git a/hack/update-generated-dynamic-resource-allocation-dockerized.sh b/hack/update-generated-dynamic-resource-allocation-dockerized.sh new file mode 100755 index 00000000000..ad9f75f114d --- /dev/null +++ b/hack/update-generated-dynamic-resource-allocation-dockerized.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# Copyright 2017 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script generates `*/api.pb.go` from the protobuf file `*/api.proto`. +# Example: +# kube::protoc::generate_proto "${DYNAMIC_RESOURCE_ALLOCATION_ALPHA}" + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../" && pwd -P)" +DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/" + +source "${KUBE_ROOT}/hack/lib/protoc.sh" +kube::protoc::generate_proto "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}" diff --git a/hack/update-generated-dynamic-resource-allocation.sh b/hack/update-generated-dynamic-resource-allocation.sh new file mode 100755 index 00000000000..d20db5edb5e --- /dev/null +++ b/hack/update-generated-dynamic-resource-allocation.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +# Copyright 2017 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. + +# NOTE: All output from this script needs to be copied back to the calling +# source tree. This is managed in kube::build::copy_output in build/common.sh. +# If the output set is changed update that function. + +"${KUBE_ROOT}/build/run.sh" hack/update-generated-dynamic-resource-allocation-dockerized.sh "$@" diff --git a/hack/verify-generated-dynamic-resource-allocation.sh b/hack/verify-generated-dynamic-resource-allocation.sh new file mode 100755 index 00000000000..3f38cbf5671 --- /dev/null +++ b/hack/verify-generated-dynamic-resource-allocation.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +# Copyright 2022 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script checks whether updating of device plugin API is needed or not. We +# should run `hack/update-generated-dynamic-resource-allocation.sh` if device plugin API is +# out of date. +# Usage: `hack/verify-generated-dynamic-resource-allocation.sh`. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. +ERROR="Dynamic resource allocation kubelet plugin api is out of date. Please run hack/update-generated-dynamic-resource-allocation.sh" +DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/" + +source "${KUBE_ROOT}/hack/lib/protoc.sh" +kube::golang::setup_env + +function cleanup { + rm -rf "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/" +} + +trap cleanup EXIT + +mkdir -p "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp" +cp "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/api.pb.go" "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/" + +KUBE_VERBOSE=3 "${KUBE_ROOT}/hack/update-generated-dynamic-resource-allocation.sh" +kube::protoc::diff "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/api.pb.go" "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/api.pb.go" "${ERROR}" +echo "Generated dynamic resource allocation kubelet plugin alpha api is up to date." diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index f39bb64f66f..3937b57139f 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -115,6 +116,12 @@ type ContainerManager interface { // GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement. GetNodeAllocatableAbsolute() v1.ResourceList + // PrepareResource prepares pod resources + PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) + + // UnrepareResources unprepares pod resources + UnprepareResources(*v1.Pod) error + // Implements the podresources Provider API for CPUs, Memory and Devices podresources.CPUsProvider podresources.DevicesProvider diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index dfbfc2f7325..3268955911e 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" utilsysctl "k8s.io/component-helpers/node/util/sysctl" internalapi "k8s.io/cri-api/pkg/apis" @@ -53,6 +54,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -128,6 +130,8 @@ type containerManagerImpl struct { memoryManager memorymanager.Manager // Interface for Topology resource co-ordination topologyManager topologymanager.Manager + // Interface for Dynamic Resource Allocation management. + draManager dra.Manager } type features struct { @@ -195,7 +199,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) { // TODO(vmarmol): Add limits to the system containers. // Takes the absolute name of the specified containers. // Empty container name disables use of the specified container. -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) { subsystems, err := GetCgroupSubsystems() if err != nil { return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) @@ -307,6 +311,15 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } cm.topologyManager.AddHintProvider(cm.deviceManager) + // initialize DRA manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { + klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") + cm.draManager, err = dra.NewManagerImpl(kubeClient) + if err != nil { + return nil, err + } + } + // Initialize CPU manager cm.cpuManager, err = cpumanager.NewManager( nodeConfig.CPUManagerPolicy, @@ -642,6 +655,13 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandl // TODO: move the GetResources logic to PodContainerManager. func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { opts := &kubecontainer.RunContainerOptions{} + if cm.draManager != nil { + resOpts, err := cm.PrepareResources(pod, container) + if err != nil { + return nil, err + } + opts.Annotations = append(opts.Annotations, resOpts.Annotations...) + } // Allocate should already be called during predicateAdmitHandler.Admit(), // just try to fetch device runtime information from cached state here devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) @@ -671,13 +691,14 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle. // work as we add more and more hint providers that the TopologyManager // needs to call Allocate() on (that may not be directly intstantiated // inside this component). - return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager} + return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager, cm.draManager} } type resourceAllocator struct { cpuManager cpumanager.Manager memoryManager memorymanager.Manager deviceManager devicemanager.Manager + draManager dra.Manager } func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { @@ -1009,3 +1030,11 @@ func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresources return containerMemories } + +func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { + return cm.draManager.PrepareResources(pod, container) +} + +func (cm *containerManagerImpl) UnprepareResources(pod *v1.Pod) error { + return cm.draManager.UnprepareResources(pod) +} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 6bd5f3e3689..8441befd626 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -24,6 +24,7 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -154,6 +155,14 @@ func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList { return nil } +func (cm *containerManagerStub) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { + return nil, nil +} + +func (cm *containerManagerStub) UnprepareResources(*v1.Pod) error { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index e58c236b50f..ba4f9a1b6ad 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -25,6 +25,7 @@ import ( "k8s.io/mount-utils" v1 "k8s.io/api/core/v1" + clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" internalapi "k8s.io/cri-api/pkg/apis" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -42,6 +43,6 @@ func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config. return fmt.Errorf("Container Manager is unsupported in this build") } -func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) { return &unsupportedContainerManager{}, nil } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 742506a78b4..fb9df4b721f 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -30,6 +30,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" @@ -37,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -93,7 +95,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } // NewContainerManager creates windows container manager. -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) { // It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because // machine info is computed and cached once as part of cAdvisor object creation. // But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts @@ -250,3 +252,11 @@ func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.Contai func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList { return nil } + +func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { + return nil, nil +} + +func (cm *containerManagerImpl) UnprepareResources(*v1.Pod) error { + return nil +} diff --git a/pkg/kubelet/cm/dra/cdi.go b/pkg/kubelet/cm/dra/cdi.go new file mode 100644 index 00000000000..3d80891f9d2 --- /dev/null +++ b/pkg/kubelet/cm/dra/cdi.go @@ -0,0 +1,283 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The code below was copied from +// https://github.com/container-orchestrated-devices/container-device-interface/blob/v0.5.3/pkg/cdi/annotations.go +// https://github.com/container-orchestrated-devices/container-device-interface/blob/v0.5.3/pkg/cdi/qualified-device.go +// to avoid a dependency on that package and the indirect dependencies that +// this would have implied. +// +// Long term it would be good to avoid this duplication: +// https://github.com/container-orchestrated-devices/container-device-interface/issues/97 + +package dra + +import ( + "errors" + "fmt" + "strings" +) + +const ( + // annotationPrefix is the prefix for CDI container annotation keys. + annotationPrefix = "cdi.k8s.io/" +) + +// updateAnnotations updates annotations with a plugin-specific CDI device +// injection request for the given devices. Upon any error a non-nil error +// is returned and annotations are left intact. By convention plugin should +// be in the format of "vendor.device-type". +func updateAnnotations(annotations map[string]string, plugin string, deviceID string, devices []string) (map[string]string, error) { + key, err := annotationKey(plugin, deviceID) + if err != nil { + return annotations, fmt.Errorf("CDI annotation failed: %v", err) + } + if _, ok := annotations[key]; ok { + return annotations, fmt.Errorf("CDI annotation failed, key %q used", key) + } + value, err := annotationValue(devices) + if err != nil { + return annotations, fmt.Errorf("CDI annotation failed: %v", err) + } + + if annotations == nil { + annotations = make(map[string]string) + } + annotations[key] = value + + return annotations, nil +} + +// annotationKey returns a unique annotation key for an device allocation +// by a K8s device plugin. pluginName should be in the format of +// "vendor.device-type". deviceID is the ID of the device the plugin is +// allocating. It is used to make sure that the generated key is unique +// even if multiple allocations by a single plugin needs to be annotated. +func annotationKey(pluginName, deviceID string) (string, error) { + const maxNameLen = 63 + + if pluginName == "" { + return "", errors.New("invalid plugin name, empty") + } + if deviceID == "" { + return "", errors.New("invalid deviceID, empty") + } + + name := pluginName + "_" + strings.ReplaceAll(deviceID, "/", "_") + + if len(name) > maxNameLen { + return "", fmt.Errorf("invalid plugin+deviceID %q, too long", name) + } + + if c := rune(name[0]); !isAlphaNumeric(c) { + return "", fmt.Errorf("invalid name %q, first '%c' should be alphanumeric", name, c) + } + if len(name) > 2 { + for _, c := range name[1 : len(name)-1] { + switch { + case isAlphaNumeric(c): + case c == '_' || c == '-' || c == '.': + default: + return "", fmt.Errorf("invalid name %q, invalid charcter '%c'", name, c) + } + } + } + if c := rune(name[len(name)-1]); !isAlphaNumeric(c) { + return "", fmt.Errorf("invalid name %q, last '%c' should be alphanumeric", name, c) + } + + return annotationPrefix + name, nil +} + +// annotationValue returns an annotation value for the given devices. +func annotationValue(devices []string) (string, error) { + value, sep := "", "" + for _, d := range devices { + if _, _, _, err := parseQualifiedName(d); err != nil { + return "", err + } + value += sep + d + sep = "," + } + + return value, nil +} + +// parseQualifiedName splits a qualified name into device vendor, class, +// and name. If the device fails to parse as a qualified name, or if any +// of the split components fail to pass syntax validation, vendor and +// class are returned as empty, together with the verbatim input as the +// name and an error describing the reason for failure. +func parseQualifiedName(device string) (string, string, string, error) { + vendor, class, name := parseDevice(device) + + if vendor == "" { + return "", "", device, fmt.Errorf("unqualified device %q, missing vendor", device) + } + if class == "" { + return "", "", device, fmt.Errorf("unqualified device %q, missing class", device) + } + if name == "" { + return "", "", device, fmt.Errorf("unqualified device %q, missing device name", device) + } + + if err := validateVendorName(vendor); err != nil { + return "", "", device, fmt.Errorf("invalid device %q: %v", device, err) + } + if err := validateClassName(class); err != nil { + return "", "", device, fmt.Errorf("invalid device %q: %v", device, err) + } + if err := validateDeviceName(name); err != nil { + return "", "", device, fmt.Errorf("invalid device %q: %v", device, err) + } + + return vendor, class, name, nil +} + +// parseDevice tries to split a device name into vendor, class, and name. +// If this fails, for instance in the case of unqualified device names, +// parseDevice returns an empty vendor and class together with name set +// to the verbatim input. +func parseDevice(device string) (string, string, string) { + if device == "" || device[0] == '/' { + return "", "", device + } + + parts := strings.SplitN(device, "=", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return "", "", device + } + + name := parts[1] + vendor, class := parseQualifier(parts[0]) + if vendor == "" { + return "", "", device + } + + return vendor, class, name +} + +// parseQualifier splits a device qualifier into vendor and class. +// The syntax for a device qualifier is +// +// "/" +// +// If parsing fails, an empty vendor and the class set to the +// verbatim input is returned. +func parseQualifier(kind string) (string, string) { + parts := strings.SplitN(kind, "/", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return "", kind + } + return parts[0], parts[1] +} + +// validateVendorName checks the validity of a vendor name. +// A vendor name may contain the following ASCII characters: +// - upper- and lowercase letters ('A'-'Z', 'a'-'z') +// - digits ('0'-'9') +// - underscore, dash, and dot ('_', '-', and '.') +func validateVendorName(vendor string) error { + if vendor == "" { + return fmt.Errorf("invalid (empty) vendor name") + } + if !isLetter(rune(vendor[0])) { + return fmt.Errorf("invalid vendor %q, should start with letter", vendor) + } + for _, c := range string(vendor[1 : len(vendor)-1]) { + switch { + case isAlphaNumeric(c): + case c == '_' || c == '-' || c == '.': + default: + return fmt.Errorf("invalid character '%c' in vendor name %q", + c, vendor) + } + } + if !isAlphaNumeric(rune(vendor[len(vendor)-1])) { + return fmt.Errorf("invalid vendor %q, should end with a letter or digit", vendor) + } + + return nil +} + +// validateClassName checks the validity of class name. +// A class name may contain the following ASCII characters: +// - upper- and lowercase letters ('A'-'Z', 'a'-'z') +// - digits ('0'-'9') +// - underscore and dash ('_', '-') +func validateClassName(class string) error { + if class == "" { + return fmt.Errorf("invalid (empty) device class") + } + if !isLetter(rune(class[0])) { + return fmt.Errorf("invalid class %q, should start with letter", class) + } + for _, c := range string(class[1 : len(class)-1]) { + switch { + case isAlphaNumeric(c): + case c == '_' || c == '-': + default: + return fmt.Errorf("invalid character '%c' in device class %q", + c, class) + } + } + if !isAlphaNumeric(rune(class[len(class)-1])) { + return fmt.Errorf("invalid class %q, should end with a letter or digit", class) + } + return nil +} + +// validateDeviceName checks the validity of a device name. +// A device name may contain the following ASCII characters: +// - upper- and lowercase letters ('A'-'Z', 'a'-'z') +// - digits ('0'-'9') +// - underscore, dash, dot, colon ('_', '-', '.', ':') +func validateDeviceName(name string) error { + if name == "" { + return fmt.Errorf("invalid (empty) device name") + } + if !isAlphaNumeric(rune(name[0])) { + return fmt.Errorf("invalid class %q, should start with a letter or digit", name) + } + if len(name) == 1 { + return nil + } + for _, c := range string(name[1 : len(name)-1]) { + switch { + case isAlphaNumeric(c): + case c == '_' || c == '-' || c == '.' || c == ':': + default: + return fmt.Errorf("invalid character '%c' in device name %q", + c, name) + } + } + if !isAlphaNumeric(rune(name[len(name)-1])) { + return fmt.Errorf("invalid name %q, should end with a letter or digit", name) + } + return nil +} + +func isLetter(c rune) bool { + return ('A' <= c && c <= 'Z') || ('a' <= c && c <= 'z') +} + +func isDigit(c rune) bool { + return '0' <= c && c <= '9' +} + +func isAlphaNumeric(c rune) bool { + return isLetter(c) || isDigit(c) +} diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go new file mode 100644 index 00000000000..cea774237f4 --- /dev/null +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -0,0 +1,110 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +// claimInfo holds information required +// to prepare and unprepare a resource claim. +type claimInfo struct { + sync.RWMutex + + // name of the DRA driver + driverName string + + // claimUID is an UID of the resource claim + claimUID types.UID + + // claimName is a name of the resource claim + claimName string + + // namespace is a claim namespace + namespace string + + // podUIDs is a set of pod UIDs that reference a resource + podUIDs sets.Set[string] + + // cdiDevices is a list of CDI devices returned by the + // GRPC API call NodePrepareResource + cdiDevices []string + + // annotations is a list of container annotations associated with + // a prepared resource + annotations []kubecontainer.Annotation +} + +func (res *claimInfo) addPodReference(podUID types.UID) { + res.Lock() + defer res.Unlock() + + res.podUIDs.Insert(string(podUID)) +} + +func (res *claimInfo) deletePodReference(podUID types.UID) { + res.Lock() + defer res.Unlock() + + res.podUIDs.Delete(string(podUID)) +} + +// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. +type claimInfoCache struct { + sync.RWMutex + claimInfo map[string]*claimInfo +} + +// newClaimInfoCache is a function that returns an instance of the claimInfoCache. +func newClaimInfoCache() *claimInfoCache { + return &claimInfoCache{ + claimInfo: make(map[string]*claimInfo), + } +} + +func (cache *claimInfoCache) add(claim, namespace string, res *claimInfo) error { + cache.Lock() + defer cache.Unlock() + + key := claim + namespace + if _, ok := cache.claimInfo[key]; ok { + return fmt.Errorf("claim %s, namespace %s already cached", claim, namespace) + } + + cache.claimInfo[claim+namespace] = res + + return nil +} + +func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo { + cache.RLock() + defer cache.RUnlock() + + return cache.claimInfo[claimName+namespace] +} + +func (cache *claimInfoCache) delete(claimName, namespace string) { + cache.Lock() + defer cache.Unlock() + + delete(cache.claimInfo, claimName+namespace) +} diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go new file mode 100644 index 00000000000..c5d257354df --- /dev/null +++ b/pkg/kubelet/cm/dra/manager.go @@ -0,0 +1,245 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/resourceclaim" + "k8s.io/klog/v2" + dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +// ManagerImpl is the structure in charge of managing DRA resource Plugins. +type ManagerImpl struct { + // cache contains cached claim info + cache *claimInfoCache + + // KubeClient reference + kubeClient clientset.Interface +} + +// NewManagerImpl creates a new manager. +func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) { + klog.V(2).InfoS("Creating DRA manager") + + manager := &ManagerImpl{ + cache: newClaimInfoCache(), + kubeClient: kubeClient, + } + + return manager, nil +} + +// Generate container annotations using CDI UpdateAnnotations API. +func generateCDIAnnotations( + claimUID types.UID, + driverName string, + cdiDevices []string, +) ([]kubecontainer.Annotation, error) { + annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices) + if err != nil { + return nil, fmt.Errorf("can't generate CDI annotations: %+v", err) + } + + kubeAnnotations := []kubecontainer.Annotation{} + for key, value := range annotations { + kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value}) + } + + return kubeAnnotations, nil +} + +// prepareContainerResources attempts to prepare all of required resource +// plugin resources for the input container, issue an NodePrepareResource rpc request +// for each new resource requirement, process their responses and update the cached +// containerResources on success. +func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Container) error { + // Process resources for each resource claim referenced by container + for range container.Resources.Claims { + for i, podResourceClaim := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) + + if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { + // resource is already prepared, add pod UID to it + claimInfo.addPodReference(pod.UID) + + continue + } + + // Query claim object from the API server + resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get( + context.TODO(), + claimName, + metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) + } + + // Check if pod is in the ReservedFor for the claim + if !resourceclaim.IsReservedForPod(pod, resourceClaim) { + return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", + pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) + } + + // Call NodePrepareResource RPC + driverName := resourceClaim.Status.DriverName + + client, err := dra.NewDRAPluginClient(driverName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) + } + + response, err := client.NodePrepareResource( + context.Background(), + resourceClaim.Namespace, + resourceClaim.UID, + resourceClaim.Name, + resourceClaim.Status.Allocation.ResourceHandle) + if err != nil { + return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err) + } + + klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) + + annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices) + if err != nil { + return fmt.Errorf("failed to generate container annotations, err: %+v", err) + } + + // Cache prepared resource + err = m.cache.add( + resourceClaim.Name, + resourceClaim.Namespace, + &claimInfo{ + driverName: driverName, + claimUID: resourceClaim.UID, + claimName: resourceClaim.Name, + namespace: resourceClaim.Namespace, + podUIDs: sets.New(string(pod.UID)), + cdiDevices: response.CdiDevices, + annotations: annotations, + }) + if err != nil { + return fmt.Errorf( + "failed to cache prepared resource, claim: %s(%s), err: %+v", + resourceClaim.Name, + resourceClaim.UID, + err, + ) + } + } + } + + return nil +} + +// getContainerInfo gets a container info from the claimInfo cache. +// This information is used by the caller to update a container config. +func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { + annotations := []kubecontainer.Annotation{} + + for i, podResourceClaim := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + + for _, claim := range container.Resources.Claims { + if podResourceClaim.Name != claim.Name { + continue + } + + claimInfo := m.cache.get(claimName, pod.Namespace) + if claimInfo == nil { + return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName) + } + + klog.V(3).InfoS("add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) + annotations = append(annotations, claimInfo.annotations...) + } + } + + return &ContainerInfo{Annotations: annotations}, nil +} + +// PrepareResources calls plugin NodePrepareResource from the registered DRA resource plugins. +func (m *ManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { + if err := m.prepareContainerResources(pod, container); err != nil { + return nil, err + } + + return m.getContainerInfo(pod, container) +} + +// UnprepareResources calls a plugin's NodeUnprepareResource API for each resource claim owned by a pod. +// This function is idempotent and may be called multiple times against the same pod. +// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have +// already been successfully unprepared. +func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { + // Call NodeUnprepareResource RPC for every resource claim referenced by the pod + for i := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + claimInfo := m.cache.get(claimName, pod.Namespace) + + // Skip calling NodeUnprepareResource if claim info is not cached + if claimInfo == nil { + continue + } + + // Delete pod UID from the cache + claimInfo.deletePodReference(pod.UID) + + // Skip calling NodeUnprepareResource if other pods are still referencing it + if len(claimInfo.podUIDs) > 0 { + continue + } + + // Call NodeUnprepareResource only for the last pod that references the claim + client, err := dra.NewDRAPluginClient(claimInfo.driverName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.driverName, err) + } + + response, err := client.NodeUnprepareResource( + context.Background(), + claimInfo.namespace, + claimInfo.claimUID, + claimInfo.claimName, + claimInfo.cdiDevices) + if err != nil { + return fmt.Errorf( + "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", + pod.Name, + claimInfo.claimUID, + claimInfo.claimName, + claimInfo.cdiDevices, err) + } + + klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) + // delete resource from the cache + m.cache.delete(claimInfo.claimName, pod.Namespace) + } + + return nil +} diff --git a/pkg/kubelet/cm/dra/plugin/client.go b/pkg/kubelet/cm/dra/plugin/client.go new file mode 100644 index 00000000000..49ab5cb1e3c --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/client.go @@ -0,0 +1,182 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "errors" + "fmt" + "io" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha1" +) + +type Client interface { + NodePrepareResource( + ctx context.Context, + namespace string, + claimUID types.UID, + claimName string, + resourceHandle string, + ) (*drapbv1.NodePrepareResourceResponse, error) + + NodeUnprepareResource( + ctx context.Context, + namespace string, + claimUID types.UID, + claimName string, + cdiDevice []string, + ) (*drapbv1.NodeUnprepareResourceResponse, error) +} + +// Strongly typed address. +type draAddr string + +// draPluginClient encapsulates all dra plugin methods. +type draPluginClient struct { + pluginName string + addr draAddr + nodeV1ClientCreator nodeV1ClientCreator +} + +var _ Client = &draPluginClient{} + +type nodeV1ClientCreator func(addr draAddr) ( + nodeClient drapbv1.NodeClient, + closer io.Closer, + err error, +) + +// newV1NodeClient creates a new NodeClient with the internally used gRPC +// connection set up. It also returns a closer which must be called to close +// the gRPC connection when the NodeClient is not used anymore. +// This is the default implementation for the nodeV1ClientCreator, used in +// newDRAPluginClient. +func newV1NodeClient(addr draAddr) (nodeClient drapbv1.NodeClient, closer io.Closer, err error) { + var conn *grpc.ClientConn + + conn, err = newGrpcConn(addr) + if err != nil { + return nil, nil, err + } + + return drapbv1.NewNodeClient(conn), conn, nil +} + +func NewDRAPluginClient(pluginName string) (Client, error) { + if pluginName == "" { + return nil, fmt.Errorf("plugin name is empty") + } + + existingPlugin := draPlugins.Get(pluginName) + if existingPlugin == nil { + return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName) + } + + return &draPluginClient{ + pluginName: pluginName, + addr: draAddr(existingPlugin.endpoint), + nodeV1ClientCreator: newV1NodeClient, + }, nil +} + +func (r *draPluginClient) NodePrepareResource( + ctx context.Context, + namespace string, + claimUID types.UID, + claimName string, + resourceHandle string, +) (*drapbv1.NodePrepareResourceResponse, error) { + klog.V(4).InfoS( + log("calling NodePrepareResource rpc"), + "namespace", namespace, + "claim UID", claimUID, + "claim name", claimName, + "resource handle", resourceHandle) + + if r.nodeV1ClientCreator == nil { + return nil, errors.New("failed to call NodePrepareResource. nodeV1ClientCreator is nil") + } + + nodeClient, closer, err := r.nodeV1ClientCreator(r.addr) + if err != nil { + return nil, err + } + defer closer.Close() + + req := &drapbv1.NodePrepareResourceRequest{ + Namespace: namespace, + ClaimUid: string(claimUID), + ClaimName: claimName, + ResourceHandle: resourceHandle, + } + + return nodeClient.NodePrepareResource(ctx, req) +} + +func (r *draPluginClient) NodeUnprepareResource( + ctx context.Context, + namespace string, + claimUID types.UID, + claimName string, + cdiDevices []string, +) (*drapbv1.NodeUnprepareResourceResponse, error) { + klog.V(4).InfoS( + log("calling NodeUnprepareResource rpc"), + "namespace", namespace, + "claim UID", claimUID, + "claim name", claimName, + "cdi devices", cdiDevices) + + if r.nodeV1ClientCreator == nil { + return nil, errors.New("nodeV1ClientCreate is nil") + } + + nodeClient, closer, err := r.nodeV1ClientCreator(r.addr) + if err != nil { + return nil, err + } + defer closer.Close() + + req := &drapbv1.NodeUnprepareResourceRequest{ + Namespace: namespace, + ClaimUid: string(claimUID), + ClaimName: claimName, + CdiDevices: cdiDevices, + } + + return nodeClient.NodeUnprepareResource(ctx, req) +} + +func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) { + network := "unix" + klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr) + + return grpc.Dial( + string(addr), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, target) + }), + ) +} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go new file mode 100644 index 00000000000..0f2a1ff4cb4 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -0,0 +1,178 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "errors" + "fmt" + "strings" + + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/klog/v2" +) + +const ( + // DRAPluginName is the name of the in-tree DRA Plugin. + DRAPluginName = "kubernetes.io/dra" +) + +// draPlugins map keeps track of all registered DRA plugins on the node +// and their corresponding sockets. +var draPlugins = &PluginsStore{} + +// RegistrationHandler is the handler which is fed to the pluginwatcher API. +type RegistrationHandler struct{} + +// NewPluginHandler returns new registration handler. +func NewRegistrationHandler() *RegistrationHandler { + return &RegistrationHandler{} +} + +// RegisterPlugin is called when a plugin can be registered. +func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error { + klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint) + + highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions) + if err != nil { + return err + } + + // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key + // all other DRA components will be able to get the actual socket of DRA plugins by its name. + draPlugins.Set(pluginName, &Plugin{ + endpoint: endpoint, + highestSupportedVersion: highestSupportedVersion, + }) + + return nil +} + +// Return the highest supported version. +func highestSupportedVersion(versions []string) (*utilversion.Version, error) { + if len(versions) == 0 { + return nil, errors.New(log("DRA plugin reporting empty array for supported versions")) + } + + var highestSupportedVersion *utilversion.Version + + var theErr error + + for i := len(versions) - 1; i >= 0; i-- { + currentHighestVer, err := utilversion.ParseGeneric(versions[i]) + if err != nil { + theErr = err + + continue + } + + if currentHighestVer.Major() > 1 { + // DRA currently only has version 1.x + continue + } + + if highestSupportedVersion == nil || highestSupportedVersion.LessThan(currentHighestVer) { + highestSupportedVersion = currentHighestVer + } + } + + if highestSupportedVersion == nil { + return nil, fmt.Errorf( + "could not find a highest supported version from versions (%v) reported by this plugin: %+v", + versions, theErr) + } + + if highestSupportedVersion.Major() != 1 { + return nil, fmt.Errorf("highest supported version reported by plugin is %v, must be v1.x", highestSupportedVersion) + } + + return highestSupportedVersion, nil +} + +func (h *RegistrationHandler) validateVersions( + callerName string, + pluginName string, + versions []string, +) (*utilversion.Version, error) { + if len(versions) == 0 { + return nil, errors.New( + log( + "%s for DRA plugin %q failed. Plugin returned an empty list for supported versions", + callerName, + pluginName, + ), + ) + } + + // Validate version + newPluginHighestVersion, err := highestSupportedVersion(versions) + if err != nil { + return nil, errors.New( + log( + "%s for DRA plugin %q failed. None of the versions specified %q are supported. err=%v", + callerName, + pluginName, + versions, + err, + ), + ) + } + + existingPlugin := draPlugins.Get(pluginName) + if existingPlugin != nil { + if !existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) { + return nil, errors.New( + log( + "%s for DRA plugin %q failed. Another plugin with the same name is already registered with a higher supported version: %q", + callerName, + pluginName, + existingPlugin.highestSupportedVersion, + ), + ) + } + } + + return newPluginHighestVersion, nil +} + +func unregisterPlugin(pluginName string) { + draPlugins.Delete(pluginName) +} + +// DeRegisterPlugin is called when a plugin has removed its socket, +// signaling it is no longer available. +func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { + klog.InfoS("DeRegister DRA plugin", "name", pluginName) + unregisterPlugin(pluginName) +} + +// ValidatePlugin is called by kubelet's plugin watcher upon detection +// of a new registration socket opened by DRA plugin. +func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + klog.InfoS("Validate DRA plugin", "name", pluginName, "endpoint", endpoint, "versions", strings.Join(versions, ",")) + + _, err := h.validateVersions("ValidatePlugin", pluginName, versions) + if err != nil { + return fmt.Errorf("validation failed for DRA plugin %s at endpoint %s: %+v", pluginName, endpoint, err) + } + + return err +} + +// log prepends log string with `kubernetes.io/dra`. +func log(msg string, parts ...interface{}) string { + return fmt.Sprintf(fmt.Sprintf("%s: %s", DRAPluginName, msg), parts...) +} diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go new file mode 100644 index 00000000000..90adb702ac7 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -0,0 +1,76 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "sync" + + utilversion "k8s.io/apimachinery/pkg/util/version" +) + +// Plugin is a description of a DRA Plugin, defined by an endpoint +// and the highest DRA version supported. +type Plugin struct { + endpoint string + highestSupportedVersion *utilversion.Version +} + +// PluginsStore holds a list of DRA Plugins. +type PluginsStore struct { + sync.RWMutex + store map[string]*Plugin +} + +// Get lets you retrieve a DRA Plugin by name. +// This method is protected by a mutex. +func (s *PluginsStore) Get(pluginName string) *Plugin { + s.RLock() + defer s.RUnlock() + + return s.store[pluginName] +} + +// Set lets you save a DRA Plugin to the list and give it a specific name. +// This method is protected by a mutex. +func (s *PluginsStore) Set(pluginName string, plugin *Plugin) { + s.Lock() + defer s.Unlock() + + if s.store == nil { + s.store = make(map[string]*Plugin) + } + + s.store[pluginName] = plugin +} + +// Delete lets you delete a DRA Plugin by name. +// This method is protected by a mutex. +func (s *PluginsStore) Delete(pluginName string) { + s.Lock() + defer s.Unlock() + + delete(s.store, pluginName) +} + +// Clear deletes all entries in the store. +// This methiod is protected by a mutex. +func (s *PluginsStore) Clear() { + s.Lock() + defer s.Unlock() + + s.store = make(map[string]*Plugin) +} diff --git a/pkg/kubelet/cm/dra/types.go b/pkg/kubelet/cm/dra/types.go new file mode 100644 index 00000000000..0f1ab678bc1 --- /dev/null +++ b/pkg/kubelet/cm/dra/types.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + v1 "k8s.io/api/core/v1" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +// Manager manages all the DRA resource plugins running on a node. +type Manager interface { + // PrepareResources prepares resources for a container in a pod. + // It communicates with the DRA resource plugin to prepare resources and + // returns resource info to trigger CDI injection by the runtime. + PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) + + // UnprepareResources calls NodeUnprepareResource GRPC from DRA plugin to unprepare pod resources + UnprepareResources(pod *v1.Pod) error +} + +// ContainerInfo contains information required by the runtime to consume prepared resources. +type ContainerInfo struct { + // The Annotations for the container + Annotations []kubecontainer.Annotation +} diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 685246276c5..06ba8b872d4 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -25,6 +25,7 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -236,3 +237,11 @@ func (cm *FakeContainerManager) GetNodeAllocatableAbsolute() v1.ResourceList { defer cm.Unlock() return nil } + +func (cm *FakeContainerManager) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { + return nil, nil +} + +func (cm *FakeContainerManager) UnprepareResources(*v1.Pod) error { + return nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1a3916a3faa..1a03b11ed46 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -72,6 +72,7 @@ import ( kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/cloudresource" "k8s.io/kubernetes/pkg/kubelet/cm" + draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -1473,8 +1474,13 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.containerLogManager.Start() // Adding Registration Callback function for CSI Driver kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) + // Adding Registration Callback function for DRA Plugin + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler())) + } // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) + // Start the plugin manager klog.V(4).InfoS("Starting plugin manager") go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) @@ -1936,6 +1942,15 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers) } + // NOTE: resources must be unprepared AFTER all containers have stopped + // and BEFORE the pod status is changed on the API server + // to avoid race conditions with the resource deallocation code in kubernetes core. + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if err := kl.containerManager.UnprepareResources(pod); err != nil { + return err + } + } + // we have successfully stopped all containers, the pod is terminating, our status is "done" klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -1953,6 +1968,7 @@ func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // generate the final status of the pod // TODO: should we simply fold this into TerminatePod? that would give a single pod update apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) + kl.statusManager.SetPodStatus(pod, apiPodStatus) // volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 882c8b49b0a..d0c023da03d 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -175,6 +175,12 @@ func NodeRules() []rbacv1.PolicyRule { // RuntimeClass nodePolicyRules = append(nodePolicyRules, rbacv1helpers.NewRule("get", "list", "watch").Groups("node.k8s.io").Resources("runtimeclasses").RuleOrDie()) + + // DRA Resource Claims + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + nodePolicyRules = append(nodePolicyRules, rbacv1helpers.NewRule("get").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie()) + } + return nodePolicyRules } diff --git a/staging/src/k8s.io/kubelet/pkg/apis/dra/OWNERS b/staging/src/k8s.io/kubelet/pkg/apis/dra/OWNERS new file mode 100644 index 00000000000..03cc9aff5b4 --- /dev/null +++ b/staging/src/k8s.io/kubelet/pkg/apis/dra/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - klueska + - pohly +reviewers: + - klueska + - pohly + - bart0sh +labels: + - sig/node diff --git a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.pb.go b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.pb.go new file mode 100644 index 00000000000..bd50df30554 --- /dev/null +++ b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.pb.go @@ -0,0 +1,1314 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: api.proto + +package dra + +import ( + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type NodePrepareResourceRequest struct { + // The ResourceClaim namespace (ResourceClaim.meta.Namespace). + // This field is REQUIRED. + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + // The UID of the Resource claim (ResourceClaim.meta.UUID). + // This field is REQUIRED. + ClaimUid string `protobuf:"bytes,2,opt,name=claim_uid,json=claimUid,proto3" json:"claim_uid,omitempty"` + // The name of the Resource claim (ResourceClaim.meta.Name) + // This field is REQUIRED. + ClaimName string `protobuf:"bytes,3,opt,name=claim_name,json=claimName,proto3" json:"claim_name,omitempty"` + // Resource handle (AllocationResult.ResourceHandle) + // This field is REQUIRED. + ResourceHandle string `protobuf:"bytes,4,opt,name=resource_handle,json=resourceHandle,proto3" json:"resource_handle,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodePrepareResourceRequest) Reset() { *m = NodePrepareResourceRequest{} } +func (*NodePrepareResourceRequest) ProtoMessage() {} +func (*NodePrepareResourceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{0} +} +func (m *NodePrepareResourceRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodePrepareResourceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodePrepareResourceRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodePrepareResourceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodePrepareResourceRequest.Merge(m, src) +} +func (m *NodePrepareResourceRequest) XXX_Size() int { + return m.Size() +} +func (m *NodePrepareResourceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_NodePrepareResourceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_NodePrepareResourceRequest proto.InternalMessageInfo + +func (m *NodePrepareResourceRequest) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +func (m *NodePrepareResourceRequest) GetClaimUid() string { + if m != nil { + return m.ClaimUid + } + return "" +} + +func (m *NodePrepareResourceRequest) GetClaimName() string { + if m != nil { + return m.ClaimName + } + return "" +} + +func (m *NodePrepareResourceRequest) GetResourceHandle() string { + if m != nil { + return m.ResourceHandle + } + return "" +} + +type NodePrepareResourceResponse struct { + // These are the additional devices that kubelet must + // make available via the container runtime. A resource + // may have zero or more devices. + CdiDevices []string `protobuf:"bytes,1,rep,name=cdi_devices,json=cdiDevices,proto3" json:"cdi_devices,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodePrepareResourceResponse) Reset() { *m = NodePrepareResourceResponse{} } +func (*NodePrepareResourceResponse) ProtoMessage() {} +func (*NodePrepareResourceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{1} +} +func (m *NodePrepareResourceResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodePrepareResourceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodePrepareResourceResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodePrepareResourceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodePrepareResourceResponse.Merge(m, src) +} +func (m *NodePrepareResourceResponse) XXX_Size() int { + return m.Size() +} +func (m *NodePrepareResourceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_NodePrepareResourceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_NodePrepareResourceResponse proto.InternalMessageInfo + +func (m *NodePrepareResourceResponse) GetCdiDevices() []string { + if m != nil { + return m.CdiDevices + } + return nil +} + +type NodeUnprepareResourceRequest struct { + // The ResourceClaim namespace (ResourceClaim.meta.Namespace). + // This field is REQUIRED. + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + // The UID of the Resource claim (ResourceClaim.meta.UUID). + // This field is REQUIRED. + ClaimUid string `protobuf:"bytes,2,opt,name=claim_uid,json=claimUid,proto3" json:"claim_uid,omitempty"` + // The name of the Resource claim (ResourceClaim.meta.Name) + // This field is REQUIRED. + ClaimName string `protobuf:"bytes,3,opt,name=claim_name,json=claimName,proto3" json:"claim_name,omitempty"` + // List of fully qualified CDI device names + // Kubelet plugin returns them in the NodePrepareResourceResponse + CdiDevices []string `protobuf:"bytes,4,rep,name=cdi_devices,json=cdiDevices,proto3" json:"cdi_devices,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodeUnprepareResourceRequest) Reset() { *m = NodeUnprepareResourceRequest{} } +func (*NodeUnprepareResourceRequest) ProtoMessage() {} +func (*NodeUnprepareResourceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{2} +} +func (m *NodeUnprepareResourceRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodeUnprepareResourceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodeUnprepareResourceRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodeUnprepareResourceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodeUnprepareResourceRequest.Merge(m, src) +} +func (m *NodeUnprepareResourceRequest) XXX_Size() int { + return m.Size() +} +func (m *NodeUnprepareResourceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_NodeUnprepareResourceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_NodeUnprepareResourceRequest proto.InternalMessageInfo + +func (m *NodeUnprepareResourceRequest) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +func (m *NodeUnprepareResourceRequest) GetClaimUid() string { + if m != nil { + return m.ClaimUid + } + return "" +} + +func (m *NodeUnprepareResourceRequest) GetClaimName() string { + if m != nil { + return m.ClaimName + } + return "" +} + +func (m *NodeUnprepareResourceRequest) GetCdiDevices() []string { + if m != nil { + return m.CdiDevices + } + return nil +} + +type NodeUnprepareResourceResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NodeUnprepareResourceResponse) Reset() { *m = NodeUnprepareResourceResponse{} } +func (*NodeUnprepareResourceResponse) ProtoMessage() {} +func (*NodeUnprepareResourceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{3} +} +func (m *NodeUnprepareResourceResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *NodeUnprepareResourceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_NodeUnprepareResourceResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *NodeUnprepareResourceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_NodeUnprepareResourceResponse.Merge(m, src) +} +func (m *NodeUnprepareResourceResponse) XXX_Size() int { + return m.Size() +} +func (m *NodeUnprepareResourceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_NodeUnprepareResourceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_NodeUnprepareResourceResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*NodePrepareResourceRequest)(nil), "v1alpha1.NodePrepareResourceRequest") + proto.RegisterType((*NodePrepareResourceResponse)(nil), "v1alpha1.NodePrepareResourceResponse") + proto.RegisterType((*NodeUnprepareResourceRequest)(nil), "v1alpha1.NodeUnprepareResourceRequest") + proto.RegisterType((*NodeUnprepareResourceResponse)(nil), "v1alpha1.NodeUnprepareResourceResponse") +} + +func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } + +var fileDescriptor_00212fb1f9d3bf1c = []byte{ + // 348 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x52, 0x31, 0x4f, 0x32, 0x41, + 0x10, 0x65, 0x3f, 0xc8, 0x17, 0x6e, 0x4c, 0x34, 0x59, 0x63, 0x72, 0x39, 0xe0, 0xc0, 0x8b, 0x0a, + 0x8d, 0x47, 0xd0, 0xde, 0xc2, 0x58, 0x58, 0x11, 0x73, 0x09, 0x8d, 0x0d, 0x59, 0x76, 0x47, 0x58, + 0xc3, 0xdd, 0xae, 0x77, 0x1c, 0xb5, 0x3f, 0xc1, 0xd6, 0xca, 0xbf, 0x43, 0x69, 0x49, 0x29, 0xe7, + 0x1f, 0x31, 0xec, 0x49, 0x8c, 0x0a, 0xa1, 0xb3, 0xdb, 0x79, 0xf3, 0x66, 0xde, 0x9b, 0x99, 0x05, + 0x8b, 0x69, 0xe9, 0xeb, 0x58, 0x4d, 0x14, 0x2d, 0x4f, 0x3b, 0x6c, 0xac, 0x47, 0xac, 0xe3, 0x9c, + 0x0e, 0xe5, 0x64, 0x94, 0x0e, 0x7c, 0xae, 0xc2, 0xf6, 0x50, 0x0d, 0x55, 0xdb, 0x10, 0x06, 0xe9, + 0x9d, 0x89, 0x4c, 0x60, 0x5e, 0x79, 0xa1, 0xf7, 0x42, 0xc0, 0xe9, 0x2a, 0x81, 0x37, 0x31, 0x6a, + 0x16, 0x63, 0x80, 0x89, 0x4a, 0x63, 0x8e, 0x01, 0x3e, 0xa4, 0x98, 0x4c, 0x68, 0x15, 0xac, 0x88, + 0x85, 0x98, 0x68, 0xc6, 0xd1, 0x26, 0x0d, 0xd2, 0xb2, 0x82, 0x2f, 0x80, 0x56, 0xc0, 0xe2, 0x63, + 0x26, 0xc3, 0x7e, 0x2a, 0x85, 0xfd, 0xcf, 0x64, 0xcb, 0x06, 0xe8, 0x49, 0x41, 0x6b, 0x00, 0x79, + 0x72, 0xc9, 0xb7, 0x8b, 0x79, 0xad, 0x41, 0xba, 0x2c, 0x44, 0xda, 0x84, 0xbd, 0xf8, 0x53, 0xac, + 0x3f, 0x62, 0x91, 0x18, 0xa3, 0x5d, 0x32, 0x9c, 0xdd, 0x15, 0x7c, 0x6d, 0x50, 0xef, 0x02, 0x2a, + 0x6b, 0x0d, 0x26, 0x5a, 0x45, 0x09, 0xd2, 0x3a, 0xec, 0x70, 0x21, 0xfb, 0x02, 0xa7, 0x92, 0x63, + 0x62, 0x93, 0x46, 0xb1, 0x65, 0x05, 0xc0, 0x85, 0xbc, 0xca, 0x11, 0xef, 0x99, 0x40, 0x75, 0xd9, + 0xa0, 0x17, 0xe9, 0xbf, 0x9e, 0xf1, 0x87, 0xb7, 0xd2, 0x2f, 0x6f, 0x75, 0xa8, 0x6d, 0xb0, 0x96, + 0x4f, 0x77, 0x36, 0x27, 0x50, 0x5a, 0x32, 0xa8, 0x80, 0xfd, 0x35, 0x5b, 0xa0, 0x47, 0xfe, 0xea, + 0xf0, 0xfe, 0xe6, 0x2b, 0x3a, 0xc7, 0x5b, 0x58, 0xb9, 0x98, 0x57, 0xa0, 0xf7, 0x70, 0xb0, 0xd6, + 0x0f, 0x3d, 0xf9, 0xde, 0x61, 0xd3, 0x2e, 0x9d, 0xe6, 0x56, 0xde, 0x4a, 0xeb, 0xf2, 0x70, 0xb6, + 0x70, 0xc9, 0x7c, 0xe1, 0x16, 0x1e, 0x33, 0x97, 0xcc, 0x32, 0x97, 0xbc, 0x66, 0x2e, 0x79, 0xcb, + 0x5c, 0xf2, 0xf4, 0xee, 0x16, 0x6e, 0x8b, 0x22, 0x66, 0x83, 0xff, 0xe6, 0x8f, 0x9e, 0x7f, 0x04, + 0x00, 0x00, 0xff, 0xff, 0xab, 0xb6, 0x01, 0xe3, 0xe9, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// NodeClient is the client API for Node service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type NodeClient interface { + NodePrepareResource(ctx context.Context, in *NodePrepareResourceRequest, opts ...grpc.CallOption) (*NodePrepareResourceResponse, error) + NodeUnprepareResource(ctx context.Context, in *NodeUnprepareResourceRequest, opts ...grpc.CallOption) (*NodeUnprepareResourceResponse, error) +} + +type nodeClient struct { + cc *grpc.ClientConn +} + +func NewNodeClient(cc *grpc.ClientConn) NodeClient { + return &nodeClient{cc} +} + +func (c *nodeClient) NodePrepareResource(ctx context.Context, in *NodePrepareResourceRequest, opts ...grpc.CallOption) (*NodePrepareResourceResponse, error) { + out := new(NodePrepareResourceResponse) + err := c.cc.Invoke(ctx, "/v1alpha1.Node/NodePrepareResource", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *nodeClient) NodeUnprepareResource(ctx context.Context, in *NodeUnprepareResourceRequest, opts ...grpc.CallOption) (*NodeUnprepareResourceResponse, error) { + out := new(NodeUnprepareResourceResponse) + err := c.cc.Invoke(ctx, "/v1alpha1.Node/NodeUnprepareResource", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// NodeServer is the server API for Node service. +type NodeServer interface { + NodePrepareResource(context.Context, *NodePrepareResourceRequest) (*NodePrepareResourceResponse, error) + NodeUnprepareResource(context.Context, *NodeUnprepareResourceRequest) (*NodeUnprepareResourceResponse, error) +} + +// UnimplementedNodeServer can be embedded to have forward compatible implementations. +type UnimplementedNodeServer struct { +} + +func (*UnimplementedNodeServer) NodePrepareResource(ctx context.Context, req *NodePrepareResourceRequest) (*NodePrepareResourceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method NodePrepareResource not implemented") +} +func (*UnimplementedNodeServer) NodeUnprepareResource(ctx context.Context, req *NodeUnprepareResourceRequest) (*NodeUnprepareResourceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method NodeUnprepareResource not implemented") +} + +func RegisterNodeServer(s *grpc.Server, srv NodeServer) { + s.RegisterService(&_Node_serviceDesc, srv) +} + +func _Node_NodePrepareResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(NodePrepareResourceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(NodeServer).NodePrepareResource(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v1alpha1.Node/NodePrepareResource", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(NodeServer).NodePrepareResource(ctx, req.(*NodePrepareResourceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Node_NodeUnprepareResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(NodeUnprepareResourceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(NodeServer).NodeUnprepareResource(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v1alpha1.Node/NodeUnprepareResource", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(NodeServer).NodeUnprepareResource(ctx, req.(*NodeUnprepareResourceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Node_serviceDesc = grpc.ServiceDesc{ + ServiceName: "v1alpha1.Node", + HandlerType: (*NodeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "NodePrepareResource", + Handler: _Node_NodePrepareResource_Handler, + }, + { + MethodName: "NodeUnprepareResource", + Handler: _Node_NodeUnprepareResource_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "api.proto", +} + +func (m *NodePrepareResourceRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodePrepareResourceRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodePrepareResourceRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ResourceHandle) > 0 { + i -= len(m.ResourceHandle) + copy(dAtA[i:], m.ResourceHandle) + i = encodeVarintApi(dAtA, i, uint64(len(m.ResourceHandle))) + i-- + dAtA[i] = 0x22 + } + if len(m.ClaimName) > 0 { + i -= len(m.ClaimName) + copy(dAtA[i:], m.ClaimName) + i = encodeVarintApi(dAtA, i, uint64(len(m.ClaimName))) + i-- + dAtA[i] = 0x1a + } + if len(m.ClaimUid) > 0 { + i -= len(m.ClaimUid) + copy(dAtA[i:], m.ClaimUid) + i = encodeVarintApi(dAtA, i, uint64(len(m.ClaimUid))) + i-- + dAtA[i] = 0x12 + } + if len(m.Namespace) > 0 { + i -= len(m.Namespace) + copy(dAtA[i:], m.Namespace) + i = encodeVarintApi(dAtA, i, uint64(len(m.Namespace))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *NodePrepareResourceResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodePrepareResourceResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodePrepareResourceResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.CdiDevices) > 0 { + for iNdEx := len(m.CdiDevices) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.CdiDevices[iNdEx]) + copy(dAtA[i:], m.CdiDevices[iNdEx]) + i = encodeVarintApi(dAtA, i, uint64(len(m.CdiDevices[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *NodeUnprepareResourceRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeUnprepareResourceRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodeUnprepareResourceRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.CdiDevices) > 0 { + for iNdEx := len(m.CdiDevices) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.CdiDevices[iNdEx]) + copy(dAtA[i:], m.CdiDevices[iNdEx]) + i = encodeVarintApi(dAtA, i, uint64(len(m.CdiDevices[iNdEx]))) + i-- + dAtA[i] = 0x22 + } + } + if len(m.ClaimName) > 0 { + i -= len(m.ClaimName) + copy(dAtA[i:], m.ClaimName) + i = encodeVarintApi(dAtA, i, uint64(len(m.ClaimName))) + i-- + dAtA[i] = 0x1a + } + if len(m.ClaimUid) > 0 { + i -= len(m.ClaimUid) + copy(dAtA[i:], m.ClaimUid) + i = encodeVarintApi(dAtA, i, uint64(len(m.ClaimUid))) + i-- + dAtA[i] = 0x12 + } + if len(m.Namespace) > 0 { + i -= len(m.Namespace) + copy(dAtA[i:], m.Namespace) + i = encodeVarintApi(dAtA, i, uint64(len(m.Namespace))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *NodeUnprepareResourceResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *NodeUnprepareResourceResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *NodeUnprepareResourceResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func encodeVarintApi(dAtA []byte, offset int, v uint64) int { + offset -= sovApi(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *NodePrepareResourceRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.ClaimUid) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.ClaimName) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.ResourceHandle) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + return n +} + +func (m *NodePrepareResourceResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.CdiDevices) > 0 { + for _, s := range m.CdiDevices { + l = len(s) + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + +func (m *NodeUnprepareResourceRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.ClaimUid) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.ClaimName) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + if len(m.CdiDevices) > 0 { + for _, s := range m.CdiDevices { + l = len(s) + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + +func (m *NodeUnprepareResourceResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovApi(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozApi(x uint64) (n int) { + return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *NodePrepareResourceRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodePrepareResourceRequest{`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, + `ClaimUid:` + fmt.Sprintf("%v", this.ClaimUid) + `,`, + `ClaimName:` + fmt.Sprintf("%v", this.ClaimName) + `,`, + `ResourceHandle:` + fmt.Sprintf("%v", this.ResourceHandle) + `,`, + `}`, + }, "") + return s +} +func (this *NodePrepareResourceResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodePrepareResourceResponse{`, + `CdiDevices:` + fmt.Sprintf("%v", this.CdiDevices) + `,`, + `}`, + }, "") + return s +} +func (this *NodeUnprepareResourceRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodeUnprepareResourceRequest{`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, + `ClaimUid:` + fmt.Sprintf("%v", this.ClaimUid) + `,`, + `ClaimName:` + fmt.Sprintf("%v", this.ClaimName) + `,`, + `CdiDevices:` + fmt.Sprintf("%v", this.CdiDevices) + `,`, + `}`, + }, "") + return s +} +func (this *NodeUnprepareResourceResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodeUnprepareResourceResponse{`, + `}`, + }, "") + return s +} +func valueToStringApi(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *NodePrepareResourceRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodePrepareResourceRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodePrepareResourceRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClaimUid", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClaimUid = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClaimName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClaimName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResourceHandle", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResourceHandle = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *NodePrepareResourceResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodePrepareResourceResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodePrepareResourceResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CdiDevices", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CdiDevices = append(m.CdiDevices, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *NodeUnprepareResourceRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeUnprepareResourceRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeUnprepareResourceRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClaimUid", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClaimUid = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClaimName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClaimName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CdiDevices", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CdiDevices = append(m.CdiDevices, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *NodeUnprepareResourceResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeUnprepareResourceResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeUnprepareResourceResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipApi(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowApi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowApi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowApi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthApi + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupApi + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthApi + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthApi = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowApi = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupApi = fmt.Errorf("proto: unexpected end of group") +) diff --git a/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.proto b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.proto new file mode 100644 index 00000000000..2ba5c5c34d4 --- /dev/null +++ b/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/api.proto @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// To regenerate api.pb.go run hack/update-generated-kubelet-plugin-registration.sh + +syntax = "proto3"; + +package v1alpha1; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option go_package = "dra"; +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.goproto_getters_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_unrecognized_all) = false; + +service Node { + rpc NodePrepareResource (NodePrepareResourceRequest) + returns (NodePrepareResourceResponse) {} + + rpc NodeUnprepareResource (NodeUnprepareResourceRequest) + returns (NodeUnprepareResourceResponse) {} +} + +message NodePrepareResourceRequest { + // The ResourceClaim namespace (ResourceClaim.meta.Namespace). + // This field is REQUIRED. + string namespace = 1; + // The UID of the Resource claim (ResourceClaim.meta.UUID). + // This field is REQUIRED. + string claim_uid = 2; + // The name of the Resource claim (ResourceClaim.meta.Name) + // This field is REQUIRED. + string claim_name = 3; + // Resource handle (AllocationResult.ResourceHandle) + // This field is REQUIRED. + string resource_handle = 4; +} + +message NodePrepareResourceResponse { + // These are the additional devices that kubelet must + // make available via the container runtime. A resource + // may have zero or more devices. + repeated string cdi_devices = 1; +} + +message NodeUnprepareResourceRequest { + // The ResourceClaim namespace (ResourceClaim.meta.Namespace). + // This field is REQUIRED. + string namespace = 1; + // The UID of the Resource claim (ResourceClaim.meta.UUID). + // This field is REQUIRED. + string claim_uid = 2; + // The name of the Resource claim (ResourceClaim.meta.Name) + // This field is REQUIRED. + string claim_name = 3; + // List of fully qualified CDI device names + // Kubelet plugin returns them in the NodePrepareResourceResponse + repeated string cdi_devices = 4; +} + +message NodeUnprepareResourceResponse { + // Intentionally empty. +} \ No newline at end of file diff --git a/staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1/constants.go b/staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1/constants.go index 7708f758fa2..5efe743d368 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1/constants.go +++ b/staging/src/k8s.io/kubelet/pkg/apis/pluginregistration/v1/constants.go @@ -21,4 +21,6 @@ const ( CSIPlugin = "CSIPlugin" // DevicePlugin identifier for registered device plugins DevicePlugin = "DevicePlugin" + // DRAPlugin identifier for registered Dynamic Resourc Allocation plugins + DRAPlugin = "DRAPlugin" ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 97707583e6f..883b8869ccb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2255,6 +2255,7 @@ k8s.io/kubelet/pkg/apis/credentialprovider/v1 k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1 k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1 k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1 +k8s.io/kubelet/pkg/apis/dra/v1alpha1 k8s.io/kubelet/pkg/apis/pluginregistration/v1 k8s.io/kubelet/pkg/apis/podresources/v1 k8s.io/kubelet/pkg/apis/podresources/v1alpha1