From 48ca6e53e6ffcb9cdaacb3fd2b1575575e04b129 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Tue, 10 Mar 2020 15:36:16 +0200 Subject: [PATCH] memory manager: provide and use the checkpoint manager The checkpoint manager provides a way to save the memory manager `MemoryTable` both under the memory and under the state file. Saving the `MemoryTable` under the state file can be useful when kubelet restarted and you want to restore memory allocations for running containers. Also, it provides a way to monitor memory allocations done by the memory manager, and in the future, the state file content can be exposed under the pod metrics. Signed-off-by: Artyom Lukianov --- pkg/kubelet/cm/memorymanager/state/BUILD | 57 ++++++ .../cm/memorymanager/state/checkpoint.go | 65 ++++++ pkg/kubelet/cm/memorymanager/state/state.go | 130 ++++++++++++ .../memorymanager/state/state_checkpoint.go | 187 ++++++++++++++++++ .../cm/memorymanager/state/state_mem.go | 123 ++++++++++++ 5 files changed, 562 insertions(+) create mode 100644 pkg/kubelet/cm/memorymanager/state/BUILD create mode 100644 pkg/kubelet/cm/memorymanager/state/checkpoint.go create mode 100644 pkg/kubelet/cm/memorymanager/state/state.go create mode 100644 pkg/kubelet/cm/memorymanager/state/state_checkpoint.go create mode 100644 pkg/kubelet/cm/memorymanager/state/state_mem.go diff --git a/pkg/kubelet/cm/memorymanager/state/BUILD b/pkg/kubelet/cm/memorymanager/state/BUILD new file mode 100644 index 00000000000..438737c9d18 --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/state/BUILD @@ -0,0 +1,57 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "checkpoint.go", + "state.go", + "state_checkpoint.go", + "state_file.go", + "state_mem.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "state_checkpoint_test.go", + "state_compatibility_test.go", + "state_file_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", + "//pkg/kubelet/cm/cpumanager/state/testing:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/cm/cpumanager/state/testing:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cm/memorymanager/state/checkpoint.go b/pkg/kubelet/cm/memorymanager/state/checkpoint.go new file mode 100644 index 00000000000..918bbfb6763 --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/state/checkpoint.go @@ -0,0 +1,65 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +var _ checkpointmanager.Checkpoint = &MemoryManagerCheckpoint{} + +// MemoryManagerCheckpoint struct is used to store memory/pod assignments in a checkpoint +type MemoryManagerCheckpoint struct { + PolicyName string `json:"policyName"` + MachineState NodeMap `json:"machineState"` + Entries ContainerMemoryAssignments `json:"entries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` +} + +// NewMemoryManagerCheckpoint returns an instance of Checkpoint +func NewMemoryManagerCheckpoint() *MemoryManagerCheckpoint { + //lint:ignore unexported-type-in-api user-facing error message + return &MemoryManagerCheckpoint{ + Entries: ContainerMemoryAssignments{}, + MachineState: NodeMap{}, + } +} + +// MarshalCheckpoint returns marshalled checkpoint +func (mp *MemoryManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { + // make sure checksum wasn't set before so it doesn't affect output checksum + mp.Checksum = 0 + mp.Checksum = checksum.New(mp) + return json.Marshal(*mp) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint +func (mp *MemoryManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, mp) +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid +func (mp *MemoryManagerCheckpoint) VerifyChecksum() error { + ck := mp.Checksum + mp.Checksum = 0 + err := ck.Verify(mp) + mp.Checksum = ck + return err +} diff --git a/pkg/kubelet/cm/memorymanager/state/state.go b/pkg/kubelet/cm/memorymanager/state/state.go new file mode 100644 index 00000000000..00a6f2a8757 --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/state/state.go @@ -0,0 +1,130 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + v1 "k8s.io/api/core/v1" +) + +// MemoryTable contains memory information +type MemoryTable struct { + TotalMemSize uint64 `json:"total"` + SystemReserved uint64 `json:"systemReserved"` + Allocatable uint64 `json:"allocatable"` + Reserved uint64 `json:"reserved"` + Free uint64 `json:"free"` +} + +// NodeState contains NUMA node related information +type NodeState struct { + // NumberOfAssignments contains a number memory assignments from this node + // When the container requires memory and hugepages it will increase number of assignments by two + NumberOfAssignments int `json:"numberOfAssignments"` + // MemoryTable contains NUMA node memory related information + MemoryMap map[v1.ResourceName]*MemoryTable `json:"memoryMap"` + // Nodes contains the current NUMA node and all other nodes that are in a group with current NUMA node + // This parameter indicates if the current node is used for the multiple NUMA node memory allocation + // For example if some container has pinning 0,1,2, NUMA nodes 0,1,2 under the state will have + // this parameter equals to [0, 1, 2] + Nodes []int `json:"nodes"` +} + +// NodeMap contains memory information for each NUMA node. +type NodeMap map[int]*NodeState + +// Clone returns a copy of NodeMap +func (nm NodeMap) Clone() NodeMap { + clone := make(NodeMap) + for node, s := range nm { + if s == nil { + clone[node] = nil + continue + } + + clone[node] = &NodeState{} + clone[node].NumberOfAssignments = s.NumberOfAssignments + clone[node].Nodes = append([]int{}, s.Nodes...) + + if s.MemoryMap == nil { + continue + } + + clone[node].MemoryMap = map[v1.ResourceName]*MemoryTable{} + for memoryType, memoryTable := range s.MemoryMap { + clone[node].MemoryMap[memoryType] = &MemoryTable{ + Allocatable: memoryTable.Allocatable, + Free: memoryTable.Free, + Reserved: memoryTable.Reserved, + SystemReserved: memoryTable.SystemReserved, + TotalMemSize: memoryTable.TotalMemSize, + } + } + } + return clone +} + +// Block is a data structure used to represent a certain amount of memory +type Block struct { + // NUMAAffinity contains the string that represents NUMA affinity bitmask + NUMAAffinity []int `json:"numaAffinity"` + Type v1.ResourceName `json:"type"` + Size uint64 `json:"size"` +} + +// ContainerMemoryAssignments stores memory assignments of containers +type ContainerMemoryAssignments map[string]map[string][]Block + +// Clone returns a copy of ContainerMemoryAssignments +func (as ContainerMemoryAssignments) Clone() ContainerMemoryAssignments { + clone := make(ContainerMemoryAssignments) + for pod := range as { + clone[pod] = make(map[string][]Block) + for container, blocks := range as[pod] { + clone[pod][container] = append([]Block{}, blocks...) + } + } + return clone +} + +// Reader interface used to read current memory/pod assignment state +type Reader interface { + // GetMachineState returns Memory Map stored in the State + GetMachineState() NodeMap + // GetMemoryBlocks returns memory assignments of a container + GetMemoryBlocks(podUID string, containerName string) []Block + // GetMemoryAssignments returns ContainerMemoryAssignments + GetMemoryAssignments() ContainerMemoryAssignments +} + +type writer interface { + // SetMachineState stores NodeMap in State + SetMachineState(memoryMap NodeMap) + // SetMemoryBlocks stores memory assignments of a container + SetMemoryBlocks(podUID string, containerName string, blocks []Block) + // SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter + SetMemoryAssignments(assignments ContainerMemoryAssignments) + // Delete deletes corresponding Blocks from ContainerMemoryAssignments + Delete(podUID string, containerName string) + // ClearState clears machineState and ContainerMemoryAssignments + ClearState() +} + +// State interface provides methods for tracking and setting memory/pod assignment +type State interface { + Reader + writer +} diff --git a/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go b/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go new file mode 100644 index 00000000000..fd3dabd6d54 --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go @@ -0,0 +1,187 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "path" + "sync" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" +) + +var _ State = &stateCheckpoint{} + +type stateCheckpoint struct { + sync.RWMutex + cache State + policyName string + checkpointManager checkpointmanager.CheckpointManager + checkpointName string + initialContainers containermap.ContainerMap +} + +// NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend +func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) { + checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) + } + stateCheckpoint := &stateCheckpoint{ + cache: NewMemoryState(), + policyName: policyName, + checkpointManager: checkpointManager, + checkpointName: checkpointName, + initialContainers: initialContainers, + } + + if err := stateCheckpoint.restoreState(); err != nil { + //lint:ignore ST1005 user-facing error message + return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory manager checkpoint file %q before restarting Kubelet", + err, path.Join(stateDir, checkpointName)) + } + + return stateCheckpoint, nil +} + +// restores state from a checkpoint and creates it if it doesn't exist +func (sc *stateCheckpoint) restoreState() error { + sc.Lock() + defer sc.Unlock() + var err error + + checkpoint := NewMemoryManagerCheckpoint() + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { + if err == errors.ErrCheckpointNotFound { + return sc.storeState() + } + return err + } + + if sc.policyName != checkpoint.PolicyName { + return fmt.Errorf("[memorymanager] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + } + + sc.cache.SetMachineState(checkpoint.MachineState) + sc.cache.SetMemoryAssignments(checkpoint.Entries) + + klog.V(2).Info("[memorymanager] state checkpoint: restored state from checkpoint") + + return nil +} + +// saves state to a checkpoint, caller is responsible for locking +func (sc *stateCheckpoint) storeState() error { + checkpoint := NewMemoryManagerCheckpoint() + checkpoint.PolicyName = sc.policyName + checkpoint.MachineState = sc.cache.GetMachineState() + checkpoint.Entries = sc.cache.GetMemoryAssignments() + + err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + if err != nil { + klog.Errorf("[memorymanager] could not save checkpoint: %v", err) + return err + } + return nil +} + +// GetMemoryState returns Memory Map stored in the State +func (sc *stateCheckpoint) GetMachineState() NodeMap { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetMachineState() +} + +// GetMemoryBlocks returns memory assignments of a container +func (sc *stateCheckpoint) GetMemoryBlocks(podUID string, containerName string) []Block { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetMemoryBlocks(podUID, containerName) +} + +// GetMemoryAssignments returns ContainerMemoryAssignments +func (sc *stateCheckpoint) GetMemoryAssignments() ContainerMemoryAssignments { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetMemoryAssignments() +} + +// SetMachineState stores NodeMap in State +func (sc *stateCheckpoint) SetMachineState(memoryMap NodeMap) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetMachineState(memoryMap) + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } +} + +// SetMemoryBlocks stores memory assignments of container +func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, blocks []Block) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetMemoryBlocks(podUID, containerName, blocks) + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } +} + +// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter +func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssignments) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetMemoryAssignments(assignments) + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } +} + +// Delete deletes corresponding Blocks from ContainerMemoryAssignments +func (sc *stateCheckpoint) Delete(podUID string, containerName string) { + sc.Lock() + defer sc.Unlock() + + sc.cache.Delete(podUID, containerName) + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } +} + +// ClearState clears machineState and ContainerMemoryAssignments +func (sc *stateCheckpoint) ClearState() { + sc.Lock() + defer sc.Unlock() + + sc.cache.ClearState() + err := sc.storeState() + if err != nil { + klog.Warningf("store state to checkpoint error: %v", err) + } +} diff --git a/pkg/kubelet/cm/memorymanager/state/state_mem.go b/pkg/kubelet/cm/memorymanager/state/state_mem.go new file mode 100644 index 00000000000..a84dabcf27a --- /dev/null +++ b/pkg/kubelet/cm/memorymanager/state/state_mem.go @@ -0,0 +1,123 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "sync" + + "k8s.io/klog/v2" +) + +type stateMemory struct { + sync.RWMutex + assignments ContainerMemoryAssignments + machineState NodeMap +} + +var _ State = &stateMemory{} + +// NewMemoryState creates new State for keeping track of cpu/pod assignment +func NewMemoryState() State { + klog.Infof("[memorymanager] initializing new in-memory state store") + return &stateMemory{ + assignments: ContainerMemoryAssignments{}, + machineState: NodeMap{}, + } +} + +// GetMemoryState returns Memory Map stored in the State +func (s *stateMemory) GetMachineState() NodeMap { + s.RLock() + defer s.RUnlock() + + return s.machineState.Clone() +} + +// GetMemoryBlocks returns memory assignments of a container +func (s *stateMemory) GetMemoryBlocks(podUID string, containerName string) []Block { + s.RLock() + defer s.RUnlock() + + if res, ok := s.assignments[podUID][containerName]; ok { + return append([]Block{}, res...) + } + return nil +} + +// GetMemoryAssignments returns ContainerMemoryAssignments +func (s *stateMemory) GetMemoryAssignments() ContainerMemoryAssignments { + s.RLock() + defer s.RUnlock() + + return s.assignments.Clone() +} + +// SetMachineState stores NodeMap in State +func (s *stateMemory) SetMachineState(nodeMap NodeMap) { + s.Lock() + defer s.Unlock() + + s.machineState = nodeMap.Clone() + klog.Info("[memorymanager] updated machine memory state") +} + +// SetMemoryBlocks stores memory assignments of container +func (s *stateMemory) SetMemoryBlocks(podUID string, containerName string, blocks []Block) { + s.Lock() + defer s.Unlock() + + if _, ok := s.assignments[podUID]; !ok { + s.assignments[podUID] = map[string][]Block{} + } + + s.assignments[podUID][containerName] = append([]Block{}, blocks...) + klog.Infof("[memorymanager] updated memory state (pod: %s, container: %s)", podUID, containerName) +} + +// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter +func (s *stateMemory) SetMemoryAssignments(assignments ContainerMemoryAssignments) { + s.Lock() + defer s.Unlock() + + s.assignments = assignments.Clone() +} + +// Delete deletes corresponding Blocks from ContainerMemoryAssignments +func (s *stateMemory) Delete(podUID string, containerName string) { + s.Lock() + defer s.Unlock() + + if _, ok := s.assignments[podUID]; !ok { + return + } + + delete(s.assignments[podUID], containerName) + if len(s.assignments[podUID]) == 0 { + delete(s.assignments, podUID) + } + klog.V(2).Infof("[memorymanager] deleted memory assignment (pod: %s, container: %s)", podUID, containerName) +} + +// ClearState clears machineState and ContainerMemoryAssignments +func (s *stateMemory) ClearState() { + s.Lock() + defer s.Unlock() + + s.machineState = NodeMap{} + s.assignments = make(ContainerMemoryAssignments) + klog.V(2).Infof("[memorymanager] cleared state") +}