mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
memory manager: re-use the memory allocated for init containers
The idea that during allocation phase we will: - during call to `Allocate` and `GetTopologyHints` we will take into account the init containers reusable memory, which means that we will re-use the memory and update container memory blocks accordingly. For example for the pod with two init containers that requested: 1Gi and 2Gi, and app container that requested 4Gi, we can re-use 2Gi of memory. Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
This commit is contained in:
parent
5e3bed6399
commit
b965502c49
@ -36,6 +36,7 @@ import (
|
||||
const policyTypeStatic policyType = "Static"
|
||||
|
||||
type systemReservedMemory map[int]map[v1.ResourceName]uint64
|
||||
type reusableMemory map[string]map[string]map[v1.ResourceName]uint64
|
||||
|
||||
// staticPolicy is implementation of the policy interface for the static policy
|
||||
type staticPolicy struct {
|
||||
@ -45,6 +46,8 @@ type staticPolicy struct {
|
||||
systemReserved systemReservedMemory
|
||||
// topology manager reference to get container Topology affinity
|
||||
affinity topologymanager.Store
|
||||
// initContainersReusableMemory contains the memory allocated for init containers that can be reused
|
||||
initContainersReusableMemory reusableMemory
|
||||
}
|
||||
|
||||
var _ Policy = &staticPolicy{}
|
||||
@ -65,9 +68,10 @@ func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReserv
|
||||
}
|
||||
|
||||
return &staticPolicy{
|
||||
machineInfo: machineInfo,
|
||||
systemReserved: reserved,
|
||||
affinity: affinity,
|
||||
machineInfo: machineInfo,
|
||||
systemReserved: reserved,
|
||||
affinity: affinity,
|
||||
initContainersReusableMemory: reusableMemory{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -90,14 +94,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
return nil
|
||||
}
|
||||
|
||||
podUID := string(pod.UID)
|
||||
klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
if blocks := s.GetMemoryBlocks(string(pod.UID), container.Name); blocks != nil {
|
||||
if blocks := s.GetMemoryBlocks(podUID, container.Name); blocks != nil {
|
||||
p.updatePodReusableMemory(pod, container, blocks)
|
||||
|
||||
klog.InfoS("Container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call Topology Manager to get the aligned affinity across all hint providers.
|
||||
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
|
||||
hint := p.affinity.GetAffinity(podUID, container.Name)
|
||||
klog.InfoS("Got topology affinity", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "hint", hint)
|
||||
|
||||
requestedResources, err := getRequestedResources(container)
|
||||
@ -105,11 +112,12 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
return err
|
||||
}
|
||||
|
||||
machineState := s.GetMachineState()
|
||||
bestHint := &hint
|
||||
// topology manager returned the hint with NUMA affinity nil
|
||||
// we should use the default NUMA affinity calculated the same way as for the topology manager
|
||||
if hint.NUMANodeAffinity == nil {
|
||||
defaultHint, err := p.getDefaultHint(s, requestedResources)
|
||||
defaultHint, err := p.getDefaultHint(machineState, pod, requestedResources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -120,12 +128,10 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
bestHint = defaultHint
|
||||
}
|
||||
|
||||
machineState := s.GetMachineState()
|
||||
|
||||
// topology manager returns the hint that does not satisfy completely the container request
|
||||
// we should extend this hint to the one who will satisfy the request and include the current hint
|
||||
if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) {
|
||||
extendedHint, err := p.extendTopologyManagerHint(s, requestedResources, bestHint.NUMANodeAffinity)
|
||||
extendedHint, err := p.extendTopologyManagerHint(machineState, pod, requestedResources, bestHint.NUMANodeAffinity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -146,43 +152,78 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
Type: resourceName,
|
||||
})
|
||||
|
||||
// Update nodes memory state
|
||||
for _, nodeID := range maskBits {
|
||||
machineState[nodeID].NumberOfAssignments++
|
||||
machineState[nodeID].Cells = maskBits
|
||||
|
||||
// we need to continue to update all affinity mask nodes
|
||||
if requestedSize == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// update the node memory state
|
||||
nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
|
||||
if nodeResourceMemoryState.Free <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// the node has enough memory to satisfy the request
|
||||
if nodeResourceMemoryState.Free >= requestedSize {
|
||||
nodeResourceMemoryState.Reserved += requestedSize
|
||||
nodeResourceMemoryState.Free -= requestedSize
|
||||
requestedSize = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// the node does not have enough memory, use the node remaining memory and move to the next node
|
||||
requestedSize -= nodeResourceMemoryState.Free
|
||||
nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
|
||||
nodeResourceMemoryState.Free = 0
|
||||
podReusableMemory := p.getPodReusableMemory(pod, bestHint.NUMANodeAffinity, resourceName)
|
||||
if podReusableMemory >= requestedSize {
|
||||
requestedSize = 0
|
||||
} else {
|
||||
requestedSize -= podReusableMemory
|
||||
}
|
||||
|
||||
// Update nodes memory state
|
||||
p.updateMachineState(machineState, maskBits, resourceName, requestedSize)
|
||||
}
|
||||
|
||||
p.updatePodReusableMemory(pod, container, containerBlocks)
|
||||
|
||||
s.SetMachineState(machineState)
|
||||
s.SetMemoryBlocks(string(pod.UID), container.Name, containerBlocks)
|
||||
s.SetMemoryBlocks(podUID, container.Name, containerBlocks)
|
||||
|
||||
// update init containers memory blocks to reflect the fact that we re-used init containers memory
|
||||
// it possible that the size of the init container memory block will have 0 value, when all memory
|
||||
// allocated for it was re-used
|
||||
// we only do this so that the sum(memory_for_all_containers) == total amount of allocated memory to the pod, even
|
||||
// though the fine state here doesn't accurately reflect what was (in reality) allocated to each container
|
||||
// TODO: we should refactor our state structs to reflect the amount of the re-used memory
|
||||
p.updateInitContainersMemoryBlocks(s, pod, container, containerBlocks)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *staticPolicy) updateMachineState(machineState state.NUMANodeMap, numaAffinity []int, resourceName v1.ResourceName, requestedSize uint64) {
|
||||
for _, nodeID := range numaAffinity {
|
||||
machineState[nodeID].NumberOfAssignments++
|
||||
machineState[nodeID].Cells = numaAffinity
|
||||
|
||||
// we need to continue to update all affinity mask nodes
|
||||
if requestedSize == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// update the node memory state
|
||||
nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
|
||||
if nodeResourceMemoryState.Free <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// the node has enough memory to satisfy the request
|
||||
if nodeResourceMemoryState.Free >= requestedSize {
|
||||
nodeResourceMemoryState.Reserved += requestedSize
|
||||
nodeResourceMemoryState.Free -= requestedSize
|
||||
requestedSize = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// the node does not have enough memory, use the node remaining memory and move to the next node
|
||||
requestedSize -= nodeResourceMemoryState.Free
|
||||
nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
|
||||
nodeResourceMemoryState.Free = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (p *staticPolicy) getPodReusableMemory(pod *v1.Pod, numaAffinity bitmask.BitMask, resourceName v1.ResourceName) uint64 {
|
||||
podReusableMemory, ok := p.initContainersReusableMemory[string(pod.UID)]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
numaReusableMemory, ok := podReusableMemory[numaAffinity.String()]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
return numaReusableMemory[resourceName]
|
||||
}
|
||||
|
||||
// RemoveContainer call is idempotent
|
||||
func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
|
||||
klog.InfoS("RemoveContainer", "podUID", podUID, "containerName", containerName)
|
||||
@ -339,7 +380,9 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
|
||||
return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
|
||||
}
|
||||
}
|
||||
return p.calculateHints(s, reqRsrcs)
|
||||
|
||||
// the pod topology hints calculated only once for all containers, so no need to pass re-usable state
|
||||
return p.calculateHints(s.GetMachineState(), pod, reqRsrcs)
|
||||
}
|
||||
|
||||
// GetTopologyHints implements the topologymanager.HintProvider Interface
|
||||
@ -364,7 +407,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
|
||||
return regenerateHints(pod, container, containerBlocks, requestedResources)
|
||||
}
|
||||
|
||||
return p.calculateHints(s, requestedResources)
|
||||
return p.calculateHints(s.GetMachineState(), pod, requestedResources)
|
||||
}
|
||||
|
||||
func getRequestedResources(container *v1.Container) (map[v1.ResourceName]uint64, error) {
|
||||
@ -382,8 +425,7 @@ func getRequestedResources(container *v1.Container) (map[v1.ResourceName]uint64,
|
||||
return requestedResources, nil
|
||||
}
|
||||
|
||||
func (p *staticPolicy) calculateHints(s state.State, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
|
||||
machineState := s.GetMachineState()
|
||||
func (p *staticPolicy) calculateHints(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
|
||||
var numaNodes []int
|
||||
for n := range machineState {
|
||||
numaNodes = append(numaNodes, n)
|
||||
@ -447,7 +489,8 @@ func (p *staticPolicy) calculateHints(s state.State, requestedResources map[v1.R
|
||||
|
||||
// verify that for all memory types the node mask has enough free resources
|
||||
for resourceName, requestedSize := range requestedResources {
|
||||
if totalFreeSize[resourceName] < requestedSize {
|
||||
podReusableMemory := p.getPodReusableMemory(pod, mask, resourceName)
|
||||
if totalFreeSize[resourceName]+podReusableMemory < requestedSize {
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -671,8 +714,8 @@ func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.Res
|
||||
return systemReserved
|
||||
}
|
||||
|
||||
func (p *staticPolicy) getDefaultHint(s state.State, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) {
|
||||
hints := p.calculateHints(s, requestedResources)
|
||||
func (p *staticPolicy) getDefaultHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) {
|
||||
hints := p.calculateHints(machineState, pod, requestedResources)
|
||||
if len(hints) < 1 {
|
||||
return nil, fmt.Errorf("[memorymanager] failed to get the default NUMA affinity, no NUMA nodes with enough memory is available")
|
||||
}
|
||||
@ -706,8 +749,8 @@ func isAffinitySatisfyRequest(machineState state.NUMANodeMap, mask bitmask.BitMa
|
||||
// the topology manager uses bitwise AND to merge all topology hints into the best one, so in case of the restricted policy,
|
||||
// it possible that we will get the subset of hint that we provided to the topology manager, in this case we want to extend
|
||||
// it to the original one
|
||||
func (p *staticPolicy) extendTopologyManagerHint(s state.State, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) {
|
||||
hints := p.calculateHints(s, requestedResources)
|
||||
func (p *staticPolicy) extendTopologyManagerHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) {
|
||||
hints := p.calculateHints(machineState, pod, requestedResources)
|
||||
|
||||
var filteredHints []topologymanager.TopologyHint
|
||||
// hints for all memory types should be the same, so we will check hints only for regular memory type
|
||||
@ -742,7 +785,8 @@ func isHintInGroup(hint []int, group []int) bool {
|
||||
}
|
||||
hintIndex++
|
||||
}
|
||||
return false
|
||||
|
||||
return hintIndex == len(hint)
|
||||
}
|
||||
|
||||
func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.TopologyHint {
|
||||
@ -788,3 +832,121 @@ func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block {
|
||||
}
|
||||
return allocatableMemory
|
||||
}
|
||||
|
||||
func (p *staticPolicy) updatePodReusableMemory(pod *v1.Pod, container *v1.Container, memoryBlocks []state.Block) {
|
||||
podUID := string(pod.UID)
|
||||
|
||||
// If pod entries to m.initContainersReusableMemory other than the current pod exist, delete them.
|
||||
for uid := range p.initContainersReusableMemory {
|
||||
if podUID != uid {
|
||||
delete(p.initContainersReusableMemory, podUID)
|
||||
}
|
||||
}
|
||||
|
||||
if isInitContainer(pod, container) {
|
||||
if _, ok := p.initContainersReusableMemory[podUID]; !ok {
|
||||
p.initContainersReusableMemory[podUID] = map[string]map[v1.ResourceName]uint64{}
|
||||
}
|
||||
|
||||
for _, block := range memoryBlocks {
|
||||
blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...)
|
||||
blockBitMaskString := blockBitMask.String()
|
||||
|
||||
if _, ok := p.initContainersReusableMemory[podUID][blockBitMaskString]; !ok {
|
||||
p.initContainersReusableMemory[podUID][blockBitMaskString] = map[v1.ResourceName]uint64{}
|
||||
}
|
||||
|
||||
if blockReusableMemory := p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type]; block.Size > blockReusableMemory {
|
||||
p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type] = block.Size
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// update re-usable memory once it used by the app container
|
||||
for _, block := range memoryBlocks {
|
||||
blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...)
|
||||
if podReusableMemory := p.getPodReusableMemory(pod, blockBitMask, block.Type); podReusableMemory != 0 {
|
||||
if block.Size >= podReusableMemory {
|
||||
p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] = 0
|
||||
} else {
|
||||
p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] -= block.Size
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) {
|
||||
podUID := string(pod.UID)
|
||||
|
||||
for _, containerBlock := range containerMemoryBlocks {
|
||||
blockSize := containerBlock.Size
|
||||
for _, initContainer := range pod.Spec.InitContainers {
|
||||
// we do not want to continue updates once we reach the current container
|
||||
if initContainer.Name == container.Name {
|
||||
break
|
||||
}
|
||||
|
||||
if blockSize == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
initContainerBlocks := s.GetMemoryBlocks(podUID, initContainer.Name)
|
||||
if len(initContainerBlocks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for i := range initContainerBlocks {
|
||||
initContainerBlock := &initContainerBlocks[i]
|
||||
if initContainerBlock.Size == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if initContainerBlock.Type != containerBlock.Type {
|
||||
continue
|
||||
}
|
||||
|
||||
if !isNUMAAffinitiesEqual(initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) {
|
||||
continue
|
||||
}
|
||||
|
||||
if initContainerBlock.Size > blockSize {
|
||||
initContainerBlock.Size -= blockSize
|
||||
blockSize = 0
|
||||
} else {
|
||||
blockSize -= initContainerBlock.Size
|
||||
initContainerBlock.Size = 0
|
||||
}
|
||||
}
|
||||
|
||||
s.SetMemoryBlocks(podUID, initContainer.Name, initContainerBlocks)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isInitContainer(pod *v1.Pod, container *v1.Container) bool {
|
||||
for _, initContainer := range pod.Spec.InitContainers {
|
||||
if initContainer.Name == container.Name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func isNUMAAffinitiesEqual(numaAffinity1, numaAffinity2 []int) bool {
|
||||
bitMask1, err := bitmask.NewBitMask(numaAffinity1...)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to create bit mask", numaAffinity1)
|
||||
return false
|
||||
}
|
||||
|
||||
bitMask2, err := bitmask.NewBitMask(numaAffinity2...)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to create bit mask", numaAffinity2)
|
||||
return false
|
||||
}
|
||||
|
||||
return bitMask1.IsEqual(bitMask2)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user