From 00b88c14b077ac9c239c62d6e5e23d9e1a5e5160 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Thu, 7 Feb 2019 11:12:36 -0800 Subject: [PATCH] Checks whether we have cached runtime state before starting a container that requests any device plugin resource. If not, re-issue Allocate grpc calls. This allows us to handle the edge case that a pod got assigned to a node even before it populates its extended resource capacity. --- pkg/kubelet/cm/devicemanager/manager.go | 28 +++++++++++++++++++++---- pkg/kubelet/kubelet_node_status.go | 1 + 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 31402ff33ba..dbb2720dfe9 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -311,10 +311,7 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { return false } -// Allocate is the call that you can use to allocate a set of devices -// from the registered device plugins. -func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - pod := attrs.Pod +func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error { devicesToReuse := make(map[string]sets.String) for _, container := range pod.Spec.InitContainers { if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { @@ -328,6 +325,18 @@ func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycl } m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) } + return nil +} + +// Allocate is the call that you can use to allocate a set of devices +// from the registered device plugins. +func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + pod := attrs.Pod + err := m.allocatePodResources(pod) + if err != nil { + klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err) + return err + } m.mutex.Lock() defer m.mutex.Unlock() @@ -717,6 +726,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) { podUID := string(pod.UID) contName := container.Name + needsReAllocate := false for k := range container.Resources.Limits { resource := string(k) if !m.isDevicePluginResource(resource) { @@ -726,6 +736,16 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co if err != nil { return nil, err } + // This is a device plugin resource yet we don't have cached + // resource state. This is likely due to a race during node + // restart. We re-issue allocate request to cover this race. + if m.podDevices.containerDevices(podUID, contName, resource) == nil { + needsReAllocate = true + } + } + if needsReAllocate { + klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID) + m.allocatePodResources(pod) } m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index bc86e4d0060..dc17a37a658 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -134,6 +134,7 @@ func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool { requiresUpdate := false for k := range node.Status.Capacity { if v1helper.IsExtendedResourceName(k) { + klog.Infof("Zero out resource %s capacity in existing node.", k) node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) requiresUpdate = true