mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #54488 from lichuqiang/plugin_base
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add admission handler for device resources allocation **What this PR does / why we need it**: Add admission handler for device resources allocation to fail fast during pod creation **Which issue this PR fixes** fixes #51592 **Special notes for your reviewer**: @jiayingz Sorry, there is something wrong with my branch in #51895. And I think the existing comments in the PR might be too long for others to view. So I closed it and opened the new one, as we have basically reach an agreement on the implement :) I have covered the functionality and unit test part here, and would set about the e2e part ASAP /cc @jiayingz @vishh @RenaudWasTaken **Release note**: ```release-note NONE ```
This commit is contained in:
commit
2084f7f4f3
@ -209,6 +209,7 @@ go_test(
|
|||||||
"//pkg/volume/host_path:go_default_library",
|
"//pkg/volume/host_path:go_default_library",
|
||||||
"//pkg/volume/testing:go_default_library",
|
"//pkg/volume/testing:go_default_library",
|
||||||
"//pkg/volume/util/volumehelper:go_default_library",
|
"//pkg/volume/util/volumehelper:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
||||||
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
|
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
@ -37,8 +37,10 @@ go_library(
|
|||||||
"//pkg/kubelet/cm/cpumanager:go_default_library",
|
"//pkg/kubelet/cm/cpumanager:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/eviction/api:go_default_library",
|
"//pkg/kubelet/eviction/api:go_default_library",
|
||||||
|
"//pkg/kubelet/lifecycle:go_default_library",
|
||||||
"//pkg/kubelet/status:go_default_library",
|
"//pkg/kubelet/status:go_default_library",
|
||||||
"//pkg/util/mount:go_default_library",
|
"//pkg/util/mount:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
@ -26,7 +26,9 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -74,7 +76,14 @@ type ContainerManager interface {
|
|||||||
|
|
||||||
// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
|
// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
|
||||||
// extended resources required by container.
|
// extended resources required by container.
|
||||||
GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error)
|
GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
|
||||||
|
|
||||||
|
// UpdatePluginResources calls Allocate of device plugin handler for potential
|
||||||
|
// requests for device plugin resources, and returns an error if fails.
|
||||||
|
// Otherwise, it updates allocatableResource in nodeInfo if necessary,
|
||||||
|
// to make sure it is at least equal to the pod's requested capacity for
|
||||||
|
// any registered device plugin resource
|
||||||
|
UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error
|
||||||
|
|
||||||
InternalContainerLifecycle() InternalContainerLifecycle
|
InternalContainerLifecycle() InternalContainerLifecycle
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin"
|
"k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin"
|
||||||
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
|
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
utilfile "k8s.io/kubernetes/pkg/util/file"
|
utilfile "k8s.io/kubernetes/pkg/util/file"
|
||||||
@ -56,6 +57,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/procfs"
|
"k8s.io/kubernetes/pkg/util/procfs"
|
||||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||||
utilversion "k8s.io/kubernetes/pkg/util/version"
|
utilversion "k8s.io/kubernetes/pkg/util/version"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -594,7 +596,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
|
|||||||
}, time.Second, stopChan)
|
}, time.Second, stopChan)
|
||||||
|
|
||||||
// Starts device plugin manager.
|
// Starts device plugin manager.
|
||||||
if err := cm.devicePluginHandler.Start(); err != nil {
|
if err := cm.devicePluginHandler.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -615,14 +617,10 @@ func (cm *containerManagerImpl) setFsCapacity() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: move the GetResources logic to PodContainerManager.
|
// TODO: move the GetResources logic to PodContainerManager.
|
||||||
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
|
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
|
||||||
opts := &kubecontainer.RunContainerOptions{}
|
opts := &kubecontainer.RunContainerOptions{}
|
||||||
// Gets devices, mounts, and envs from device plugin handler.
|
// Allocate should already be called during predicateAdmitHandler.Admit(),
|
||||||
glog.V(3).Infof("Calling devicePluginHandler AllocateDevices")
|
// just try to fetch device runtime information from cached state here
|
||||||
err := cm.devicePluginHandler.Allocate(pod, container, activePods)
|
|
||||||
if err != nil {
|
|
||||||
return opts, err
|
|
||||||
}
|
|
||||||
devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container)
|
devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container)
|
||||||
if devOpts == nil {
|
if devOpts == nil {
|
||||||
return opts, nil
|
return opts, nil
|
||||||
@ -633,6 +631,10 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
|
|||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
|
return cm.devicePluginHandler.Allocate(node, attrs)
|
||||||
|
}
|
||||||
|
|
||||||
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
|
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
|
||||||
cpuLimit := int64(0)
|
cpuLimit := int64(0)
|
||||||
|
|
||||||
|
@ -23,7 +23,9 @@ import (
|
|||||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
type containerManagerStub struct{}
|
type containerManagerStub struct{}
|
||||||
@ -71,10 +73,14 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
|
|||||||
return &podContainerManagerStub{}
|
return &podContainerManagerStub{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
|
func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
|
||||||
return &kubecontainer.RunContainerOptions{}, nil
|
return &kubecontainer.RunContainerOptions{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *containerManagerStub) UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
|
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
|
||||||
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
|
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
|
||||||
}
|
}
|
||||||
|
@ -27,8 +27,10 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
type unsupportedContainerManager struct {
|
type unsupportedContainerManager struct {
|
||||||
@ -76,10 +78,14 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana
|
|||||||
return &unsupportedPodContainerManager{}
|
return &unsupportedPodContainerManager{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
|
func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
|
||||||
return &kubecontainer.RunContainerOptions{}, nil
|
return &kubecontainer.RunContainerOptions{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *unsupportedContainerManager) UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
|
func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
|
||||||
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
|
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@ go_library(
|
|||||||
"//pkg/api/v1/helper:go_default_library",
|
"//pkg/api/v1/helper:go_default_library",
|
||||||
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
|
"//pkg/kubelet/lifecycle:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/golang.org/x/net/context:go_default_library",
|
"//vendor/golang.org/x/net/context:go_default_library",
|
||||||
"//vendor/google.golang.org/grpc:go_default_library",
|
"//vendor/google.golang.org/grpc:go_default_library",
|
||||||
@ -55,6 +57,8 @@ go_test(
|
|||||||
library = ":go_default_library",
|
library = ":go_default_library",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
||||||
|
"//pkg/kubelet/lifecycle:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -29,19 +29,28 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
||||||
|
type ActivePodsFunc func() []*v1.Pod
|
||||||
|
|
||||||
// Handler defines the functions used to manage and access device plugin resources.
|
// Handler defines the functions used to manage and access device plugin resources.
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
// Start starts device plugin registration service.
|
// Start starts device plugin registration service.
|
||||||
Start() error
|
Start(activePods ActivePodsFunc) error
|
||||||
// Devices returns all of registered devices keyed by resourceName.
|
// Devices returns all of registered devices keyed by resourceName.
|
||||||
Devices() map[string][]pluginapi.Device
|
Devices() map[string][]pluginapi.Device
|
||||||
// Allocate attempts to allocate all of required extended resources for
|
// Allocate scans through containers in the pod spec
|
||||||
// the input container, issues an Allocate rpc request for each of such
|
// If it finds the container requires device plugin resource, it:
|
||||||
// resources, processes their AllocateResponses, and updates the cached
|
// 1. Checks whether it already has this information in its cached state.
|
||||||
// containerDevices on success.
|
// 2. If not, it calls Allocate and populate its cached state afterwards.
|
||||||
Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error
|
// 3. If there is no cached state and Allocate fails, it returns an error.
|
||||||
|
// 4. Otherwise, it updates allocatableResource in nodeInfo if necessary,
|
||||||
|
// to make sure it is at least equal to the pod's requested capacity for
|
||||||
|
// any registered device plugin resource
|
||||||
|
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
|
||||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||||
// for the found one. An empty struct is returned in case no cached state is found.
|
// for the found one. An empty struct is returned in case no cached state is found.
|
||||||
@ -53,6 +62,10 @@ type HandlerImpl struct {
|
|||||||
// TODO: consider to change this to RWMutex.
|
// TODO: consider to change this to RWMutex.
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
devicePluginManager Manager
|
devicePluginManager Manager
|
||||||
|
// activePods is a method for listing active pods on the node
|
||||||
|
// so the amount of pluginResources requested by existing pods
|
||||||
|
// could be counted when updating allocated devices
|
||||||
|
activePods ActivePodsFunc
|
||||||
// devicePluginManagerMonitorCallback is used for testing only.
|
// devicePluginManagerMonitorCallback is used for testing only.
|
||||||
devicePluginManagerMonitorCallback MonitorCallback
|
devicePluginManagerMonitorCallback MonitorCallback
|
||||||
// allDevices contains all of registered resourceNames and their exported device IDs.
|
// allDevices contains all of registered resourceNames and their exported device IDs.
|
||||||
@ -103,16 +116,21 @@ func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, err
|
|||||||
|
|
||||||
handler.devicePluginManager = mgr
|
handler.devicePluginManager = mgr
|
||||||
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
||||||
// Loads in allocatedDevices information from disk.
|
|
||||||
err = handler.readCheckpoint()
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
|
||||||
}
|
|
||||||
return handler, nil
|
return handler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts device plugin registration service.
|
// Start initializes podDevices and allocatedDevices information from checkpoint-ed state
|
||||||
func (h *HandlerImpl) Start() error {
|
// and starts device plugin registration service.
|
||||||
|
func (h *HandlerImpl) Start(activePods ActivePodsFunc) error {
|
||||||
|
h.activePods = activePods
|
||||||
|
|
||||||
|
// Loads in allocatedDevices information from disk.
|
||||||
|
err := h.readCheckpoint()
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return h.devicePluginManager.Start()
|
return h.devicePluginManager.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,11 +184,11 @@ func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, requi
|
|||||||
return devices, nil
|
return devices, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate attempts to allocate all of required extended resources for
|
// allocateContainerResources attempts to allocate all of required device
|
||||||
// the input container, issues an Allocate rpc request for each of such
|
// plugin resources for the input container, issues an Allocate rpc request
|
||||||
// resources, processes their AllocateResponses, and updates the cached
|
// for each new device resource requirement, processes their AllocateResponses,
|
||||||
// containerDevices on success.
|
// and updates the cached containerDevices on success.
|
||||||
func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error {
|
func (h *HandlerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error {
|
||||||
podUID := string(pod.UID)
|
podUID := string(pod.UID)
|
||||||
contName := container.Name
|
contName := container.Name
|
||||||
allocatedDevicesUpdated := false
|
allocatedDevicesUpdated := false
|
||||||
@ -184,7 +202,7 @@ func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods
|
|||||||
// Updates allocatedDevices to garbage collect any stranded resources
|
// Updates allocatedDevices to garbage collect any stranded resources
|
||||||
// before doing the device plugin allocation.
|
// before doing the device plugin allocation.
|
||||||
if !allocatedDevicesUpdated {
|
if !allocatedDevicesUpdated {
|
||||||
h.updateAllocatedDevices(activePods)
|
h.updateAllocatedDevices(h.activePods())
|
||||||
allocatedDevicesUpdated = true
|
allocatedDevicesUpdated = true
|
||||||
}
|
}
|
||||||
allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed)
|
allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed)
|
||||||
@ -226,6 +244,60 @@ func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods
|
|||||||
return h.writeCheckpoint()
|
return h.writeCheckpoint()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Allocate attempts to allocate all of required device plugin resources,
|
||||||
|
// and update Allocatable resources in nodeInfo if necessary
|
||||||
|
func (h *HandlerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
|
pod := attrs.Pod
|
||||||
|
// TODO: Reuse devices between init containers and regular containers.
|
||||||
|
for _, container := range pod.Spec.InitContainers {
|
||||||
|
if err := h.allocateContainerResources(pod, &container); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
if err := h.allocateContainerResources(pod, &container); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// quick return if no pluginResources requested
|
||||||
|
if _, podRequireDevicePluginResource := h.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
h.sanitizeNodeAllocatable(node)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sanitizeNodeAllocatable scans through allocatedDevices in DevicePluginHandler
|
||||||
|
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
||||||
|
// the allocated capacity. This allows pods that have already been scheduled on
|
||||||
|
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
||||||
|
func (h *HandlerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
||||||
|
var newAllocatableResource *schedulercache.Resource
|
||||||
|
allocatableResource := node.AllocatableResource()
|
||||||
|
if allocatableResource.ScalarResources == nil {
|
||||||
|
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
||||||
|
}
|
||||||
|
for resource, devices := range h.allocatedDevices {
|
||||||
|
needed := devices.Len()
|
||||||
|
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
||||||
|
if ok && int(quant) >= needed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Needs to update nodeInfo.AllocatableResource to make sure
|
||||||
|
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
||||||
|
if newAllocatableResource == nil {
|
||||||
|
newAllocatableResource = allocatableResource.Clone()
|
||||||
|
}
|
||||||
|
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
||||||
|
}
|
||||||
|
if newAllocatableResource != nil {
|
||||||
|
node.SetAllocatableResource(newAllocatableResource)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||||
// for the found one. An empty struct is returned in case no cached state is found.
|
// for the found one. An empty struct is returned in case no cached state is found.
|
||||||
|
@ -19,6 +19,8 @@ package deviceplugin
|
|||||||
import (
|
import (
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HandlerStub provides a simple stub implementation for Handler.
|
// HandlerStub provides a simple stub implementation for Handler.
|
||||||
@ -30,7 +32,7 @@ func NewHandlerStub() (*HandlerStub, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start simply returns nil.
|
// Start simply returns nil.
|
||||||
func (h *HandlerStub) Start() error {
|
func (h *HandlerStub) Start(activePods ActivePodsFunc) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -40,7 +42,7 @@ func (h *HandlerStub) Devices() map[string][]pluginapi.Device {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Allocate simply returns nil.
|
// Allocate simply returns nil.
|
||||||
func (h *HandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error {
|
func (h *HandlerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUpdateCapacity(t *testing.T) {
|
func TestUpdateCapacity(t *testing.T) {
|
||||||
@ -224,13 +226,24 @@ func TestCheckpoint(t *testing.T) {
|
|||||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type activePodsStub struct {
|
||||||
|
activePods []*v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
||||||
|
return a.activePods
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
||||||
|
a.activePods = newPods
|
||||||
|
}
|
||||||
|
|
||||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||||
var logLevel string
|
var logLevel string
|
||||||
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
||||||
flag.Lookup("v").Value.Set(logLevel)
|
flag.Lookup("v").Value.Set(logLevel)
|
||||||
|
|
||||||
var activePods []*v1.Pod
|
|
||||||
resourceName1 := "domain1.com/resource1"
|
resourceName1 := "domain1.com/resource1"
|
||||||
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
devID1 := "dev1"
|
devID1 := "dev1"
|
||||||
@ -244,6 +257,16 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
as := assert.New(t)
|
as := assert.New(t)
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
podsStub := activePodsStub{
|
||||||
|
activePods: []*v1.Pod{},
|
||||||
|
}
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
testHandler := &HandlerImpl{
|
testHandler := &HandlerImpl{
|
||||||
devicePluginManager: m,
|
devicePluginManager: m,
|
||||||
@ -251,6 +274,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
allDevices: make(map[string]sets.String),
|
allDevices: make(map[string]sets.String),
|
||||||
allocatedDevices: make(map[string]sets.String),
|
allocatedDevices: make(map[string]sets.String),
|
||||||
podDevices: make(podDevices),
|
podDevices: make(podDevices),
|
||||||
|
activePods: podsStub.getActivePods,
|
||||||
}
|
}
|
||||||
testHandler.allDevices[resourceName1] = sets.NewString()
|
testHandler.allDevices[resourceName1] = sets.NewString()
|
||||||
testHandler.allDevices[resourceName1].Insert(devID1)
|
testHandler.allDevices[resourceName1].Insert(devID1)
|
||||||
@ -288,8 +312,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
activePods = append(activePods, pod)
|
podsStub.updateActivePods([]*v1.Pod{pod})
|
||||||
err = testHandler.Allocate(pod, &pod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
||||||
as.Equal(len(runContainerOpts.Devices), 3)
|
as.Equal(len(runContainerOpts.Devices), 3)
|
||||||
@ -315,7 +339,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = testHandler.Allocate(failPod, &failPod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
||||||
as.NotNil(err)
|
as.NotNil(err)
|
||||||
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
||||||
as.Nil(runContainerOpts2)
|
as.Nil(runContainerOpts2)
|
||||||
@ -338,8 +362,53 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = testHandler.Allocate(newPod, &newPod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
||||||
as.Equal(1, len(runContainerOpts3.Envs))
|
as.Equal(1, len(runContainerOpts3.Envs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
devID1 := "dev1"
|
||||||
|
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
devID2 := "dev2"
|
||||||
|
|
||||||
|
m, err := NewDevicePluginManagerTestStub()
|
||||||
|
as := assert.New(t)
|
||||||
|
as.Nil(err)
|
||||||
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
|
||||||
|
testHandler := &HandlerImpl{
|
||||||
|
devicePluginManager: m,
|
||||||
|
devicePluginManagerMonitorCallback: monitorCallback,
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
}
|
||||||
|
// require one of resource1 and one of resource2
|
||||||
|
testHandler.allocatedDevices[resourceName1] = sets.NewString()
|
||||||
|
testHandler.allocatedDevices[resourceName1].Insert(devID1)
|
||||||
|
testHandler.allocatedDevices[resourceName2] = sets.NewString()
|
||||||
|
testHandler.allocatedDevices[resourceName2].Insert(devID2)
|
||||||
|
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{
|
||||||
|
// has no resource1 and two of resource2
|
||||||
|
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
|
testHandler.sanitizeNodeAllocatable(nodeInfo)
|
||||||
|
|
||||||
|
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
||||||
|
// allocatable in nodeInfo is less than needed, should update
|
||||||
|
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
||||||
|
// allocatable in nodeInfo is more than needed, should skip updating
|
||||||
|
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
||||||
|
}
|
||||||
|
@ -878,7 +878,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
klet.AddPodSyncHandler(activeDeadlineHandler)
|
klet.AddPodSyncHandler(activeDeadlineHandler)
|
||||||
|
|
||||||
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
|
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
|
||||||
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler))
|
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
|
||||||
// apply functional Option's
|
// apply functional Option's
|
||||||
for _, opt := range kubeDeps.Options {
|
for _, opt := range kubeDeps.Options {
|
||||||
opt(klet)
|
opt(klet)
|
||||||
|
@ -383,7 +383,7 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string {
|
|||||||
// the container runtime to set parameters for launching a container.
|
// the container runtime to set parameters for launching a container.
|
||||||
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) {
|
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) {
|
||||||
useClusterFirstPolicy := false
|
useClusterFirstPolicy := false
|
||||||
opts, err := kl.containerManager.GetResources(pod, container, kl.GetActivePods())
|
opts, err := kl.containerManager.GetResources(pod, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,7 @@ import (
|
|||||||
_ "k8s.io/kubernetes/pkg/volume/host_path"
|
_ "k8s.io/kubernetes/pkg/volume/host_path"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -284,7 +285,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.evictionManager = evictionManager
|
kubelet.evictionManager = evictionManager
|
||||||
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
|
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
|
||||||
// Add this as cleanup predicate pod admitter
|
// Add this as cleanup predicate pod admitter
|
||||||
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub()))
|
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))
|
||||||
|
|
||||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||||
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
|
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
|
||||||
@ -573,6 +574,116 @@ func TestHandleMemExceeded(t *testing.T) {
|
|||||||
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that we handle result of interface UpdatePluginResources correctly
|
||||||
|
// by setting corresponding status in status map.
|
||||||
|
func TestHandlePluginResources(t *testing.T) {
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
testKubelet.chainMock()
|
||||||
|
kl := testKubelet.kubelet
|
||||||
|
|
||||||
|
adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
|
||||||
|
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
|
||||||
|
failedResource := v1.ResourceName("domain2.com/failedResource")
|
||||||
|
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||||
|
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
|
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
|
||||||
|
allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI)
|
||||||
|
nodes := []*v1.Node{
|
||||||
|
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity1,
|
||||||
|
unadjustedResouce: resourceQuantity1,
|
||||||
|
v1.ResourcePods: allowedPodQuantity,
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
kl.nodeInfo = testNodeInfo{nodes: nodes}
|
||||||
|
|
||||||
|
updatePluginResourcesFunc := func(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
|
// Maps from resourceName to the value we use to set node.allocatableResource[resourceName].
|
||||||
|
// A resource with invalid value (< 0) causes the function to return an error
|
||||||
|
// to emulate resource Allocation failure.
|
||||||
|
// Resources not contained in this map will have their node.allocatableResource
|
||||||
|
// quantity unchanged.
|
||||||
|
updateResourceMap := map[v1.ResourceName]resource.Quantity{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
failedResource: resourceQuantityInvalid,
|
||||||
|
}
|
||||||
|
pod := attrs.Pod
|
||||||
|
allocatableResource := node.AllocatableResource()
|
||||||
|
newAllocatableResource := allocatableResource.Clone()
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
for resource := range container.Resources.Requests {
|
||||||
|
newQuantity, exist := updateResourceMap[resource]
|
||||||
|
if !exist {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if newQuantity.Value() < 0 {
|
||||||
|
return fmt.Errorf("Allocation failed")
|
||||||
|
}
|
||||||
|
newAllocatableResource.ScalarResources[resource] = newQuantity.Value()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node.SetAllocatableResource(newAllocatableResource)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// add updatePluginResourcesFunc to admission handler, to test it's behavior.
|
||||||
|
kl.admitHandlers = lifecycle.PodAdmitHandlers{}
|
||||||
|
kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc))
|
||||||
|
|
||||||
|
// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
|
||||||
|
// adjusts node.allocatableResource for this resource to a sufficient value.
|
||||||
|
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
|
||||||
|
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
unadjustedResouce: resourceQuantity2,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
unadjustedResouce: resourceQuantity2,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
// pod requiring failedResource will fail with the resource failed to be allocated.
|
||||||
|
failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
failedResource: resourceQuantity1,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
failedResource: resourceQuantity1,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
pods := []*v1.Pod{
|
||||||
|
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
|
||||||
|
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
|
||||||
|
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
|
||||||
|
}
|
||||||
|
// The latter two pod should be rejected.
|
||||||
|
fittingPod := pods[0]
|
||||||
|
exceededPod := pods[1]
|
||||||
|
failedPod := pods[2]
|
||||||
|
|
||||||
|
kl.HandlePodAdditions(pods)
|
||||||
|
|
||||||
|
// Check pod status stored in the status map.
|
||||||
|
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
||||||
|
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
|
||||||
|
checkPodStatus(t, kl, failedPod, v1.PodFailed)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
|
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
|
||||||
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
|
|
||||||
type getNodeAnyWayFuncType func() (*v1.Node, error)
|
type getNodeAnyWayFuncType func() (*v1.Node, error)
|
||||||
|
|
||||||
|
type pluginResourceUpdateFuncType func(*schedulercache.NodeInfo, *PodAdmitAttributes) error
|
||||||
|
|
||||||
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
|
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
|
||||||
// This allows for the graceful handling of pod admission failure.
|
// This allows for the graceful handling of pod admission failure.
|
||||||
type AdmissionFailureHandler interface {
|
type AdmissionFailureHandler interface {
|
||||||
@ -37,14 +39,16 @@ type AdmissionFailureHandler interface {
|
|||||||
|
|
||||||
type predicateAdmitHandler struct {
|
type predicateAdmitHandler struct {
|
||||||
getNodeAnyWayFunc getNodeAnyWayFuncType
|
getNodeAnyWayFunc getNodeAnyWayFuncType
|
||||||
|
pluginResourceUpdateFunc pluginResourceUpdateFuncType
|
||||||
admissionFailureHandler AdmissionFailureHandler
|
admissionFailureHandler AdmissionFailureHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ PodAdmitHandler = &predicateAdmitHandler{}
|
var _ PodAdmitHandler = &predicateAdmitHandler{}
|
||||||
|
|
||||||
func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler) *predicateAdmitHandler {
|
func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler, pluginResourceUpdateFunc pluginResourceUpdateFuncType) *predicateAdmitHandler {
|
||||||
return &predicateAdmitHandler{
|
return &predicateAdmitHandler{
|
||||||
getNodeAnyWayFunc,
|
getNodeAnyWayFunc,
|
||||||
|
pluginResourceUpdateFunc,
|
||||||
admissionFailureHandler,
|
admissionFailureHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,6 +67,16 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
|
|||||||
pods := attrs.OtherPods
|
pods := attrs.OtherPods
|
||||||
nodeInfo := schedulercache.NewNodeInfo(pods...)
|
nodeInfo := schedulercache.NewNodeInfo(pods...)
|
||||||
nodeInfo.SetNode(node)
|
nodeInfo.SetNode(node)
|
||||||
|
// ensure the node has enough plugin resources for that required in pods
|
||||||
|
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
|
||||||
|
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
|
||||||
|
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
|
||||||
|
return PodAdmitResult{
|
||||||
|
Admit: false,
|
||||||
|
Reason: "UnexpectedAdmissionError",
|
||||||
|
Message: message,
|
||||||
|
}
|
||||||
|
}
|
||||||
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
|
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
|
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
|
||||||
|
@ -255,6 +255,11 @@ func (n *NodeInfo) AllocatableResource() Resource {
|
|||||||
return *n.allocatableResource
|
return *n.allocatableResource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetAllocatableResource sets the allocatableResource information of given node.
|
||||||
|
func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) {
|
||||||
|
n.allocatableResource = allocatableResource
|
||||||
|
}
|
||||||
|
|
||||||
func (n *NodeInfo) Clone() *NodeInfo {
|
func (n *NodeInfo) Clone() *NodeInfo {
|
||||||
clone := &NodeInfo{
|
clone := &NodeInfo{
|
||||||
node: n.node,
|
node: n.node,
|
||||||
|
Loading…
Reference in New Issue
Block a user