memory manager: provide and use the checkpoint manager

The checkpoint manager provides a way to save the memory manager
`MemoryTable` both under the memory and under the state file.

Saving the `MemoryTable` under the state file can be useful when kubelet
restarted and you want to restore memory allocations for running containers.
Also, it provides a way to monitor memory allocations done by the memory manager,
and in the future, the state file content can be exposed under the pod metrics.

Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
This commit is contained in:
Artyom Lukianov 2020-03-10 15:36:16 +02:00
parent 4c75be0604
commit 48ca6e53e6
5 changed files with 562 additions and 0 deletions

View File

@ -0,0 +1,57 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"state.go",
"state_checkpoint.go",
"state_file.go",
"state_mem.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"state_checkpoint_test.go",
"state_compatibility_test.go",
"state_file_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/cm/cpumanager/state/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,65 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &MemoryManagerCheckpoint{}
// MemoryManagerCheckpoint struct is used to store memory/pod assignments in a checkpoint
type MemoryManagerCheckpoint struct {
PolicyName string `json:"policyName"`
MachineState NodeMap `json:"machineState"`
Entries ContainerMemoryAssignments `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewMemoryManagerCheckpoint returns an instance of Checkpoint
func NewMemoryManagerCheckpoint() *MemoryManagerCheckpoint {
//lint:ignore unexported-type-in-api user-facing error message
return &MemoryManagerCheckpoint{
Entries: ContainerMemoryAssignments{},
MachineState: NodeMap{},
}
}
// MarshalCheckpoint returns marshalled checkpoint
func (mp *MemoryManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
mp.Checksum = 0
mp.Checksum = checksum.New(mp)
return json.Marshal(*mp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (mp *MemoryManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, mp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (mp *MemoryManagerCheckpoint) VerifyChecksum() error {
ck := mp.Checksum
mp.Checksum = 0
err := ck.Verify(mp)
mp.Checksum = ck
return err
}

View File

@ -0,0 +1,130 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
v1 "k8s.io/api/core/v1"
)
// MemoryTable contains memory information
type MemoryTable struct {
TotalMemSize uint64 `json:"total"`
SystemReserved uint64 `json:"systemReserved"`
Allocatable uint64 `json:"allocatable"`
Reserved uint64 `json:"reserved"`
Free uint64 `json:"free"`
}
// NodeState contains NUMA node related information
type NodeState struct {
// NumberOfAssignments contains a number memory assignments from this node
// When the container requires memory and hugepages it will increase number of assignments by two
NumberOfAssignments int `json:"numberOfAssignments"`
// MemoryTable contains NUMA node memory related information
MemoryMap map[v1.ResourceName]*MemoryTable `json:"memoryMap"`
// Nodes contains the current NUMA node and all other nodes that are in a group with current NUMA node
// This parameter indicates if the current node is used for the multiple NUMA node memory allocation
// For example if some container has pinning 0,1,2, NUMA nodes 0,1,2 under the state will have
// this parameter equals to [0, 1, 2]
Nodes []int `json:"nodes"`
}
// NodeMap contains memory information for each NUMA node.
type NodeMap map[int]*NodeState
// Clone returns a copy of NodeMap
func (nm NodeMap) Clone() NodeMap {
clone := make(NodeMap)
for node, s := range nm {
if s == nil {
clone[node] = nil
continue
}
clone[node] = &NodeState{}
clone[node].NumberOfAssignments = s.NumberOfAssignments
clone[node].Nodes = append([]int{}, s.Nodes...)
if s.MemoryMap == nil {
continue
}
clone[node].MemoryMap = map[v1.ResourceName]*MemoryTable{}
for memoryType, memoryTable := range s.MemoryMap {
clone[node].MemoryMap[memoryType] = &MemoryTable{
Allocatable: memoryTable.Allocatable,
Free: memoryTable.Free,
Reserved: memoryTable.Reserved,
SystemReserved: memoryTable.SystemReserved,
TotalMemSize: memoryTable.TotalMemSize,
}
}
}
return clone
}
// Block is a data structure used to represent a certain amount of memory
type Block struct {
// NUMAAffinity contains the string that represents NUMA affinity bitmask
NUMAAffinity []int `json:"numaAffinity"`
Type v1.ResourceName `json:"type"`
Size uint64 `json:"size"`
}
// ContainerMemoryAssignments stores memory assignments of containers
type ContainerMemoryAssignments map[string]map[string][]Block
// Clone returns a copy of ContainerMemoryAssignments
func (as ContainerMemoryAssignments) Clone() ContainerMemoryAssignments {
clone := make(ContainerMemoryAssignments)
for pod := range as {
clone[pod] = make(map[string][]Block)
for container, blocks := range as[pod] {
clone[pod][container] = append([]Block{}, blocks...)
}
}
return clone
}
// Reader interface used to read current memory/pod assignment state
type Reader interface {
// GetMachineState returns Memory Map stored in the State
GetMachineState() NodeMap
// GetMemoryBlocks returns memory assignments of a container
GetMemoryBlocks(podUID string, containerName string) []Block
// GetMemoryAssignments returns ContainerMemoryAssignments
GetMemoryAssignments() ContainerMemoryAssignments
}
type writer interface {
// SetMachineState stores NodeMap in State
SetMachineState(memoryMap NodeMap)
// SetMemoryBlocks stores memory assignments of a container
SetMemoryBlocks(podUID string, containerName string, blocks []Block)
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
SetMemoryAssignments(assignments ContainerMemoryAssignments)
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
Delete(podUID string, containerName string)
// ClearState clears machineState and ContainerMemoryAssignments
ClearState()
}
// State interface provides methods for tracking and setting memory/pod assignment
type State interface {
Reader
writer
}

View File

@ -0,0 +1,187 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"path"
"sync"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
)
var _ State = &stateCheckpoint{}
type stateCheckpoint struct {
sync.RWMutex
cache State
policyName string
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
initialContainers containermap.ContainerMap
}
// NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
cache: NewMemoryState(),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
initialContainers: initialContainers,
}
if err := stateCheckpoint.restoreState(); err != nil {
//lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory manager checkpoint file %q before restarting Kubelet",
err, path.Join(stateDir, checkpointName))
}
return stateCheckpoint, nil
}
// restores state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) restoreState() error {
sc.Lock()
defer sc.Unlock()
var err error
checkpoint := NewMemoryManagerCheckpoint()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
return sc.storeState()
}
return err
}
if sc.policyName != checkpoint.PolicyName {
return fmt.Errorf("[memorymanager] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
}
sc.cache.SetMachineState(checkpoint.MachineState)
sc.cache.SetMemoryAssignments(checkpoint.Entries)
klog.V(2).Info("[memorymanager] state checkpoint: restored state from checkpoint")
return nil
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() error {
checkpoint := NewMemoryManagerCheckpoint()
checkpoint.PolicyName = sc.policyName
checkpoint.MachineState = sc.cache.GetMachineState()
checkpoint.Entries = sc.cache.GetMemoryAssignments()
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
klog.Errorf("[memorymanager] could not save checkpoint: %v", err)
return err
}
return nil
}
// GetMemoryState returns Memory Map stored in the State
func (sc *stateCheckpoint) GetMachineState() NodeMap {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMachineState()
}
// GetMemoryBlocks returns memory assignments of a container
func (sc *stateCheckpoint) GetMemoryBlocks(podUID string, containerName string) []Block {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMemoryBlocks(podUID, containerName)
}
// GetMemoryAssignments returns ContainerMemoryAssignments
func (sc *stateCheckpoint) GetMemoryAssignments() ContainerMemoryAssignments {
sc.RLock()
defer sc.RUnlock()
return sc.cache.GetMemoryAssignments()
}
// SetMachineState stores NodeMap in State
func (sc *stateCheckpoint) SetMachineState(memoryMap NodeMap) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMachineState(memoryMap)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// SetMemoryBlocks stores memory assignments of container
func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMemoryBlocks(podUID, containerName, blocks)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
sc.Lock()
defer sc.Unlock()
sc.cache.SetMemoryAssignments(assignments)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.Lock()
defer sc.Unlock()
sc.cache.Delete(podUID, containerName)
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}
// ClearState clears machineState and ContainerMemoryAssignments
func (sc *stateCheckpoint) ClearState() {
sc.Lock()
defer sc.Unlock()
sc.cache.ClearState()
err := sc.storeState()
if err != nil {
klog.Warningf("store state to checkpoint error: %v", err)
}
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"sync"
"k8s.io/klog/v2"
)
type stateMemory struct {
sync.RWMutex
assignments ContainerMemoryAssignments
machineState NodeMap
}
var _ State = &stateMemory{}
// NewMemoryState creates new State for keeping track of cpu/pod assignment
func NewMemoryState() State {
klog.Infof("[memorymanager] initializing new in-memory state store")
return &stateMemory{
assignments: ContainerMemoryAssignments{},
machineState: NodeMap{},
}
}
// GetMemoryState returns Memory Map stored in the State
func (s *stateMemory) GetMachineState() NodeMap {
s.RLock()
defer s.RUnlock()
return s.machineState.Clone()
}
// GetMemoryBlocks returns memory assignments of a container
func (s *stateMemory) GetMemoryBlocks(podUID string, containerName string) []Block {
s.RLock()
defer s.RUnlock()
if res, ok := s.assignments[podUID][containerName]; ok {
return append([]Block{}, res...)
}
return nil
}
// GetMemoryAssignments returns ContainerMemoryAssignments
func (s *stateMemory) GetMemoryAssignments() ContainerMemoryAssignments {
s.RLock()
defer s.RUnlock()
return s.assignments.Clone()
}
// SetMachineState stores NodeMap in State
func (s *stateMemory) SetMachineState(nodeMap NodeMap) {
s.Lock()
defer s.Unlock()
s.machineState = nodeMap.Clone()
klog.Info("[memorymanager] updated machine memory state")
}
// SetMemoryBlocks stores memory assignments of container
func (s *stateMemory) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
s.Lock()
defer s.Unlock()
if _, ok := s.assignments[podUID]; !ok {
s.assignments[podUID] = map[string][]Block{}
}
s.assignments[podUID][containerName] = append([]Block{}, blocks...)
klog.Infof("[memorymanager] updated memory state (pod: %s, container: %s)", podUID, containerName)
}
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
func (s *stateMemory) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
s.Lock()
defer s.Unlock()
s.assignments = assignments.Clone()
}
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
func (s *stateMemory) Delete(podUID string, containerName string) {
s.Lock()
defer s.Unlock()
if _, ok := s.assignments[podUID]; !ok {
return
}
delete(s.assignments[podUID], containerName)
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
klog.V(2).Infof("[memorymanager] deleted memory assignment (pod: %s, container: %s)", podUID, containerName)
}
// ClearState clears machineState and ContainerMemoryAssignments
func (s *stateMemory) ClearState() {
s.Lock()
defer s.Unlock()
s.machineState = NodeMap{}
s.assignments = make(ContainerMemoryAssignments)
klog.V(2).Infof("[memorymanager] cleared state")
}