From 3991ed5d2f6e93df0bf4aae284afc5aeccde0ed0 Mon Sep 17 00:00:00 2001 From: Klaudiusz Dembler Date: Thu, 8 Feb 2018 17:43:56 +0100 Subject: [PATCH] Add tests --- pkg/kubelet/cm/cpumanager/state/BUILD | 22 +- .../cm/cpumanager/state/state_checkpoint.go | 38 +- .../cpumanager/state/state_checkpoint_test.go | 325 ++++++++++++++++++ .../cm/cpumanager/state/state_file_test.go | 8 +- pkg/kubelet/cm/cpumanager/state/testing/BUILD | 23 ++ .../cm/cpumanager/state/testing/util.go | 41 +++ 6 files changed, 432 insertions(+), 25 deletions(-) create mode 100644 pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go create mode 100644 pkg/kubelet/cm/cpumanager/state/testing/BUILD create mode 100644 pkg/kubelet/cm/cpumanager/state/testing/util.go diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index bcaf0045a08..e3d2e77a966 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -3,23 +3,36 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "checkpoint.go", "state.go", + "state_checkpoint.go", "state_file.go", "state_mem.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/errors:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/util/store:go_default_library", + "//pkg/util/hash:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["state_file_test.go"], + srcs = [ + "state_checkpoint_test.go", + "state_file_test.go", + ], embed = [":go_default_library"], - deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], + deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/cpumanager/state/testing:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + ], ) filegroup( @@ -31,7 +44,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/kubelet/cm/cpumanager/state/testing:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go index 0656fa9954f..ee0ead3eadb 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go @@ -18,21 +18,22 @@ package state import ( "fmt" - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" - "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "path" "sync" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" ) -// cpuManagerCheckpointName is the name of checkpoint file -const cpuManagerCheckpointName = "cpu_manager_state" +// CPUManagerCheckpointName is the name of checkpoint file +const CPUManagerCheckpointName = "cpu_manager_state" var _ State = &stateCheckpoint{} type stateCheckpoint struct { - mux sync.RWMutex + mux sync.RWMutex policyName string cache State checkpointManager checkpointmanager.CheckpointManager @@ -50,17 +51,17 @@ func NewCheckpointState(stateDir string, policyName string) (State, error) { checkpointManager: checkpointManager, } - if err := stateCheckpoint.tryRestoreState(); err != nil { - return nil, fmt.Errorf("unable to restore state from checkpoint: %v\n"+ + if err := stateCheckpoint.restoreState(); err != nil { + return nil, fmt.Errorf("could not restore state from checkpoint: %v\n"+ "Please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet.", - err, path.Join(stateDir, cpuManagerCheckpointName)) + err, path.Join(stateDir, CPUManagerCheckpointName)) } return stateCheckpoint, nil } -// tryRestoreState tries to read checkpoint and creates it if it doesn't exist -func (sc *stateCheckpoint) tryRestoreState() error { +// restores state from a checkpoint and creates it if it doesn't exist +func (sc *stateCheckpoint) restoreState() error { sc.mux.Lock() defer sc.mux.Unlock() var err error @@ -71,8 +72,9 @@ func (sc *stateCheckpoint) tryRestoreState() error { tmpContainerCPUSet := cpuset.NewCPUSet() checkpoint := NewCPUManagerCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(cpuManagerCheckpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { + if err = sc.checkpointManager.GetCheckpoint(CPUManagerCheckpointName, checkpoint); err != nil { + // TODO: change to errors.ErrCheckpointNotFound may be required after issue in checkpointing PR is resolved + if err == utilstore.ErrKeyNotFound { sc.storeState() return nil } @@ -80,16 +82,16 @@ func (sc *stateCheckpoint) tryRestoreState() error { } if sc.policyName != checkpoint.PolicyName { - return fmt.Errorf("policy configured %q != policy from state checkpoint %q", sc.policyName, checkpoint.PolicyName) + return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) } if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil { - return fmt.Errorf("could not parse state checkpoint - [defaultCpuSet: %q]", checkpoint.DefaultCPUSet) + return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err) } for containerID, cpuString := range checkpoint.Entries { if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - return fmt.Errorf("could not parse state checkpoint - container id: %q, cpuset: %q", containerID, cpuString) + return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err) } tmpAssignments[containerID] = tmpContainerCPUSet } @@ -113,7 +115,7 @@ func (sc *stateCheckpoint) storeState() { checkpoint.Entries[containerID] = cset.String() } - err := sc.checkpointManager.CreateCheckpoint(cpuManagerCheckpointName, checkpoint) + err := sc.checkpointManager.CreateCheckpoint(CPUManagerCheckpointName, checkpoint) if err != nil { panic("[cpumanager] could not save checkpoint: " + err.Error()) diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go new file mode 100644 index 00000000000..01cb7ed6afb --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go @@ -0,0 +1,325 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "strings" + "testing" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +const testingDir = "/tmp" + +func TestCheckpointStateRestore(t *testing.T) { + testCases := []struct { + description string + checkpointContent string + policyName string + expectedError string + expectedState *stateMemory + }{ + { + "Restore non-existing checkpoint", + "", + "none", + "", + &stateMemory{}, + }, + { + "Restore default cpu set", + `{ + "PolicyName": "none", + "DefaultCPUSet": "4-6", + "Entries": {}, + "Checksum": 861251554 + }`, + "none", + "", + &stateMemory{ + defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), + }, + }, + { + "Restore valid checkpoint", + `{ + "PolicyName": "none", + "DefaultCPUSet": "1-3", + "Entries": { + "container1": "4-6", + "container2": "1-3" + }, + "Checksum": 2604807655 + }`, + "none", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), + }, + }, + { + "Restore checkpoint with invalid checksum", + `{ + "PolicyName": "none", + "DefaultCPUSet": "4-6", + "Entries": {}, + "Checksum": 1337 + }`, + "none", + "checkpoint is corrupted", + &stateMemory{}, + }, + { + "Restore checkpoint with invalid JSON", + `{`, + "none", + "unexpected end of JSON input", + &stateMemory{}, + }, + { + "Restore checkpoint with invalid policy name", + `{ + "PolicyName": "other", + "DefaultCPUSet": "1-3", + "Entries": {}, + "Checksum": 4266067046 + }`, + "none", + `configured policy "none" differs from state checkpoint policy "other"`, + &stateMemory{}, + }, + { + "Restore checkpoint with unparsable default cpu set", + `{ + "PolicyName": "none", + "DefaultCPUSet": "1.3", + "Entries": {}, + "Checksum": 4073769779 + }`, + "none", + `could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`, + &stateMemory{}, + }, + { + "Restore checkpoint with unparsable assignment entry", + `{ + "PolicyName": "none", + "DefaultCPUSet": "1-3", + "Entries": { + "container1": "4-6", + "container2": "asd" + }, + "Checksum": 3835486974 + }`, + "none", + `could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`, + &stateMemory{}, + }, + } + + // create checkpoint manager for testing + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + if err != nil { + t.Fatalf("could not create testing checkpoint manager: %v", err) + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + // ensure there is no previous checkpoint + cpm.RemoveCheckpoint(CPUManagerCheckpointName) + + // prepare checkpoint for testing + var checkpoint *testutil.MockCheckpoint = nil + if strings.TrimSpace(tc.checkpointContent) != "" { + checkpoint = &testutil.MockCheckpoint{Content: tc.checkpointContent} + } + if checkpoint != nil { + if err := cpm.CreateCheckpoint(CPUManagerCheckpointName, checkpoint); err != nil { + t.Fatalf("could not create testing checkpoint: %v", err) + } + } + + restoredState, err := NewCheckpointState(testingDir, tc.policyName) + if err != nil { + if strings.TrimSpace(tc.expectedError) != "" { + tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError + if strings.HasPrefix(err.Error(), tc.expectedError) { + t.Logf("got expected error: %v", err) + return + } + } + t.Fatalf("unexpected error while creatng checkpointState: %v", err) + } + + // compare state after restoration with the one expected + AssertStateEqual(t, restoredState, tc.expectedState) + }) + } +} + +func TestCheckpointStateStore(t *testing.T) { + testCases := []struct { + description string + expectedState *stateMemory + }{ + { + "Store default cpu set", + &stateMemory{defaultCPUSet: cpuset.NewCPUSet(1, 2, 3)}, + }, + { + "Store assignments", + &stateMemory{ + assignments: map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(1, 5, 8), + }, + }, + }, + } + + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + if err != nil { + t.Fatalf("could not create testing checkpoint manager: %v", err) + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + // ensure there is no previous checkpoint + cpm.RemoveCheckpoint(CPUManagerCheckpointName) + + cs1, err := NewCheckpointState(testingDir, "none") + if err != nil { + t.Fatalf("could not create testing checkpointState instance: %v", err) + } + + cs1.SetDefaultCPUSet(tc.expectedState.defaultCPUSet) + cs1.SetCPUAssignments(tc.expectedState.assignments) + + cs2, err := NewCheckpointState(testingDir, "none") + if err != nil { + t.Fatalf("could not create testing checkpointState instance: %v", err) + } + + AssertStateEqual(t, cs2, tc.expectedState) + }) + } +} + +func TestCheckpointStateHelpers(t *testing.T) { + testCases := []struct { + description string + defaultCPUset cpuset.CPUSet + containers map[string]cpuset.CPUSet + }{ + { + description: "One container", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 1), + }, + }, + { + description: "Two containers", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, + }, + { + description: "Container without assigned cpus", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(), + }, + }, + } + + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + if err != nil { + t.Fatalf("could not create testing checkpoint manager: %v", err) + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + // ensure there is no previous checkpoint + cpm.RemoveCheckpoint(CPUManagerCheckpointName) + + state, err := NewCheckpointState(testingDir, "none") + if err != nil { + t.Fatalf("could not create testing checkpointState instance: %v", err) + } + state.SetDefaultCPUSet(tc.defaultCPUset) + + for container, set := range tc.containers { + state.SetCPUSet(container, set) + if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) { + t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) + } + + state.Delete(container) + if cpus := state.GetCPUSetOrDefault(container); !cpus.Equals(tc.defaultCPUset) { + t.Fatal("deleted container still existing in state") + } + } + + }) + } +} + +func TestCheckpointStateClear(t *testing.T) { + testCases := []struct { + description string + defaultCPUset cpuset.CPUSet + containers map[string]cpuset.CPUSet + }{ + { + "Valid state", + cpuset.NewCPUSet(1, 5, 10), + map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(1, 4), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + state, err := NewCheckpointState(testingDir, "none") + if err != nil { + t.Fatalf("could not create testing checkpointState instance: %v", err) + } + + state.SetDefaultCPUSet(tc.defaultCPUset) + state.SetCPUAssignments(tc.containers) + + state.ClearState() + if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { + t.Fatal("cleared state with non-empty default cpu set") + } + for container := range tc.containers { + if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(container)) { + t.Fatalf("container %q with non-default cpu set in cleared state", container) + } + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index edccf87998c..232c47f3875 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -34,7 +34,8 @@ func writeToStateFile(statefile string, content string) { ioutil.WriteFile(statefile, []byte(content), 0644) } -func stateEqual(t *testing.T, sf State, sm State) { +// AssertStateEqual marks provided test as failed if provided states differ +func AssertStateEqual(t *testing.T, sf State, sm State) { cpusetSf := sf.GetDefaultCPUSet() cpusetSm := sm.GetDefaultCPUSet() if !cpusetSf.Equals(cpusetSm) { @@ -253,7 +254,7 @@ func TestFileStateTryRestore(t *testing.T) { } } - stateEqual(t, fileState, tc.expectedState) + AssertStateEqual(t, fileState, tc.expectedState) }) } } @@ -363,7 +364,7 @@ func TestUpdateStateFile(t *testing.T) { } } newFileState := NewFileState(sfilePath.Name(), "static") - stateEqual(t, newFileState, tc.expectedState) + AssertStateEqual(t, newFileState, tc.expectedState) }) } } @@ -471,7 +472,6 @@ func TestClearStateStateFile(t *testing.T) { t.Error("cleared state shoudn't has got information about containers") } } - }) } } diff --git a/pkg/kubelet/cm/cpumanager/state/testing/BUILD b/pkg/kubelet/cm/cpumanager/state/testing/BUILD new file mode 100644 index 00000000000..41c6d772329 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/testing/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["util.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing", + visibility = ["//visibility:public"], + deps = ["//pkg/kubelet/checkpointmanager:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cm/cpumanager/state/testing/util.go b/pkg/kubelet/cm/cpumanager/state/testing/util.go new file mode 100644 index 00000000000..6222c161cb1 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/testing/util.go @@ -0,0 +1,41 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + +var _ checkpointmanager.Checkpoint = &MockCheckpoint{} + +// MockCheckpoint struct is used for mocking checkpoint values in testing +type MockCheckpoint struct { + Content string +} + +// MarshalCheckpoint returns fake content +func (mc *MockCheckpoint) MarshalCheckpoint() ([]byte, error) { + return []byte(mc.Content), nil +} + +func (mc *MockCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return nil +} + +func (mc *MockCheckpoint) GetChecksum() uint64 { + return 0 +} + +func (mc *MockCheckpoint) UpdateChecksum() {}