From ebd445eb8c84803166575e30b4525c487f0ae62d Mon Sep 17 00:00:00 2001 From: lichuqiang Date: Thu, 2 Nov 2017 09:17:48 +0800 Subject: [PATCH 1/2] add admission handler for device resources allocation --- pkg/kubelet/cm/BUILD | 2 + pkg/kubelet/cm/container_manager.go | 11 +- pkg/kubelet/cm/container_manager_linux.go | 18 +-- pkg/kubelet/cm/container_manager_stub.go | 8 +- .../cm/container_manager_unsupported.go | 8 +- pkg/kubelet/cm/deviceplugin/BUILD | 2 + .../cm/deviceplugin/device_plugin_handler.go | 110 +++++++++++++++--- .../device_plugin_handler_stub.go | 6 +- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_pods.go | 2 +- pkg/kubelet/lifecycle/predicate.go | 20 +++- .../pkg/scheduler/schedulercache/node_info.go | 5 + 12 files changed, 157 insertions(+), 37 deletions(-) diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index a50202332fe..dafbacebca3 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -37,8 +37,10 @@ go_library( "//pkg/kubelet/cm/cpumanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", + "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/util/mount:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index eb556ef001e..ce19634d3f2 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -26,7 +26,9 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "fmt" "strconv" @@ -74,7 +76,14 @@ type ContainerManager interface { // GetResources returns RunContainerOptions with devices, mounts, and env fields populated for // extended resources required by container. - GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) + GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) + + // UpdatePluginResources calls Allocate of device plugin handler for potential + // requests for device plugin resources, and returns an error if fails. + // Otherwise, it updates allocatableResource in nodeInfo if necessary, + // to make sure it is at least equal to the pod's requested capacity for + // any registered device plugin resource + UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error InternalContainerLifecycle() InternalContainerLifecycle } diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 0bc5a333936..22c4dbf1597 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/status" utilfile "k8s.io/kubernetes/pkg/util/file" @@ -56,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilversion "k8s.io/kubernetes/pkg/util/version" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -594,7 +596,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, }, time.Second, stopChan) // Starts device plugin manager. - if err := cm.devicePluginHandler.Start(); err != nil { + if err := cm.devicePluginHandler.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil { return err } return nil @@ -615,14 +617,10 @@ func (cm *containerManagerImpl) setFsCapacity() error { } // TODO: move the GetResources logic to PodContainerManager. -func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { +func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { opts := &kubecontainer.RunContainerOptions{} - // Gets devices, mounts, and envs from device plugin handler. - glog.V(3).Infof("Calling devicePluginHandler AllocateDevices") - err := cm.devicePluginHandler.Allocate(pod, container, activePods) - if err != nil { - return opts, err - } + // Allocate should already be called during predicateAdmitHandler.Admit(), + // just try to fetch device runtime information from cached state here devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container) if devOpts == nil { return opts, nil @@ -633,6 +631,10 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe return opts, nil } +func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + return cm.devicePluginHandler.Allocate(node, attrs) +} + func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { cpuLimit := int64(0) diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index ee5d7c73bf6..beaf57a342e 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -23,7 +23,9 @@ import ( internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type containerManagerStub struct{} @@ -71,10 +73,14 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { return &podContainerManagerStub{} } -func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { +func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { return &kubecontainer.RunContainerOptions{}, nil } +func (cm *containerManagerStub) UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error { + return nil +} + func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index 0602bb5de97..f3937703686 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -27,8 +27,10 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type unsupportedContainerManager struct { @@ -76,10 +78,14 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana return &unsupportedPodContainerManager{} } -func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { +func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { return &kubecontainer.RunContainerOptions{}, nil } +func (cm *unsupportedContainerManager) UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error { + return nil +} + func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle { return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} } diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 437ba6fcf5f..679945e1fe0 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -22,6 +22,8 @@ go_library( "//pkg/api/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/lifecycle:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go index 23d67c3fd6d..b95ec226101 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go @@ -29,19 +29,28 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +// ActivePodsFunc is a function that returns a list of pods to reconcile. +type ActivePodsFunc func() []*v1.Pod + // Handler defines the functions used to manage and access device plugin resources. type Handler interface { // Start starts device plugin registration service. - Start() error + Start(activePods ActivePodsFunc) error // Devices returns all of registered devices keyed by resourceName. Devices() map[string][]pluginapi.Device - // Allocate attempts to allocate all of required extended resources for - // the input container, issues an Allocate rpc request for each of such - // resources, processes their AllocateResponses, and updates the cached - // containerDevices on success. - Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error + // Allocate scans through containers in the pod spec + // If it finds the container requires device plugin resource, it: + // 1. Checks whether it already has this information in its cached state. + // 2. If not, it calls Allocate and populate its cached state afterwards. + // 3. If there is no cached state and Allocate fails, it returns an error. + // 4. Otherwise, it updates allocatableResource in nodeInfo if necessary, + // to make sure it is at least equal to the pod's requested capacity for + // any registered device plugin resource + Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. @@ -53,6 +62,10 @@ type HandlerImpl struct { // TODO: consider to change this to RWMutex. sync.Mutex devicePluginManager Manager + // activePods is a method for listing active pods on the node + // so the amount of pluginResources requested by existing pods + // could be counted when updating allocated devices + activePods ActivePodsFunc // devicePluginManagerMonitorCallback is used for testing only. devicePluginManagerMonitorCallback MonitorCallback // allDevices contains all of registered resourceNames and their exported device IDs. @@ -103,16 +116,21 @@ func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, err handler.devicePluginManager = mgr handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback - // Loads in allocatedDevices information from disk. - err = handler.readCheckpoint() - if err != nil { - glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) - } + return handler, nil } -// Start starts device plugin registration service. -func (h *HandlerImpl) Start() error { +// Start initializes podDevices and allocatedDevices information from checkpoint-ed state +// and starts device plugin registration service. +func (h *HandlerImpl) Start(activePods ActivePodsFunc) error { + h.activePods = activePods + + // Loads in allocatedDevices information from disk. + err := h.readCheckpoint() + if err != nil { + glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) + } + return h.devicePluginManager.Start() } @@ -166,11 +184,11 @@ func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, requi return devices, nil } -// Allocate attempts to allocate all of required extended resources for -// the input container, issues an Allocate rpc request for each of such -// resources, processes their AllocateResponses, and updates the cached -// containerDevices on success. -func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error { +// allocateContainerResources attempts to allocate all of required device +// plugin resources for the input container, issues an Allocate rpc request +// for each new device resource requirement, processes their AllocateResponses, +// and updates the cached containerDevices on success. +func (h *HandlerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error { podUID := string(pod.UID) contName := container.Name allocatedDevicesUpdated := false @@ -184,7 +202,7 @@ func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods // Updates allocatedDevices to garbage collect any stranded resources // before doing the device plugin allocation. if !allocatedDevicesUpdated { - h.updateAllocatedDevices(activePods) + h.updateAllocatedDevices(h.activePods()) allocatedDevicesUpdated = true } allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed) @@ -226,6 +244,60 @@ func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods return h.writeCheckpoint() } +// Allocate attempts to allocate all of required device plugin resources, +// and update Allocatable resources in nodeInfo if necessary +func (h *HandlerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + pod := attrs.Pod + // TODO: Reuse devices between init containers and regular containers. + for _, container := range pod.Spec.InitContainers { + if err := h.allocateContainerResources(pod, &container); err != nil { + return err + } + } + for _, container := range pod.Spec.Containers { + if err := h.allocateContainerResources(pod, &container); err != nil { + return err + } + } + + // quick return if no pluginResources requested + if _, podRequireDevicePluginResource := h.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { + return nil + } + + h.sanitizeNodeAllocatable(node) + + return nil +} + +// sanitizeNodeAllocatable scans through allocatedDevices in DevicePluginHandler +// and if necessary, updates allocatableResource in nodeInfo to at least equal to +// the allocated capacity. This allows pods that have already been scheduled on +// the node to pass GeneralPredicates admission checking even upon device plugin failure. +func (h *HandlerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) { + var newAllocatableResource *schedulercache.Resource + allocatableResource := node.AllocatableResource() + if allocatableResource.ScalarResources == nil { + allocatableResource.ScalarResources = make(map[v1.ResourceName]int64) + } + for resource, devices := range h.allocatedDevices { + needed := devices.Len() + quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] + if ok && int(quant) >= needed { + continue + } + // Needs to update nodeInfo.AllocatableResource to make sure + // NodeInfo.allocatableResource at least equal to the capacity already allocated. + if newAllocatableResource == nil { + newAllocatableResource = allocatableResource.Clone() + } + newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed) + } + if newAllocatableResource != nil { + node.SetAllocatableResource(newAllocatableResource) + } +} + // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go index c3735d486c9..eb723090467 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go @@ -19,6 +19,8 @@ package deviceplugin import ( "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // HandlerStub provides a simple stub implementation for Handler. @@ -30,7 +32,7 @@ func NewHandlerStub() (*HandlerStub, error) { } // Start simply returns nil. -func (h *HandlerStub) Start() error { +func (h *HandlerStub) Start(activePods ActivePodsFunc) error { return nil } @@ -40,7 +42,7 @@ func (h *HandlerStub) Devices() map[string][]pluginapi.Device { } // Allocate simply returns nil. -func (h *HandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error { +func (h *HandlerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6662772c32e..1c973131b33 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -873,7 +873,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.AddPodSyncHandler(activeDeadlineHandler) criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) - klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler)) + klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) // apply functional Option's for _, opt := range kubeDeps.Options { opt(klet) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index dfdb0f384d3..cf4ae321ec4 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -383,7 +383,7 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string { // the container runtime to set parameters for launching a container. func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) { useClusterFirstPolicy := false - opts, err := kl.containerManager.GetResources(pod, container, kl.GetActivePods()) + opts, err := kl.containerManager.GetResources(pod, container) if err != nil { return nil, false, err } diff --git a/pkg/kubelet/lifecycle/predicate.go b/pkg/kubelet/lifecycle/predicate.go index 75c7663dcf4..9b8ad4d3cc1 100644 --- a/pkg/kubelet/lifecycle/predicate.go +++ b/pkg/kubelet/lifecycle/predicate.go @@ -29,6 +29,8 @@ import ( type getNodeAnyWayFuncType func() (*v1.Node, error) +type pluginResourceUpdateFuncType func(*schedulercache.NodeInfo, *PodAdmitAttributes) error + // AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod. // This allows for the graceful handling of pod admission failure. type AdmissionFailureHandler interface { @@ -36,15 +38,17 @@ type AdmissionFailureHandler interface { } type predicateAdmitHandler struct { - getNodeAnyWayFunc getNodeAnyWayFuncType - admissionFailureHandler AdmissionFailureHandler + getNodeAnyWayFunc getNodeAnyWayFuncType + pluginResourceUpdateFunc pluginResourceUpdateFuncType + admissionFailureHandler AdmissionFailureHandler } var _ PodAdmitHandler = &predicateAdmitHandler{} -func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler) *predicateAdmitHandler { +func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler, pluginResourceUpdateFunc pluginResourceUpdateFuncType) *predicateAdmitHandler { return &predicateAdmitHandler{ getNodeAnyWayFunc, + pluginResourceUpdateFunc, admissionFailureHandler, } } @@ -63,6 +67,16 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult pods := attrs.OtherPods nodeInfo := schedulercache.NewNodeInfo(pods...) nodeInfo.SetNode(node) + // ensure the node has enough plugin resources for that required in pods + if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil { + message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + return PodAdmitResult{ + Admit: false, + Reason: "UnexpectedAdmissionError", + Message: message, + } + } fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo) if err != nil { message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 7cbc4568069..da46247b9be 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -255,6 +255,11 @@ func (n *NodeInfo) AllocatableResource() Resource { return *n.allocatableResource } +// SetAllocatableResource sets the allocatableResource information of given node. +func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) { + n.allocatableResource = allocatableResource +} + func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ node: n.node, From 06308963833f12e087f7db4ced5289e3c39f4159 Mon Sep 17 00:00:00 2001 From: lichuqiang Date: Thu, 2 Nov 2017 09:18:24 +0800 Subject: [PATCH 2/2] update unit test for plugin resources allocation reinforcement --- pkg/kubelet/BUILD | 1 + pkg/kubelet/cm/deviceplugin/BUILD | 2 + .../device_plugin_handler_test.go | 79 +++++++++++- pkg/kubelet/kubelet_test.go | 113 +++++++++++++++++- 4 files changed, 189 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index eb1dce4d8d8..8e9d8ea4e9e 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -209,6 +209,7 @@ go_test( "//pkg/volume/host_path:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/github.com/google/cadvisor/info/v2:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 679945e1fe0..456325870db 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -57,6 +57,8 @@ go_test( library = ":go_default_library", deps = [ "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", + "//pkg/kubelet/lifecycle:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go index 17c01059593..252968c3806 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go @@ -30,6 +30,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func TestUpdateCapacity(t *testing.T) { @@ -224,13 +226,24 @@ func TestCheckpoint(t *testing.T) { as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices)) } +type activePodsStub struct { + activePods []*v1.Pod +} + +func (a *activePodsStub) getActivePods() []*v1.Pod { + return a.activePods +} + +func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { + a.activePods = newPods +} + func TestPodContainerDeviceAllocation(t *testing.T) { flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) var logLevel string flag.StringVar(&logLevel, "logLevel", "4", "test") flag.Lookup("v").Value.Set(logLevel) - var activePods []*v1.Pod resourceName1 := "domain1.com/resource1" resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) devID1 := "dev1" @@ -244,6 +257,16 @@ func TestPodContainerDeviceAllocation(t *testing.T) { as := assert.New(t) as.Nil(err) monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + podsStub := activePodsStub{ + activePods: []*v1.Pod{}, + } + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{}, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) testHandler := &HandlerImpl{ devicePluginManager: m, @@ -251,6 +274,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { allDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), + activePods: podsStub.getActivePods, } testHandler.allDevices[resourceName1] = sets.NewString() testHandler.allDevices[resourceName1].Insert(devID1) @@ -288,8 +312,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, } - activePods = append(activePods, pod) - err = testHandler.Allocate(pod, &pod.Spec.Containers[0], activePods) + podsStub.updateActivePods([]*v1.Pod{pod}) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) as.Nil(err) runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) as.Equal(len(runContainerOpts.Devices), 3) @@ -315,7 +339,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, }, } - err = testHandler.Allocate(failPod, &failPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod}) as.NotNil(err) runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0]) as.Nil(runContainerOpts2) @@ -338,8 +362,53 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, }, } - err = testHandler.Allocate(newPod, &newPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod}) as.Nil(err) runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) as.Equal(1, len(runContainerOpts3.Envs)) } + +func TestSanitizeNodeAllocatable(t *testing.T) { + resourceName1 := "domain1.com/resource1" + devID1 := "dev1" + + resourceName2 := "domain2.com/resource2" + devID2 := "dev2" + + m, err := NewDevicePluginManagerTestStub() + as := assert.New(t) + as.Nil(err) + monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + + testHandler := &HandlerImpl{ + devicePluginManager: m, + devicePluginManagerMonitorCallback: monitorCallback, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } + // require one of resource1 and one of resource2 + testHandler.allocatedDevices[resourceName1] = sets.NewString() + testHandler.allocatedDevices[resourceName1].Insert(devID1) + testHandler.allocatedDevices[resourceName2] = sets.NewString() + testHandler.allocatedDevices[resourceName2].Insert(devID2) + + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + // has no resource1 and two of resource2 + v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI), + }, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) + + testHandler.sanitizeNodeAllocatable(nodeInfo) + + allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources + // allocatable in nodeInfo is less than needed, should update + as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)])) + // allocatable in nodeInfo is more than needed, should skip updating + as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 70e2dd6eb02..5019121d43a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -72,6 +72,7 @@ import ( _ "k8s.io/kubernetes/pkg/volume/host_path" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util/volumehelper" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func init() { @@ -284,7 +285,7 @@ func newTestKubeletWithImageList( kubelet.evictionManager = evictionManager kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) // Add this as cleanup predicate pod admitter - kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub())) + kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources)) plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock @@ -573,6 +574,116 @@ func TestHandleMemExceeded(t *testing.T) { checkPodStatus(t, kl, fittingPod, v1.PodPending) } +// Tests that we handle result of interface UpdatePluginResources correctly +// by setting corresponding status in status map. +func TestHandlePluginResources(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + testKubelet.chainMock() + kl := testKubelet.kubelet + + adjustedResource := v1.ResourceName("domain1.com/adjustedResource") + unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce") + failedResource := v1.ResourceName("domain2.com/failedResource") + resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI) + resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI) + resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI) + allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI) + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{ + adjustedResource: resourceQuantity1, + unadjustedResouce: resourceQuantity1, + v1.ResourcePods: allowedPodQuantity, + }}}, + } + kl.nodeInfo = testNodeInfo{nodes: nodes} + + updatePluginResourcesFunc := func(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + // Maps from resourceName to the value we use to set node.allocatableResource[resourceName]. + // A resource with invalid value (< 0) causes the function to return an error + // to emulate resource Allocation failure. + // Resources not contained in this map will have their node.allocatableResource + // quantity unchanged. + updateResourceMap := map[v1.ResourceName]resource.Quantity{ + adjustedResource: resourceQuantity2, + failedResource: resourceQuantityInvalid, + } + pod := attrs.Pod + allocatableResource := node.AllocatableResource() + newAllocatableResource := allocatableResource.Clone() + for _, container := range pod.Spec.Containers { + for resource := range container.Resources.Requests { + newQuantity, exist := updateResourceMap[resource] + if !exist { + continue + } + if newQuantity.Value() < 0 { + return fmt.Errorf("Allocation failed") + } + newAllocatableResource.ScalarResources[resource] = newQuantity.Value() + } + } + node.SetAllocatableResource(newAllocatableResource) + return nil + } + + // add updatePluginResourcesFunc to admission handler, to test it's behavior. + kl.admitHandlers = lifecycle.PodAdmitHandlers{} + kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc)) + + // pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc + // adjusts node.allocatableResource for this resource to a sufficient value. + fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + adjustedResource: resourceQuantity2, + }, + Requests: v1.ResourceList{ + adjustedResource: resourceQuantity2, + }, + }}}, + } + // pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit. + exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + unadjustedResouce: resourceQuantity2, + }, + Requests: v1.ResourceList{ + unadjustedResouce: resourceQuantity2, + }, + }}}, + } + // pod requiring failedResource will fail with the resource failed to be allocated. + failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + failedResource: resourceQuantity1, + }, + Requests: v1.ResourceList{ + failedResource: resourceQuantity1, + }, + }}}, + } + pods := []*v1.Pod{ + podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec), + podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec), + podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec), + } + // The latter two pod should be rejected. + fittingPod := pods[0] + exceededPod := pods[1] + failedPod := pods[2] + + kl.HandlePodAdditions(pods) + + // Check pod status stored in the status map. + checkPodStatus(t, kl, fittingPod, v1.PodPending) + checkPodStatus(t, kl, exceededPod, v1.PodFailed) + checkPodStatus(t, kl, failedPod, v1.PodFailed) +} + // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. func TestPurgingObsoleteStatusMapEntries(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)