From 9ae499ae46fa1c08d1da9a4cd7e7dff01012aa21 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Thu, 8 Oct 2020 18:47:57 +0300 Subject: [PATCH] memory manager: pass memory manager flags to the container manager Pass memory manager flags to the container manager and call all relevant memory manager methods under the container manager. Signed-off-by: Byonggon Chun --- pkg/kubelet/cm/container_manager.go | 19 ++++---- pkg/kubelet/cm/container_manager_linux.go | 47 ++++++++++++++++++- .../cm/internal_container_lifecycle.go | 9 ++++ .../cm/memorymanager/memory_manager.go | 19 ++++++-- 4 files changed, 81 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index ea81d6163a7..fcebd79fd75 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -19,6 +19,7 @@ package cm import ( "time" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" // TODO: Migrate kubelet to either use its own internal objects or client library. v1 "k8s.io/api/core/v1" @@ -131,14 +132,16 @@ type NodeConfig struct { KubeletRootDir string ProtectKernelDefaults bool NodeAllocatableConfig - QOSReserved map[v1.ResourceName]int64 - ExperimentalCPUManagerPolicy string - ExperimentalTopologyManagerScope string - ExperimentalCPUManagerReconcilePeriod time.Duration - ExperimentalPodPidsLimit int64 - EnforceCPULimits bool - CPUCFSQuotaPeriod time.Duration - ExperimentalTopologyManagerPolicy string + QOSReserved map[v1.ResourceName]int64 + ExperimentalCPUManagerPolicy string + ExperimentalTopologyManagerScope string + ExperimentalCPUManagerReconcilePeriod time.Duration + ExperimentalMemoryManagerPolicy string + ExperimentalMemoryManagerReservedMemory map[int]map[v1.ResourceName]resource.Quantity + ExperimentalPodPidsLimit int64 + EnforceCPULimits bool + CPUCFSQuotaPeriod time.Duration + ExperimentalTopologyManagerPolicy string } type NodeAllocatableConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 8db1638d179..a514ce68c43 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -53,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" "k8s.io/kubernetes/pkg/kubelet/config" @@ -138,6 +139,8 @@ type containerManagerImpl struct { deviceManager devicemanager.Manager // Interface for CPU affinity management. cpuManager cpumanager.Manager + // Interface for memory affinity management. + memoryManager memorymanager.Manager // Interface for Topology resource co-ordination topologyManager topologymanager.Manager } @@ -341,6 +344,22 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cm.topologyManager.AddHintProvider(cm.cpuManager) } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { + cm.memoryManager, err = memorymanager.NewManager( + nodeConfig.ExperimentalMemoryManagerPolicy, + machineInfo, + cm.GetNodeAllocatableReservation(), + nodeConfig.ExperimentalMemoryManagerReservedMemory, + nodeConfig.KubeletRootDir, + cm.topologyManager, + ) + if err != nil { + klog.Errorf("failed to initialize memory manager: %v", err) + return nil, err + } + cm.topologyManager.AddHintProvider(cm.memoryManager) + } + return cm, nil } @@ -364,7 +383,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager} + return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager} } // Create a cgroup container manager. @@ -606,6 +625,18 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } } + // Initialize memory manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { + containerMap, err := buildContainerMapFromRuntime(runtimeService) + if err != nil { + return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) + } + err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) + if err != nil { + return fmt.Errorf("start memory manager error: %v", err) + } + } + // cache the node Info including resource capacity and // allocatable of the node cm.nodeInfo = node @@ -706,11 +737,12 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle. // work as we add more and more hint providers that the TopologyManager // needs to call Allocate() on (that may not be directly intstantiated // inside this component). - return &resourceAllocator{cm.cpuManager, cm.deviceManager} + return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager} } type resourceAllocator struct { cpuManager cpumanager.Manager + memoryManager memorymanager.Manager deviceManager devicemanager.Manager } @@ -737,6 +769,17 @@ func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle } } } + + if m.memoryManager != nil { + err = m.memoryManager.Allocate(pod, &container) + if err != nil { + return lifecycle.PodAdmitResult{ + Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), + Reason: "UnexpectedAdmissionError", + Admit: false, + } + } + } } return lifecycle.PodAdmitResult{Admit: true} diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go index 0635ea0ed4f..0d3a3357b06 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle.go +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -22,6 +22,7 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -35,6 +36,7 @@ type InternalContainerLifecycle interface { // Implements InternalContainerLifecycle interface. type internalContainerLifecycleImpl struct { cpuManager cpumanager.Manager + memoryManager memorymanager.Manager topologyManager topologymanager.Manager } @@ -43,6 +45,13 @@ func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, containe i.cpuManager.AddContainer(pod, container, containerID) } + if i.memoryManager != nil { + err := i.memoryManager.AddContainer(pod, container, containerID) + if err != nil { + return err + } + } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { err := i.topologyManager.AddContainer(pod, containerID) if err != nil { diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index b242f483e22..b298ca384d9 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -25,6 +25,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" @@ -113,7 +114,7 @@ type manager struct { var _ Manager = &manager{} // NewManager returns new instance of the memory manager -func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { +func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory map[int]map[v1.ResourceName]resource.Quantity, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var policy Policy switch policyType(policyName) { @@ -122,10 +123,11 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll policy = NewPolicyNone() case policyTypeStatic: - reserved, err := getReservedMemory(machineInfo, nodeAllocatableReservation) + reserved, err := getReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory) if err != nil { return nil, err } + policy, err = NewPolicyStatic(machineInfo, reserved, affinity) if err != nil { return nil, err @@ -307,10 +309,21 @@ func (m *manager) policyRemoveContainerByRef(podUID string, containerName string return err } -func getReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList) (systemReservedMemory, error) { +func validateReservedMemory(nodeAllocatableReservation v1.ResourceList, reservedMemory map[int]map[v1.ResourceName]resource.Quantity) error { + // TODO: this will check equality of total reserved memory by node allocatable feature and total pre-reserved memory + + return nil +} + +func getReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory map[int]map[v1.ResourceName]resource.Quantity) (systemReservedMemory, error) { // TODO: we should add new kubelet parameter, and to get reserved memory per NUMA node from it // currently we use kube-reserved + system-reserved + eviction reserve for each NUMA node, that creates memory over-consumption // and no reservation for huge pages + + if err := validateReservedMemory(nodeAllocatableReservation, reservedMemory); err != nil { + return nil, err + } + reserved := systemReservedMemory{} for _, node := range machineInfo.Topology { memory := nodeAllocatableReservation[v1.ResourceMemory]