From 9da0912a333ac2b43b36b18199be56dfc26f221e Mon Sep 17 00:00:00 2001 From: Byonggon Chun Date: Fri, 2 Oct 2020 11:53:05 +0200 Subject: [PATCH] Implement devicemanager.GetPodLevelTopologyHints() function * Add podDevices() func * Add getPodDeviceRequest() func Signed-off-by: Krzysztof Wiatrzyk --- pkg/kubelet/cm/devicemanager/pod_devices.go | 13 ++ .../cm/devicemanager/topology_hints.go | 115 +++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 6f640666211..8e20eb7bb7b 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -93,6 +93,19 @@ func (pdev *podDevices) delete(pods []string) { } } +// Returns list of device Ids allocated to the given pod for the given resource. +// Returns nil if we don't have cached state for the given . +func (pdev *podDevices) podDevices(podUID, resource string) sets.String { + pdev.RLock() + defer pdev.RUnlock() + + ret := sets.NewString() + for contName := range pdev.devs[podUID] { + ret = ret.Union(pdev.containerDevices(podUID, contName, resource)) + } + return ret +} + // Returns list of device Ids allocated to the given container for the given resource. // Returns nil if we don't have cached state for the given . func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.String { diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 105e97e8a7a..855f4763468 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -81,10 +81,52 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map return deviceHints } -// GetPodTopologyHints implements the TopologyManager HintProvider Interface which +// GetPodTopologyHints implements the topologymanager.HintProvider Interface which // ensures the Device Manager is consulted when Topology Aware Hints for Pod are created. func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - return nil + // Garbage collect any stranded device resources before providing TopologyHints + m.UpdateAllocatedDevices() + + deviceHints := make(map[string][]topologymanager.TopologyHint) + accumulatedResourceRequests := m.getPodDeviceRequest(pod) + + for resource, requested := range accumulatedResourceRequests { + // Only consider devices that actually contain topology information. + if aligned := m.deviceHasTopologyAlignment(resource); !aligned { + klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource) + deviceHints[resource] = nil + continue + } + + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Pod. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.podDevices(string(pod.UID), resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v) with different number than request: requested: %d, allocated: %d", resource, format.Pod(pod), requested, allocated.Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v)", resource, format.Pod(pod)) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) + continue + } + + // Get the list of available devices, for which TopologyHints should be generated. + available := m.getAvailableDevices(resource) + if available.Len() < requested { + klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. + deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, sets.String{}, requested) + } + + return deviceHints } func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { @@ -179,3 +221,72 @@ func (m *ManagerImpl) getNUMANodeIds(topology *pluginapi.TopologyInfo) []int { } return ids } + +func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int { + podResources := sets.NewString() + + // Find the max request of a given resource across all init containers + initContainerRequests := make(map[string]int) + for _, container := range pod.Spec.InitContainers { + for resourceObj, requestedObj := range container.Resources.Limits { + resource := string(resourceObj) + requested := int(requestedObj.Value()) + + if !m.isDevicePluginResource(resource) { + continue + } + + podResources.Insert(resource) + + if _, exists := initContainerRequests[resource]; !exists { + initContainerRequests[resource] = requested + continue + } + if requested > initContainerRequests[resource] { + initContainerRequests[resource] = requested + + } + } + } + + // Compute the sum of requests across all app containers for a given resource + appContainerRequests := make(map[string]int) + for _, container := range pod.Spec.Containers { + for resourceObj, requestedObj := range container.Resources.Limits { + resource := string(resourceObj) + requested := int(requestedObj.Value()) + + if !m.isDevicePluginResource(resource) { + continue + } + podResources.Insert(resource) + appContainerRequests[resource] += requested + } + } + + // Calculate podRequests as the max of init and app container requests for a given resource + podRequests := make(map[string]int) + for resource := range podResources { + _, initExists := initContainerRequests[resource] + _, appExists := appContainerRequests[resource] + + if initExists && !appExists { + podRequests[resource] = initContainerRequests[resource] + continue + } + + if !initExists && appExists { + podRequests[resource] = appContainerRequests[resource] + continue + } + + if initContainerRequests[resource] > appContainerRequests[resource] { + podRequests[resource] = initContainerRequests[resource] + continue + } + + podRequests[resource] = appContainerRequests[resource] + } + + return podRequests +}