mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merging handler into manager API
This commit is contained in:
parent
db4134d03f
commit
b16bfc768d
@ -128,7 +128,7 @@ type containerManagerImpl struct {
|
|||||||
// Interface for QoS cgroup management
|
// Interface for QoS cgroup management
|
||||||
qosContainerManager QOSContainerManager
|
qosContainerManager QOSContainerManager
|
||||||
// Interface for exporting and allocating devices reported by device plugins.
|
// Interface for exporting and allocating devices reported by device plugins.
|
||||||
devicePluginHandler deviceplugin.Handler
|
devicePluginManager deviceplugin.Manager
|
||||||
// Interface for CPU affinity management.
|
// Interface for CPU affinity management.
|
||||||
cpuManager cpumanager.Manager
|
cpuManager cpumanager.Manager
|
||||||
}
|
}
|
||||||
@ -274,11 +274,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("Creating device plugin handler: %t", devicePluginEnabled)
|
glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
|
||||||
if devicePluginEnabled {
|
if devicePluginEnabled {
|
||||||
cm.devicePluginHandler, err = deviceplugin.NewHandlerImpl(updateDeviceCapacityFunc)
|
cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc)
|
||||||
} else {
|
} else {
|
||||||
cm.devicePluginHandler, err = deviceplugin.NewHandlerStub()
|
cm.devicePluginManager, err = deviceplugin.NewManagerStub()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -597,7 +597,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
|
|||||||
}, time.Second, stopChan)
|
}, time.Second, stopChan)
|
||||||
|
|
||||||
// Starts device plugin manager.
|
// Starts device plugin manager.
|
||||||
if err := cm.devicePluginHandler.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -622,7 +622,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
|
|||||||
opts := &kubecontainer.RunContainerOptions{}
|
opts := &kubecontainer.RunContainerOptions{}
|
||||||
// Allocate should already be called during predicateAdmitHandler.Admit(),
|
// Allocate should already be called during predicateAdmitHandler.Admit(),
|
||||||
// just try to fetch device runtime information from cached state here
|
// just try to fetch device runtime information from cached state here
|
||||||
devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container)
|
devOpts := cm.devicePluginManager.GetDeviceRunContainerOptions(pod, container)
|
||||||
if devOpts == nil {
|
if devOpts == nil {
|
||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
@ -633,7 +633,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
return cm.devicePluginHandler.Allocate(node, attrs)
|
return cm.devicePluginManager.Allocate(node, attrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
|
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
|
||||||
|
@ -9,11 +9,10 @@ load(
|
|||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"device_plugin_handler.go",
|
|
||||||
"device_plugin_handler_stub.go",
|
|
||||||
"device_plugin_stub.go",
|
"device_plugin_stub.go",
|
||||||
"endpoint.go",
|
"endpoint.go",
|
||||||
"manager.go",
|
"manager.go",
|
||||||
|
"manager_stub.go",
|
||||||
"pod_devices.go",
|
"pod_devices.go",
|
||||||
"types.go",
|
"types.go",
|
||||||
],
|
],
|
||||||
@ -49,7 +48,6 @@ filegroup(
|
|||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = [
|
||||||
"device_plugin_handler_test.go",
|
|
||||||
"endpoint_test.go",
|
"endpoint_test.go",
|
||||||
"manager_test.go",
|
"manager_test.go",
|
||||||
],
|
],
|
||||||
|
@ -1,369 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package deviceplugin
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
|
||||||
type ActivePodsFunc func() []*v1.Pod
|
|
||||||
|
|
||||||
// Handler defines the functions used to manage and access device plugin resources.
|
|
||||||
type Handler interface {
|
|
||||||
// Start starts device plugin registration service.
|
|
||||||
Start(activePods ActivePodsFunc) error
|
|
||||||
// Devices returns all of registered devices keyed by resourceName.
|
|
||||||
Devices() map[string][]pluginapi.Device
|
|
||||||
// Allocate scans through containers in the pod spec
|
|
||||||
// If it finds the container requires device plugin resource, it:
|
|
||||||
// 1. Checks whether it already has this information in its cached state.
|
|
||||||
// 2. If not, it calls Allocate and populate its cached state afterwards.
|
|
||||||
// 3. If there is no cached state and Allocate fails, it returns an error.
|
|
||||||
// 4. Otherwise, it updates allocatableResource in nodeInfo if necessary,
|
|
||||||
// to make sure it is at least equal to the pod's requested capacity for
|
|
||||||
// any registered device plugin resource
|
|
||||||
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
|
|
||||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
|
||||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
|
||||||
// for the found one. An empty struct is returned in case no cached state is found.
|
|
||||||
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandlerImpl implements the actual functionality to manage device plugin resources.
|
|
||||||
type HandlerImpl struct {
|
|
||||||
// TODO: consider to change this to RWMutex.
|
|
||||||
sync.Mutex
|
|
||||||
// devicePluginManager is an implementation of deviceplugin.Manager interface.
|
|
||||||
devicePluginManager Manager
|
|
||||||
// activePods is a method for listing active pods on the node
|
|
||||||
// so the amount of pluginResources requested by existing pods
|
|
||||||
// could be counted when updating allocated devices
|
|
||||||
activePods ActivePodsFunc
|
|
||||||
// devicePluginManagerMonitorCallback is used for updating devices' states in one time call.
|
|
||||||
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
|
||||||
devicePluginManagerMonitorCallback MonitorCallback
|
|
||||||
// allDevices contains all of registered resourceNames and their exported device IDs.
|
|
||||||
allDevices map[string]sets.String
|
|
||||||
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
|
|
||||||
allocatedDevices map[string]sets.String
|
|
||||||
// podDevices contains pod to allocated device mapping.
|
|
||||||
podDevices podDevices
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHandlerImpl creates a HandlerImpl to manage device plugin resources.
|
|
||||||
// updateCapacityFunc is called to update ContainerManager capacity when
|
|
||||||
// device capacity changes.
|
|
||||||
func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, error) {
|
|
||||||
glog.V(2).Infof("Creating Device Plugin Handler")
|
|
||||||
handler := &HandlerImpl{
|
|
||||||
allDevices: make(map[string]sets.String),
|
|
||||||
allocatedDevices: make(map[string]sets.String),
|
|
||||||
podDevices: make(podDevices),
|
|
||||||
}
|
|
||||||
|
|
||||||
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {
|
|
||||||
var capacity = v1.ResourceList{}
|
|
||||||
kept := append(updated, added...)
|
|
||||||
|
|
||||||
handler.Lock()
|
|
||||||
defer handler.Unlock()
|
|
||||||
|
|
||||||
if _, ok := handler.allDevices[resourceName]; !ok {
|
|
||||||
handler.allDevices[resourceName] = sets.NewString()
|
|
||||||
}
|
|
||||||
// For now, Handler only keeps track of healthy devices.
|
|
||||||
// We can revisit this later when the need comes to track unhealthy devices here.
|
|
||||||
for _, dev := range kept {
|
|
||||||
if dev.Health == pluginapi.Healthy {
|
|
||||||
handler.allDevices[resourceName].Insert(dev.ID)
|
|
||||||
} else {
|
|
||||||
handler.allDevices[resourceName].Delete(dev.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, dev := range deleted {
|
|
||||||
handler.allDevices[resourceName].Delete(dev.ID)
|
|
||||||
}
|
|
||||||
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI)
|
|
||||||
updateCapacityFunc(capacity)
|
|
||||||
}
|
|
||||||
|
|
||||||
mgr, err := NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
handler.devicePluginManager = mgr
|
|
||||||
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
|
||||||
|
|
||||||
return handler, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start initializes podDevices and allocatedDevices information from checkpoint-ed state
|
|
||||||
// and starts device plugin registration service.
|
|
||||||
func (h *HandlerImpl) Start(activePods ActivePodsFunc) error {
|
|
||||||
h.activePods = activePods
|
|
||||||
|
|
||||||
// Loads in allocatedDevices information from disk.
|
|
||||||
err := h.readCheckpoint()
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.devicePluginManager.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Devices returns all of registered devices keyed by resourceName.
|
|
||||||
func (h *HandlerImpl) Devices() map[string][]pluginapi.Device {
|
|
||||||
return h.devicePluginManager.Devices()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns list of device Ids we need to allocate with Allocate rpc call.
|
|
||||||
// Returns empty list in case we don't need to issue the Allocate rpc call.
|
|
||||||
func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
needed := required
|
|
||||||
// Gets list of devices that have already been allocated.
|
|
||||||
// This can happen if a container restarts for example.
|
|
||||||
devices := h.podDevices.containerDevices(podUID, contName, resource)
|
|
||||||
if devices != nil {
|
|
||||||
glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
|
|
||||||
needed = needed - devices.Len()
|
|
||||||
// A pod's resource is not expected to change once admitted by the API server,
|
|
||||||
// so just fail loudly here. We can revisit this part if this no longer holds.
|
|
||||||
if needed != 0 {
|
|
||||||
return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if needed == 0 {
|
|
||||||
// No change, no work.
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
devices = sets.NewString()
|
|
||||||
// Needs to allocate additional devices.
|
|
||||||
if h.allocatedDevices[resource] == nil {
|
|
||||||
h.allocatedDevices[resource] = sets.NewString()
|
|
||||||
}
|
|
||||||
// Gets Devices in use.
|
|
||||||
devicesInUse := h.allocatedDevices[resource]
|
|
||||||
// Gets a list of available devices.
|
|
||||||
available := h.allDevices[resource].Difference(devicesInUse)
|
|
||||||
if int(available.Len()) < needed {
|
|
||||||
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
|
|
||||||
}
|
|
||||||
allocated := available.UnsortedList()[:needed]
|
|
||||||
// Updates h.allocatedDevices with allocated devices to prevent them
|
|
||||||
// from being allocated to other pods/containers, given that we are
|
|
||||||
// not holding lock during the rpc call.
|
|
||||||
for _, device := range allocated {
|
|
||||||
h.allocatedDevices[resource].Insert(device)
|
|
||||||
devices.Insert(device)
|
|
||||||
}
|
|
||||||
return devices, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocateContainerResources attempts to allocate all of required device
|
|
||||||
// plugin resources for the input container, issues an Allocate rpc request
|
|
||||||
// for each new device resource requirement, processes their AllocateResponses,
|
|
||||||
// and updates the cached containerDevices on success.
|
|
||||||
func (h *HandlerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error {
|
|
||||||
podUID := string(pod.UID)
|
|
||||||
contName := container.Name
|
|
||||||
allocatedDevicesUpdated := false
|
|
||||||
for k, v := range container.Resources.Limits {
|
|
||||||
resource := string(k)
|
|
||||||
needed := int(v.Value())
|
|
||||||
glog.V(3).Infof("needs %d %s", needed, resource)
|
|
||||||
if _, registeredResource := h.allDevices[resource]; !registeredResource {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Updates allocatedDevices to garbage collect any stranded resources
|
|
||||||
// before doing the device plugin allocation.
|
|
||||||
if !allocatedDevicesUpdated {
|
|
||||||
h.updateAllocatedDevices(h.activePods())
|
|
||||||
allocatedDevicesUpdated = true
|
|
||||||
}
|
|
||||||
allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if allocDevices == nil || len(allocDevices) <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// devicePluginManager.Allocate involves RPC calls to device plugin, which
|
|
||||||
// could be heavy-weight. Therefore we want to perform this operation outside
|
|
||||||
// mutex lock. Note if Allcate call fails, we may leave container resources
|
|
||||||
// partially allocated for the failed container. We rely on updateAllocatedDevices()
|
|
||||||
// to garbage collect these resources later. Another side effect is that if
|
|
||||||
// we have X resource A and Y resource B in total, and two containers, container1
|
|
||||||
// and container2 both require X resource A and Y resource B. Both allocation
|
|
||||||
// requests may fail if we serve them in mixed order.
|
|
||||||
// TODO: may revisit this part later if we see inefficient resource allocation
|
|
||||||
// in real use as the result of this. Should also consider to parallize device
|
|
||||||
// plugin Allocate grpc calls if it becomes common that a container may require
|
|
||||||
// resources from multiple device plugins.
|
|
||||||
resp, err := h.devicePluginManager.Allocate(resource, allocDevices.UnsortedList())
|
|
||||||
if err != nil {
|
|
||||||
// In case of allocation failure, we want to restore h.allocatedDevices
|
|
||||||
// to the actual allocated state from h.podDevices.
|
|
||||||
h.Lock()
|
|
||||||
h.allocatedDevices = h.podDevices.devices()
|
|
||||||
h.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update internal cached podDevices state.
|
|
||||||
h.Lock()
|
|
||||||
h.podDevices.insert(podUID, contName, resource, allocDevices, resp)
|
|
||||||
h.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checkpoints device to container allocation information.
|
|
||||||
return h.writeCheckpoint()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate attempts to allocate all of required device plugin resources,
|
|
||||||
// and update Allocatable resources in nodeInfo if necessary
|
|
||||||
func (h *HandlerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
|
||||||
pod := attrs.Pod
|
|
||||||
// TODO: Reuse devices between init containers and regular containers.
|
|
||||||
for _, container := range pod.Spec.InitContainers {
|
|
||||||
if err := h.allocateContainerResources(pod, &container); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, container := range pod.Spec.Containers {
|
|
||||||
if err := h.allocateContainerResources(pod, &container); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// quick return if no pluginResources requested
|
|
||||||
if _, podRequireDevicePluginResource := h.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
h.sanitizeNodeAllocatable(node)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sanitizeNodeAllocatable scans through allocatedDevices in DevicePluginHandler
|
|
||||||
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
|
||||||
// the allocated capacity. This allows pods that have already been scheduled on
|
|
||||||
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
|
||||||
func (h *HandlerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
|
||||||
var newAllocatableResource *schedulercache.Resource
|
|
||||||
allocatableResource := node.AllocatableResource()
|
|
||||||
if allocatableResource.ScalarResources == nil {
|
|
||||||
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
|
||||||
}
|
|
||||||
for resource, devices := range h.allocatedDevices {
|
|
||||||
needed := devices.Len()
|
|
||||||
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
|
||||||
if ok && int(quant) >= needed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Needs to update nodeInfo.AllocatableResource to make sure
|
|
||||||
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
|
||||||
if newAllocatableResource == nil {
|
|
||||||
newAllocatableResource = allocatableResource.Clone()
|
|
||||||
}
|
|
||||||
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
|
||||||
}
|
|
||||||
if newAllocatableResource != nil {
|
|
||||||
node.SetAllocatableResource(newAllocatableResource)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
|
||||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
|
||||||
// for the found one. An empty struct is returned in case no cached state is found.
|
|
||||||
func (h *HandlerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
return h.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
|
||||||
// terminated pods. Returns error on failure.
|
|
||||||
func (h *HandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
activePodUids := sets.NewString()
|
|
||||||
for _, pod := range activePods {
|
|
||||||
activePodUids.Insert(string(pod.UID))
|
|
||||||
}
|
|
||||||
allocatedPodUids := h.podDevices.pods()
|
|
||||||
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
|
|
||||||
if len(podsToBeRemoved) <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
|
||||||
h.podDevices.delete(podsToBeRemoved.List())
|
|
||||||
// Regenerated allocatedDevices after we update pod allocation information.
|
|
||||||
h.allocatedDevices = h.podDevices.devices()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checkpoints device to container allocation information to disk.
|
|
||||||
func (h *HandlerImpl) writeCheckpoint() error {
|
|
||||||
h.Lock()
|
|
||||||
data := h.podDevices.toCheckpointData()
|
|
||||||
h.Unlock()
|
|
||||||
|
|
||||||
dataJSON, err := json.Marshal(data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
filepath := h.devicePluginManager.CheckpointFile()
|
|
||||||
return ioutil.WriteFile(filepath, dataJSON, 0644)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reads device to container allocation information from disk, and populates
|
|
||||||
// h.allocatedDevices accordingly.
|
|
||||||
func (h *HandlerImpl) readCheckpoint() error {
|
|
||||||
filepath := h.devicePluginManager.CheckpointFile()
|
|
||||||
content, err := ioutil.ReadFile(filepath)
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
|
|
||||||
var data checkpointData
|
|
||||||
if err := json.Unmarshal(content, &data); err != nil {
|
|
||||||
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
h.podDevices.fromCheckpointData(data)
|
|
||||||
h.allocatedDevices = h.podDevices.devices()
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,414 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package deviceplugin
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestUpdateCapacity(t *testing.T) {
|
|
||||||
var expected = v1.ResourceList{}
|
|
||||||
as := assert.New(t)
|
|
||||||
verifyCapacityFunc := func(updates v1.ResourceList) {
|
|
||||||
as.Equal(expected, updates)
|
|
||||||
}
|
|
||||||
testHandler, err := NewHandlerImpl(verifyCapacityFunc)
|
|
||||||
as.NotNil(testHandler)
|
|
||||||
as.Nil(err)
|
|
||||||
|
|
||||||
devs := []pluginapi.Device{
|
|
||||||
{ID: "Device1", Health: pluginapi.Healthy},
|
|
||||||
{ID: "Device2", Health: pluginapi.Healthy},
|
|
||||||
{ID: "Device3", Health: pluginapi.Unhealthy},
|
|
||||||
}
|
|
||||||
|
|
||||||
resourceName := "resource1"
|
|
||||||
// Adds three devices for resource1, two healthy and one unhealthy.
|
|
||||||
// Expects capacity for resource1 to be 2.
|
|
||||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
|
||||||
testHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
|
||||||
// Deletes an unhealthy device should NOT change capacity.
|
|
||||||
testHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
|
||||||
// Updates a healthy device to unhealthy should reduce capacity by 1.
|
|
||||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
|
|
||||||
// Deletes a healthy device should reduce capacity by 1.
|
|
||||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
|
||||||
// Tests adding another resource.
|
|
||||||
delete(expected, v1.ResourceName(resourceName))
|
|
||||||
resourceName2 := "resource2"
|
|
||||||
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
|
||||||
testHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type stringPairType struct {
|
|
||||||
value1 string
|
|
||||||
value2 string
|
|
||||||
}
|
|
||||||
|
|
||||||
// DevicePluginManager stub to test device Allocation behavior.
|
|
||||||
type DevicePluginManagerTestStub struct {
|
|
||||||
// All data structs are keyed by resourceName+DevId
|
|
||||||
devRuntimeDevices map[string][]stringPairType
|
|
||||||
devRuntimeMounts map[string][]stringPairType
|
|
||||||
devRuntimeEnvs map[string][]stringPairType
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDevicePluginManagerTestStub() (*DevicePluginManagerTestStub, error) {
|
|
||||||
return &DevicePluginManagerTestStub{
|
|
||||||
devRuntimeDevices: make(map[string][]stringPairType),
|
|
||||||
devRuntimeMounts: make(map[string][]stringPairType),
|
|
||||||
devRuntimeEnvs: make(map[string][]stringPairType),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DevicePluginManagerTestStub) Start() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device {
|
|
||||||
return make(map[string][]pluginapi.Device)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) {
|
|
||||||
resp := new(pluginapi.AllocateResponse)
|
|
||||||
resp.Envs = make(map[string]string)
|
|
||||||
for _, id := range devIds {
|
|
||||||
key := resourceName + id
|
|
||||||
fmt.Printf("Alloc device %v for resource %v\n", id, resourceName)
|
|
||||||
for _, dev := range m.devRuntimeDevices[key] {
|
|
||||||
fmt.Printf("Add dev %v %v\n", dev.value1, dev.value2)
|
|
||||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
|
||||||
ContainerPath: dev.value1,
|
|
||||||
HostPath: dev.value2,
|
|
||||||
Permissions: "mrw",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for _, mount := range m.devRuntimeMounts[key] {
|
|
||||||
fmt.Printf("Add mount %v %v\n", mount.value1, mount.value2)
|
|
||||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
|
||||||
ContainerPath: mount.value1,
|
|
||||||
HostPath: mount.value2,
|
|
||||||
ReadOnly: true,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for _, env := range m.devRuntimeEnvs[key] {
|
|
||||||
fmt.Printf("Add env %v %v\n", env.value1, env.value2)
|
|
||||||
resp.Envs[env.value1] = env.value2
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DevicePluginManagerTestStub) Stop() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DevicePluginManagerTestStub) CheckpointFile() string {
|
|
||||||
return "/tmp/device-plugin-checkpoint"
|
|
||||||
}
|
|
||||||
|
|
||||||
func constructDevices(devices []string) sets.String {
|
|
||||||
ret := sets.NewString()
|
|
||||||
for _, dev := range devices {
|
|
||||||
ret.Insert(dev)
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse {
|
|
||||||
resp := &pluginapi.AllocateResponse{}
|
|
||||||
for k, v := range devices {
|
|
||||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
|
||||||
HostPath: k,
|
|
||||||
ContainerPath: v,
|
|
||||||
Permissions: "mrw",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for k, v := range mounts {
|
|
||||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
|
||||||
ContainerPath: k,
|
|
||||||
HostPath: v,
|
|
||||||
ReadOnly: true,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
resp.Envs = make(map[string]string)
|
|
||||||
for k, v := range envs {
|
|
||||||
resp.Envs[k] = v
|
|
||||||
}
|
|
||||||
return resp
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCheckpoint(t *testing.T) {
|
|
||||||
resourceName1 := "domain1.com/resource1"
|
|
||||||
resourceName2 := "domain2.com/resource2"
|
|
||||||
|
|
||||||
m, err := NewDevicePluginManagerTestStub()
|
|
||||||
as := assert.New(t)
|
|
||||||
as.Nil(err)
|
|
||||||
|
|
||||||
testHandler := &HandlerImpl{
|
|
||||||
devicePluginManager: m,
|
|
||||||
allDevices: make(map[string]sets.String),
|
|
||||||
allocatedDevices: make(map[string]sets.String),
|
|
||||||
podDevices: make(podDevices),
|
|
||||||
}
|
|
||||||
|
|
||||||
testHandler.podDevices.insert("pod1", "con1", resourceName1,
|
|
||||||
constructDevices([]string{"dev1", "dev2"}),
|
|
||||||
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
|
|
||||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
|
||||||
testHandler.podDevices.insert("pod1", "con1", resourceName2,
|
|
||||||
constructDevices([]string{"dev1", "dev2"}),
|
|
||||||
constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
|
|
||||||
map[string]string{"/home/r2lib1": "/usr/r2lib1"},
|
|
||||||
map[string]string{"r2devices": "dev1 dev2"}))
|
|
||||||
testHandler.podDevices.insert("pod1", "con2", resourceName1,
|
|
||||||
constructDevices([]string{"dev3"}),
|
|
||||||
constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
|
|
||||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
|
||||||
testHandler.podDevices.insert("pod2", "con1", resourceName1,
|
|
||||||
constructDevices([]string{"dev4"}),
|
|
||||||
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
|
|
||||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
|
||||||
|
|
||||||
expectedPodDevices := testHandler.podDevices
|
|
||||||
expectedAllocatedDevices := testHandler.podDevices.devices()
|
|
||||||
|
|
||||||
err = testHandler.writeCheckpoint()
|
|
||||||
as.Nil(err)
|
|
||||||
testHandler.podDevices = make(podDevices)
|
|
||||||
err = testHandler.readCheckpoint()
|
|
||||||
as.Nil(err)
|
|
||||||
|
|
||||||
as.Equal(len(expectedPodDevices), len(testHandler.podDevices))
|
|
||||||
for podUID, containerDevices := range expectedPodDevices {
|
|
||||||
for conName, resources := range containerDevices {
|
|
||||||
for resource := range resources {
|
|
||||||
as.True(reflect.DeepEqual(
|
|
||||||
expectedPodDevices.containerDevices(podUID, conName, resource),
|
|
||||||
testHandler.podDevices.containerDevices(podUID, conName, resource)))
|
|
||||||
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
|
|
||||||
opts2 := testHandler.podDevices.deviceRunContainerOptions(podUID, conName)
|
|
||||||
as.Equal(len(opts1.Envs), len(opts2.Envs))
|
|
||||||
as.Equal(len(opts1.Mounts), len(opts2.Mounts))
|
|
||||||
as.Equal(len(opts1.Devices), len(opts2.Devices))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
|
||||||
}
|
|
||||||
|
|
||||||
type activePodsStub struct {
|
|
||||||
activePods []*v1.Pod
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
|
||||||
return a.activePods
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
|
||||||
a.activePods = newPods
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
|
||||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
|
||||||
var logLevel string
|
|
||||||
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
|
||||||
flag.Lookup("v").Value.Set(logLevel)
|
|
||||||
|
|
||||||
resourceName1 := "domain1.com/resource1"
|
|
||||||
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
|
||||||
devID1 := "dev1"
|
|
||||||
devID2 := "dev2"
|
|
||||||
resourceName2 := "domain2.com/resource2"
|
|
||||||
resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
|
||||||
devID3 := "dev3"
|
|
||||||
devID4 := "dev4"
|
|
||||||
|
|
||||||
m, err := NewDevicePluginManagerTestStub()
|
|
||||||
as := assert.New(t)
|
|
||||||
as.Nil(err)
|
|
||||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
|
||||||
podsStub := activePodsStub{
|
|
||||||
activePods: []*v1.Pod{},
|
|
||||||
}
|
|
||||||
cachedNode := &v1.Node{
|
|
||||||
Status: v1.NodeStatus{
|
|
||||||
Allocatable: v1.ResourceList{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
nodeInfo := &schedulercache.NodeInfo{}
|
|
||||||
nodeInfo.SetNode(cachedNode)
|
|
||||||
|
|
||||||
testHandler := &HandlerImpl{
|
|
||||||
devicePluginManager: m,
|
|
||||||
devicePluginManagerMonitorCallback: monitorCallback,
|
|
||||||
allDevices: make(map[string]sets.String),
|
|
||||||
allocatedDevices: make(map[string]sets.String),
|
|
||||||
podDevices: make(podDevices),
|
|
||||||
activePods: podsStub.getActivePods,
|
|
||||||
}
|
|
||||||
testHandler.allDevices[resourceName1] = sets.NewString()
|
|
||||||
testHandler.allDevices[resourceName1].Insert(devID1)
|
|
||||||
testHandler.allDevices[resourceName1].Insert(devID2)
|
|
||||||
testHandler.allDevices[resourceName2] = sets.NewString()
|
|
||||||
testHandler.allDevices[resourceName2].Insert(devID3)
|
|
||||||
testHandler.allDevices[resourceName2].Insert(devID4)
|
|
||||||
|
|
||||||
m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/aaa", "/dev/aaa"})
|
|
||||||
m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/bbb", "/dev/bbb"})
|
|
||||||
m.devRuntimeDevices[resourceName1+devID2] = append(m.devRuntimeDevices[resourceName1+devID2], stringPairType{"/dev/ccc", "/dev/ccc"})
|
|
||||||
m.devRuntimeMounts[resourceName1+devID1] = append(m.devRuntimeMounts[resourceName1+devID1], stringPairType{"/container_dir1/file1", "host_dir1/file1"})
|
|
||||||
m.devRuntimeMounts[resourceName1+devID2] = append(m.devRuntimeMounts[resourceName1+devID2], stringPairType{"/container_dir1/file2", "host_dir1/file2"})
|
|
||||||
m.devRuntimeEnvs[resourceName1+devID2] = append(m.devRuntimeEnvs[resourceName1+devID2], stringPairType{"key1", "val1"})
|
|
||||||
m.devRuntimeEnvs[resourceName2+devID3] = append(m.devRuntimeEnvs[resourceName2+devID3], stringPairType{"key2", "val2"})
|
|
||||||
m.devRuntimeEnvs[resourceName2+devID4] = append(m.devRuntimeEnvs[resourceName2+devID4], stringPairType{"key2", "val3"})
|
|
||||||
|
|
||||||
pod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
UID: uuid.NewUUID(),
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: string(uuid.NewUUID()),
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Limits: v1.ResourceList{
|
|
||||||
v1.ResourceName(resourceName1): resourceQuantity1,
|
|
||||||
v1.ResourceName("cpu"): resourceQuantity1,
|
|
||||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
podsStub.updateActivePods([]*v1.Pod{pod})
|
|
||||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
|
||||||
as.Nil(err)
|
|
||||||
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
|
||||||
as.Equal(len(runContainerOpts.Devices), 3)
|
|
||||||
as.Equal(len(runContainerOpts.Mounts), 2)
|
|
||||||
as.Equal(len(runContainerOpts.Envs), 2)
|
|
||||||
|
|
||||||
// Requesting to create a pod without enough resources should fail.
|
|
||||||
as.Equal(2, testHandler.allocatedDevices[resourceName1].Len())
|
|
||||||
failPod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
UID: uuid.NewUUID(),
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: string(uuid.NewUUID()),
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Limits: v1.ResourceList{
|
|
||||||
v1.ResourceName(resourceName1): resourceQuantity2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
|
||||||
as.NotNil(err)
|
|
||||||
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
|
||||||
as.Nil(runContainerOpts2)
|
|
||||||
|
|
||||||
// Requesting to create a new pod with a single resourceName2 should succeed.
|
|
||||||
newPod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
UID: uuid.NewUUID(),
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: string(uuid.NewUUID()),
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Limits: v1.ResourceList{
|
|
||||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
|
||||||
as.Nil(err)
|
|
||||||
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
|
||||||
as.Equal(1, len(runContainerOpts3.Envs))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSanitizeNodeAllocatable(t *testing.T) {
|
|
||||||
resourceName1 := "domain1.com/resource1"
|
|
||||||
devID1 := "dev1"
|
|
||||||
|
|
||||||
resourceName2 := "domain2.com/resource2"
|
|
||||||
devID2 := "dev2"
|
|
||||||
|
|
||||||
m, err := NewDevicePluginManagerTestStub()
|
|
||||||
as := assert.New(t)
|
|
||||||
as.Nil(err)
|
|
||||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
|
||||||
|
|
||||||
testHandler := &HandlerImpl{
|
|
||||||
devicePluginManager: m,
|
|
||||||
devicePluginManagerMonitorCallback: monitorCallback,
|
|
||||||
allDevices: make(map[string]sets.String),
|
|
||||||
allocatedDevices: make(map[string]sets.String),
|
|
||||||
podDevices: make(podDevices),
|
|
||||||
}
|
|
||||||
// require one of resource1 and one of resource2
|
|
||||||
testHandler.allocatedDevices[resourceName1] = sets.NewString()
|
|
||||||
testHandler.allocatedDevices[resourceName1].Insert(devID1)
|
|
||||||
testHandler.allocatedDevices[resourceName2] = sets.NewString()
|
|
||||||
testHandler.allocatedDevices[resourceName2].Insert(devID2)
|
|
||||||
|
|
||||||
cachedNode := &v1.Node{
|
|
||||||
Status: v1.NodeStatus{
|
|
||||||
Allocatable: v1.ResourceList{
|
|
||||||
// has no resource1 and two of resource2
|
|
||||||
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
nodeInfo := &schedulercache.NodeInfo{}
|
|
||||||
nodeInfo.SetNode(cachedNode)
|
|
||||||
|
|
||||||
testHandler.sanitizeNodeAllocatable(nodeInfo)
|
|
||||||
|
|
||||||
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
|
||||||
// allocatable in nodeInfo is less than needed, should update
|
|
||||||
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
|
||||||
// allocatable in nodeInfo is more than needed, should skip updating
|
|
||||||
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
|
||||||
}
|
|
@ -32,7 +32,15 @@ import (
|
|||||||
// endpoint maps to a single registered device plugin. It is responsible
|
// endpoint maps to a single registered device plugin. It is responsible
|
||||||
// for managing gRPC communications with the device plugin and caching
|
// for managing gRPC communications with the device plugin and caching
|
||||||
// device states reported by the device plugin.
|
// device states reported by the device plugin.
|
||||||
type endpoint struct {
|
type endpoint interface {
|
||||||
|
run()
|
||||||
|
stop()
|
||||||
|
allocate(devs []string) (*pluginapi.AllocateResponse, error)
|
||||||
|
getDevices() []pluginapi.Device
|
||||||
|
callback(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||||
|
}
|
||||||
|
|
||||||
|
type endpointImpl struct {
|
||||||
client pluginapi.DevicePluginClient
|
client pluginapi.DevicePluginClient
|
||||||
clientConn *grpc.ClientConn
|
clientConn *grpc.ClientConn
|
||||||
|
|
||||||
@ -42,18 +50,18 @@ type endpoint struct {
|
|||||||
devices map[string]pluginapi.Device
|
devices map[string]pluginapi.Device
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
|
||||||
callback MonitorCallback
|
cb monitorCallback
|
||||||
}
|
}
|
||||||
|
|
||||||
// newEndpoint creates a new endpoint for the given resourceName.
|
// newEndpoint creates a new endpoint for the given resourceName.
|
||||||
func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.Device, callback MonitorCallback) (*endpoint, error) {
|
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) {
|
||||||
client, c, err := dial(socketPath)
|
client, c, err := dial(socketPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &endpoint{
|
return &endpointImpl{
|
||||||
client: client,
|
client: client,
|
||||||
clientConn: c,
|
clientConn: c,
|
||||||
|
|
||||||
@ -61,11 +69,15 @@ func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.D
|
|||||||
resourceName: resourceName,
|
resourceName: resourceName,
|
||||||
|
|
||||||
devices: devices,
|
devices: devices,
|
||||||
callback: callback,
|
cb: callback,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *endpoint) getDevices() []pluginapi.Device {
|
func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||||
|
e.cb(resourceName, added, updated, deleted)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *endpointImpl) getDevices() []pluginapi.Device {
|
||||||
e.mutex.Lock()
|
e.mutex.Lock()
|
||||||
defer e.mutex.Unlock()
|
defer e.mutex.Unlock()
|
||||||
var devs []pluginapi.Device
|
var devs []pluginapi.Device
|
||||||
@ -81,11 +93,9 @@ func (e *endpoint) getDevices() []pluginapi.Device {
|
|||||||
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
|
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
|
||||||
// stream update contains a new list of device states. listAndWatch compares the new
|
// stream update contains a new list of device states. listAndWatch compares the new
|
||||||
// device states with its cached states to get list of new, updated, and deleted devices.
|
// device states with its cached states to get list of new, updated, and deleted devices.
|
||||||
// It then issues a callback to pass this information to the device_plugin_handler which
|
// It then issues a callback to pass this information to the device manager which
|
||||||
// will adjust the resource available information accordingly.
|
// will adjust the resource available information accordingly.
|
||||||
func (e *endpoint) run() {
|
func (e *endpointImpl) run() {
|
||||||
glog.V(3).Infof("Starting ListAndWatch")
|
|
||||||
|
|
||||||
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
|
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf(errListAndWatch, e.resourceName, err)
|
glog.Errorf(errListAndWatch, e.resourceName, err)
|
||||||
@ -162,13 +172,13 @@ func (e *endpoint) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// allocate issues Allocate gRPC call to the device plugin.
|
// allocate issues Allocate gRPC call to the device plugin.
|
||||||
func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||||
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||||
DevicesIDs: devs,
|
DevicesIDs: devs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *endpoint) stop() {
|
func (e *endpointImpl) stop() {
|
||||||
e.clientConn.Close()
|
e.clientConn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func TestRun(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetDevices(t *testing.T) {
|
func TestGetDevices(t *testing.T) {
|
||||||
e := endpoint{
|
e := endpointImpl{
|
||||||
devices: map[string]pluginapi.Device{
|
devices: map[string]pluginapi.Device{
|
||||||
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
|
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
|
||||||
},
|
},
|
||||||
@ -96,19 +96,19 @@ func TestGetDevices(t *testing.T) {
|
|||||||
require.Len(t, devs, 1)
|
require.Len(t, devs, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) {
|
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
|
||||||
p := NewDevicePluginStub(devs, socket)
|
p := NewDevicePluginStub(devs, socket)
|
||||||
|
|
||||||
err := p.Start()
|
err := p.Start()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
e, err := newEndpoint(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {})
|
e, err := newEndpointImpl(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return p, e
|
return p, e
|
||||||
}
|
}
|
||||||
|
|
||||||
func ecleanup(t *testing.T, p *Stub, e *endpoint) {
|
func ecleanup(t *testing.T, p *Stub, e *endpointImpl) {
|
||||||
p.Stop()
|
p.Stop()
|
||||||
e.stop()
|
e.stop()
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,9 @@ limitations under the License.
|
|||||||
package deviceplugin
|
package deviceplugin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -28,27 +30,58 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
||||||
|
type ActivePodsFunc func() []*v1.Pod
|
||||||
|
|
||||||
|
// monitorCallback is the function called when a device's health state changes,
|
||||||
|
// or new devices are reported, or old devices are deleted.
|
||||||
|
// Updated contains the most recent state of the Device.
|
||||||
|
type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||||
|
|
||||||
// ManagerImpl is the structure in charge of managing Device Plugins.
|
// ManagerImpl is the structure in charge of managing Device Plugins.
|
||||||
type ManagerImpl struct {
|
type ManagerImpl struct {
|
||||||
socketname string
|
socketname string
|
||||||
socketdir string
|
socketdir string
|
||||||
|
|
||||||
endpoints map[string]*endpoint // Key is ResourceName
|
endpoints map[string]endpoint // Key is ResourceName
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
|
||||||
callback MonitorCallback
|
|
||||||
|
|
||||||
server *grpc.Server
|
server *grpc.Server
|
||||||
|
|
||||||
|
// activePods is a method for listing active pods on the node
|
||||||
|
// so the amount of pluginResources requested by existing pods
|
||||||
|
// could be counted when updating allocated devices
|
||||||
|
activePods ActivePodsFunc
|
||||||
|
|
||||||
|
// callback is used for updating devices' states in one time call.
|
||||||
|
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
||||||
|
callback monitorCallback
|
||||||
|
|
||||||
|
// allDevices contains all of registered resourceNames and their exported device IDs.
|
||||||
|
allDevices map[string]sets.String
|
||||||
|
|
||||||
|
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
|
||||||
|
allocatedDevices map[string]sets.String
|
||||||
|
|
||||||
|
// podDevices contains pod to allocated device mapping.
|
||||||
|
podDevices podDevices
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManagerImpl creates a new manager on the socket `socketPath`.
|
// NewManagerImpl creates a new manager. updateCapacityFunc is called to
|
||||||
// f is the callback that is called when a device becomes unhealthy.
|
// update ContainerManager capacity when device capacity changes.
|
||||||
// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket
|
func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) {
|
||||||
func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) {
|
return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) {
|
||||||
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
||||||
|
|
||||||
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
||||||
@ -56,13 +89,42 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
dir, file := filepath.Split(socketPath)
|
dir, file := filepath.Split(socketPath)
|
||||||
return &ManagerImpl{
|
manager := &ManagerImpl{
|
||||||
endpoints: make(map[string]*endpoint),
|
endpoints: make(map[string]endpoint),
|
||||||
|
|
||||||
socketname: file,
|
socketname: file,
|
||||||
socketdir: dir,
|
socketdir: dir,
|
||||||
callback: f,
|
allDevices: make(map[string]sets.String),
|
||||||
}, nil
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
}
|
||||||
|
|
||||||
|
manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||||
|
var capacity = v1.ResourceList{}
|
||||||
|
kept := append(updated, added...)
|
||||||
|
|
||||||
|
manager.mutex.Lock()
|
||||||
|
defer manager.mutex.Unlock()
|
||||||
|
|
||||||
|
if _, ok := manager.allDevices[resourceName]; !ok {
|
||||||
|
manager.allDevices[resourceName] = sets.NewString()
|
||||||
|
}
|
||||||
|
// For now, Manager only keeps track of healthy devices.
|
||||||
|
// We can revisit this later when the need comes to track unhealthy devices here.
|
||||||
|
for _, dev := range kept {
|
||||||
|
if dev.Health == pluginapi.Healthy {
|
||||||
|
manager.allDevices[resourceName].Insert(dev.ID)
|
||||||
|
} else {
|
||||||
|
manager.allDevices[resourceName].Delete(dev.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, dev := range deleted {
|
||||||
|
manager.allDevices[resourceName].Delete(dev.ID)
|
||||||
|
}
|
||||||
|
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI)
|
||||||
|
updateCapacityFunc(capacity)
|
||||||
|
}
|
||||||
|
|
||||||
|
return manager, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ManagerImpl) removeContents(dir string) error {
|
func (m *ManagerImpl) removeContents(dir string) error {
|
||||||
@ -77,7 +139,7 @@ func (m *ManagerImpl) removeContents(dir string) error {
|
|||||||
}
|
}
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
filePath := filepath.Join(dir, name)
|
filePath := filepath.Join(dir, name)
|
||||||
if filePath == m.CheckpointFile() {
|
if filePath == m.checkpointFile() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
stat, err := os.Stat(filePath)
|
stat, err := os.Stat(filePath)
|
||||||
@ -101,15 +163,25 @@ const (
|
|||||||
kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint"
|
kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CheckpointFile returns device plugin checkpoint file path.
|
// checkpointFile returns device plugin checkpoint file path.
|
||||||
func (m *ManagerImpl) CheckpointFile() string {
|
func (m *ManagerImpl) checkpointFile() string {
|
||||||
return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint)
|
return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the Device Plugin Manager
|
// Start starts the Device Plugin Manager amd start initialization of
|
||||||
func (m *ManagerImpl) Start() error {
|
// podDevices and allocatedDevices information from checkpoint-ed state and
|
||||||
|
// starts device plugin registration service.
|
||||||
|
func (m *ManagerImpl) Start(activePods ActivePodsFunc) error {
|
||||||
glog.V(2).Infof("Starting Device Plugin manager")
|
glog.V(2).Infof("Starting Device Plugin manager")
|
||||||
|
|
||||||
|
m.activePods = activePods
|
||||||
|
|
||||||
|
// Loads in allocatedDevices information from disk.
|
||||||
|
err := m.readCheckpoint()
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
socketPath := filepath.Join(m.socketdir, m.socketname)
|
socketPath := filepath.Join(m.socketdir, m.socketname)
|
||||||
os.MkdirAll(m.socketdir, 0755)
|
os.MkdirAll(m.socketdir, 0755)
|
||||||
|
|
||||||
@ -130,6 +202,8 @@ func (m *ManagerImpl) Start() error {
|
|||||||
pluginapi.RegisterRegistrationServer(m.server, m)
|
pluginapi.RegisterRegistrationServer(m.server, m)
|
||||||
go m.server.Serve(s)
|
go m.server.Serve(s)
|
||||||
|
|
||||||
|
glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,22 +224,27 @@ func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
|
|||||||
|
|
||||||
// Allocate is the call that you can use to allocate a set of devices
|
// Allocate is the call that you can use to allocate a set of devices
|
||||||
// from the registered device plugins.
|
// from the registered device plugins.
|
||||||
func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.AllocateResponse, error) {
|
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
|
pod := attrs.Pod
|
||||||
if len(devs) == 0 {
|
// TODO: Reuse devices between init containers and regular containers.
|
||||||
return nil, nil
|
for _, container := range pod.Spec.InitContainers {
|
||||||
|
if err := m.allocateContainerResources(pod, &container); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
if err := m.allocateContainerResources(pod, &container); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s",
|
// quick return if no pluginResources requested
|
||||||
devs, resourceName)
|
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
||||||
m.mutex.Lock()
|
return nil
|
||||||
e, ok := m.endpoints[resourceName]
|
|
||||||
m.mutex.Unlock()
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.allocate(devs)
|
m.sanitizeNodeAllocatable(node)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register registers a device plugin.
|
// Register registers a device plugin.
|
||||||
@ -211,12 +290,16 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||||||
if ok && old != nil {
|
if ok && old != nil {
|
||||||
// Pass devices of previous endpoint into re-registered one,
|
// Pass devices of previous endpoint into re-registered one,
|
||||||
// to avoid potential orphaned devices upon re-registration
|
// to avoid potential orphaned devices upon re-registration
|
||||||
existingDevs = old.devices
|
devices := make(map[string]pluginapi.Device)
|
||||||
|
for _, device := range old.getDevices() {
|
||||||
|
devices[device.ID] = device
|
||||||
|
}
|
||||||
|
existingDevs = devices
|
||||||
}
|
}
|
||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
|
|
||||||
socketPath := filepath.Join(m.socketdir, r.Endpoint)
|
socketPath := filepath.Join(m.socketdir, r.Endpoint)
|
||||||
e, err := newEndpoint(socketPath, r.ResourceName, existingDevs, m.callback)
|
e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||||
return
|
return
|
||||||
@ -259,3 +342,212 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||||||
m.mutex.Unlock()
|
m.mutex.Unlock()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checkpoints device to container allocation information to disk.
|
||||||
|
func (m *ManagerImpl) writeCheckpoint() error {
|
||||||
|
m.mutex.Lock()
|
||||||
|
data := m.podDevices.toCheckpointData()
|
||||||
|
m.mutex.Unlock()
|
||||||
|
|
||||||
|
dataJSON, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
filepath := m.checkpointFile()
|
||||||
|
return ioutil.WriteFile(filepath, dataJSON, 0644)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads device to container allocation information from disk, and populates
|
||||||
|
// m.allocatedDevices accordingly.
|
||||||
|
func (m *ManagerImpl) readCheckpoint() error {
|
||||||
|
filepath := m.checkpointFile()
|
||||||
|
content, err := ioutil.ReadFile(filepath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
|
||||||
|
var data checkpointData
|
||||||
|
if err := json.Unmarshal(content, &data); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
m.podDevices.fromCheckpointData(data)
|
||||||
|
m.allocatedDevices = m.podDevices.devices()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
||||||
|
// terminated pods. Returns error on failure.
|
||||||
|
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
activePodUids := sets.NewString()
|
||||||
|
for _, pod := range activePods {
|
||||||
|
activePodUids.Insert(string(pod.UID))
|
||||||
|
}
|
||||||
|
allocatedPodUids := m.podDevices.pods()
|
||||||
|
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
|
||||||
|
if len(podsToBeRemoved) <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||||
|
m.podDevices.delete(podsToBeRemoved.List())
|
||||||
|
// Regenerated allocatedDevices after we update pod allocation information.
|
||||||
|
m.allocatedDevices = m.podDevices.devices()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns list of device Ids we need to allocate with Allocate rpc call.
|
||||||
|
// Returns empty list in case we don't need to issue the Allocate rpc call.
|
||||||
|
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
needed := required
|
||||||
|
// Gets list of devices that have already been allocated.
|
||||||
|
// This can happen if a container restarts for example.
|
||||||
|
devices := m.podDevices.containerDevices(podUID, contName, resource)
|
||||||
|
if devices != nil {
|
||||||
|
glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
|
||||||
|
needed = needed - devices.Len()
|
||||||
|
// A pod's resource is not expected to change once admitted by the API server,
|
||||||
|
// so just fail loudly here. We can revisit this part if this no longer holds.
|
||||||
|
if needed != 0 {
|
||||||
|
return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needed == 0 {
|
||||||
|
// No change, no work.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
devices = sets.NewString()
|
||||||
|
// Needs to allocate additional devices.
|
||||||
|
if m.allocatedDevices[resource] == nil {
|
||||||
|
m.allocatedDevices[resource] = sets.NewString()
|
||||||
|
}
|
||||||
|
// Gets Devices in use.
|
||||||
|
devicesInUse := m.allocatedDevices[resource]
|
||||||
|
// Gets a list of available devices.
|
||||||
|
available := m.allDevices[resource].Difference(devicesInUse)
|
||||||
|
if int(available.Len()) < needed {
|
||||||
|
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
|
||||||
|
}
|
||||||
|
allocated := available.UnsortedList()[:needed]
|
||||||
|
// Updates m.allocatedDevices with allocated devices to prevent them
|
||||||
|
// from being allocated to other pods/containers, given that we are
|
||||||
|
// not holding lock during the rpc call.
|
||||||
|
for _, device := range allocated {
|
||||||
|
m.allocatedDevices[resource].Insert(device)
|
||||||
|
devices.Insert(device)
|
||||||
|
}
|
||||||
|
return devices, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// allocateContainerResources attempts to allocate all of required device
|
||||||
|
// plugin resources for the input container, issues an Allocate rpc request
|
||||||
|
// for each new device resource requirement, processes their AllocateResponses,
|
||||||
|
// and updates the cached containerDevices on success.
|
||||||
|
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error {
|
||||||
|
podUID := string(pod.UID)
|
||||||
|
contName := container.Name
|
||||||
|
allocatedDevicesUpdated := false
|
||||||
|
for k, v := range container.Resources.Limits {
|
||||||
|
resource := string(k)
|
||||||
|
needed := int(v.Value())
|
||||||
|
glog.V(3).Infof("needs %d %s", needed, resource)
|
||||||
|
if _, registeredResource := m.allDevices[resource]; !registeredResource {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Updates allocatedDevices to garbage collect any stranded resources
|
||||||
|
// before doing the device plugin allocation.
|
||||||
|
if !allocatedDevicesUpdated {
|
||||||
|
m.updateAllocatedDevices(m.activePods())
|
||||||
|
allocatedDevicesUpdated = true
|
||||||
|
}
|
||||||
|
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if allocDevices == nil || len(allocDevices) <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// devicePluginManager.Allocate involves RPC calls to device plugin, which
|
||||||
|
// could be heavy-weight. Therefore we want to perform this operation outside
|
||||||
|
// mutex lock. Note if Allocate call fails, we may leave container resources
|
||||||
|
// partially allocated for the failed container. We rely on updateAllocatedDevices()
|
||||||
|
// to garbage collect these resources later. Another side effect is that if
|
||||||
|
// we have X resource A and Y resource B in total, and two containers, container1
|
||||||
|
// and container2 both require X resource A and Y resource B. Both allocation
|
||||||
|
// requests may fail if we serve them in mixed order.
|
||||||
|
// TODO: may revisit this part later if we see inefficient resource allocation
|
||||||
|
// in real use as the result of this. Should also consider to parallize device
|
||||||
|
// plugin Allocate grpc calls if it becomes common that a container may require
|
||||||
|
// resources from multiple device plugins.
|
||||||
|
m.mutex.Lock()
|
||||||
|
e, ok := m.endpoints[resource]
|
||||||
|
m.mutex.Unlock()
|
||||||
|
if !ok {
|
||||||
|
m.mutex.Lock()
|
||||||
|
m.allocatedDevices = m.podDevices.devices()
|
||||||
|
m.mutex.Unlock()
|
||||||
|
return fmt.Errorf("Unknown Device Plugin %s", resource)
|
||||||
|
}
|
||||||
|
|
||||||
|
devs := allocDevices.UnsortedList()
|
||||||
|
glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
|
||||||
|
resp, err := e.allocate(devs)
|
||||||
|
if err != nil {
|
||||||
|
// In case of allocation failure, we want to restore m.allocatedDevices
|
||||||
|
// to the actual allocated state from m.podDevices.
|
||||||
|
m.mutex.Lock()
|
||||||
|
m.allocatedDevices = m.podDevices.devices()
|
||||||
|
m.mutex.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update internal cached podDevices state.
|
||||||
|
m.mutex.Lock()
|
||||||
|
m.podDevices.insert(podUID, contName, resource, allocDevices, resp)
|
||||||
|
m.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checkpoints device to container allocation information.
|
||||||
|
return m.writeCheckpoint()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||||
|
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||||
|
// for the found one. An empty struct is returned in case no cached state is found.
|
||||||
|
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
|
||||||
|
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
||||||
|
// the allocated capacity. This allows pods that have already been scheduled on
|
||||||
|
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
||||||
|
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
||||||
|
var newAllocatableResource *schedulercache.Resource
|
||||||
|
allocatableResource := node.AllocatableResource()
|
||||||
|
if allocatableResource.ScalarResources == nil {
|
||||||
|
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
||||||
|
}
|
||||||
|
for resource, devices := range m.allocatedDevices {
|
||||||
|
needed := devices.Len()
|
||||||
|
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
||||||
|
if ok && int(quant) >= needed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Needs to update nodeInfo.AllocatableResource to make sure
|
||||||
|
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
||||||
|
if newAllocatableResource == nil {
|
||||||
|
newAllocatableResource = allocatableResource.Clone()
|
||||||
|
}
|
||||||
|
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
||||||
|
}
|
||||||
|
if newAllocatableResource != nil {
|
||||||
|
node.SetAllocatableResource(newAllocatableResource)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -23,30 +23,35 @@ import (
|
|||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HandlerStub provides a simple stub implementation for Handler.
|
// ManagerStub provides a simple stub implementation for the Device Manager.
|
||||||
type HandlerStub struct{}
|
type ManagerStub struct{}
|
||||||
|
|
||||||
// NewHandlerStub creates a HandlerStub.
|
// NewManagerStub creates a ManagerStub.
|
||||||
func NewHandlerStub() (*HandlerStub, error) {
|
func NewManagerStub() (*ManagerStub, error) {
|
||||||
return &HandlerStub{}, nil
|
return &ManagerStub{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start simply returns nil.
|
// Start simply returns nil.
|
||||||
func (h *HandlerStub) Start(activePods ActivePodsFunc) error {
|
func (h *ManagerStub) Start(activePods ActivePodsFunc) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop simply returns nil.
|
||||||
|
func (h *ManagerStub) Stop() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Devices returns an empty map.
|
// Devices returns an empty map.
|
||||||
func (h *HandlerStub) Devices() map[string][]pluginapi.Device {
|
func (h *ManagerStub) Devices() map[string][]pluginapi.Device {
|
||||||
return make(map[string][]pluginapi.Device)
|
return make(map[string][]pluginapi.Device)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate simply returns nil.
|
// Allocate simply returns nil.
|
||||||
func (h *HandlerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDeviceRunContainerOptions simply returns nil.
|
// GetDeviceRunContainerOptions simply returns nil.
|
||||||
func (h *HandlerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
@ -17,13 +17,23 @@ limitations under the License.
|
|||||||
package deviceplugin
|
package deviceplugin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -33,10 +43,8 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNewManagerImpl(t *testing.T) {
|
func TestNewManagerImpl(t *testing.T) {
|
||||||
_, err := NewManagerImpl("", func(n string, a, u, r []pluginapi.Device) {})
|
verifyCapacityFunc := func(updates v1.ResourceList) {}
|
||||||
require.Error(t, err)
|
_, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||||
|
|
||||||
_, err = NewManagerImpl(socketName, func(n string, a, u, r []pluginapi.Device) {})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,6 +80,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
|||||||
m, p1 := setup(t, devs, callback)
|
m, p1 := setup(t, devs, callback)
|
||||||
p1.Register(socketName, testResourceName)
|
p1.Register(socketName, testResourceName)
|
||||||
// Wait for the first callback to be issued.
|
// Wait for the first callback to be issued.
|
||||||
|
|
||||||
<-callbackChan
|
<-callbackChan
|
||||||
// Wait till the endpoint is added to the manager.
|
// Wait till the endpoint is added to the manager.
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
@ -113,10 +122,17 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {
|
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) {
|
||||||
m, err := NewManagerImpl(socketName, callback)
|
updateCapacity := func(v1.ResourceList) {}
|
||||||
|
m, err := newManagerImpl(updateCapacity, socketName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = m.Start()
|
|
||||||
|
m.callback = callback
|
||||||
|
|
||||||
|
activePods := func() []*v1.Pod {
|
||||||
|
return []*v1.Pod{}
|
||||||
|
}
|
||||||
|
err = m.Start(activePods)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
p := NewDevicePluginStub(devs, pluginSocketName)
|
p := NewDevicePluginStub(devs, pluginSocketName)
|
||||||
@ -130,3 +146,387 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
|
|||||||
p.Stop()
|
p.Stop()
|
||||||
m.Stop()
|
m.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateCapacity(t *testing.T) {
|
||||||
|
var expected = v1.ResourceList{}
|
||||||
|
as := assert.New(t)
|
||||||
|
verifyCapacityFunc := func(updates v1.ResourceList) {
|
||||||
|
as.Equal(expected, updates)
|
||||||
|
}
|
||||||
|
testManager, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||||
|
as.NotNil(testManager)
|
||||||
|
as.Nil(err)
|
||||||
|
|
||||||
|
devs := []pluginapi.Device{
|
||||||
|
{ID: "Device1", Health: pluginapi.Healthy},
|
||||||
|
{ID: "Device2", Health: pluginapi.Healthy},
|
||||||
|
{ID: "Device3", Health: pluginapi.Unhealthy},
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceName := "resource1"
|
||||||
|
// Adds three devices for resource1, two healthy and one unhealthy.
|
||||||
|
// Expects capacity for resource1 to be 2.
|
||||||
|
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
|
testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||||
|
// Deletes an unhealthy device should NOT change capacity.
|
||||||
|
testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||||
|
// Updates a healthy device to unhealthy should reduce capacity by 1.
|
||||||
|
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||||
|
// Deletes a healthy device should reduce capacity by 1.
|
||||||
|
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||||
|
// Tests adding another resource.
|
||||||
|
delete(expected, v1.ResourceName(resourceName))
|
||||||
|
resourceName2 := "resource2"
|
||||||
|
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
|
testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type stringPairType struct {
|
||||||
|
value1 string
|
||||||
|
value2 string
|
||||||
|
}
|
||||||
|
|
||||||
|
func constructDevices(devices []string) sets.String {
|
||||||
|
ret := sets.NewString()
|
||||||
|
for _, dev := range devices {
|
||||||
|
ret.Insert(dev)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse {
|
||||||
|
resp := &pluginapi.AllocateResponse{}
|
||||||
|
for k, v := range devices {
|
||||||
|
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||||
|
HostPath: k,
|
||||||
|
ContainerPath: v,
|
||||||
|
Permissions: "mrw",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
for k, v := range mounts {
|
||||||
|
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||||
|
ContainerPath: k,
|
||||||
|
HostPath: v,
|
||||||
|
ReadOnly: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
resp.Envs = make(map[string]string)
|
||||||
|
for k, v := range envs {
|
||||||
|
resp.Envs[k] = v
|
||||||
|
}
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckpoint(t *testing.T) {
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
|
||||||
|
testManager := &ManagerImpl{
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
}
|
||||||
|
|
||||||
|
testManager.podDevices.insert("pod1", "con1", resourceName1,
|
||||||
|
constructDevices([]string{"dev1", "dev2"}),
|
||||||
|
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
|
||||||
|
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||||
|
testManager.podDevices.insert("pod1", "con1", resourceName2,
|
||||||
|
constructDevices([]string{"dev1", "dev2"}),
|
||||||
|
constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
|
||||||
|
map[string]string{"/home/r2lib1": "/usr/r2lib1"},
|
||||||
|
map[string]string{"r2devices": "dev1 dev2"}))
|
||||||
|
testManager.podDevices.insert("pod1", "con2", resourceName1,
|
||||||
|
constructDevices([]string{"dev3"}),
|
||||||
|
constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
|
||||||
|
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||||
|
testManager.podDevices.insert("pod2", "con1", resourceName1,
|
||||||
|
constructDevices([]string{"dev4"}),
|
||||||
|
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
|
||||||
|
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||||
|
|
||||||
|
expectedPodDevices := testManager.podDevices
|
||||||
|
expectedAllocatedDevices := testManager.podDevices.devices()
|
||||||
|
|
||||||
|
err := testManager.writeCheckpoint()
|
||||||
|
as := assert.New(t)
|
||||||
|
|
||||||
|
as.Nil(err)
|
||||||
|
testManager.podDevices = make(podDevices)
|
||||||
|
err = testManager.readCheckpoint()
|
||||||
|
as.Nil(err)
|
||||||
|
|
||||||
|
as.Equal(len(expectedPodDevices), len(testManager.podDevices))
|
||||||
|
for podUID, containerDevices := range expectedPodDevices {
|
||||||
|
for conName, resources := range containerDevices {
|
||||||
|
for resource := range resources {
|
||||||
|
as.True(reflect.DeepEqual(
|
||||||
|
expectedPodDevices.containerDevices(podUID, conName, resource),
|
||||||
|
testManager.podDevices.containerDevices(podUID, conName, resource)))
|
||||||
|
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
|
||||||
|
opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
|
||||||
|
as.Equal(len(opts1.Envs), len(opts2.Envs))
|
||||||
|
as.Equal(len(opts1.Mounts), len(opts2.Mounts))
|
||||||
|
as.Equal(len(opts1.Devices), len(opts2.Devices))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
|
||||||
|
}
|
||||||
|
|
||||||
|
type activePodsStub struct {
|
||||||
|
activePods []*v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
||||||
|
return a.activePods
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
||||||
|
a.activePods = newPods
|
||||||
|
}
|
||||||
|
|
||||||
|
type MockEndpoint struct {
|
||||||
|
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockEndpoint) stop() {}
|
||||||
|
func (m *MockEndpoint) run() {}
|
||||||
|
|
||||||
|
func (m *MockEndpoint) getDevices() []pluginapi.Device {
|
||||||
|
return []pluginapi.Device{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
|
||||||
|
func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||||
|
if m.allocateFunc != nil {
|
||||||
|
return m.allocateFunc(devs)
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||||
|
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||||
|
var logLevel string
|
||||||
|
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
||||||
|
flag.Lookup("v").Value.Set(logLevel)
|
||||||
|
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
|
devID1 := "dev1"
|
||||||
|
devID2 := "dev2"
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||||
|
devID3 := "dev3"
|
||||||
|
devID4 := "dev4"
|
||||||
|
|
||||||
|
as := require.New(t)
|
||||||
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
podsStub := activePodsStub{
|
||||||
|
activePods: []*v1.Pod{},
|
||||||
|
}
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
|
testManager := &ManagerImpl{
|
||||||
|
callback: monitorCallback,
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
endpoints: make(map[string]endpoint),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
activePods: podsStub.getActivePods,
|
||||||
|
}
|
||||||
|
|
||||||
|
testManager.allDevices[resourceName1] = sets.NewString()
|
||||||
|
testManager.allDevices[resourceName1].Insert(devID1)
|
||||||
|
testManager.allDevices[resourceName1].Insert(devID2)
|
||||||
|
testManager.allDevices[resourceName2] = sets.NewString()
|
||||||
|
testManager.allDevices[resourceName2].Insert(devID3)
|
||||||
|
testManager.allDevices[resourceName2].Insert(devID4)
|
||||||
|
|
||||||
|
testManager.endpoints[resourceName1] = &MockEndpoint{
|
||||||
|
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||||
|
resp := new(pluginapi.AllocateResponse)
|
||||||
|
resp.Envs = make(map[string]string)
|
||||||
|
for _, dev := range devs {
|
||||||
|
switch dev {
|
||||||
|
case "dev1":
|
||||||
|
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||||
|
ContainerPath: "/dev/aaa",
|
||||||
|
HostPath: "/dev/aaa",
|
||||||
|
Permissions: "mrw",
|
||||||
|
})
|
||||||
|
|
||||||
|
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||||
|
ContainerPath: "/dev/bbb",
|
||||||
|
HostPath: "/dev/bbb",
|
||||||
|
Permissions: "mrw",
|
||||||
|
})
|
||||||
|
|
||||||
|
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||||
|
ContainerPath: "/container_dir1/file1",
|
||||||
|
HostPath: "host_dir1/file1",
|
||||||
|
ReadOnly: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
case "dev2":
|
||||||
|
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||||
|
ContainerPath: "/dev/ccc",
|
||||||
|
HostPath: "/dev/ccc",
|
||||||
|
Permissions: "mrw",
|
||||||
|
})
|
||||||
|
|
||||||
|
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||||
|
ContainerPath: "/container_dir1/file2",
|
||||||
|
HostPath: "host_dir1/file2",
|
||||||
|
ReadOnly: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
resp.Envs["key1"] = "val1"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
testManager.endpoints[resourceName2] = &MockEndpoint{
|
||||||
|
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||||
|
resp := new(pluginapi.AllocateResponse)
|
||||||
|
resp.Envs = make(map[string]string)
|
||||||
|
for _, dev := range devs {
|
||||||
|
switch dev {
|
||||||
|
case "dev3":
|
||||||
|
resp.Envs["key2"] = "val2"
|
||||||
|
|
||||||
|
case "dev4":
|
||||||
|
resp.Envs["key2"] = "val3"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: uuid.NewUUID(),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: string(uuid.NewUUID()),
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
v1.ResourceName(resourceName1): resourceQuantity1,
|
||||||
|
v1.ResourceName("cpu"): resourceQuantity1,
|
||||||
|
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
podsStub.updateActivePods([]*v1.Pod{pod})
|
||||||
|
err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
||||||
|
as.Nil(err)
|
||||||
|
runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
||||||
|
as.NotNil(runContainerOpts)
|
||||||
|
as.Equal(len(runContainerOpts.Devices), 3)
|
||||||
|
as.Equal(len(runContainerOpts.Mounts), 2)
|
||||||
|
as.Equal(len(runContainerOpts.Envs), 2)
|
||||||
|
|
||||||
|
// Requesting to create a pod without enough resources should fail.
|
||||||
|
as.Equal(2, testManager.allocatedDevices[resourceName1].Len())
|
||||||
|
failPod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: uuid.NewUUID(),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: string(uuid.NewUUID()),
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
v1.ResourceName(resourceName1): resourceQuantity2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
||||||
|
as.NotNil(err)
|
||||||
|
runContainerOpts2 := testManager.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
||||||
|
as.Nil(runContainerOpts2)
|
||||||
|
|
||||||
|
// Requesting to create a new pod with a single resourceName2 should succeed.
|
||||||
|
newPod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: uuid.NewUUID(),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: string(uuid.NewUUID()),
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
||||||
|
as.Nil(err)
|
||||||
|
runContainerOpts3 := testManager.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
||||||
|
as.Equal(1, len(runContainerOpts3.Envs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
devID1 := "dev1"
|
||||||
|
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
devID2 := "dev2"
|
||||||
|
|
||||||
|
as := assert.New(t)
|
||||||
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
|
||||||
|
testManager := &ManagerImpl{
|
||||||
|
callback: monitorCallback,
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
}
|
||||||
|
// require one of resource1 and one of resource2
|
||||||
|
testManager.allocatedDevices[resourceName1] = sets.NewString()
|
||||||
|
testManager.allocatedDevices[resourceName1].Insert(devID1)
|
||||||
|
testManager.allocatedDevices[resourceName2] = sets.NewString()
|
||||||
|
testManager.allocatedDevices[resourceName2].Insert(devID2)
|
||||||
|
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{
|
||||||
|
// has no resource1 and two of resource2
|
||||||
|
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
|
testManager.sanitizeNodeAllocatable(nodeInfo)
|
||||||
|
|
||||||
|
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
||||||
|
// allocatable in nodeInfo is less than needed, should update
|
||||||
|
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
||||||
|
// allocatable in nodeInfo is more than needed, should skip updating
|
||||||
|
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
||||||
|
}
|
||||||
|
@ -116,6 +116,11 @@ func (pdev podDevices) toCheckpointData() checkpointData {
|
|||||||
for conName, resources := range containerDevices {
|
for conName, resources := range containerDevices {
|
||||||
for resource, devices := range resources {
|
for resource, devices := range resources {
|
||||||
devIds := devices.deviceIds.UnsortedList()
|
devIds := devices.deviceIds.UnsortedList()
|
||||||
|
if devices.allocResp == nil {
|
||||||
|
glog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
allocResp, err := devices.allocResp.Marshal()
|
allocResp, err := devices.allocResp.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
|
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
|
||||||
|
@ -17,34 +17,40 @@ limitations under the License.
|
|||||||
package deviceplugin
|
package deviceplugin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MonitorCallback is the function called when a device's health state changes,
|
|
||||||
// or new devices are reported, or old devices are deleted.
|
|
||||||
// Updated contains the most recent state of the Device.
|
|
||||||
type MonitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
|
|
||||||
|
|
||||||
// Manager manages all the Device Plugins running on a node.
|
// Manager manages all the Device Plugins running on a node.
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
// Start starts the gRPC Registration service.
|
// Start starts device plugin registration service.
|
||||||
Start() error
|
Start(activePods ActivePodsFunc) error
|
||||||
|
|
||||||
// Devices is the map of devices that have registered themselves
|
// Devices is the map of devices that have registered themselves
|
||||||
// against the manager.
|
// against the manager.
|
||||||
// The map key is the ResourceName of the device plugins.
|
// The map key is the ResourceName of the device plugins.
|
||||||
Devices() map[string][]pluginapi.Device
|
Devices() map[string][]pluginapi.Device
|
||||||
|
|
||||||
// Allocate takes resourceName and list of device Ids, and calls the
|
// Allocate configures and assigns devices to pods. The pods are provided
|
||||||
// gRPC Allocate on the device plugin matching the resourceName.
|
// through the pod admission attributes in the attrs argument. From the
|
||||||
Allocate(string, []string) (*pluginapi.AllocateResponse, error)
|
// requested device resources, Allocate will communicate with the owning
|
||||||
|
// device plugin to allow setup procedures to take place, and for the
|
||||||
|
// device plugin to provide runtime settings to use the device (environment
|
||||||
|
// variables, mount points and device files). The node object is provided
|
||||||
|
// for the device manager to update the node capacity to reflect the
|
||||||
|
// currently available devices.
|
||||||
|
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
|
||||||
|
|
||||||
// Stop stops the manager.
|
// Stop stops the manager.
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
||||||
// Returns checkpoint file path.
|
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||||
CheckpointFile() string
|
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||||
|
// for the found one. An empty struct is returned in case no cached state is found.
|
||||||
|
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
||||||
|
Loading…
Reference in New Issue
Block a user