memory manager: implement the manager interface methods

The commit adds implementation for methods:
- Start
- AddContainer
- Allocate
- RemoveContainer
- State
- GetTopologyHints

Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
This commit is contained in:
Artyom Lukianov 2020-03-19 12:05:18 +02:00
parent d0caec90e2
commit 95f81372e2

View File

@ -17,13 +17,16 @@ limitations under the License.
package memorymanager
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -34,7 +37,7 @@ import (
// memoryManagerStateFileName is the file name where memory manager stores its state
const memoryManagerStateFileName = "memory_manager_state"
// ActivePodsFunc is a function that returns a list of pods to reconcile.
// ActivePodsFunc is a function that returns a list of active pods
type ActivePodsFunc func() []*v1.Pod
type runtimeService interface {
@ -61,8 +64,7 @@ type Manager interface {
Allocate(pod *v1.Pod, container *v1.Container) error
// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, the memory manager stops trying to reconcile
// that container, and any memory allocated to the container are freed.
// container. After this call, any memory allocated to the container are freed.
RemoveContainer(containerID string) error
// State returns a read-only interface to the internal memory manager state.
@ -72,11 +74,6 @@ type Manager interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
}
type manager struct {
@ -116,28 +113,132 @@ type manager struct {
var _ Manager = &manager{}
// NewManager returns new instance of the memory manager
func NewManager(reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var policy Policy
switch policyType(policyName) {
case policyTypeNone:
policy = NewPolicyNone()
case policyTypeStatic:
reserved, err := getReservedMemory(machineInfo, nodeAllocatableReservation)
if err != nil {
return nil, err
}
policy, err = NewPolicyStatic(machineInfo, reserved, affinity)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", policyName)
}
manager := &manager{
policy: policy,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
// Start starts the memory manager reconcile loop under the kubelet to keep state updated
// Start starts the memory manager under the kubelet and calls policy start
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Infof("[memorymanager] starting with %s policy", m.policy.Name())
m.sourcesReady = sourcesReady
m.activePods = activePods
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
m.containerMap = initialContainers
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name(), m.containerMap)
if err != nil {
klog.Errorf("[memorymanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
return err
}
m.state = stateImpl
err = m.policy.Start(m.state)
if err != nil {
klog.Errorf("[memorymanager] policy start error: %v", err)
return err
}
return nil
}
// AddContainer saves the value of requested memory for the guranteed pod under the state and set memory affinity according to the topolgy manager
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
// AddContainer saves the value of requested memory for the guaranteed pod under the state and set memory affinity according to the topolgy manager
func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
m.Lock()
m.containerMap.Add(string(pod.UID), container.Name, containerID)
m.Unlock()
// Get NUMA node affinity of blocks assigned to the container during Allocate()
var nodes []string
for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) {
for _, nodeID := range block.NUMAAffinity {
nodes = append(nodes, strconv.Itoa(nodeID))
}
}
if len(nodes) < 1 {
klog.V(5).Infof("[memorymanager] update container resources is skipped due to memory blocks are empty")
return nil
}
affinity := strings.Join(nodes, ",")
klog.Infof("[memorymanager] Set container %q cpuset.mems to %q", containerID, affinity)
err := m.containerRuntime.UpdateContainerResources(containerID, &runtimeapi.LinuxContainerResources{CpusetMems: affinity})
if err != nil {
klog.Errorf("[memorymanager] AddContainer error: error updating cpuset.mems for container (pod: %s, container: %s, container id: %s, err: %v)", pod.Name, container.Name, containerID, err)
m.Lock()
err = m.policyRemoveContainerByRef(string(pod.UID), container.Name)
if err != nil {
klog.Errorf("[memorymanager] AddContainer rollback state error: %v", err)
}
m.Unlock()
}
return err
}
// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// Garbage collect any stranded resources before allocation
m.removeStaleState()
m.Lock()
defer m.Unlock()
// Call down into the policy to assign this container memory if required.
if err := m.policy.Allocate(m.state, pod, container); err != nil {
klog.Errorf("[memorymanager] Allocate error: %v", err)
return err
}
return nil
}
// RemoveContainer removes the container from the state
func (m *manager) RemoveContainer(containerID string) error {
m.Lock()
defer m.Unlock()
// if error appears it means container entry already does not exist under the container map
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
klog.Warningf("[memorymanager] Failed to get container %s from container map error: %v", containerID, err)
return nil
}
err = m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[memorymanager] RemoveContainer error: %v", err)
return err
}
return nil
}
// State returns the state of the manager
@ -147,5 +248,83 @@ func (m *manager) State() state.Reader {
// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}
// TODO: consider to move this method to manager interface, the only difference between CPU manager is assignments, we can send it to the method
func (m *manager) removeStaleState() {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
// the actual active pods list.
if !m.sourcesReady.AllReady() {
return
}
// We grab the lock to ensure that no new containers will grab memory block while
// executing the code below. Without this lock, its possible that we end up
// removing state that is newly added by an asynchronous call to
// AddContainer() during the execution of this code.
m.Lock()
defer m.Unlock()
// Get the list of active pods.
activePods := m.activePods()
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
}
}
// Loop through the MemoryManager state. Remove any state for containers not
// in the `activeContainers` list built above.
assignments := m.state.GetMemoryAssignments()
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Infof("[memorymanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[memorymanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
}
}
}
}
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
err := m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
return err
}
func getReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList) (systemReservedMemory, error) {
// TODO: we should add new kubelet parameter, and to get reserved memory per NUMA node from it
// currently we use kube-reserved + system-reserved + eviction reserve for each NUMA node, that creates memory over-consumption
// and no reservation for huge pages
reserved := systemReservedMemory{}
for _, node := range machineInfo.Topology {
memory := nodeAllocatableReservation[v1.ResourceMemory]
if memory.IsZero() {
break
}
value, succeeded := memory.AsInt64()
if !succeeded {
return nil, fmt.Errorf("failed to represent reserved memory as int64")
}
reserved[node.Id] = map[v1.ResourceName]uint64{
v1.ResourceMemory: uint64(value),
}
}
return reserved, nil
}