From 18c8a821e0fe4bf46dffded61486a816d6a0e1da Mon Sep 17 00:00:00 2001
From: Pawel Rapacz
Date: Fri, 25 Sep 2020 15:53:34 +0200
Subject: [PATCH] memory manager: implement GetPodTopologyHints method
It will return memory and hugepages hints for the whole pod.
Signed-off-by: Pawel Rapacz
---
.../cm/memorymanager/fake_memory_manager.go | 9 +-
.../cm/memorymanager/memory_manager.go | 13 ++
.../cm/memorymanager/memory_manager_test.go | 67 ++++++-
pkg/kubelet/cm/memorymanager/policy.go | 4 +
pkg/kubelet/cm/memorymanager/policy_none.go | 7 +
pkg/kubelet/cm/memorymanager/policy_static.go | 173 ++++++++++++------
6 files changed, 219 insertions(+), 54 deletions(-)
diff --git a/pkg/kubelet/cm/memorymanager/fake_memory_manager.go b/pkg/kubelet/cm/memorymanager/fake_memory_manager.go
index 334476f702e..1cbe0171013 100644
--- a/pkg/kubelet/cm/memorymanager/fake_memory_manager.go
+++ b/pkg/kubelet/cm/memorymanager/fake_memory_manager.go
@@ -18,8 +18,8 @@ package memorymanager
import (
v1 "k8s.io/api/core/v1"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
+ "k8s.io/klog/v2"
+ "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -60,6 +60,11 @@ func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
return map[string][]topologymanager.TopologyHint{}
}
+func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ klog.Infof("[fake memorymanager] Get Pod Topology Hints")
+ return map[string][]topologymanager.TopologyHint{}
+}
+
func (m *fakeManager) State() state.Reader {
return m.state
}
diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go
index 0b711244c47..3f906ac68f9 100644
--- a/pkg/kubelet/cm/memorymanager/memory_manager.go
+++ b/pkg/kubelet/cm/memorymanager/memory_manager.go
@@ -76,6 +76,11 @@ type Manager interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
+
+ // GetPodTopologyHints implements the topologymanager.HintProvider Interface
+ // and is consulted to achieve NUMA aware resource alignment among this
+ // and other resource controllers.
+ GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
}
type manager struct {
@@ -246,6 +251,14 @@ func (m *manager) State() state.Reader {
return m.state
}
+// GetPodTopologyHints returns the topology hints for the topology manager
+func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ // Garbage collect any stranded resources before providing TopologyHints
+ m.removeStaleState()
+ // Delegate to active policy
+ return m.policy.GetPodTopologyHints(m.state, pod)
+}
+
// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go
index ae6ec42e6ad..aed29893d88 100644
--- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go
+++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go
@@ -11,6 +11,10 @@ import (
info "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
+ "k8s.io/apimachinery/pkg/types"
+ runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
+ "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
+ "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
const (
@@ -20,7 +24,67 @@ const (
type nodeResources map[v1.ResourceName]resource.Quantity
-// validateReservedMemory
+type mockPolicy struct {
+ err error
+}
+
+func (p *mockPolicy) Name() string {
+ return string(policyTypeMock)
+}
+
+func (p *mockPolicy) Start(s state.State) error {
+ return p.err
+}
+
+func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
+ return p.err
+}
+
+func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
+ return p.err
+}
+
+func (p *mockPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
+ return nil
+}
+
+func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ return nil
+}
+
+type mockRuntimeService struct {
+ err error
+}
+
+func (rt mockRuntimeService) UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error {
+ return rt.err
+}
+
+type mockPodStatusProvider struct {
+ podStatus v1.PodStatus
+ found bool
+}
+
+func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
+ return psp.podStatus, psp.found
+}
+
+func getPod(podUID string, containerName string, requirements *v1.ResourceRequirements) *v1.Pod {
+ return &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ UID: types.UID(podUID),
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: containerName,
+ Resources: *requirements,
+ },
+ },
+ },
+ }
+}
+
func TestValidatePreReservedMemory(t *testing.T) {
const msgNotEqual = "the total amount of memory of type \"%s\" is not equal to the value determined by Node Allocatable feature"
testCases := []struct {
@@ -106,6 +170,7 @@ func TestValidatePreReservedMemory(t *testing.T) {
}
}
+// validateReservedMemory
func TestConvertPreReserved(t *testing.T) {
machineInfo := info.MachineInfo{
Topology: []info.Node{
diff --git a/pkg/kubelet/cm/memorymanager/policy.go b/pkg/kubelet/cm/memorymanager/policy.go
index dea23b335e3..8d84c71f137 100644
--- a/pkg/kubelet/cm/memorymanager/policy.go
+++ b/pkg/kubelet/cm/memorymanager/policy.go
@@ -37,4 +37,8 @@ type Policy interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
+ // GetPodTopologyHints implements the topologymanager.HintProvider Interface
+ // and is consulted to achieve NUMA aware resource alignment among this
+ // and other resource controllers.
+ GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
}
diff --git a/pkg/kubelet/cm/memorymanager/policy_none.go b/pkg/kubelet/cm/memorymanager/policy_none.go
index e91c3ce3439..a8b6b778520 100644
--- a/pkg/kubelet/cm/memorymanager/policy_none.go
+++ b/pkg/kubelet/cm/memorymanager/policy_none.go
@@ -59,3 +59,10 @@ func (p *none) RemoveContainer(s state.State, podUID string, containerName strin
func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return nil
}
+
+// GetPodTopologyHints implements the topologymanager.HintProvider Interface
+// and is consulted to achieve NUMA aware resource alignment among this
+// and other resource controllers.
+func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ return nil
+}
diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go
index 5cee6851448..c26b4efb265 100644
--- a/pkg/kubelet/cm/memorymanager/policy_static.go
+++ b/pkg/kubelet/cm/memorymanager/policy_static.go
@@ -147,9 +147,9 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
})
// Update nodes memory state
- for _, nodeId := range maskBits {
- machineState[nodeId].NumberOfAssignments++
- machineState[nodeId].Nodes = maskBits
+ for _, nodeID := range maskBits {
+ machineState[nodeID].NumberOfAssignments++
+ machineState[nodeID].Nodes = maskBits
// we need to continue to update all affinity mask nodes
if requestedSize == 0 {
@@ -157,7 +157,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
}
// update the node memory state
- nodeResourceMemoryState := machineState[nodeId].MemoryMap[resourceName]
+ nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
if nodeResourceMemoryState.Free <= 0 {
continue
}
@@ -197,12 +197,12 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
machineState := s.GetMachineState()
for _, b := range blocks {
releasedSize := b.Size
- for _, nodeId := range b.NUMAAffinity {
- machineState[nodeId].NumberOfAssignments--
+ for _, nodeID := range b.NUMAAffinity {
+ machineState[nodeID].NumberOfAssignments--
// once we do not have any memory allocations on this node, clear node groups
- if machineState[nodeId].NumberOfAssignments == 0 {
- machineState[nodeId].Nodes = []int{nodeId}
+ if machineState[nodeID].NumberOfAssignments == 0 {
+ machineState[nodeID].Nodes = []int{nodeID}
}
// we still need to pass over all NUMA node under the affinity mask to update them
@@ -210,7 +210,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
continue
}
- nodeResourceMemoryState := machineState[nodeId].MemoryMap[b.Type]
+ nodeResourceMemoryState := machineState[nodeID].MemoryMap[b.Type]
// if the node does not have reserved memory to free, continue to the next node
if nodeResourceMemoryState.Reserved == 0 {
@@ -238,6 +238,110 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
return nil
}
+func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
+ hints := map[string][]topologymanager.TopologyHint{}
+ for resourceName := range reqRsrc {
+ hints[string(resourceName)] = []topologymanager.TopologyHint{}
+ }
+
+ if len(ctnBlocks) != len(reqRsrc) {
+ klog.Errorf("[memorymanager] The number of requested resources by the container %s differs from the number of memory blocks", ctn.Name)
+ return nil
+ }
+
+ for _, b := range ctnBlocks {
+ if _, ok := reqRsrc[b.Type]; !ok {
+ klog.Errorf("[memorymanager] Container %s requested resources do not have resource of type %s", ctn.Name, b.Type)
+ return nil
+ }
+
+ if b.Size != reqRsrc[b.Type] {
+ klog.Errorf("[memorymanager] Memory %s already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", b.Type, pod.UID, ctn.Name, reqRsrc[b.Type], b.Size)
+ return nil
+ }
+
+ containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
+ if err != nil {
+ klog.Errorf("[memorymanager] failed to generate NUMA bitmask: %v", err)
+ return nil
+ }
+
+ klog.Infof("[memorymanager] Regenerating TopologyHints, %s was already allocated to (pod %v, container %v)", b.Type, pod.UID, ctn.Name)
+ hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
+ NUMANodeAffinity: containerNUMAAffinity,
+ Preferred: true,
+ })
+ }
+ return hints
+}
+
+func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) {
+ reqRsrcsByInitCtrs := make(map[v1.ResourceName]uint64)
+ reqRsrcsByAppCtrs := make(map[v1.ResourceName]uint64)
+
+ for _, ctr := range pod.Spec.InitContainers {
+ reqRsrcs, err := getRequestedResources(&ctr)
+
+ if err != nil {
+ return nil, err
+ }
+ for rsrcName, qty := range reqRsrcs {
+ if _, ok := reqRsrcsByInitCtrs[rsrcName]; !ok {
+ reqRsrcsByInitCtrs[rsrcName] = uint64(0)
+ }
+
+ if reqRsrcs[rsrcName] > reqRsrcsByInitCtrs[rsrcName] {
+ reqRsrcsByInitCtrs[rsrcName] = qty
+ }
+ }
+ }
+
+ for _, ctr := range pod.Spec.Containers {
+ reqRsrcs, err := getRequestedResources(&ctr)
+
+ if err != nil {
+ return nil, err
+ }
+ for rsrcName, qty := range reqRsrcs {
+ if _, ok := reqRsrcsByAppCtrs[rsrcName]; !ok {
+ reqRsrcsByAppCtrs[rsrcName] = uint64(0)
+ }
+
+ reqRsrcsByAppCtrs[rsrcName] += qty
+ }
+ }
+
+ for rsrcName := range reqRsrcsByAppCtrs {
+ if reqRsrcsByInitCtrs[rsrcName] > reqRsrcsByAppCtrs[rsrcName] {
+ reqRsrcsByAppCtrs[rsrcName] = reqRsrcsByInitCtrs[rsrcName]
+ }
+ }
+ return reqRsrcsByAppCtrs, nil
+}
+
+func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
+ return nil
+ }
+
+ reqRsrcs, err := getPodRequestedResources(pod)
+ if err != nil {
+ klog.Error(err.Error())
+ return nil
+ }
+
+ for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
+ containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name)
+ // Short circuit to regenerate the same hints if there are already
+ // memory allocated for the container. This might happen after a
+ // kubelet restart, for example.
+ if containerBlocks != nil {
+ return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
+ }
+ }
+ return p.calculateHints(s, reqRsrcs)
+}
+
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
@@ -252,45 +356,12 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
return nil
}
- hints := map[string][]topologymanager.TopologyHint{}
- for resourceName := range requestedResources {
- hints[string(resourceName)] = []topologymanager.TopologyHint{}
- }
-
containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name)
// Short circuit to regenerate the same hints if there are already
// memory allocated for the container. This might happen after a
// kubelet restart, for example.
if containerBlocks != nil {
- if len(containerBlocks) != len(requestedResources) {
- klog.Errorf("[memorymanager] The number of requested resources by the container %s differs from the number of memory blocks", container.Name)
- return nil
- }
-
- for _, b := range containerBlocks {
- if _, ok := requestedResources[b.Type]; !ok {
- klog.Errorf("[memorymanager] Container %s requested resources do not have resource of type %s", container.Name, b.Type)
- return nil
- }
-
- if b.Size != requestedResources[b.Type] {
- klog.Errorf("[memorymanager] Memory %s already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", b.Type, pod.UID, container.Name, requestedResources[b.Type], b.Size)
- return nil
- }
-
- containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
- if err != nil {
- klog.Errorf("[memorymanager] failed to generate NUMA bitmask: %v", err)
- return nil
- }
-
- klog.Infof("[memorymanager] Regenerating TopologyHints, %s was already allocated to (pod %v, container %v)", b.Type, pod.UID, container.Name)
- hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
- NUMANodeAffinity: containerNUMAAffinity,
- Preferred: true,
- })
- }
- return hints
+ return regenerateHints(pod, container, containerBlocks, requestedResources)
}
return p.calculateHints(s, requestedResources)
@@ -503,10 +574,10 @@ func areMachineStatesEqual(ms1, ms2 state.NodeMap) bool {
return false
}
- for nodeId, nodeState1 := range ms1 {
- nodeState2, ok := ms2[nodeId]
+ for nodeID, nodeState1 := range ms1 {
+ nodeState2, ok := ms2[nodeID]
if !ok {
- klog.Errorf("[memorymanager] node state does not have node ID %d", nodeId)
+ klog.Errorf("[memorymanager] node state does not have node ID %d", nodeID)
return false
}
@@ -533,7 +604,7 @@ func areMachineStatesEqual(ms1, ms2 state.NodeMap) bool {
}
if !reflect.DeepEqual(*memoryState1, *memoryState2) {
- klog.Errorf("[memorymanager] memory states for the NUMA node %d and the resource %s are different: %+v != %+v", nodeId, resourceName, *memoryState1, *memoryState2)
+ klog.Errorf("[memorymanager] memory states for the NUMA node %d and the resource %s are different: %+v != %+v", nodeID, resourceName, *memoryState1, *memoryState2)
return false
}
}
@@ -590,9 +661,9 @@ func (p *staticPolicy) getDefaultMachineState() state.NodeMap {
return defaultMachineState
}
-func (p *staticPolicy) getResourceSystemReserved(nodeId int, resourceName v1.ResourceName) uint64 {
+func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.ResourceName) uint64 {
var systemReserved uint64
- if nodeSystemReserved, ok := p.systemReserved[nodeId]; ok {
+ if nodeSystemReserved, ok := p.systemReserved[nodeID]; ok {
if nodeMemorySystemReserved, ok := nodeSystemReserved[resourceName]; ok {
systemReserved = nodeMemorySystemReserved
}
@@ -612,12 +683,12 @@ func (p *staticPolicy) getDefaultHint(s state.State, requestedResources map[v1.R
func isAffinitySatisfyRequest(machineState state.NodeMap, mask bitmask.BitMask, requestedResources map[v1.ResourceName]uint64) bool {
totalFreeSize := map[v1.ResourceName]uint64{}
- for _, nodeId := range mask.GetBits() {
+ for _, nodeID := range mask.GetBits() {
for resourceName := range requestedResources {
if _, ok := totalFreeSize[resourceName]; !ok {
totalFreeSize[resourceName] = 0
}
- totalFreeSize[resourceName] += machineState[nodeId].MemoryMap[resourceName].Free
+ totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
}
}