kubelet: add support for dynamic resource allocation

Dependencies need to be updated to use
github.com/container-orchestrated-devices/container-device-interface.

It's not decided yet whether we will implement Topology support
for DRA or not. Not having any toppology-related code
will help to avoid wrong impression that DRA is used as a hint
provider for the Topology Manager.
This commit is contained in:
Ed Bartosh 2022-07-15 14:28:18 +03:00 committed by Patrick Ohly
parent d2ff210c20
commit ae0f38437c
25 changed files with 2718 additions and 6 deletions

View File

@ -41,7 +41,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otelsdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.12.0"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -756,7 +756,9 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
ExperimentalTopologyManagerPolicyOptions: topologyManagerPolicyOptions,
},
s.FailSwapOn,
kubeDeps.Recorder)
kubeDeps.Recorder,
kubeDeps.KubeClient,
)
if err != nil {
return err

View File

@ -40,6 +40,7 @@ BASH_TARGETS="
update-codegen
update-generated-runtime
update-generated-device-plugin
update-generated-dynamic-resource-allocation
update-generated-api-compatibility-data
update-generated-docs
update-generated-swagger-docs

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script generates `*/api.pb.go` from the protobuf file `*/api.proto`.
# Example:
# kube::protoc::generate_proto "${DYNAMIC_RESOURCE_ALLOCATION_ALPHA}"
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../" && pwd -P)"
DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::protoc::generate_proto "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}"

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
# NOTE: All output from this script needs to be copied back to the calling
# source tree. This is managed in kube::build::copy_output in build/common.sh.
# If the output set is changed update that function.
"${KUBE_ROOT}/build/run.sh" hack/update-generated-dynamic-resource-allocation-dockerized.sh "$@"

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
# Copyright 2022 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script checks whether updating of device plugin API is needed or not. We
# should run `hack/update-generated-dynamic-resource-allocation.sh` if device plugin API is
# out of date.
# Usage: `hack/verify-generated-dynamic-resource-allocation.sh`.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
ERROR="Dynamic resource allocation kubelet plugin api is out of date. Please run hack/update-generated-dynamic-resource-allocation.sh"
DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/kubelet/pkg/apis/dra/v1alpha1/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::golang::setup_env
function cleanup {
rm -rf "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/"
}
trap cleanup EXIT
mkdir -p "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp"
cp "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/api.pb.go" "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/"
KUBE_VERBOSE=3 "${KUBE_ROOT}/hack/update-generated-dynamic-resource-allocation.sh"
kube::protoc::diff "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/api.pb.go" "${DYNAMIC_RESOURCE_ALLOCATION_V1ALPHA1}/_tmp/api.pb.go" "${ERROR}"
echo "Generated dynamic resource allocation kubelet plugin alpha api is up to date."

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -115,6 +116,12 @@ type ContainerManager interface {
// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
GetNodeAllocatableAbsolute() v1.ResourceList
// PrepareResource prepares pod resources
PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error)
// UnrepareResources unprepares pod resources
UnprepareResources(*v1.Pod) error
// Implements the podresources Provider API for CPUs, Memory and Devices
podresources.CPUsProvider
podresources.DevicesProvider

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
internalapi "k8s.io/cri-api/pkg/apis"
@ -53,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -128,6 +130,8 @@ type containerManagerImpl struct {
memoryManager memorymanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
// Interface for Dynamic Resource Allocation management.
draManager dra.Manager
}
type features struct {
@ -195,7 +199,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
@ -307,6 +311,15 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
// initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient)
if err != nil {
return nil, err
}
}
// Initialize CPU manager
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
@ -642,6 +655,13 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandl
// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
if cm.draManager != nil {
resOpts, err := cm.PrepareResources(pod, container)
if err != nil {
return nil, err
}
opts.Annotations = append(opts.Annotations, resOpts.Annotations...)
}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
@ -671,13 +691,14 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager}
return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager, cm.draManager}
}
type resourceAllocator struct {
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
deviceManager devicemanager.Manager
draManager dra.Manager
}
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
@ -1009,3 +1030,11 @@ func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresources
return containerMemories
}
func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return cm.draManager.PrepareResources(pod, container)
}
func (cm *containerManagerImpl) UnprepareResources(pod *v1.Pod) error {
return cm.draManager.UnprepareResources(pod)
}

View File

@ -24,6 +24,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -154,6 +155,14 @@ func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
}
func (cm *containerManagerStub) UnprepareResources(*v1.Pod) error {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/mount-utils"
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/cri-api/pkg/apis"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -42,6 +43,6 @@ func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.
return fmt.Errorf("Container Manager is unsupported in this build")
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -30,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
@ -37,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -93,7 +95,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
// NewContainerManager creates windows container manager.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
@ -250,3 +252,11 @@ func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.Contai
func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
}
func (cm *containerManagerImpl) UnprepareResources(*v1.Pod) error {
return nil
}

283
pkg/kubelet/cm/dra/cdi.go Normal file
View File

@ -0,0 +1,283 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// The code below was copied from
// https://github.com/container-orchestrated-devices/container-device-interface/blob/v0.5.3/pkg/cdi/annotations.go
// https://github.com/container-orchestrated-devices/container-device-interface/blob/v0.5.3/pkg/cdi/qualified-device.go
// to avoid a dependency on that package and the indirect dependencies that
// this would have implied.
//
// Long term it would be good to avoid this duplication:
// https://github.com/container-orchestrated-devices/container-device-interface/issues/97
package dra
import (
"errors"
"fmt"
"strings"
)
const (
// annotationPrefix is the prefix for CDI container annotation keys.
annotationPrefix = "cdi.k8s.io/"
)
// updateAnnotations updates annotations with a plugin-specific CDI device
// injection request for the given devices. Upon any error a non-nil error
// is returned and annotations are left intact. By convention plugin should
// be in the format of "vendor.device-type".
func updateAnnotations(annotations map[string]string, plugin string, deviceID string, devices []string) (map[string]string, error) {
key, err := annotationKey(plugin, deviceID)
if err != nil {
return annotations, fmt.Errorf("CDI annotation failed: %v", err)
}
if _, ok := annotations[key]; ok {
return annotations, fmt.Errorf("CDI annotation failed, key %q used", key)
}
value, err := annotationValue(devices)
if err != nil {
return annotations, fmt.Errorf("CDI annotation failed: %v", err)
}
if annotations == nil {
annotations = make(map[string]string)
}
annotations[key] = value
return annotations, nil
}
// annotationKey returns a unique annotation key for an device allocation
// by a K8s device plugin. pluginName should be in the format of
// "vendor.device-type". deviceID is the ID of the device the plugin is
// allocating. It is used to make sure that the generated key is unique
// even if multiple allocations by a single plugin needs to be annotated.
func annotationKey(pluginName, deviceID string) (string, error) {
const maxNameLen = 63
if pluginName == "" {
return "", errors.New("invalid plugin name, empty")
}
if deviceID == "" {
return "", errors.New("invalid deviceID, empty")
}
name := pluginName + "_" + strings.ReplaceAll(deviceID, "/", "_")
if len(name) > maxNameLen {
return "", fmt.Errorf("invalid plugin+deviceID %q, too long", name)
}
if c := rune(name[0]); !isAlphaNumeric(c) {
return "", fmt.Errorf("invalid name %q, first '%c' should be alphanumeric", name, c)
}
if len(name) > 2 {
for _, c := range name[1 : len(name)-1] {
switch {
case isAlphaNumeric(c):
case c == '_' || c == '-' || c == '.':
default:
return "", fmt.Errorf("invalid name %q, invalid charcter '%c'", name, c)
}
}
}
if c := rune(name[len(name)-1]); !isAlphaNumeric(c) {
return "", fmt.Errorf("invalid name %q, last '%c' should be alphanumeric", name, c)
}
return annotationPrefix + name, nil
}
// annotationValue returns an annotation value for the given devices.
func annotationValue(devices []string) (string, error) {
value, sep := "", ""
for _, d := range devices {
if _, _, _, err := parseQualifiedName(d); err != nil {
return "", err
}
value += sep + d
sep = ","
}
return value, nil
}
// parseQualifiedName splits a qualified name into device vendor, class,
// and name. If the device fails to parse as a qualified name, or if any
// of the split components fail to pass syntax validation, vendor and
// class are returned as empty, together with the verbatim input as the
// name and an error describing the reason for failure.
func parseQualifiedName(device string) (string, string, string, error) {
vendor, class, name := parseDevice(device)
if vendor == "" {
return "", "", device, fmt.Errorf("unqualified device %q, missing vendor", device)
}
if class == "" {
return "", "", device, fmt.Errorf("unqualified device %q, missing class", device)
}
if name == "" {
return "", "", device, fmt.Errorf("unqualified device %q, missing device name", device)
}
if err := validateVendorName(vendor); err != nil {
return "", "", device, fmt.Errorf("invalid device %q: %v", device, err)
}
if err := validateClassName(class); err != nil {
return "", "", device, fmt.Errorf("invalid device %q: %v", device, err)
}
if err := validateDeviceName(name); err != nil {
return "", "", device, fmt.Errorf("invalid device %q: %v", device, err)
}
return vendor, class, name, nil
}
// parseDevice tries to split a device name into vendor, class, and name.
// If this fails, for instance in the case of unqualified device names,
// parseDevice returns an empty vendor and class together with name set
// to the verbatim input.
func parseDevice(device string) (string, string, string) {
if device == "" || device[0] == '/' {
return "", "", device
}
parts := strings.SplitN(device, "=", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", device
}
name := parts[1]
vendor, class := parseQualifier(parts[0])
if vendor == "" {
return "", "", device
}
return vendor, class, name
}
// parseQualifier splits a device qualifier into vendor and class.
// The syntax for a device qualifier is
//
// "<vendor>/<class>"
//
// If parsing fails, an empty vendor and the class set to the
// verbatim input is returned.
func parseQualifier(kind string) (string, string) {
parts := strings.SplitN(kind, "/", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", kind
}
return parts[0], parts[1]
}
// validateVendorName checks the validity of a vendor name.
// A vendor name may contain the following ASCII characters:
// - upper- and lowercase letters ('A'-'Z', 'a'-'z')
// - digits ('0'-'9')
// - underscore, dash, and dot ('_', '-', and '.')
func validateVendorName(vendor string) error {
if vendor == "" {
return fmt.Errorf("invalid (empty) vendor name")
}
if !isLetter(rune(vendor[0])) {
return fmt.Errorf("invalid vendor %q, should start with letter", vendor)
}
for _, c := range string(vendor[1 : len(vendor)-1]) {
switch {
case isAlphaNumeric(c):
case c == '_' || c == '-' || c == '.':
default:
return fmt.Errorf("invalid character '%c' in vendor name %q",
c, vendor)
}
}
if !isAlphaNumeric(rune(vendor[len(vendor)-1])) {
return fmt.Errorf("invalid vendor %q, should end with a letter or digit", vendor)
}
return nil
}
// validateClassName checks the validity of class name.
// A class name may contain the following ASCII characters:
// - upper- and lowercase letters ('A'-'Z', 'a'-'z')
// - digits ('0'-'9')
// - underscore and dash ('_', '-')
func validateClassName(class string) error {
if class == "" {
return fmt.Errorf("invalid (empty) device class")
}
if !isLetter(rune(class[0])) {
return fmt.Errorf("invalid class %q, should start with letter", class)
}
for _, c := range string(class[1 : len(class)-1]) {
switch {
case isAlphaNumeric(c):
case c == '_' || c == '-':
default:
return fmt.Errorf("invalid character '%c' in device class %q",
c, class)
}
}
if !isAlphaNumeric(rune(class[len(class)-1])) {
return fmt.Errorf("invalid class %q, should end with a letter or digit", class)
}
return nil
}
// validateDeviceName checks the validity of a device name.
// A device name may contain the following ASCII characters:
// - upper- and lowercase letters ('A'-'Z', 'a'-'z')
// - digits ('0'-'9')
// - underscore, dash, dot, colon ('_', '-', '.', ':')
func validateDeviceName(name string) error {
if name == "" {
return fmt.Errorf("invalid (empty) device name")
}
if !isAlphaNumeric(rune(name[0])) {
return fmt.Errorf("invalid class %q, should start with a letter or digit", name)
}
if len(name) == 1 {
return nil
}
for _, c := range string(name[1 : len(name)-1]) {
switch {
case isAlphaNumeric(c):
case c == '_' || c == '-' || c == '.' || c == ':':
default:
return fmt.Errorf("invalid character '%c' in device name %q",
c, name)
}
}
if !isAlphaNumeric(rune(name[len(name)-1])) {
return fmt.Errorf("invalid name %q, should end with a letter or digit", name)
}
return nil
}
func isLetter(c rune) bool {
return ('A' <= c && c <= 'Z') || ('a' <= c && c <= 'z')
}
func isDigit(c rune) bool {
return '0' <= c && c <= '9'
}
func isAlphaNumeric(c rune) bool {
return isLetter(c) || isDigit(c)
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// claimInfo holds information required
// to prepare and unprepare a resource claim.
type claimInfo struct {
sync.RWMutex
// name of the DRA driver
driverName string
// claimUID is an UID of the resource claim
claimUID types.UID
// claimName is a name of the resource claim
claimName string
// namespace is a claim namespace
namespace string
// podUIDs is a set of pod UIDs that reference a resource
podUIDs sets.Set[string]
// cdiDevices is a list of CDI devices returned by the
// GRPC API call NodePrepareResource
cdiDevices []string
// annotations is a list of container annotations associated with
// a prepared resource
annotations []kubecontainer.Annotation
}
func (res *claimInfo) addPodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Insert(string(podUID))
}
func (res *claimInfo) deletePodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Delete(string(podUID))
}
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
type claimInfoCache struct {
sync.RWMutex
claimInfo map[string]*claimInfo
}
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache() *claimInfoCache {
return &claimInfoCache{
claimInfo: make(map[string]*claimInfo),
}
}
func (cache *claimInfoCache) add(claim, namespace string, res *claimInfo) error {
cache.Lock()
defer cache.Unlock()
key := claim + namespace
if _, ok := cache.claimInfo[key]; ok {
return fmt.Errorf("claim %s, namespace %s already cached", claim, namespace)
}
cache.claimInfo[claim+namespace] = res
return nil
}
func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo {
cache.RLock()
defer cache.RUnlock()
return cache.claimInfo[claimName+namespace]
}
func (cache *claimInfoCache) delete(claimName, namespace string) {
cache.Lock()
defer cache.Unlock()
delete(cache.claimInfo, claimName+namespace)
}

View File

@ -0,0 +1,245 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// ManagerImpl is the structure in charge of managing DRA resource Plugins.
type ManagerImpl struct {
// cache contains cached claim info
cache *claimInfoCache
// KubeClient reference
kubeClient clientset.Interface
}
// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")
manager := &ManagerImpl{
cache: newClaimInfoCache(),
kubeClient: kubeClient,
}
return manager, nil
}
// Generate container annotations using CDI UpdateAnnotations API.
func generateCDIAnnotations(
claimUID types.UID,
driverName string,
cdiDevices []string,
) ([]kubecontainer.Annotation, error) {
annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices)
if err != nil {
return nil, fmt.Errorf("can't generate CDI annotations: %+v", err)
}
kubeAnnotations := []kubecontainer.Annotation{}
for key, value := range annotations {
kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value})
}
return kubeAnnotations, nil
}
// prepareContainerResources attempts to prepare all of required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Container) error {
// Process resources for each resource claim referenced by container
for range container.Resources.Claims {
for i, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// resource is already prepared, add pod UID to it
claimInfo.addPodReference(pod.UID)
continue
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
}
// Call NodePrepareResource RPC
driverName := resourceClaim.Status.DriverName
client, err := dra.NewDRAPluginClient(driverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Status.Allocation.ResourceHandle)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
// Cache prepared resource
err = m.cache.add(
resourceClaim.Name,
resourceClaim.Namespace,
&claimInfo{
driverName: driverName,
claimUID: resourceClaim.UID,
claimName: resourceClaim.Name,
namespace: resourceClaim.Namespace,
podUIDs: sets.New(string(pod.UID)),
cdiDevices: response.CdiDevices,
annotations: annotations,
})
if err != nil {
return fmt.Errorf(
"failed to cache prepared resource, claim: %s(%s), err: %+v",
resourceClaim.Name,
resourceClaim.UID,
err,
)
}
}
}
return nil
}
// getContainerInfo gets a container info from the claimInfo cache.
// This information is used by the caller to update a container config.
func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
annotations := []kubecontainer.Annotation{}
for i, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
for _, claim := range container.Resources.Claims {
if podResourceClaim.Name != claim.Name {
continue
}
claimInfo := m.cache.get(claimName, pod.Namespace)
if claimInfo == nil {
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName)
}
klog.V(3).InfoS("add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
annotations = append(annotations, claimInfo.annotations...)
}
}
return &ContainerInfo{Annotations: annotations}, nil
}
// PrepareResources calls plugin NodePrepareResource from the registered DRA resource plugins.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
if err := m.prepareContainerResources(pod, container); err != nil {
return nil, err
}
return m.getContainerInfo(pod, container)
}
// UnprepareResources calls a plugin's NodeUnprepareResource API for each resource claim owned by a pod.
// This function is idempotent and may be called multiple times against the same pod.
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
// Call NodeUnprepareResource RPC for every resource claim referenced by the pod
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
claimInfo := m.cache.get(claimName, pod.Namespace)
// Skip calling NodeUnprepareResource if claim info is not cached
if claimInfo == nil {
continue
}
// Delete pod UID from the cache
claimInfo.deletePodReference(pod.UID)
// Skip calling NodeUnprepareResource if other pods are still referencing it
if len(claimInfo.podUIDs) > 0 {
continue
}
// Call NodeUnprepareResource only for the last pod that references the claim
client, err := dra.NewDRAPluginClient(claimInfo.driverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.driverName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.namespace,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
pod.Name,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// delete resource from the cache
m.cache.delete(claimInfo.claimName, pod.Namespace)
}
return nil
}

View File

@ -0,0 +1,182 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"context"
"errors"
"fmt"
"io"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha1"
)
type Client interface {
NodePrepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodePrepareResourceResponse, error)
NodeUnprepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
cdiDevice []string,
) (*drapbv1.NodeUnprepareResourceResponse, error)
}
// Strongly typed address.
type draAddr string
// draPluginClient encapsulates all dra plugin methods.
type draPluginClient struct {
pluginName string
addr draAddr
nodeV1ClientCreator nodeV1ClientCreator
}
var _ Client = &draPluginClient{}
type nodeV1ClientCreator func(addr draAddr) (
nodeClient drapbv1.NodeClient,
closer io.Closer,
err error,
)
// newV1NodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// newDRAPluginClient.
func newV1NodeClient(addr draAddr) (nodeClient drapbv1.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, err
}
return drapbv1.NewNodeClient(conn), conn, nil
}
func NewDRAPluginClient(pluginName string) (Client, error) {
if pluginName == "" {
return nil, fmt.Errorf("plugin name is empty")
}
existingPlugin := draPlugins.Get(pluginName)
if existingPlugin == nil {
return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName)
}
return &draPluginClient{
pluginName: pluginName,
addr: draAddr(existingPlugin.endpoint),
nodeV1ClientCreator: newV1NodeClient,
}, nil
}
func (r *draPluginClient) NodePrepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
resourceHandle string,
) (*drapbv1.NodePrepareResourceResponse, error) {
klog.V(4).InfoS(
log("calling NodePrepareResource rpc"),
"namespace", namespace,
"claim UID", claimUID,
"claim name", claimName,
"resource handle", resourceHandle)
if r.nodeV1ClientCreator == nil {
return nil, errors.New("failed to call NodePrepareResource. nodeV1ClientCreator is nil")
}
nodeClient, closer, err := r.nodeV1ClientCreator(r.addr)
if err != nil {
return nil, err
}
defer closer.Close()
req := &drapbv1.NodePrepareResourceRequest{
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
ResourceHandle: resourceHandle,
}
return nodeClient.NodePrepareResource(ctx, req)
}
func (r *draPluginClient) NodeUnprepareResource(
ctx context.Context,
namespace string,
claimUID types.UID,
claimName string,
cdiDevices []string,
) (*drapbv1.NodeUnprepareResourceResponse, error) {
klog.V(4).InfoS(
log("calling NodeUnprepareResource rpc"),
"namespace", namespace,
"claim UID", claimUID,
"claim name", claimName,
"cdi devices", cdiDevices)
if r.nodeV1ClientCreator == nil {
return nil, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := r.nodeV1ClientCreator(r.addr)
if err != nil {
return nil, err
}
defer closer.Close()
req := &drapbv1.NodeUnprepareResourceRequest{
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
CdiDevices: cdiDevices,
}
return nodeClient.NodeUnprepareResource(ctx, req)
}
func newGrpcConn(addr draAddr) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr)
return grpc.Dial(
string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
}

View File

@ -0,0 +1,178 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"errors"
"fmt"
"strings"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
)
const (
// DRAPluginName is the name of the in-tree DRA Plugin.
DRAPluginName = "kubernetes.io/dra"
)
// draPlugins map keeps track of all registered DRA plugins on the node
// and their corresponding sockets.
var draPlugins = &PluginsStore{}
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct{}
// NewPluginHandler returns new registration handler.
func NewRegistrationHandler() *RegistrationHandler {
return &RegistrationHandler{}
}
// RegisterPlugin is called when a plugin can be registered.
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
if err != nil {
return err
}
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
draPlugins.Set(pluginName, &Plugin{
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})
return nil
}
// Return the highest supported version.
func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
if len(versions) == 0 {
return nil, errors.New(log("DRA plugin reporting empty array for supported versions"))
}
var highestSupportedVersion *utilversion.Version
var theErr error
for i := len(versions) - 1; i >= 0; i-- {
currentHighestVer, err := utilversion.ParseGeneric(versions[i])
if err != nil {
theErr = err
continue
}
if currentHighestVer.Major() > 1 {
// DRA currently only has version 1.x
continue
}
if highestSupportedVersion == nil || highestSupportedVersion.LessThan(currentHighestVer) {
highestSupportedVersion = currentHighestVer
}
}
if highestSupportedVersion == nil {
return nil, fmt.Errorf(
"could not find a highest supported version from versions (%v) reported by this plugin: %+v",
versions, theErr)
}
if highestSupportedVersion.Major() != 1 {
return nil, fmt.Errorf("highest supported version reported by plugin is %v, must be v1.x", highestSupportedVersion)
}
return highestSupportedVersion, nil
}
func (h *RegistrationHandler) validateVersions(
callerName string,
pluginName string,
versions []string,
) (*utilversion.Version, error) {
if len(versions) == 0 {
return nil, errors.New(
log(
"%s for DRA plugin %q failed. Plugin returned an empty list for supported versions",
callerName,
pluginName,
),
)
}
// Validate version
newPluginHighestVersion, err := highestSupportedVersion(versions)
if err != nil {
return nil, errors.New(
log(
"%s for DRA plugin %q failed. None of the versions specified %q are supported. err=%v",
callerName,
pluginName,
versions,
err,
),
)
}
existingPlugin := draPlugins.Get(pluginName)
if existingPlugin != nil {
if !existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) {
return nil, errors.New(
log(
"%s for DRA plugin %q failed. Another plugin with the same name is already registered with a higher supported version: %q",
callerName,
pluginName,
existingPlugin.highestSupportedVersion,
),
)
}
}
return newPluginHighestVersion, nil
}
func unregisterPlugin(pluginName string) {
draPlugins.Delete(pluginName)
}
// DeRegisterPlugin is called when a plugin has removed its socket,
// signaling it is no longer available.
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
klog.InfoS("DeRegister DRA plugin", "name", pluginName)
unregisterPlugin(pluginName)
}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by DRA plugin.
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
klog.InfoS("Validate DRA plugin", "name", pluginName, "endpoint", endpoint, "versions", strings.Join(versions, ","))
_, err := h.validateVersions("ValidatePlugin", pluginName, versions)
if err != nil {
return fmt.Errorf("validation failed for DRA plugin %s at endpoint %s: %+v", pluginName, endpoint, err)
}
return err
}
// log prepends log string with `kubernetes.io/dra`.
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", DRAPluginName, msg), parts...)
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"sync"
utilversion "k8s.io/apimachinery/pkg/util/version"
)
// Plugin is a description of a DRA Plugin, defined by an endpoint
// and the highest DRA version supported.
type Plugin struct {
endpoint string
highestSupportedVersion *utilversion.Version
}
// PluginsStore holds a list of DRA Plugins.
type PluginsStore struct {
sync.RWMutex
store map[string]*Plugin
}
// Get lets you retrieve a DRA Plugin by name.
// This method is protected by a mutex.
func (s *PluginsStore) Get(pluginName string) *Plugin {
s.RLock()
defer s.RUnlock()
return s.store[pluginName]
}
// Set lets you save a DRA Plugin to the list and give it a specific name.
// This method is protected by a mutex.
func (s *PluginsStore) Set(pluginName string, plugin *Plugin) {
s.Lock()
defer s.Unlock()
if s.store == nil {
s.store = make(map[string]*Plugin)
}
s.store[pluginName] = plugin
}
// Delete lets you delete a DRA Plugin by name.
// This method is protected by a mutex.
func (s *PluginsStore) Delete(pluginName string) {
s.Lock()
defer s.Unlock()
delete(s.store, pluginName)
}
// Clear deletes all entries in the store.
// This methiod is protected by a mutex.
func (s *PluginsStore) Clear() {
s.Lock()
defer s.Unlock()
s.store = make(map[string]*Plugin)
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
v1 "k8s.io/api/core/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// Manager manages all the DRA resource plugins running on a node.
type Manager interface {
// PrepareResources prepares resources for a container in a pod.
// It communicates with the DRA resource plugin to prepare resources and
// returns resource info to trigger CDI injection by the runtime.
PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error)
// UnprepareResources calls NodeUnprepareResource GRPC from DRA plugin to unprepare pod resources
UnprepareResources(pod *v1.Pod) error
}
// ContainerInfo contains information required by the runtime to consume prepared resources.
type ContainerInfo struct {
// The Annotations for the container
Annotations []kubecontainer.Annotation
}

View File

@ -25,6 +25,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -236,3 +237,11 @@ func (cm *FakeContainerManager) GetNodeAllocatableAbsolute() v1.ResourceList {
defer cm.Unlock()
return nil
}
func (cm *FakeContainerManager) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
}
func (cm *FakeContainerManager) UnprepareResources(*v1.Pod) error {
return nil
}

View File

@ -72,6 +72,7 @@ import (
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/cm"
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -1473,8 +1474,13 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
kl.containerLogManager.Start()
// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for DRA Plugin
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler()))
}
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager
klog.V(4).InfoS("Starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
@ -1936,6 +1942,15 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
}
// NOTE: resources must be unprepared AFTER all containers have stopped
// and BEFORE the pod status is changed on the API server
// to avoid race conditions with the resource deallocation code in kubernetes core.
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if err := kl.containerManager.UnprepareResources(pod); err != nil {
return err
}
}
// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
@ -1953,6 +1968,7 @@ func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
// generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied

View File

@ -175,6 +175,12 @@ func NodeRules() []rbacv1.PolicyRule {
// RuntimeClass
nodePolicyRules = append(nodePolicyRules, rbacv1helpers.NewRule("get", "list", "watch").Groups("node.k8s.io").Resources("runtimeclasses").RuleOrDie())
// DRA Resource Claims
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
nodePolicyRules = append(nodePolicyRules, rbacv1helpers.NewRule("get").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie())
}
return nodePolicyRules
}

View File

@ -0,0 +1,11 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- klueska
- pohly
reviewers:
- klueska
- pohly
- bart0sh
labels:
- sig/node

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,81 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// To regenerate api.pb.go run hack/update-generated-kubelet-plugin-registration.sh
syntax = "proto3";
package v1alpha1;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option go_package = "dra";
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.goproto_getters_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_unrecognized_all) = false;
service Node {
rpc NodePrepareResource (NodePrepareResourceRequest)
returns (NodePrepareResourceResponse) {}
rpc NodeUnprepareResource (NodeUnprepareResourceRequest)
returns (NodeUnprepareResourceResponse) {}
}
message NodePrepareResourceRequest {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
string namespace = 1;
// The UID of the Resource claim (ResourceClaim.meta.UUID).
// This field is REQUIRED.
string claim_uid = 2;
// The name of the Resource claim (ResourceClaim.meta.Name)
// This field is REQUIRED.
string claim_name = 3;
// Resource handle (AllocationResult.ResourceHandle)
// This field is REQUIRED.
string resource_handle = 4;
}
message NodePrepareResourceResponse {
// These are the additional devices that kubelet must
// make available via the container runtime. A resource
// may have zero or more devices.
repeated string cdi_devices = 1;
}
message NodeUnprepareResourceRequest {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
string namespace = 1;
// The UID of the Resource claim (ResourceClaim.meta.UUID).
// This field is REQUIRED.
string claim_uid = 2;
// The name of the Resource claim (ResourceClaim.meta.Name)
// This field is REQUIRED.
string claim_name = 3;
// List of fully qualified CDI device names
// Kubelet plugin returns them in the NodePrepareResourceResponse
repeated string cdi_devices = 4;
}
message NodeUnprepareResourceResponse {
// Intentionally empty.
}

View File

@ -21,4 +21,6 @@ const (
CSIPlugin = "CSIPlugin"
// DevicePlugin identifier for registered device plugins
DevicePlugin = "DevicePlugin"
// DRAPlugin identifier for registered Dynamic Resourc Allocation plugins
DRAPlugin = "DRAPlugin"
)

1
vendor/modules.txt vendored
View File

@ -2255,6 +2255,7 @@ k8s.io/kubelet/pkg/apis/credentialprovider/v1
k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1
k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1
k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1
k8s.io/kubelet/pkg/apis/dra/v1alpha1
k8s.io/kubelet/pkg/apis/pluginregistration/v1
k8s.io/kubelet/pkg/apis/podresources/v1
k8s.io/kubelet/pkg/apis/podresources/v1alpha1