mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #59214 from kdembler/cpumanager-checkpointing
Automatic merge from submit-queue (batch tested with PRs 59214, 65330). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Migrate cpumanager to use checkpointing manager **What this PR does / why we need it**: This PR migrates `cpumanager` to use new kubelet level node checkpointing feature (#56040) to decrease code redundancy and improve consistency. **Which issue(s) this PR fixes**: Fixes #58339 **Notes**: At point of submitting PR the most straightforward approach was used - `state_checkpoint` implementation of `State` interface was added. However, with checkpointing implementation there might be no point to keep `State` interface and just use single implementation with checkpoint backend and in case of different backend than filestore needed just supply `cpumanager` with custom `CheckpointManager` implementation. /kind feature /sig node cc @flyingcougar @ConnorDoyle
This commit is contained in:
commit
991a84758f
@ -33,7 +33,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"path"
|
||||
)
|
||||
|
||||
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
||||
@ -45,8 +44,8 @@ type runtimeService interface {
|
||||
|
||||
type policyName string
|
||||
|
||||
// CPUManagerStateFileName is the name file name where cpu manager stores it's state
|
||||
const CPUManagerStateFileName = "cpu_manager_state"
|
||||
// cpuManagerStateFileName is the name file name where cpu manager stores it's state
|
||||
const cpuManagerStateFileName = "cpu_manager_state"
|
||||
|
||||
// Manager interface provides methods for Kubelet to manage pod cpus.
|
||||
type Manager interface {
|
||||
@ -98,7 +97,7 @@ type manager struct {
|
||||
var _ Manager = &manager{}
|
||||
|
||||
// NewManager creates new cpu manager based on provided policy
|
||||
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirecory string) (Manager, error) {
|
||||
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) {
|
||||
var policy Policy
|
||||
|
||||
switch policyName(cpuPolicyName) {
|
||||
@ -137,9 +136,10 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
|
||||
policy = NewNonePolicy()
|
||||
}
|
||||
|
||||
stateImpl := state.NewFileState(
|
||||
path.Join(stateFileDirecory, CPUManagerStateFileName),
|
||||
policy.Name())
|
||||
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
manager := &manager{
|
||||
policy: policy,
|
||||
|
@ -3,13 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"checkpoint.go",
|
||||
"state.go",
|
||||
"state_checkpoint.go",
|
||||
"state_file.go",
|
||||
"state_mem.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
|
||||
"//pkg/kubelet/cm/cpuset:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
@ -17,9 +22,17 @@ go_library(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["state_file_test.go"],
|
||||
srcs = [
|
||||
"state_checkpoint_test.go",
|
||||
"state_compatibility_test.go",
|
||||
"state_file_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//pkg/kubelet/cm/cpuset:go_default_library"],
|
||||
deps = [
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
|
||||
"//pkg/kubelet/cm/cpuset:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
@ -31,7 +44,10 @@ filegroup(
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/kubelet/cm/cpumanager/state/testing:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
67
pkg/kubelet/cm/cpumanager/state/checkpoint.go
Normal file
67
pkg/kubelet/cm/cpumanager/state/checkpoint.go
Normal file
@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
|
||||
)
|
||||
|
||||
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpoint{}
|
||||
|
||||
// CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint
|
||||
type CPUManagerCheckpoint struct {
|
||||
PolicyName string `json:"policyName"`
|
||||
DefaultCPUSet string `json:"defaultCpuSet"`
|
||||
Entries map[string]string `json:"entries,omitempty"`
|
||||
Checksum checksum.Checksum `json:"checksum"`
|
||||
}
|
||||
|
||||
// NewCPUManagerCheckpoint returns an instance of Checkpoint
|
||||
func NewCPUManagerCheckpoint() *CPUManagerCheckpoint {
|
||||
return &CPUManagerCheckpoint{
|
||||
Entries: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalCheckpoint returns marshalled checkpoint
|
||||
func (cp *CPUManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
|
||||
// make sure checksum wasn't set before so it doesn't affect output checksum
|
||||
cp.Checksum = 0
|
||||
cp.Checksum = checksum.New(cp)
|
||||
return json.Marshal(*cp)
|
||||
}
|
||||
|
||||
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
|
||||
func (cp *CPUManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
|
||||
return json.Unmarshal(blob, cp)
|
||||
}
|
||||
|
||||
// VerifyChecksum verifies that current checksum of checkpoint is valid
|
||||
func (cp *CPUManagerCheckpoint) VerifyChecksum() error {
|
||||
if cp.Checksum == 0 {
|
||||
// accept empty checksum for compatibility with old file backend
|
||||
return nil
|
||||
}
|
||||
ck := cp.Checksum
|
||||
cp.Checksum = 0
|
||||
err := ck.Verify(cp)
|
||||
cp.Checksum = ck
|
||||
return err
|
||||
}
|
194
pkg/kubelet/cm/cpumanager/state/state_checkpoint.go
Normal file
194
pkg/kubelet/cm/cpumanager/state/state_checkpoint.go
Normal file
@ -0,0 +1,194 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
)
|
||||
|
||||
var _ State = &stateCheckpoint{}
|
||||
|
||||
type stateCheckpoint struct {
|
||||
mux sync.RWMutex
|
||||
policyName string
|
||||
cache State
|
||||
checkpointManager checkpointmanager.CheckpointManager
|
||||
checkpointName string
|
||||
}
|
||||
|
||||
// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
|
||||
func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
|
||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
|
||||
}
|
||||
stateCheckpoint := &stateCheckpoint{
|
||||
cache: NewMemoryState(),
|
||||
policyName: policyName,
|
||||
checkpointManager: checkpointManager,
|
||||
checkpointName: checkpointName,
|
||||
}
|
||||
|
||||
if err := stateCheckpoint.restoreState(); err != nil {
|
||||
return nil, fmt.Errorf("could not restore state from checkpoint: %v\n"+
|
||||
"Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.",
|
||||
err, path.Join(stateDir, checkpointName))
|
||||
}
|
||||
|
||||
return stateCheckpoint, nil
|
||||
}
|
||||
|
||||
// restores state from a checkpoint and creates it if it doesn't exist
|
||||
func (sc *stateCheckpoint) restoreState() error {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
var err error
|
||||
|
||||
// used when all parsing is ok
|
||||
tmpAssignments := make(ContainerCPUAssignments)
|
||||
tmpDefaultCPUSet := cpuset.NewCPUSet()
|
||||
tmpContainerCPUSet := cpuset.NewCPUSet()
|
||||
|
||||
checkpoint := NewCPUManagerCheckpoint()
|
||||
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
|
||||
if err == errors.ErrCheckpointNotFound {
|
||||
sc.storeState()
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if sc.policyName != checkpoint.PolicyName {
|
||||
return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
|
||||
}
|
||||
|
||||
if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil {
|
||||
return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err)
|
||||
}
|
||||
|
||||
for containerID, cpuString := range checkpoint.Entries {
|
||||
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
|
||||
return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err)
|
||||
}
|
||||
tmpAssignments[containerID] = tmpContainerCPUSet
|
||||
}
|
||||
|
||||
sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
|
||||
sc.cache.SetCPUAssignments(tmpAssignments)
|
||||
|
||||
glog.V(2).Info("[cpumanager] state checkpoint: restored state from checkpoint")
|
||||
glog.V(2).Infof("[cpumanager] state checkpoint: defaultCPUSet: %s", tmpDefaultCPUSet.String())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// saves state to a checkpoint, caller is responsible for locking
|
||||
func (sc *stateCheckpoint) storeState() {
|
||||
checkpoint := NewCPUManagerCheckpoint()
|
||||
checkpoint.PolicyName = sc.policyName
|
||||
checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
|
||||
|
||||
for containerID, cset := range sc.cache.GetCPUAssignments() {
|
||||
checkpoint.Entries[containerID] = cset.String()
|
||||
}
|
||||
|
||||
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
|
||||
|
||||
if err != nil {
|
||||
panic("[cpumanager] could not save checkpoint: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// GetCPUSet returns current CPU set
|
||||
func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
|
||||
sc.mux.RLock()
|
||||
defer sc.mux.RUnlock()
|
||||
|
||||
res, ok := sc.cache.GetCPUSet(containerID)
|
||||
return res, ok
|
||||
}
|
||||
|
||||
// GetDefaultCPUSet returns default CPU set
|
||||
func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
|
||||
sc.mux.RLock()
|
||||
defer sc.mux.RUnlock()
|
||||
|
||||
return sc.cache.GetDefaultCPUSet()
|
||||
}
|
||||
|
||||
// GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed
|
||||
func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
|
||||
sc.mux.RLock()
|
||||
defer sc.mux.RUnlock()
|
||||
|
||||
return sc.cache.GetCPUSetOrDefault(containerID)
|
||||
}
|
||||
|
||||
// GetCPUAssignments returns current CPU to pod assignments
|
||||
func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
|
||||
sc.mux.RLock()
|
||||
defer sc.mux.RUnlock()
|
||||
|
||||
return sc.cache.GetCPUAssignments()
|
||||
}
|
||||
|
||||
// SetCPUSet sets CPU set
|
||||
func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.SetCPUSet(containerID, cset)
|
||||
sc.storeState()
|
||||
}
|
||||
|
||||
// SetDefaultCPUSet sets default CPU set
|
||||
func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.SetDefaultCPUSet(cset)
|
||||
sc.storeState()
|
||||
}
|
||||
|
||||
// SetCPUAssignments sets CPU to pod assignments
|
||||
func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.SetCPUAssignments(a)
|
||||
sc.storeState()
|
||||
}
|
||||
|
||||
// Delete deletes assignment for specified pod
|
||||
func (sc *stateCheckpoint) Delete(containerID string) {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.Delete(containerID)
|
||||
sc.storeState()
|
||||
}
|
||||
|
||||
// ClearState clears the state and saves it in a checkpoint
|
||||
func (sc *stateCheckpoint) ClearState() {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.ClearState()
|
||||
sc.storeState()
|
||||
}
|
326
pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go
Normal file
326
pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go
Normal file
@ -0,0 +1,326 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
)
|
||||
|
||||
const testingCheckpoint = "cpumanager_checkpoint_test"
|
||||
|
||||
var testingDir = os.TempDir()
|
||||
|
||||
func TestCheckpointStateRestore(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
checkpointContent string
|
||||
policyName string
|
||||
expectedError string
|
||||
expectedState *stateMemory
|
||||
}{
|
||||
{
|
||||
"Restore non-existing checkpoint",
|
||||
"",
|
||||
"none",
|
||||
"",
|
||||
&stateMemory{},
|
||||
},
|
||||
{
|
||||
"Restore default cpu set",
|
||||
`{
|
||||
"policyName": "none",
|
||||
"defaultCPUSet": "4-6",
|
||||
"entries": {},
|
||||
"checksum": 2912033808
|
||||
}`,
|
||||
"none",
|
||||
"",
|
||||
&stateMemory{
|
||||
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
|
||||
},
|
||||
},
|
||||
{
|
||||
"Restore valid checkpoint",
|
||||
`{
|
||||
"policyName": "none",
|
||||
"defaultCPUSet": "1-3",
|
||||
"entries": {
|
||||
"container1": "4-6",
|
||||
"container2": "1-3"
|
||||
},
|
||||
"checksum": 1535905563
|
||||
}`,
|
||||
"none",
|
||||
"",
|
||||
&stateMemory{
|
||||
assignments: ContainerCPUAssignments{
|
||||
"container1": cpuset.NewCPUSet(4, 5, 6),
|
||||
"container2": cpuset.NewCPUSet(1, 2, 3),
|
||||
},
|
||||
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
|
||||
},
|
||||
},
|
||||
{
|
||||
"Restore checkpoint with invalid checksum",
|
||||
`{
|
||||
"policyName": "none",
|
||||
"defaultCPUSet": "4-6",
|
||||
"entries": {},
|
||||
"checksum": 1337
|
||||
}`,
|
||||
"none",
|
||||
"checkpoint is corrupted",
|
||||
&stateMemory{},
|
||||
},
|
||||
{
|
||||
"Restore checkpoint with invalid JSON",
|
||||
`{`,
|
||||
"none",
|
||||
"unexpected end of JSON input",
|
||||
&stateMemory{},
|
||||
},
|
||||
{
|
||||
"Restore checkpoint with invalid policy name",
|
||||
`{
|
||||
"policyName": "other",
|
||||
"defaultCPUSet": "1-3",
|
||||
"entries": {},
|
||||
"checksum": 4195836012
|
||||
}`,
|
||||
"none",
|
||||
`configured policy "none" differs from state checkpoint policy "other"`,
|
||||
&stateMemory{},
|
||||
},
|
||||
{
|
||||
"Restore checkpoint with unparsable default cpu set",
|
||||
`{
|
||||
"policyName": "none",
|
||||
"defaultCPUSet": "1.3",
|
||||
"entries": {},
|
||||
"checksum": 1025273327
|
||||
}`,
|
||||
"none",
|
||||
`could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`,
|
||||
&stateMemory{},
|
||||
},
|
||||
{
|
||||
"Restore checkpoint with unparsable assignment entry",
|
||||
`{
|
||||
"policyName": "none",
|
||||
"defaultCPUSet": "1-3",
|
||||
"entries": {
|
||||
"container1": "4-6",
|
||||
"container2": "asd"
|
||||
},
|
||||
"checksum": 2764213924
|
||||
}`,
|
||||
"none",
|
||||
`could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`,
|
||||
&stateMemory{},
|
||||
},
|
||||
}
|
||||
|
||||
// create checkpoint manager for testing
|
||||
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
// ensure there is no previous checkpoint
|
||||
cpm.RemoveCheckpoint(testingCheckpoint)
|
||||
|
||||
// prepare checkpoint for testing
|
||||
if strings.TrimSpace(tc.checkpointContent) != "" {
|
||||
checkpoint := &testutil.MockCheckpoint{Content: tc.checkpointContent}
|
||||
if err := cpm.CreateCheckpoint(testingCheckpoint, checkpoint); err != nil {
|
||||
t.Fatalf("could not create testing checkpoint: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName)
|
||||
if err != nil {
|
||||
if strings.TrimSpace(tc.expectedError) != "" {
|
||||
tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError
|
||||
if strings.HasPrefix(err.Error(), tc.expectedError) {
|
||||
t.Logf("got expected error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("unexpected error while creatng checkpointState: %v", err)
|
||||
}
|
||||
|
||||
// compare state after restoration with the one expected
|
||||
AssertStateEqual(t, restoredState, tc.expectedState)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckpointStateStore(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
expectedState *stateMemory
|
||||
}{
|
||||
{
|
||||
"Store default cpu set",
|
||||
&stateMemory{defaultCPUSet: cpuset.NewCPUSet(1, 2, 3)},
|
||||
},
|
||||
{
|
||||
"Store assignments",
|
||||
&stateMemory{
|
||||
assignments: map[string]cpuset.CPUSet{
|
||||
"container1": cpuset.NewCPUSet(1, 5, 8),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
// ensure there is no previous checkpoint
|
||||
cpm.RemoveCheckpoint(testingCheckpoint)
|
||||
|
||||
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpointState instance: %v", err)
|
||||
}
|
||||
|
||||
// set values of cs1 instance so they are stored in checkpoint and can be read by cs2
|
||||
cs1.SetDefaultCPUSet(tc.expectedState.defaultCPUSet)
|
||||
cs1.SetCPUAssignments(tc.expectedState.assignments)
|
||||
|
||||
// restore checkpoint with previously stored values
|
||||
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpointState instance: %v", err)
|
||||
}
|
||||
|
||||
AssertStateEqual(t, cs2, tc.expectedState)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckpointStateHelpers(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
defaultCPUset cpuset.CPUSet
|
||||
containers map[string]cpuset.CPUSet
|
||||
}{
|
||||
{
|
||||
description: "One container",
|
||||
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
|
||||
containers: map[string]cpuset.CPUSet{
|
||||
"c1": cpuset.NewCPUSet(0, 1),
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "Two containers",
|
||||
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
|
||||
containers: map[string]cpuset.CPUSet{
|
||||
"c1": cpuset.NewCPUSet(0, 1),
|
||||
"c2": cpuset.NewCPUSet(2, 3, 4, 5),
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "Container without assigned cpus",
|
||||
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
|
||||
containers: map[string]cpuset.CPUSet{
|
||||
"c1": cpuset.NewCPUSet(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
// ensure there is no previous checkpoint
|
||||
cpm.RemoveCheckpoint(testingCheckpoint)
|
||||
|
||||
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpointState instance: %v", err)
|
||||
}
|
||||
state.SetDefaultCPUSet(tc.defaultCPUset)
|
||||
|
||||
for container, set := range tc.containers {
|
||||
state.SetCPUSet(container, set)
|
||||
if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) {
|
||||
t.Fatalf("state inconsistent, got %q instead of %q", set, cpus)
|
||||
}
|
||||
|
||||
state.Delete(container)
|
||||
if _, ok := state.GetCPUSet(container); ok {
|
||||
t.Fatal("deleted container still existing in state")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckpointStateClear(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
defaultCPUset cpuset.CPUSet
|
||||
containers map[string]cpuset.CPUSet
|
||||
}{
|
||||
{
|
||||
"Valid state",
|
||||
cpuset.NewCPUSet(1, 5, 10),
|
||||
map[string]cpuset.CPUSet{
|
||||
"container1": cpuset.NewCPUSet(1, 4),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpointState instance: %v", err)
|
||||
}
|
||||
|
||||
state.SetDefaultCPUSet(tc.defaultCPUset)
|
||||
state.SetCPUAssignments(tc.containers)
|
||||
|
||||
state.ClearState()
|
||||
if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) {
|
||||
t.Fatal("cleared state with non-empty default cpu set")
|
||||
}
|
||||
for container := range tc.containers {
|
||||
if _, ok := state.GetCPUSet(container); ok {
|
||||
t.Fatalf("container %q with non-default cpu set in cleared state", container)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
78
pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go
Normal file
78
pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
)
|
||||
|
||||
const compatibilityTestingCheckpoint = "cpumanager_state_compatibility_test"
|
||||
|
||||
var state = &stateMemory{
|
||||
assignments: ContainerCPUAssignments{
|
||||
"container1": cpuset.NewCPUSet(4, 5, 6),
|
||||
"container2": cpuset.NewCPUSet(1, 2, 3),
|
||||
},
|
||||
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
|
||||
}
|
||||
|
||||
func TestFileToCheckpointCompatibility(t *testing.T) {
|
||||
statePath := path.Join(testingDir, compatibilityTestingCheckpoint)
|
||||
|
||||
// ensure there is no previous state saved at testing path
|
||||
os.Remove(statePath)
|
||||
// ensure testing state is removed after testing
|
||||
defer os.Remove(statePath)
|
||||
|
||||
fileState := NewFileState(statePath, "none")
|
||||
|
||||
fileState.SetDefaultCPUSet(state.defaultCPUSet)
|
||||
fileState.SetCPUAssignments(state.assignments)
|
||||
|
||||
restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none")
|
||||
if err != nil {
|
||||
t.Fatalf("could not restore file state: %v", err)
|
||||
}
|
||||
|
||||
AssertStateEqual(t, restoredState, state)
|
||||
}
|
||||
|
||||
func TestCheckpointToFileCompatibility(t *testing.T) {
|
||||
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create testing checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
// ensure there is no previous checkpoint
|
||||
cpm.RemoveCheckpoint(compatibilityTestingCheckpoint)
|
||||
// ensure testing checkpoint is removed after testing
|
||||
defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint)
|
||||
|
||||
checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none")
|
||||
|
||||
checkpointState.SetDefaultCPUSet(state.defaultCPUSet)
|
||||
checkpointState.SetCPUAssignments(state.assignments)
|
||||
|
||||
restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none")
|
||||
|
||||
AssertStateEqual(t, restoredState, state)
|
||||
}
|
@ -34,7 +34,8 @@ func writeToStateFile(statefile string, content string) {
|
||||
ioutil.WriteFile(statefile, []byte(content), 0644)
|
||||
}
|
||||
|
||||
func stateEqual(t *testing.T, sf State, sm State) {
|
||||
// AssertStateEqual marks provided test as failed if provided states differ
|
||||
func AssertStateEqual(t *testing.T, sf State, sm State) {
|
||||
cpusetSf := sf.GetDefaultCPUSet()
|
||||
cpusetSm := sm.GetDefaultCPUSet()
|
||||
if !cpusetSf.Equals(cpusetSm) {
|
||||
@ -253,7 +254,7 @@ func TestFileStateTryRestore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
stateEqual(t, fileState, tc.expectedState)
|
||||
AssertStateEqual(t, fileState, tc.expectedState)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -363,7 +364,7 @@ func TestUpdateStateFile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
newFileState := NewFileState(sfilePath.Name(), "static")
|
||||
stateEqual(t, newFileState, tc.expectedState)
|
||||
AssertStateEqual(t, newFileState, tc.expectedState)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -471,7 +472,6 @@ func TestClearStateStateFile(t *testing.T) {
|
||||
t.Error("cleared state shoudn't has got information about containers")
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
23
pkg/kubelet/cm/cpumanager/state/testing/BUILD
Normal file
23
pkg/kubelet/cm/cpumanager/state/testing/BUILD
Normal file
@ -0,0 +1,23 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["util.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//pkg/kubelet/checkpointmanager:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
41
pkg/kubelet/cm/cpumanager/state/testing/util.go
Normal file
41
pkg/kubelet/cm/cpumanager/state/testing/util.go
Normal file
@ -0,0 +1,41 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
|
||||
var _ checkpointmanager.Checkpoint = &MockCheckpoint{}
|
||||
|
||||
// MockCheckpoint struct is used for mocking checkpoint values in testing
|
||||
type MockCheckpoint struct {
|
||||
Content string
|
||||
}
|
||||
|
||||
// MarshalCheckpoint returns fake content
|
||||
func (mc *MockCheckpoint) MarshalCheckpoint() ([]byte, error) {
|
||||
return []byte(mc.Content), nil
|
||||
}
|
||||
|
||||
// UnmarshalCheckpoint fakes unmarshaling
|
||||
func (mc *MockCheckpoint) UnmarshalCheckpoint(blob []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// VerifyChecksum fakes verifying checksum
|
||||
func (mc *MockCheckpoint) VerifyChecksum() error {
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user