mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
memory manager: implement GetPodTopologyHints method
It will return memory and hugepages hints for the whole pod. Signed-off-by: Pawel Rapacz <p.rapacz@partner.samsung.com>
This commit is contained in:
parent
abb94bec51
commit
18c8a821e0
@ -18,8 +18,8 @@ package memorymanager
|
||||
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
@ -60,6 +60,11 @@ func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
|
||||
return map[string][]topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
klog.Infof("[fake memorymanager] Get Pod Topology Hints")
|
||||
return map[string][]topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
func (m *fakeManager) State() state.Reader {
|
||||
return m.state
|
||||
}
|
||||
|
@ -76,6 +76,11 @@ type Manager interface {
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
|
||||
|
||||
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
@ -246,6 +251,14 @@ func (m *manager) State() state.Reader {
|
||||
return m.state
|
||||
}
|
||||
|
||||
// GetPodTopologyHints returns the topology hints for the topology manager
|
||||
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
m.removeStaleState()
|
||||
// Delegate to active policy
|
||||
return m.policy.GetPodTopologyHints(m.state, pod)
|
||||
}
|
||||
|
||||
// GetTopologyHints returns the topology hints for the topology manager
|
||||
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
|
@ -11,6 +11,10 @@ import (
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -20,7 +24,67 @@ const (
|
||||
|
||||
type nodeResources map[v1.ResourceName]resource.Quantity
|
||||
|
||||
// validateReservedMemory
|
||||
type mockPolicy struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (p *mockPolicy) Name() string {
|
||||
return string(policyTypeMock)
|
||||
}
|
||||
|
||||
func (p *mockPolicy) Start(s state.State) error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
func (p *mockPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockRuntimeService struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (rt mockRuntimeService) UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error {
|
||||
return rt.err
|
||||
}
|
||||
|
||||
type mockPodStatusProvider struct {
|
||||
podStatus v1.PodStatus
|
||||
found bool
|
||||
}
|
||||
|
||||
func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
|
||||
return psp.podStatus, psp.found
|
||||
}
|
||||
|
||||
func getPod(podUID string, containerName string, requirements *v1.ResourceRequirements) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: types.UID(podUID),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: containerName,
|
||||
Resources: *requirements,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidatePreReservedMemory(t *testing.T) {
|
||||
const msgNotEqual = "the total amount of memory of type \"%s\" is not equal to the value determined by Node Allocatable feature"
|
||||
testCases := []struct {
|
||||
@ -106,6 +170,7 @@ func TestValidatePreReservedMemory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// validateReservedMemory
|
||||
func TestConvertPreReserved(t *testing.T) {
|
||||
machineInfo := info.MachineInfo{
|
||||
Topology: []info.Node{
|
||||
|
@ -37,4 +37,8 @@ type Policy interface {
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
|
||||
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
|
||||
}
|
||||
|
@ -59,3 +59,10 @@ func (p *none) RemoveContainer(s state.State, podUID string, containerName strin
|
||||
func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
return nil
|
||||
}
|
||||
|
@ -147,9 +147,9 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
})
|
||||
|
||||
// Update nodes memory state
|
||||
for _, nodeId := range maskBits {
|
||||
machineState[nodeId].NumberOfAssignments++
|
||||
machineState[nodeId].Nodes = maskBits
|
||||
for _, nodeID := range maskBits {
|
||||
machineState[nodeID].NumberOfAssignments++
|
||||
machineState[nodeID].Nodes = maskBits
|
||||
|
||||
// we need to continue to update all affinity mask nodes
|
||||
if requestedSize == 0 {
|
||||
@ -157,7 +157,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
}
|
||||
|
||||
// update the node memory state
|
||||
nodeResourceMemoryState := machineState[nodeId].MemoryMap[resourceName]
|
||||
nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
|
||||
if nodeResourceMemoryState.Free <= 0 {
|
||||
continue
|
||||
}
|
||||
@ -197,12 +197,12 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
|
||||
machineState := s.GetMachineState()
|
||||
for _, b := range blocks {
|
||||
releasedSize := b.Size
|
||||
for _, nodeId := range b.NUMAAffinity {
|
||||
machineState[nodeId].NumberOfAssignments--
|
||||
for _, nodeID := range b.NUMAAffinity {
|
||||
machineState[nodeID].NumberOfAssignments--
|
||||
|
||||
// once we do not have any memory allocations on this node, clear node groups
|
||||
if machineState[nodeId].NumberOfAssignments == 0 {
|
||||
machineState[nodeId].Nodes = []int{nodeId}
|
||||
if machineState[nodeID].NumberOfAssignments == 0 {
|
||||
machineState[nodeID].Nodes = []int{nodeID}
|
||||
}
|
||||
|
||||
// we still need to pass over all NUMA node under the affinity mask to update them
|
||||
@ -210,7 +210,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
|
||||
continue
|
||||
}
|
||||
|
||||
nodeResourceMemoryState := machineState[nodeId].MemoryMap[b.Type]
|
||||
nodeResourceMemoryState := machineState[nodeID].MemoryMap[b.Type]
|
||||
|
||||
// if the node does not have reserved memory to free, continue to the next node
|
||||
if nodeResourceMemoryState.Reserved == 0 {
|
||||
@ -238,6 +238,110 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
|
||||
return nil
|
||||
}
|
||||
|
||||
func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
|
||||
hints := map[string][]topologymanager.TopologyHint{}
|
||||
for resourceName := range reqRsrc {
|
||||
hints[string(resourceName)] = []topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
if len(ctnBlocks) != len(reqRsrc) {
|
||||
klog.Errorf("[memorymanager] The number of requested resources by the container %s differs from the number of memory blocks", ctn.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, b := range ctnBlocks {
|
||||
if _, ok := reqRsrc[b.Type]; !ok {
|
||||
klog.Errorf("[memorymanager] Container %s requested resources do not have resource of type %s", ctn.Name, b.Type)
|
||||
return nil
|
||||
}
|
||||
|
||||
if b.Size != reqRsrc[b.Type] {
|
||||
klog.Errorf("[memorymanager] Memory %s already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", b.Type, pod.UID, ctn.Name, reqRsrc[b.Type], b.Size)
|
||||
return nil
|
||||
}
|
||||
|
||||
containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
|
||||
if err != nil {
|
||||
klog.Errorf("[memorymanager] failed to generate NUMA bitmask: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.Infof("[memorymanager] Regenerating TopologyHints, %s was already allocated to (pod %v, container %v)", b.Type, pod.UID, ctn.Name)
|
||||
hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
|
||||
NUMANodeAffinity: containerNUMAAffinity,
|
||||
Preferred: true,
|
||||
})
|
||||
}
|
||||
return hints
|
||||
}
|
||||
|
||||
func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) {
|
||||
reqRsrcsByInitCtrs := make(map[v1.ResourceName]uint64)
|
||||
reqRsrcsByAppCtrs := make(map[v1.ResourceName]uint64)
|
||||
|
||||
for _, ctr := range pod.Spec.InitContainers {
|
||||
reqRsrcs, err := getRequestedResources(&ctr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rsrcName, qty := range reqRsrcs {
|
||||
if _, ok := reqRsrcsByInitCtrs[rsrcName]; !ok {
|
||||
reqRsrcsByInitCtrs[rsrcName] = uint64(0)
|
||||
}
|
||||
|
||||
if reqRsrcs[rsrcName] > reqRsrcsByInitCtrs[rsrcName] {
|
||||
reqRsrcsByInitCtrs[rsrcName] = qty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, ctr := range pod.Spec.Containers {
|
||||
reqRsrcs, err := getRequestedResources(&ctr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for rsrcName, qty := range reqRsrcs {
|
||||
if _, ok := reqRsrcsByAppCtrs[rsrcName]; !ok {
|
||||
reqRsrcsByAppCtrs[rsrcName] = uint64(0)
|
||||
}
|
||||
|
||||
reqRsrcsByAppCtrs[rsrcName] += qty
|
||||
}
|
||||
}
|
||||
|
||||
for rsrcName := range reqRsrcsByAppCtrs {
|
||||
if reqRsrcsByInitCtrs[rsrcName] > reqRsrcsByAppCtrs[rsrcName] {
|
||||
reqRsrcsByAppCtrs[rsrcName] = reqRsrcsByInitCtrs[rsrcName]
|
||||
}
|
||||
}
|
||||
return reqRsrcsByAppCtrs, nil
|
||||
}
|
||||
|
||||
func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
|
||||
return nil
|
||||
}
|
||||
|
||||
reqRsrcs, err := getPodRequestedResources(pod)
|
||||
if err != nil {
|
||||
klog.Error(err.Error())
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
|
||||
containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name)
|
||||
// Short circuit to regenerate the same hints if there are already
|
||||
// memory allocated for the container. This might happen after a
|
||||
// kubelet restart, for example.
|
||||
if containerBlocks != nil {
|
||||
return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
|
||||
}
|
||||
}
|
||||
return p.calculateHints(s, reqRsrcs)
|
||||
}
|
||||
|
||||
// GetTopologyHints implements the topologymanager.HintProvider Interface
|
||||
// and is consulted to achieve NUMA aware resource alignment among this
|
||||
// and other resource controllers.
|
||||
@ -252,45 +356,12 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
|
||||
return nil
|
||||
}
|
||||
|
||||
hints := map[string][]topologymanager.TopologyHint{}
|
||||
for resourceName := range requestedResources {
|
||||
hints[string(resourceName)] = []topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name)
|
||||
// Short circuit to regenerate the same hints if there are already
|
||||
// memory allocated for the container. This might happen after a
|
||||
// kubelet restart, for example.
|
||||
if containerBlocks != nil {
|
||||
if len(containerBlocks) != len(requestedResources) {
|
||||
klog.Errorf("[memorymanager] The number of requested resources by the container %s differs from the number of memory blocks", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, b := range containerBlocks {
|
||||
if _, ok := requestedResources[b.Type]; !ok {
|
||||
klog.Errorf("[memorymanager] Container %s requested resources do not have resource of type %s", container.Name, b.Type)
|
||||
return nil
|
||||
}
|
||||
|
||||
if b.Size != requestedResources[b.Type] {
|
||||
klog.Errorf("[memorymanager] Memory %s already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", b.Type, pod.UID, container.Name, requestedResources[b.Type], b.Size)
|
||||
return nil
|
||||
}
|
||||
|
||||
containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
|
||||
if err != nil {
|
||||
klog.Errorf("[memorymanager] failed to generate NUMA bitmask: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.Infof("[memorymanager] Regenerating TopologyHints, %s was already allocated to (pod %v, container %v)", b.Type, pod.UID, container.Name)
|
||||
hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
|
||||
NUMANodeAffinity: containerNUMAAffinity,
|
||||
Preferred: true,
|
||||
})
|
||||
}
|
||||
return hints
|
||||
return regenerateHints(pod, container, containerBlocks, requestedResources)
|
||||
}
|
||||
|
||||
return p.calculateHints(s, requestedResources)
|
||||
@ -503,10 +574,10 @@ func areMachineStatesEqual(ms1, ms2 state.NodeMap) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
for nodeId, nodeState1 := range ms1 {
|
||||
nodeState2, ok := ms2[nodeId]
|
||||
for nodeID, nodeState1 := range ms1 {
|
||||
nodeState2, ok := ms2[nodeID]
|
||||
if !ok {
|
||||
klog.Errorf("[memorymanager] node state does not have node ID %d", nodeId)
|
||||
klog.Errorf("[memorymanager] node state does not have node ID %d", nodeID)
|
||||
return false
|
||||
}
|
||||
|
||||
@ -533,7 +604,7 @@ func areMachineStatesEqual(ms1, ms2 state.NodeMap) bool {
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(*memoryState1, *memoryState2) {
|
||||
klog.Errorf("[memorymanager] memory states for the NUMA node %d and the resource %s are different: %+v != %+v", nodeId, resourceName, *memoryState1, *memoryState2)
|
||||
klog.Errorf("[memorymanager] memory states for the NUMA node %d and the resource %s are different: %+v != %+v", nodeID, resourceName, *memoryState1, *memoryState2)
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -590,9 +661,9 @@ func (p *staticPolicy) getDefaultMachineState() state.NodeMap {
|
||||
return defaultMachineState
|
||||
}
|
||||
|
||||
func (p *staticPolicy) getResourceSystemReserved(nodeId int, resourceName v1.ResourceName) uint64 {
|
||||
func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.ResourceName) uint64 {
|
||||
var systemReserved uint64
|
||||
if nodeSystemReserved, ok := p.systemReserved[nodeId]; ok {
|
||||
if nodeSystemReserved, ok := p.systemReserved[nodeID]; ok {
|
||||
if nodeMemorySystemReserved, ok := nodeSystemReserved[resourceName]; ok {
|
||||
systemReserved = nodeMemorySystemReserved
|
||||
}
|
||||
@ -612,12 +683,12 @@ func (p *staticPolicy) getDefaultHint(s state.State, requestedResources map[v1.R
|
||||
|
||||
func isAffinitySatisfyRequest(machineState state.NodeMap, mask bitmask.BitMask, requestedResources map[v1.ResourceName]uint64) bool {
|
||||
totalFreeSize := map[v1.ResourceName]uint64{}
|
||||
for _, nodeId := range mask.GetBits() {
|
||||
for _, nodeID := range mask.GetBits() {
|
||||
for resourceName := range requestedResources {
|
||||
if _, ok := totalFreeSize[resourceName]; !ok {
|
||||
totalFreeSize[resourceName] = 0
|
||||
}
|
||||
totalFreeSize[resourceName] += machineState[nodeId].MemoryMap[resourceName].Free
|
||||
totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user