From b16bfc768d83095b3012e11c5f937540847ca3a7 Mon Sep 17 00:00:00 2001 From: "Niklas Q. Nielsen" Date: Tue, 14 Nov 2017 21:06:07 +0000 Subject: [PATCH] Merging handler into manager API --- pkg/kubelet/cm/container_manager_linux.go | 14 +- pkg/kubelet/cm/deviceplugin/BUILD | 4 +- .../cm/deviceplugin/device_plugin_handler.go | 369 ---------------- .../device_plugin_handler_test.go | 414 ----------------- pkg/kubelet/cm/deviceplugin/endpoint.go | 36 +- pkg/kubelet/cm/deviceplugin/endpoint_test.go | 8 +- pkg/kubelet/cm/deviceplugin/manager.go | 356 +++++++++++++-- ...plugin_handler_stub.go => manager_stub.go} | 23 +- pkg/kubelet/cm/deviceplugin/manager_test.go | 416 +++++++++++++++++- pkg/kubelet/cm/deviceplugin/pod_devices.go | 5 + pkg/kubelet/cm/deviceplugin/types.go | 30 +- 11 files changed, 804 insertions(+), 871 deletions(-) delete mode 100644 pkg/kubelet/cm/deviceplugin/device_plugin_handler.go delete mode 100644 pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go rename pkg/kubelet/cm/deviceplugin/{device_plugin_handler_stub.go => manager_stub.go} (67%) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 2d9f0d40897..870e4900e1e 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -128,7 +128,7 @@ type containerManagerImpl struct { // Interface for QoS cgroup management qosContainerManager QOSContainerManager // Interface for exporting and allocating devices reported by device plugins. - devicePluginHandler deviceplugin.Handler + devicePluginManager deviceplugin.Manager // Interface for CPU affinity management. cpuManager cpumanager.Manager } @@ -274,11 +274,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } } - glog.Infof("Creating device plugin handler: %t", devicePluginEnabled) + glog.Infof("Creating device plugin manager: %t", devicePluginEnabled) if devicePluginEnabled { - cm.devicePluginHandler, err = deviceplugin.NewHandlerImpl(updateDeviceCapacityFunc) + cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc) } else { - cm.devicePluginHandler, err = deviceplugin.NewHandlerStub() + cm.devicePluginManager, err = deviceplugin.NewManagerStub() } if err != nil { return nil, err @@ -597,7 +597,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, }, time.Second, stopChan) // Starts device plugin manager. - if err := cm.devicePluginHandler.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil { + if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil { return err } return nil @@ -622,7 +622,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe opts := &kubecontainer.RunContainerOptions{} // Allocate should already be called during predicateAdmitHandler.Admit(), // just try to fetch device runtime information from cached state here - devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container) + devOpts := cm.devicePluginManager.GetDeviceRunContainerOptions(pod, container) if devOpts == nil { return opts, nil } @@ -633,7 +633,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe } func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - return cm.devicePluginHandler.Allocate(node, attrs) + return cm.devicePluginManager.Allocate(node, attrs) } func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 1d650b6d89c..341b59691a7 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -9,11 +9,10 @@ load( go_library( name = "go_default_library", srcs = [ - "device_plugin_handler.go", - "device_plugin_handler_stub.go", "device_plugin_stub.go", "endpoint.go", "manager.go", + "manager_stub.go", "pod_devices.go", "types.go", ], @@ -49,7 +48,6 @@ filegroup( go_test( name = "go_default_test", srcs = [ - "device_plugin_handler_test.go", "endpoint_test.go", "manager_test.go", ], diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go deleted file mode 100644 index 7303e507d5e..00000000000 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go +++ /dev/null @@ -1,369 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deviceplugin - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "sync" - - "github.com/golang/glog" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/sets" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" - "k8s.io/kubernetes/pkg/kubelet/lifecycle" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" -) - -// ActivePodsFunc is a function that returns a list of pods to reconcile. -type ActivePodsFunc func() []*v1.Pod - -// Handler defines the functions used to manage and access device plugin resources. -type Handler interface { - // Start starts device plugin registration service. - Start(activePods ActivePodsFunc) error - // Devices returns all of registered devices keyed by resourceName. - Devices() map[string][]pluginapi.Device - // Allocate scans through containers in the pod spec - // If it finds the container requires device plugin resource, it: - // 1. Checks whether it already has this information in its cached state. - // 2. If not, it calls Allocate and populate its cached state afterwards. - // 3. If there is no cached state and Allocate fails, it returns an error. - // 4. Otherwise, it updates allocatableResource in nodeInfo if necessary, - // to make sure it is at least equal to the pod's requested capacity for - // any registered device plugin resource - Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error - // GetDeviceRunContainerOptions checks whether we have cached containerDevices - // for the passed-in and returns its DeviceRunContainerOptions - // for the found one. An empty struct is returned in case no cached state is found. - GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions -} - -// HandlerImpl implements the actual functionality to manage device plugin resources. -type HandlerImpl struct { - // TODO: consider to change this to RWMutex. - sync.Mutex - // devicePluginManager is an implementation of deviceplugin.Manager interface. - devicePluginManager Manager - // activePods is a method for listing active pods on the node - // so the amount of pluginResources requested by existing pods - // could be counted when updating allocated devices - activePods ActivePodsFunc - // devicePluginManagerMonitorCallback is used for updating devices' states in one time call. - // e.g. a new device is advertised, two old devices are deleted and a running device fails. - devicePluginManagerMonitorCallback MonitorCallback - // allDevices contains all of registered resourceNames and their exported device IDs. - allDevices map[string]sets.String - // allocatedDevices contains allocated deviceIds, keyed by resourceName. - allocatedDevices map[string]sets.String - // podDevices contains pod to allocated device mapping. - podDevices podDevices -} - -// NewHandlerImpl creates a HandlerImpl to manage device plugin resources. -// updateCapacityFunc is called to update ContainerManager capacity when -// device capacity changes. -func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, error) { - glog.V(2).Infof("Creating Device Plugin Handler") - handler := &HandlerImpl{ - allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), - } - - deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) { - var capacity = v1.ResourceList{} - kept := append(updated, added...) - - handler.Lock() - defer handler.Unlock() - - if _, ok := handler.allDevices[resourceName]; !ok { - handler.allDevices[resourceName] = sets.NewString() - } - // For now, Handler only keeps track of healthy devices. - // We can revisit this later when the need comes to track unhealthy devices here. - for _, dev := range kept { - if dev.Health == pluginapi.Healthy { - handler.allDevices[resourceName].Insert(dev.ID) - } else { - handler.allDevices[resourceName].Delete(dev.ID) - } - } - for _, dev := range deleted { - handler.allDevices[resourceName].Delete(dev.ID) - } - capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI) - updateCapacityFunc(capacity) - } - - mgr, err := NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback) - if err != nil { - return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err) - } - - handler.devicePluginManager = mgr - handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback - - return handler, nil -} - -// Start initializes podDevices and allocatedDevices information from checkpoint-ed state -// and starts device plugin registration service. -func (h *HandlerImpl) Start(activePods ActivePodsFunc) error { - h.activePods = activePods - - // Loads in allocatedDevices information from disk. - err := h.readCheckpoint() - if err != nil { - glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) - } - - return h.devicePluginManager.Start() -} - -// Devices returns all of registered devices keyed by resourceName. -func (h *HandlerImpl) Devices() map[string][]pluginapi.Device { - return h.devicePluginManager.Devices() -} - -// Returns list of device Ids we need to allocate with Allocate rpc call. -// Returns empty list in case we don't need to issue the Allocate rpc call. -func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) { - h.Lock() - defer h.Unlock() - needed := required - // Gets list of devices that have already been allocated. - // This can happen if a container restarts for example. - devices := h.podDevices.containerDevices(podUID, contName, resource) - if devices != nil { - glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List()) - needed = needed - devices.Len() - // A pod's resource is not expected to change once admitted by the API server, - // so just fail loudly here. We can revisit this part if this no longer holds. - if needed != 0 { - return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required) - } - } - if needed == 0 { - // No change, no work. - return nil, nil - } - devices = sets.NewString() - // Needs to allocate additional devices. - if h.allocatedDevices[resource] == nil { - h.allocatedDevices[resource] = sets.NewString() - } - // Gets Devices in use. - devicesInUse := h.allocatedDevices[resource] - // Gets a list of available devices. - available := h.allDevices[resource].Difference(devicesInUse) - if int(available.Len()) < needed { - return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) - } - allocated := available.UnsortedList()[:needed] - // Updates h.allocatedDevices with allocated devices to prevent them - // from being allocated to other pods/containers, given that we are - // not holding lock during the rpc call. - for _, device := range allocated { - h.allocatedDevices[resource].Insert(device) - devices.Insert(device) - } - return devices, nil -} - -// allocateContainerResources attempts to allocate all of required device -// plugin resources for the input container, issues an Allocate rpc request -// for each new device resource requirement, processes their AllocateResponses, -// and updates the cached containerDevices on success. -func (h *HandlerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error { - podUID := string(pod.UID) - contName := container.Name - allocatedDevicesUpdated := false - for k, v := range container.Resources.Limits { - resource := string(k) - needed := int(v.Value()) - glog.V(3).Infof("needs %d %s", needed, resource) - if _, registeredResource := h.allDevices[resource]; !registeredResource { - continue - } - // Updates allocatedDevices to garbage collect any stranded resources - // before doing the device plugin allocation. - if !allocatedDevicesUpdated { - h.updateAllocatedDevices(h.activePods()) - allocatedDevicesUpdated = true - } - allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed) - if err != nil { - return err - } - if allocDevices == nil || len(allocDevices) <= 0 { - continue - } - // devicePluginManager.Allocate involves RPC calls to device plugin, which - // could be heavy-weight. Therefore we want to perform this operation outside - // mutex lock. Note if Allcate call fails, we may leave container resources - // partially allocated for the failed container. We rely on updateAllocatedDevices() - // to garbage collect these resources later. Another side effect is that if - // we have X resource A and Y resource B in total, and two containers, container1 - // and container2 both require X resource A and Y resource B. Both allocation - // requests may fail if we serve them in mixed order. - // TODO: may revisit this part later if we see inefficient resource allocation - // in real use as the result of this. Should also consider to parallize device - // plugin Allocate grpc calls if it becomes common that a container may require - // resources from multiple device plugins. - resp, err := h.devicePluginManager.Allocate(resource, allocDevices.UnsortedList()) - if err != nil { - // In case of allocation failure, we want to restore h.allocatedDevices - // to the actual allocated state from h.podDevices. - h.Lock() - h.allocatedDevices = h.podDevices.devices() - h.Unlock() - return err - } - - // Update internal cached podDevices state. - h.Lock() - h.podDevices.insert(podUID, contName, resource, allocDevices, resp) - h.Unlock() - } - - // Checkpoints device to container allocation information. - return h.writeCheckpoint() -} - -// Allocate attempts to allocate all of required device plugin resources, -// and update Allocatable resources in nodeInfo if necessary -func (h *HandlerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - pod := attrs.Pod - // TODO: Reuse devices between init containers and regular containers. - for _, container := range pod.Spec.InitContainers { - if err := h.allocateContainerResources(pod, &container); err != nil { - return err - } - } - for _, container := range pod.Spec.Containers { - if err := h.allocateContainerResources(pod, &container); err != nil { - return err - } - } - - // quick return if no pluginResources requested - if _, podRequireDevicePluginResource := h.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { - return nil - } - - h.sanitizeNodeAllocatable(node) - - return nil -} - -// sanitizeNodeAllocatable scans through allocatedDevices in DevicePluginHandler -// and if necessary, updates allocatableResource in nodeInfo to at least equal to -// the allocated capacity. This allows pods that have already been scheduled on -// the node to pass GeneralPredicates admission checking even upon device plugin failure. -func (h *HandlerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) { - var newAllocatableResource *schedulercache.Resource - allocatableResource := node.AllocatableResource() - if allocatableResource.ScalarResources == nil { - allocatableResource.ScalarResources = make(map[v1.ResourceName]int64) - } - for resource, devices := range h.allocatedDevices { - needed := devices.Len() - quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] - if ok && int(quant) >= needed { - continue - } - // Needs to update nodeInfo.AllocatableResource to make sure - // NodeInfo.allocatableResource at least equal to the capacity already allocated. - if newAllocatableResource == nil { - newAllocatableResource = allocatableResource.Clone() - } - newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed) - } - if newAllocatableResource != nil { - node.SetAllocatableResource(newAllocatableResource) - } -} - -// GetDeviceRunContainerOptions checks whether we have cached containerDevices -// for the passed-in and returns its DeviceRunContainerOptions -// for the found one. An empty struct is returned in case no cached state is found. -func (h *HandlerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { - h.Lock() - defer h.Unlock() - return h.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name) -} - -// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to -// terminated pods. Returns error on failure. -func (h *HandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { - h.Lock() - defer h.Unlock() - activePodUids := sets.NewString() - for _, pod := range activePods { - activePodUids.Insert(string(pod.UID)) - } - allocatedPodUids := h.podDevices.pods() - podsToBeRemoved := allocatedPodUids.Difference(activePodUids) - if len(podsToBeRemoved) <= 0 { - return - } - glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) - h.podDevices.delete(podsToBeRemoved.List()) - // Regenerated allocatedDevices after we update pod allocation information. - h.allocatedDevices = h.podDevices.devices() -} - -// Checkpoints device to container allocation information to disk. -func (h *HandlerImpl) writeCheckpoint() error { - h.Lock() - data := h.podDevices.toCheckpointData() - h.Unlock() - - dataJSON, err := json.Marshal(data) - if err != nil { - return err - } - filepath := h.devicePluginManager.CheckpointFile() - return ioutil.WriteFile(filepath, dataJSON, 0644) -} - -// Reads device to container allocation information from disk, and populates -// h.allocatedDevices accordingly. -func (h *HandlerImpl) readCheckpoint() error { - filepath := h.devicePluginManager.CheckpointFile() - content, err := ioutil.ReadFile(filepath) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) - } - glog.V(2).Infof("Read checkpoint file %s\n", filepath) - var data checkpointData - if err := json.Unmarshal(content, &data); err != nil { - return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) - } - - h.Lock() - defer h.Unlock() - h.podDevices.fromCheckpointData(data) - h.allocatedDevices = h.podDevices.devices() - return nil -} diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go deleted file mode 100644 index 252968c3806..00000000000 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go +++ /dev/null @@ -1,414 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deviceplugin - -import ( - "flag" - "fmt" - "reflect" - "testing" - - "github.com/stretchr/testify/assert" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/uuid" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" - "k8s.io/kubernetes/pkg/kubelet/lifecycle" - "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" -) - -func TestUpdateCapacity(t *testing.T) { - var expected = v1.ResourceList{} - as := assert.New(t) - verifyCapacityFunc := func(updates v1.ResourceList) { - as.Equal(expected, updates) - } - testHandler, err := NewHandlerImpl(verifyCapacityFunc) - as.NotNil(testHandler) - as.Nil(err) - - devs := []pluginapi.Device{ - {ID: "Device1", Health: pluginapi.Healthy}, - {ID: "Device2", Health: pluginapi.Healthy}, - {ID: "Device3", Health: pluginapi.Unhealthy}, - } - - resourceName := "resource1" - // Adds three devices for resource1, two healthy and one unhealthy. - // Expects capacity for resource1 to be 2. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) - // Deletes an unhealthy device should NOT change capacity. - testHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) - // Updates a healthy device to unhealthy should reduce capacity by 1. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) - // Deletes a healthy device should reduce capacity by 1. - expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI) - // Tests adding another resource. - delete(expected, v1.ResourceName(resourceName)) - resourceName2 := "resource2" - expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) -} - -type stringPairType struct { - value1 string - value2 string -} - -// DevicePluginManager stub to test device Allocation behavior. -type DevicePluginManagerTestStub struct { - // All data structs are keyed by resourceName+DevId - devRuntimeDevices map[string][]stringPairType - devRuntimeMounts map[string][]stringPairType - devRuntimeEnvs map[string][]stringPairType -} - -func NewDevicePluginManagerTestStub() (*DevicePluginManagerTestStub, error) { - return &DevicePluginManagerTestStub{ - devRuntimeDevices: make(map[string][]stringPairType), - devRuntimeMounts: make(map[string][]stringPairType), - devRuntimeEnvs: make(map[string][]stringPairType), - }, nil -} - -func (m *DevicePluginManagerTestStub) Start() error { - return nil -} - -func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device { - return make(map[string][]pluginapi.Device) -} - -func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) { - resp := new(pluginapi.AllocateResponse) - resp.Envs = make(map[string]string) - for _, id := range devIds { - key := resourceName + id - fmt.Printf("Alloc device %v for resource %v\n", id, resourceName) - for _, dev := range m.devRuntimeDevices[key] { - fmt.Printf("Add dev %v %v\n", dev.value1, dev.value2) - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ - ContainerPath: dev.value1, - HostPath: dev.value2, - Permissions: "mrw", - }) - } - for _, mount := range m.devRuntimeMounts[key] { - fmt.Printf("Add mount %v %v\n", mount.value1, mount.value2) - resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ - ContainerPath: mount.value1, - HostPath: mount.value2, - ReadOnly: true, - }) - } - for _, env := range m.devRuntimeEnvs[key] { - fmt.Printf("Add env %v %v\n", env.value1, env.value2) - resp.Envs[env.value1] = env.value2 - } - } - return resp, nil -} - -func (m *DevicePluginManagerTestStub) Stop() error { - return nil -} - -func (m *DevicePluginManagerTestStub) CheckpointFile() string { - return "/tmp/device-plugin-checkpoint" -} - -func constructDevices(devices []string) sets.String { - ret := sets.NewString() - for _, dev := range devices { - ret.Insert(dev) - } - return ret -} - -func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse { - resp := &pluginapi.AllocateResponse{} - for k, v := range devices { - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ - HostPath: k, - ContainerPath: v, - Permissions: "mrw", - }) - } - for k, v := range mounts { - resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ - ContainerPath: k, - HostPath: v, - ReadOnly: true, - }) - } - resp.Envs = make(map[string]string) - for k, v := range envs { - resp.Envs[k] = v - } - return resp -} - -func TestCheckpoint(t *testing.T) { - resourceName1 := "domain1.com/resource1" - resourceName2 := "domain2.com/resource2" - - m, err := NewDevicePluginManagerTestStub() - as := assert.New(t) - as.Nil(err) - - testHandler := &HandlerImpl{ - devicePluginManager: m, - allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), - } - - testHandler.podDevices.insert("pod1", "con1", resourceName1, - constructDevices([]string{"dev1", "dev2"}), - constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, - map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) - testHandler.podDevices.insert("pod1", "con1", resourceName2, - constructDevices([]string{"dev1", "dev2"}), - constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}, - map[string]string{"/home/r2lib1": "/usr/r2lib1"}, - map[string]string{"r2devices": "dev1 dev2"})) - testHandler.podDevices.insert("pod1", "con2", resourceName1, - constructDevices([]string{"dev3"}), - constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}, - map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) - testHandler.podDevices.insert("pod2", "con1", resourceName1, - constructDevices([]string{"dev4"}), - constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, - map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) - - expectedPodDevices := testHandler.podDevices - expectedAllocatedDevices := testHandler.podDevices.devices() - - err = testHandler.writeCheckpoint() - as.Nil(err) - testHandler.podDevices = make(podDevices) - err = testHandler.readCheckpoint() - as.Nil(err) - - as.Equal(len(expectedPodDevices), len(testHandler.podDevices)) - for podUID, containerDevices := range expectedPodDevices { - for conName, resources := range containerDevices { - for resource := range resources { - as.True(reflect.DeepEqual( - expectedPodDevices.containerDevices(podUID, conName, resource), - testHandler.podDevices.containerDevices(podUID, conName, resource))) - opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName) - opts2 := testHandler.podDevices.deviceRunContainerOptions(podUID, conName) - as.Equal(len(opts1.Envs), len(opts2.Envs)) - as.Equal(len(opts1.Mounts), len(opts2.Mounts)) - as.Equal(len(opts1.Devices), len(opts2.Devices)) - } - } - } - as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices)) -} - -type activePodsStub struct { - activePods []*v1.Pod -} - -func (a *activePodsStub) getActivePods() []*v1.Pod { - return a.activePods -} - -func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { - a.activePods = newPods -} - -func TestPodContainerDeviceAllocation(t *testing.T) { - flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) - var logLevel string - flag.StringVar(&logLevel, "logLevel", "4", "test") - flag.Lookup("v").Value.Set(logLevel) - - resourceName1 := "domain1.com/resource1" - resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) - devID1 := "dev1" - devID2 := "dev2" - resourceName2 := "domain2.com/resource2" - resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI) - devID3 := "dev3" - devID4 := "dev4" - - m, err := NewDevicePluginManagerTestStub() - as := assert.New(t) - as.Nil(err) - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} - podsStub := activePodsStub{ - activePods: []*v1.Pod{}, - } - cachedNode := &v1.Node{ - Status: v1.NodeStatus{ - Allocatable: v1.ResourceList{}, - }, - } - nodeInfo := &schedulercache.NodeInfo{} - nodeInfo.SetNode(cachedNode) - - testHandler := &HandlerImpl{ - devicePluginManager: m, - devicePluginManagerMonitorCallback: monitorCallback, - allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), - activePods: podsStub.getActivePods, - } - testHandler.allDevices[resourceName1] = sets.NewString() - testHandler.allDevices[resourceName1].Insert(devID1) - testHandler.allDevices[resourceName1].Insert(devID2) - testHandler.allDevices[resourceName2] = sets.NewString() - testHandler.allDevices[resourceName2].Insert(devID3) - testHandler.allDevices[resourceName2].Insert(devID4) - - m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/aaa", "/dev/aaa"}) - m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/bbb", "/dev/bbb"}) - m.devRuntimeDevices[resourceName1+devID2] = append(m.devRuntimeDevices[resourceName1+devID2], stringPairType{"/dev/ccc", "/dev/ccc"}) - m.devRuntimeMounts[resourceName1+devID1] = append(m.devRuntimeMounts[resourceName1+devID1], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) - m.devRuntimeMounts[resourceName1+devID2] = append(m.devRuntimeMounts[resourceName1+devID2], stringPairType{"/container_dir1/file2", "host_dir1/file2"}) - m.devRuntimeEnvs[resourceName1+devID2] = append(m.devRuntimeEnvs[resourceName1+devID2], stringPairType{"key1", "val1"}) - m.devRuntimeEnvs[resourceName2+devID3] = append(m.devRuntimeEnvs[resourceName2+devID3], stringPairType{"key2", "val2"}) - m.devRuntimeEnvs[resourceName2+devID4] = append(m.devRuntimeEnvs[resourceName2+devID4], stringPairType{"key2", "val3"}) - - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: string(uuid.NewUUID()), - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceName(resourceName1): resourceQuantity1, - v1.ResourceName("cpu"): resourceQuantity1, - v1.ResourceName(resourceName2): resourceQuantity2, - }, - }, - }, - }, - }, - } - - podsStub.updateActivePods([]*v1.Pod{pod}) - err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) - as.Nil(err) - runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) - as.Equal(len(runContainerOpts.Devices), 3) - as.Equal(len(runContainerOpts.Mounts), 2) - as.Equal(len(runContainerOpts.Envs), 2) - - // Requesting to create a pod without enough resources should fail. - as.Equal(2, testHandler.allocatedDevices[resourceName1].Len()) - failPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: string(uuid.NewUUID()), - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceName(resourceName1): resourceQuantity2, - }, - }, - }, - }, - }, - } - err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod}) - as.NotNil(err) - runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0]) - as.Nil(runContainerOpts2) - - // Requesting to create a new pod with a single resourceName2 should succeed. - newPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: string(uuid.NewUUID()), - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceName(resourceName2): resourceQuantity2, - }, - }, - }, - }, - }, - } - err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod}) - as.Nil(err) - runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) - as.Equal(1, len(runContainerOpts3.Envs)) -} - -func TestSanitizeNodeAllocatable(t *testing.T) { - resourceName1 := "domain1.com/resource1" - devID1 := "dev1" - - resourceName2 := "domain2.com/resource2" - devID2 := "dev2" - - m, err := NewDevicePluginManagerTestStub() - as := assert.New(t) - as.Nil(err) - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} - - testHandler := &HandlerImpl{ - devicePluginManager: m, - devicePluginManagerMonitorCallback: monitorCallback, - allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), - } - // require one of resource1 and one of resource2 - testHandler.allocatedDevices[resourceName1] = sets.NewString() - testHandler.allocatedDevices[resourceName1].Insert(devID1) - testHandler.allocatedDevices[resourceName2] = sets.NewString() - testHandler.allocatedDevices[resourceName2].Insert(devID2) - - cachedNode := &v1.Node{ - Status: v1.NodeStatus{ - Allocatable: v1.ResourceList{ - // has no resource1 and two of resource2 - v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI), - }, - }, - } - nodeInfo := &schedulercache.NodeInfo{} - nodeInfo.SetNode(cachedNode) - - testHandler.sanitizeNodeAllocatable(nodeInfo) - - allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources - // allocatable in nodeInfo is less than needed, should update - as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)])) - // allocatable in nodeInfo is more than needed, should skip updating - as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) -} diff --git a/pkg/kubelet/cm/deviceplugin/endpoint.go b/pkg/kubelet/cm/deviceplugin/endpoint.go index 44898dc023b..29feaf52852 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint.go @@ -32,7 +32,15 @@ import ( // endpoint maps to a single registered device plugin. It is responsible // for managing gRPC communications with the device plugin and caching // device states reported by the device plugin. -type endpoint struct { +type endpoint interface { + run() + stop() + allocate(devs []string) (*pluginapi.AllocateResponse, error) + getDevices() []pluginapi.Device + callback(resourceName string, added, updated, deleted []pluginapi.Device) +} + +type endpointImpl struct { client pluginapi.DevicePluginClient clientConn *grpc.ClientConn @@ -42,30 +50,34 @@ type endpoint struct { devices map[string]pluginapi.Device mutex sync.Mutex - callback MonitorCallback + cb monitorCallback } // newEndpoint creates a new endpoint for the given resourceName. -func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.Device, callback MonitorCallback) (*endpoint, error) { +func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { client, c, err := dial(socketPath) if err != nil { glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) return nil, err } - return &endpoint{ + return &endpointImpl{ client: client, clientConn: c, socketPath: socketPath, resourceName: resourceName, - devices: devices, - callback: callback, + devices: devices, + cb: callback, }, nil } -func (e *endpoint) getDevices() []pluginapi.Device { +func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { + e.cb(resourceName, added, updated, deleted) +} + +func (e *endpointImpl) getDevices() []pluginapi.Device { e.mutex.Lock() defer e.mutex.Unlock() var devs []pluginapi.Device @@ -81,11 +93,9 @@ func (e *endpoint) getDevices() []pluginapi.Device { // blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch // stream update contains a new list of device states. listAndWatch compares the new // device states with its cached states to get list of new, updated, and deleted devices. -// It then issues a callback to pass this information to the device_plugin_handler which +// It then issues a callback to pass this information to the device manager which // will adjust the resource available information accordingly. -func (e *endpoint) run() { - glog.V(3).Infof("Starting ListAndWatch") - +func (e *endpointImpl) run() { stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) if err != nil { glog.Errorf(errListAndWatch, e.resourceName, err) @@ -162,13 +172,13 @@ func (e *endpoint) run() { } // allocate issues Allocate gRPC call to the device plugin. -func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) { +func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) { return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ DevicesIDs: devs, }) } -func (e *endpoint) stop() { +func (e *endpointImpl) stop() { e.clientConn.Close() } diff --git a/pkg/kubelet/cm/deviceplugin/endpoint_test.go b/pkg/kubelet/cm/deviceplugin/endpoint_test.go index cb27c89f319..226148a6b06 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint_test.go @@ -87,7 +87,7 @@ func TestRun(t *testing.T) { } func TestGetDevices(t *testing.T) { - e := endpoint{ + e := endpointImpl{ devices: map[string]pluginapi.Device{ "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, }, @@ -96,19 +96,19 @@ func TestGetDevices(t *testing.T) { require.Len(t, devs, 1) } -func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) { +func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { p := NewDevicePluginStub(devs, socket) err := p.Start() require.NoError(t, err) - e, err := newEndpoint(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {}) + e, err := newEndpointImpl(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {}) require.NoError(t, err) return p, e } -func ecleanup(t *testing.T, p *Stub, e *endpoint) { +func ecleanup(t *testing.T, p *Stub, e *endpointImpl) { p.Stop() e.stop() } diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index 2b2c0a333ff..be46973ef74 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -17,7 +17,9 @@ limitations under the License. package deviceplugin import ( + "encoding/json" "fmt" + "io/ioutil" "net" "os" "path/filepath" @@ -28,27 +30,58 @@ import ( "google.golang.org/grpc" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +// ActivePodsFunc is a function that returns a list of pods to reconcile. +type ActivePodsFunc func() []*v1.Pod + +// monitorCallback is the function called when a device's health state changes, +// or new devices are reported, or old devices are deleted. +// Updated contains the most recent state of the Device. +type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) + // ManagerImpl is the structure in charge of managing Device Plugins. type ManagerImpl struct { socketname string socketdir string - endpoints map[string]*endpoint // Key is ResourceName + endpoints map[string]endpoint // Key is ResourceName mutex sync.Mutex - callback MonitorCallback - server *grpc.Server + + // activePods is a method for listing active pods on the node + // so the amount of pluginResources requested by existing pods + // could be counted when updating allocated devices + activePods ActivePodsFunc + + // callback is used for updating devices' states in one time call. + // e.g. a new device is advertised, two old devices are deleted and a running device fails. + callback monitorCallback + + // allDevices contains all of registered resourceNames and their exported device IDs. + allDevices map[string]sets.String + + // allocatedDevices contains allocated deviceIds, keyed by resourceName. + allocatedDevices map[string]sets.String + + // podDevices contains pod to allocated device mapping. + podDevices podDevices } -// NewManagerImpl creates a new manager on the socket `socketPath`. -// f is the callback that is called when a device becomes unhealthy. -// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket -func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) { +// NewManagerImpl creates a new manager. updateCapacityFunc is called to +// update ContainerManager capacity when device capacity changes. +func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) { + return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket) +} + +func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) { glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) if socketPath == "" || !filepath.IsAbs(socketPath) { @@ -56,13 +89,42 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) } dir, file := filepath.Split(socketPath) - return &ManagerImpl{ - endpoints: make(map[string]*endpoint), + manager := &ManagerImpl{ + endpoints: make(map[string]endpoint), + socketname: file, + socketdir: dir, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } - socketname: file, - socketdir: dir, - callback: f, - }, nil + manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) { + var capacity = v1.ResourceList{} + kept := append(updated, added...) + + manager.mutex.Lock() + defer manager.mutex.Unlock() + + if _, ok := manager.allDevices[resourceName]; !ok { + manager.allDevices[resourceName] = sets.NewString() + } + // For now, Manager only keeps track of healthy devices. + // We can revisit this later when the need comes to track unhealthy devices here. + for _, dev := range kept { + if dev.Health == pluginapi.Healthy { + manager.allDevices[resourceName].Insert(dev.ID) + } else { + manager.allDevices[resourceName].Delete(dev.ID) + } + } + for _, dev := range deleted { + manager.allDevices[resourceName].Delete(dev.ID) + } + capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI) + updateCapacityFunc(capacity) + } + + return manager, nil } func (m *ManagerImpl) removeContents(dir string) error { @@ -77,7 +139,7 @@ func (m *ManagerImpl) removeContents(dir string) error { } for _, name := range names { filePath := filepath.Join(dir, name) - if filePath == m.CheckpointFile() { + if filePath == m.checkpointFile() { continue } stat, err := os.Stat(filePath) @@ -101,15 +163,25 @@ const ( kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint" ) -// CheckpointFile returns device plugin checkpoint file path. -func (m *ManagerImpl) CheckpointFile() string { +// checkpointFile returns device plugin checkpoint file path. +func (m *ManagerImpl) checkpointFile() string { return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint) } -// Start starts the Device Plugin Manager -func (m *ManagerImpl) Start() error { +// Start starts the Device Plugin Manager amd start initialization of +// podDevices and allocatedDevices information from checkpoint-ed state and +// starts device plugin registration service. +func (m *ManagerImpl) Start(activePods ActivePodsFunc) error { glog.V(2).Infof("Starting Device Plugin manager") + m.activePods = activePods + + // Loads in allocatedDevices information from disk. + err := m.readCheckpoint() + if err != nil { + glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) + } + socketPath := filepath.Join(m.socketdir, m.socketname) os.MkdirAll(m.socketdir, 0755) @@ -130,6 +202,8 @@ func (m *ManagerImpl) Start() error { pluginapi.RegisterRegistrationServer(m.server, m) go m.server.Serve(s) + glog.V(2).Infof("Serving device plugin registration server on %q", socketPath) + return nil } @@ -150,22 +224,27 @@ func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. -func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.AllocateResponse, error) { - - if len(devs) == 0 { - return nil, nil +func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + pod := attrs.Pod + // TODO: Reuse devices between init containers and regular containers. + for _, container := range pod.Spec.InitContainers { + if err := m.allocateContainerResources(pod, &container); err != nil { + return err + } + } + for _, container := range pod.Spec.Containers { + if err := m.allocateContainerResources(pod, &container); err != nil { + return err + } } - glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s", - devs, resourceName) - m.mutex.Lock() - e, ok := m.endpoints[resourceName] - m.mutex.Unlock() - if !ok { - return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) + // quick return if no pluginResources requested + if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { + return nil } - return e.allocate(devs) + m.sanitizeNodeAllocatable(node) + return nil } // Register registers a device plugin. @@ -211,12 +290,16 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { if ok && old != nil { // Pass devices of previous endpoint into re-registered one, // to avoid potential orphaned devices upon re-registration - existingDevs = old.devices + devices := make(map[string]pluginapi.Device) + for _, device := range old.getDevices() { + devices[device.ID] = device + } + existingDevs = devices } m.mutex.Unlock() socketPath := filepath.Join(m.socketdir, r.Endpoint) - e, err := newEndpoint(socketPath, r.ResourceName, existingDevs, m.callback) + e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return @@ -259,3 +342,212 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { m.mutex.Unlock() }() } + +// Checkpoints device to container allocation information to disk. +func (m *ManagerImpl) writeCheckpoint() error { + m.mutex.Lock() + data := m.podDevices.toCheckpointData() + m.mutex.Unlock() + + dataJSON, err := json.Marshal(data) + if err != nil { + return err + } + filepath := m.checkpointFile() + return ioutil.WriteFile(filepath, dataJSON, 0644) +} + +// Reads device to container allocation information from disk, and populates +// m.allocatedDevices accordingly. +func (m *ManagerImpl) readCheckpoint() error { + filepath := m.checkpointFile() + content, err := ioutil.ReadFile(filepath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) + } + glog.V(2).Infof("Read checkpoint file %s\n", filepath) + var data checkpointData + if err := json.Unmarshal(content, &data); err != nil { + return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) + } + + m.mutex.Lock() + defer m.mutex.Unlock() + m.podDevices.fromCheckpointData(data) + m.allocatedDevices = m.podDevices.devices() + return nil +} + +// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to +// terminated pods. Returns error on failure. +func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) { + m.mutex.Lock() + defer m.mutex.Unlock() + activePodUids := sets.NewString() + for _, pod := range activePods { + activePodUids.Insert(string(pod.UID)) + } + allocatedPodUids := m.podDevices.pods() + podsToBeRemoved := allocatedPodUids.Difference(activePodUids) + if len(podsToBeRemoved) <= 0 { + return + } + glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) + m.podDevices.delete(podsToBeRemoved.List()) + // Regenerated allocatedDevices after we update pod allocation information. + m.allocatedDevices = m.podDevices.devices() +} + +// Returns list of device Ids we need to allocate with Allocate rpc call. +// Returns empty list in case we don't need to issue the Allocate rpc call. +func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + needed := required + // Gets list of devices that have already been allocated. + // This can happen if a container restarts for example. + devices := m.podDevices.containerDevices(podUID, contName, resource) + if devices != nil { + glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List()) + needed = needed - devices.Len() + // A pod's resource is not expected to change once admitted by the API server, + // so just fail loudly here. We can revisit this part if this no longer holds. + if needed != 0 { + return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required) + } + } + if needed == 0 { + // No change, no work. + return nil, nil + } + devices = sets.NewString() + // Needs to allocate additional devices. + if m.allocatedDevices[resource] == nil { + m.allocatedDevices[resource] = sets.NewString() + } + // Gets Devices in use. + devicesInUse := m.allocatedDevices[resource] + // Gets a list of available devices. + available := m.allDevices[resource].Difference(devicesInUse) + if int(available.Len()) < needed { + return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) + } + allocated := available.UnsortedList()[:needed] + // Updates m.allocatedDevices with allocated devices to prevent them + // from being allocated to other pods/containers, given that we are + // not holding lock during the rpc call. + for _, device := range allocated { + m.allocatedDevices[resource].Insert(device) + devices.Insert(device) + } + return devices, nil +} + +// allocateContainerResources attempts to allocate all of required device +// plugin resources for the input container, issues an Allocate rpc request +// for each new device resource requirement, processes their AllocateResponses, +// and updates the cached containerDevices on success. +func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error { + podUID := string(pod.UID) + contName := container.Name + allocatedDevicesUpdated := false + for k, v := range container.Resources.Limits { + resource := string(k) + needed := int(v.Value()) + glog.V(3).Infof("needs %d %s", needed, resource) + if _, registeredResource := m.allDevices[resource]; !registeredResource { + continue + } + // Updates allocatedDevices to garbage collect any stranded resources + // before doing the device plugin allocation. + if !allocatedDevicesUpdated { + m.updateAllocatedDevices(m.activePods()) + allocatedDevicesUpdated = true + } + allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed) + if err != nil { + return err + } + if allocDevices == nil || len(allocDevices) <= 0 { + continue + } + // devicePluginManager.Allocate involves RPC calls to device plugin, which + // could be heavy-weight. Therefore we want to perform this operation outside + // mutex lock. Note if Allocate call fails, we may leave container resources + // partially allocated for the failed container. We rely on updateAllocatedDevices() + // to garbage collect these resources later. Another side effect is that if + // we have X resource A and Y resource B in total, and two containers, container1 + // and container2 both require X resource A and Y resource B. Both allocation + // requests may fail if we serve them in mixed order. + // TODO: may revisit this part later if we see inefficient resource allocation + // in real use as the result of this. Should also consider to parallize device + // plugin Allocate grpc calls if it becomes common that a container may require + // resources from multiple device plugins. + m.mutex.Lock() + e, ok := m.endpoints[resource] + m.mutex.Unlock() + if !ok { + m.mutex.Lock() + m.allocatedDevices = m.podDevices.devices() + m.mutex.Unlock() + return fmt.Errorf("Unknown Device Plugin %s", resource) + } + + devs := allocDevices.UnsortedList() + glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource) + resp, err := e.allocate(devs) + if err != nil { + // In case of allocation failure, we want to restore m.allocatedDevices + // to the actual allocated state from m.podDevices. + m.mutex.Lock() + m.allocatedDevices = m.podDevices.devices() + m.mutex.Unlock() + return err + } + + // Update internal cached podDevices state. + m.mutex.Lock() + m.podDevices.insert(podUID, contName, resource, allocDevices, resp) + m.mutex.Unlock() + } + + // Checkpoints device to container allocation information. + return m.writeCheckpoint() +} + +// GetDeviceRunContainerOptions checks whether we have cached containerDevices +// for the passed-in and returns its DeviceRunContainerOptions +// for the found one. An empty struct is returned in case no cached state is found. +func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name) +} + +// sanitizeNodeAllocatable scans through allocatedDevices in the device manager +// and if necessary, updates allocatableResource in nodeInfo to at least equal to +// the allocated capacity. This allows pods that have already been scheduled on +// the node to pass GeneralPredicates admission checking even upon device plugin failure. +func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) { + var newAllocatableResource *schedulercache.Resource + allocatableResource := node.AllocatableResource() + if allocatableResource.ScalarResources == nil { + allocatableResource.ScalarResources = make(map[v1.ResourceName]int64) + } + for resource, devices := range m.allocatedDevices { + needed := devices.Len() + quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] + if ok && int(quant) >= needed { + continue + } + // Needs to update nodeInfo.AllocatableResource to make sure + // NodeInfo.allocatableResource at least equal to the capacity already allocated. + if newAllocatableResource == nil { + newAllocatableResource = allocatableResource.Clone() + } + newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed) + } + if newAllocatableResource != nil { + node.SetAllocatableResource(newAllocatableResource) + } +} diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go b/pkg/kubelet/cm/deviceplugin/manager_stub.go similarity index 67% rename from pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go rename to pkg/kubelet/cm/deviceplugin/manager_stub.go index eb723090467..450164b2ee8 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go +++ b/pkg/kubelet/cm/deviceplugin/manager_stub.go @@ -23,30 +23,35 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -// HandlerStub provides a simple stub implementation for Handler. -type HandlerStub struct{} +// ManagerStub provides a simple stub implementation for the Device Manager. +type ManagerStub struct{} -// NewHandlerStub creates a HandlerStub. -func NewHandlerStub() (*HandlerStub, error) { - return &HandlerStub{}, nil +// NewManagerStub creates a ManagerStub. +func NewManagerStub() (*ManagerStub, error) { + return &ManagerStub{}, nil } // Start simply returns nil. -func (h *HandlerStub) Start(activePods ActivePodsFunc) error { +func (h *ManagerStub) Start(activePods ActivePodsFunc) error { + return nil +} + +// Stop simply returns nil. +func (h *ManagerStub) Stop() error { return nil } // Devices returns an empty map. -func (h *HandlerStub) Devices() map[string][]pluginapi.Device { +func (h *ManagerStub) Devices() map[string][]pluginapi.Device { return make(map[string][]pluginapi.Device) } // Allocate simply returns nil. -func (h *HandlerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { +func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil } // GetDeviceRunContainerOptions simply returns nil. -func (h *HandlerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { +func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { return nil } diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index 0d2178f92c3..20fb4275739 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -17,13 +17,23 @@ limitations under the License. package deviceplugin import ( + "flag" + "fmt" + "reflect" "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -33,10 +43,8 @@ const ( ) func TestNewManagerImpl(t *testing.T) { - _, err := NewManagerImpl("", func(n string, a, u, r []pluginapi.Device) {}) - require.Error(t, err) - - _, err = NewManagerImpl(socketName, func(n string, a, u, r []pluginapi.Device) {}) + verifyCapacityFunc := func(updates v1.ResourceList) {} + _, err := newManagerImpl(verifyCapacityFunc, socketName) require.NoError(t, err) } @@ -72,6 +80,7 @@ func TestDevicePluginReRegistration(t *testing.T) { m, p1 := setup(t, devs, callback) p1.Register(socketName, testResourceName) // Wait for the first callback to be issued. + <-callbackChan // Wait till the endpoint is added to the manager. for i := 0; i < 20; i++ { @@ -113,10 +122,17 @@ func TestDevicePluginReRegistration(t *testing.T) { } -func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) { - m, err := NewManagerImpl(socketName, callback) +func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) { + updateCapacity := func(v1.ResourceList) {} + m, err := newManagerImpl(updateCapacity, socketName) require.NoError(t, err) - err = m.Start() + + m.callback = callback + + activePods := func() []*v1.Pod { + return []*v1.Pod{} + } + err = m.Start(activePods) require.NoError(t, err) p := NewDevicePluginStub(devs, pluginSocketName) @@ -130,3 +146,387 @@ func cleanup(t *testing.T, m Manager, p *Stub) { p.Stop() m.Stop() } + +func TestUpdateCapacity(t *testing.T) { + var expected = v1.ResourceList{} + as := assert.New(t) + verifyCapacityFunc := func(updates v1.ResourceList) { + as.Equal(expected, updates) + } + testManager, err := newManagerImpl(verifyCapacityFunc, socketName) + as.NotNil(testManager) + as.Nil(err) + + devs := []pluginapi.Device{ + {ID: "Device1", Health: pluginapi.Healthy}, + {ID: "Device2", Health: pluginapi.Healthy}, + {ID: "Device3", Health: pluginapi.Unhealthy}, + } + + resourceName := "resource1" + // Adds three devices for resource1, two healthy and one unhealthy. + // Expects capacity for resource1 to be 2. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) + testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) + // Deletes an unhealthy device should NOT change capacity. + testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + // Updates a healthy device to unhealthy should reduce capacity by 1. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) + // Deletes a healthy device should reduce capacity by 1. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI) + // Tests adding another resource. + delete(expected, v1.ResourceName(resourceName)) + resourceName2 := "resource2" + expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) + testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) +} + +type stringPairType struct { + value1 string + value2 string +} + +func constructDevices(devices []string) sets.String { + ret := sets.NewString() + for _, dev := range devices { + ret.Insert(dev) + } + return ret +} + +func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse { + resp := &pluginapi.AllocateResponse{} + for k, v := range devices { + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + HostPath: k, + ContainerPath: v, + Permissions: "mrw", + }) + } + for k, v := range mounts { + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: k, + HostPath: v, + ReadOnly: true, + }) + } + resp.Envs = make(map[string]string) + for k, v := range envs { + resp.Envs[k] = v + } + return resp +} + +func TestCheckpoint(t *testing.T) { + resourceName1 := "domain1.com/resource1" + resourceName2 := "domain2.com/resource2" + + testManager := &ManagerImpl{ + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } + + testManager.podDevices.insert("pod1", "con1", resourceName1, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.podDevices.insert("pod1", "con1", resourceName2, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}, + map[string]string{"/home/r2lib1": "/usr/r2lib1"}, + map[string]string{"r2devices": "dev1 dev2"})) + testManager.podDevices.insert("pod1", "con2", resourceName1, + constructDevices([]string{"dev3"}), + constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.podDevices.insert("pod2", "con1", resourceName1, + constructDevices([]string{"dev4"}), + constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + + expectedPodDevices := testManager.podDevices + expectedAllocatedDevices := testManager.podDevices.devices() + + err := testManager.writeCheckpoint() + as := assert.New(t) + + as.Nil(err) + testManager.podDevices = make(podDevices) + err = testManager.readCheckpoint() + as.Nil(err) + + as.Equal(len(expectedPodDevices), len(testManager.podDevices)) + for podUID, containerDevices := range expectedPodDevices { + for conName, resources := range containerDevices { + for resource := range resources { + as.True(reflect.DeepEqual( + expectedPodDevices.containerDevices(podUID, conName, resource), + testManager.podDevices.containerDevices(podUID, conName, resource))) + opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName) + opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName) + as.Equal(len(opts1.Envs), len(opts2.Envs)) + as.Equal(len(opts1.Mounts), len(opts2.Mounts)) + as.Equal(len(opts1.Devices), len(opts2.Devices)) + } + } + } + as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices)) +} + +type activePodsStub struct { + activePods []*v1.Pod +} + +func (a *activePodsStub) getActivePods() []*v1.Pod { + return a.activePods +} + +func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { + a.activePods = newPods +} + +type MockEndpoint struct { + allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error) +} + +func (m *MockEndpoint) stop() {} +func (m *MockEndpoint) run() {} + +func (m *MockEndpoint) getDevices() []pluginapi.Device { + return []pluginapi.Device{} +} + +func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {} + +func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) { + if m.allocateFunc != nil { + return m.allocateFunc(devs) + } + return nil, nil +} + +func TestPodContainerDeviceAllocation(t *testing.T) { + flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) + var logLevel string + flag.StringVar(&logLevel, "logLevel", "4", "test") + flag.Lookup("v").Value.Set(logLevel) + + resourceName1 := "domain1.com/resource1" + resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) + devID1 := "dev1" + devID2 := "dev2" + resourceName2 := "domain2.com/resource2" + resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI) + devID3 := "dev3" + devID4 := "dev4" + + as := require.New(t) + monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + podsStub := activePodsStub{ + activePods: []*v1.Pod{}, + } + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{}, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) + + testManager := &ManagerImpl{ + callback: monitorCallback, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + endpoints: make(map[string]endpoint), + podDevices: make(podDevices), + activePods: podsStub.getActivePods, + } + + testManager.allDevices[resourceName1] = sets.NewString() + testManager.allDevices[resourceName1].Insert(devID1) + testManager.allDevices[resourceName1].Insert(devID2) + testManager.allDevices[resourceName2] = sets.NewString() + testManager.allDevices[resourceName2].Insert(devID3) + testManager.allDevices[resourceName2].Insert(devID4) + + testManager.endpoints[resourceName1] = &MockEndpoint{ + allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { + resp := new(pluginapi.AllocateResponse) + resp.Envs = make(map[string]string) + for _, dev := range devs { + switch dev { + case "dev1": + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/aaa", + HostPath: "/dev/aaa", + Permissions: "mrw", + }) + + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/bbb", + HostPath: "/dev/bbb", + Permissions: "mrw", + }) + + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: "/container_dir1/file1", + HostPath: "host_dir1/file1", + ReadOnly: true, + }) + + case "dev2": + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/ccc", + HostPath: "/dev/ccc", + Permissions: "mrw", + }) + + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: "/container_dir1/file2", + HostPath: "host_dir1/file2", + ReadOnly: true, + }) + + resp.Envs["key1"] = "val1" + } + } + return resp, nil + }, + } + + testManager.endpoints[resourceName2] = &MockEndpoint{ + allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { + resp := new(pluginapi.AllocateResponse) + resp.Envs = make(map[string]string) + for _, dev := range devs { + switch dev { + case "dev3": + resp.Envs["key2"] = "val2" + + case "dev4": + resp.Envs["key2"] = "val3" + } + } + return resp, nil + }, + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity1, + v1.ResourceName("cpu"): resourceQuantity1, + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + }, + }, + } + + podsStub.updateActivePods([]*v1.Pod{pod}) + err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + as.Nil(err) + runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) + as.NotNil(runContainerOpts) + as.Equal(len(runContainerOpts.Devices), 3) + as.Equal(len(runContainerOpts.Mounts), 2) + as.Equal(len(runContainerOpts.Envs), 2) + + // Requesting to create a pod without enough resources should fail. + as.Equal(2, testManager.allocatedDevices[resourceName1].Len()) + failPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity2, + }, + }, + }, + }, + }, + } + err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod}) + as.NotNil(err) + runContainerOpts2 := testManager.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0]) + as.Nil(runContainerOpts2) + + // Requesting to create a new pod with a single resourceName2 should succeed. + newPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + }, + }, + } + err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod}) + as.Nil(err) + runContainerOpts3 := testManager.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) + as.Equal(1, len(runContainerOpts3.Envs)) +} + +func TestSanitizeNodeAllocatable(t *testing.T) { + resourceName1 := "domain1.com/resource1" + devID1 := "dev1" + + resourceName2 := "domain2.com/resource2" + devID2 := "dev2" + + as := assert.New(t) + monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + + testManager := &ManagerImpl{ + callback: monitorCallback, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } + // require one of resource1 and one of resource2 + testManager.allocatedDevices[resourceName1] = sets.NewString() + testManager.allocatedDevices[resourceName1].Insert(devID1) + testManager.allocatedDevices[resourceName2] = sets.NewString() + testManager.allocatedDevices[resourceName2].Insert(devID2) + + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + // has no resource1 and two of resource2 + v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI), + }, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) + + testManager.sanitizeNodeAllocatable(nodeInfo) + + allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources + // allocatable in nodeInfo is less than needed, should update + as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)])) + // allocatable in nodeInfo is more than needed, should skip updating + as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) +} diff --git a/pkg/kubelet/cm/deviceplugin/pod_devices.go b/pkg/kubelet/cm/deviceplugin/pod_devices.go index e343321e221..e903dbf1035 100644 --- a/pkg/kubelet/cm/deviceplugin/pod_devices.go +++ b/pkg/kubelet/cm/deviceplugin/pod_devices.go @@ -116,6 +116,11 @@ func (pdev podDevices) toCheckpointData() checkpointData { for conName, resources := range containerDevices { for resource, devices := range resources { devIds := devices.deviceIds.UnsortedList() + if devices.allocResp == nil { + glog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource) + continue + } + allocResp, err := devices.allocResp.Marshal() if err != nil { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) diff --git a/pkg/kubelet/cm/deviceplugin/types.go b/pkg/kubelet/cm/deviceplugin/types.go index 9745393d499..4b1607a71dc 100644 --- a/pkg/kubelet/cm/deviceplugin/types.go +++ b/pkg/kubelet/cm/deviceplugin/types.go @@ -17,34 +17,40 @@ limitations under the License. package deviceplugin import ( + "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -// MonitorCallback is the function called when a device's health state changes, -// or new devices are reported, or old devices are deleted. -// Updated contains the most recent state of the Device. -type MonitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) - // Manager manages all the Device Plugins running on a node. type Manager interface { - // Start starts the gRPC Registration service. - Start() error + // Start starts device plugin registration service. + Start(activePods ActivePodsFunc) error // Devices is the map of devices that have registered themselves // against the manager. // The map key is the ResourceName of the device plugins. Devices() map[string][]pluginapi.Device - // Allocate takes resourceName and list of device Ids, and calls the - // gRPC Allocate on the device plugin matching the resourceName. - Allocate(string, []string) (*pluginapi.AllocateResponse, error) + // Allocate configures and assigns devices to pods. The pods are provided + // through the pod admission attributes in the attrs argument. From the + // requested device resources, Allocate will communicate with the owning + // device plugin to allow setup procedures to take place, and for the + // device plugin to provide runtime settings to use the device (environment + // variables, mount points and device files). The node object is provided + // for the device manager to update the node capacity to reflect the + // currently available devices. + Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // Stop stops the manager. Stop() error - // Returns checkpoint file path. - CheckpointFile() string + // GetDeviceRunContainerOptions checks whether we have cached containerDevices + // for the passed-in and returns its DeviceRunContainerOptions + // for the found one. An empty struct is returned in case no cached state is found. + GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.