diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index a763bbfaf36..e4df638c3db 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -32,7 +32,7 @@ import ( ) type mockState struct { - assignments map[string]cpuset.CPUSet + assignments state.ContainerCPUAssignments defaultCPUSet cpuset.CPUSet } @@ -64,6 +64,19 @@ func (s *mockState) Delete(containerID string) { delete(s.assignments, containerID) } +func (s *mockState) ClearState() { + s.defaultCPUSet = cpuset.CPUSet{} + s.assignments = make(state.ContainerCPUAssignments) +} + +func (s *mockState) SetCPUAssignments(a state.ContainerCPUAssignments) { + s.assignments = a.Clone() +} + +func (s *mockState) GetCPUAssignments() state.ContainerCPUAssignments { + return s.assignments.Clone() +} + type mockPolicy struct { err error } @@ -190,7 +203,7 @@ func TestCPUManagerAdd(t *testing.T) { err: testCase.regErr, }, state: &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{ @@ -216,7 +229,7 @@ func TestCPUManagerRemove(t *testing.T) { err: nil, }, state: &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{}, @@ -251,7 +264,7 @@ func TestReconcileState(t *testing.T) { activePods []*v1.Pod pspPS v1.PodStatus pspFound bool - stAssignments map[string]cpuset.CPUSet + stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet updateErr error expectFailedContainerName string @@ -282,7 +295,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(1, 2), }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), @@ -308,7 +321,7 @@ func TestReconcileState(t *testing.T) { }, pspPS: v1.PodStatus{}, pspFound: false, - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectFailedContainerName: "fakeName", @@ -339,7 +352,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectFailedContainerName: "fakeName", @@ -370,7 +383,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(), }, stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), @@ -403,7 +416,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(1, 2), }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index e20f9bdbbc1..99e61fdbbe2 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -19,6 +19,7 @@ package cpumanager import ( "testing" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -36,7 +37,7 @@ func TestNonePolicyAdd(t *testing.T) { policy := &nonePolicy{} st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } @@ -53,7 +54,7 @@ func TestNonePolicyRemove(t *testing.T) { policy := &nonePolicy{} st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index bf6b53f326d..e6626f44675 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -31,7 +32,7 @@ type staticPolicyTest struct { topo *topology.CPUTopology numReservedCPUs int containerID string - stAssignments map[string]cpuset.CPUSet + stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod expErr error @@ -53,7 +54,7 @@ func TestStaticPolicyStart(t *testing.T) { policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy) st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), } @@ -88,7 +89,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("8000m", "8000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), @@ -100,7 +101,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("1000m", "1000m"), expErr: nil, @@ -112,7 +113,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2, 3, 6, 7), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), @@ -126,7 +127,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), @@ -140,7 +141,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11), @@ -154,7 +155,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketNoHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7), @@ -168,7 +169,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketNoHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(4, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7), @@ -182,7 +183,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), @@ -196,7 +197,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("1000m", "2000m"), expErr: nil, @@ -208,7 +209,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID4", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("977m", "977m"), expErr: nil, @@ -220,7 +221,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 7), @@ -234,7 +235,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 2, 3), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11), @@ -250,7 +251,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocSock0", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7), }, stDefaultCPUSet: largeTopoCPUSet.Difference(cpuset.NewCPUSet(3, 11, 4, 5, 6, 7)), @@ -265,7 +266,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllFullCoresFromThreeSockets", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, 53, 173, 113, 233, 54, 61)), }, @@ -281,7 +282,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllSock1+FullCore", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, 173, 61, 181, 108, 228, 115, 235))), }, @@ -298,7 +299,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), @@ -314,7 +315,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, NoAlloc", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52), @@ -374,7 +375,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocOneContainer", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 2, 3), }, stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), @@ -384,7 +385,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 2, 3), "fakeID2": cpuset.NewCPUSet(4, 5, 6, 7), }, @@ -395,7 +396,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocTwoContainer", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 3, 5), "fakeID2": cpuset.NewCPUSet(2, 4), }, @@ -406,7 +407,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, NoDeAlloc", topo: topoSingleSocketHT, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 3, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7), diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index 6393bdec4d0..8af631c9ad6 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -1,9 +1,10 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "state.go", + "state_file.go", "state_mem.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", @@ -14,6 +15,14 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = ["state_file_test.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", + library = ":go_default_library", + deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/kubelet/cm/cpumanager/state/state.go b/pkg/kubelet/cm/cpumanager/state/state.go index 98f7f7dc240..0550b644d57 100644 --- a/pkg/kubelet/cm/cpumanager/state/state.go +++ b/pkg/kubelet/cm/cpumanager/state/state.go @@ -20,17 +20,32 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +// ContainerCPUAssignments type used in cpu manger state +type ContainerCPUAssignments map[string]cpuset.CPUSet + +// Clone returns a copy of ContainerCPUAssignments +func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments { + ret := make(ContainerCPUAssignments) + for key, val := range as { + ret[key] = val + } + return ret +} + // Reader interface used to read current cpu/pod assignment state type Reader interface { GetCPUSet(containerID string) (cpuset.CPUSet, bool) GetDefaultCPUSet() cpuset.CPUSet GetCPUSetOrDefault(containerID string) cpuset.CPUSet + GetCPUAssignments() ContainerCPUAssignments } type writer interface { SetCPUSet(containerID string, cpuset cpuset.CPUSet) SetDefaultCPUSet(cpuset cpuset.CPUSet) + SetCPUAssignments(ContainerCPUAssignments) Delete(containerID string) + ClearState() } // State interface provides methods for tracking and setting cpu/pod assignment diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go new file mode 100644 index 00000000000..b800613419f --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -0,0 +1,195 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + "github.com/golang/glog" + "io/ioutil" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "os" + "sync" +) + +type stateFileData struct { + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]string `json:"entries,omitempty"` +} + +var _ State = &stateFile{} + +type stateFile struct { + sync.RWMutex + stateFilePath string + cache State +} + +// NewFileState creates new State for keeping track of cpu/pod assignment with file backend +func NewFileState(filePath string) State { + stateFile := &stateFile{ + stateFilePath: filePath, + cache: NewMemoryState(), + } + + if err := stateFile.tryRestoreState(); err != nil { + // could not restore state, init new state file + glog.Infof("[cpumanager] state file: initializing empty state file") + stateFile.cache.ClearState() + stateFile.storeState() + } + + return stateFile +} + +// tryRestoreState tries to read state file, upon any error, +// err message is logged and state is left clean. un-initialized +func (sf *stateFile) tryRestoreState() error { + sf.Lock() + defer sf.Unlock() + var err error + + // used when all parsing is ok + tmpAssignments := make(ContainerCPUAssignments) + tmpDefaultCPUSet := cpuset.NewCPUSet() + tmpContainerCPUSet := cpuset.NewCPUSet() + + var content []byte + + if content, err = ioutil.ReadFile(sf.stateFilePath); os.IsNotExist(err) { + // Create file + if _, err = os.Create(sf.stateFilePath); err != nil { + glog.Errorf("[cpumanager] state file: unable to create state file \"%s\":%s", sf.stateFilePath, err.Error()) + panic("[cpumanager] state file not created") + } + glog.Infof("[cpumanager] state file: created empty state file \"%s\"", sf.stateFilePath) + } else { + // File exists - try to read + var readState stateFileData + + if err = json.Unmarshal(content, &readState); err != nil { + glog.Warningf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) + return err + } + + if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { + glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) + return err + } + + for containerID, cpuString := range readState.Entries { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + glog.Warningf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) + return err + } + tmpAssignments[containerID] = tmpContainerCPUSet + } + + sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) + sf.cache.SetCPUAssignments(tmpAssignments) + + glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath) + glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String()) + } + return nil +} + +// saves state to a file, caller is responsible for locking +func (sf *stateFile) storeState() { + var content []byte + var err error + + data := stateFileData{ + DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), + Entries: map[string]string{}, + } + + for containerID, cset := range sf.cache.GetCPUAssignments() { + data.Entries[containerID] = cset.String() + } + + if content, err = json.Marshal(data); err != nil { + panic("[cpumanager] state file: could not serialize state to json") + } + + if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil { + panic("[cpumanager] state file not written") + } + return +} + +func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { + sf.RLock() + defer sf.RUnlock() + + res, ok := sf.cache.GetCPUSet(containerID) + return res, ok +} + +func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet { + sf.RLock() + defer sf.RUnlock() + + return sf.cache.GetDefaultCPUSet() +} + +func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { + sf.RLock() + defer sf.RUnlock() + + return sf.cache.GetCPUSetOrDefault(containerID) +} + +func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { + sf.RLock() + defer sf.RUnlock() + return sf.cache.GetCPUAssignments() +} + +func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetCPUSet(containerID, cset) + sf.storeState() +} + +func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetDefaultCPUSet(cset) + sf.storeState() +} + +func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetCPUAssignments(a) + sf.storeState() +} + +func (sf *stateFile) Delete(containerID string) { + sf.Lock() + defer sf.Unlock() + sf.cache.Delete(containerID) + sf.storeState() +} + +func (sf *stateFile) ClearState() { + sf.Lock() + defer sf.Unlock() + sf.cache.ClearState() + sf.storeState() +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go new file mode 100644 index 00000000000..967c581af03 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -0,0 +1,299 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "io/ioutil" + "path" + "reflect" + "testing" + + "fmt" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + + "os" +) + +func writeToStateFile(statefile string, content string) { + ioutil.WriteFile(statefile, []byte(content), 0644) +} + +func TestFileStateTryRestore(t *testing.T) { + testCases := []struct { + description string + stateFileContent string + expErr string + expectedState stateMemory + }{ + { + "Invalid JSON - empty file", + "\n", + "unexpected end of JSON input", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Invalid JSON - invalid content", + "{", + "unexpected end of JSON input", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore defaultCPUSet only", + "{ \"defaultCpuSet\": \"4-6\"}", + "", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), + }, + }, + { + "Try restore defaultCPUSet only - invalid name", + "{ \"defCPUSet\": \"4-6\"}", + "", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore assignments only", + "{" + + "\"reservedList\": { " + + "\"container1\": \"4-6\"," + + "\"container2\": \"1-3\"" + + "} }", + "", + stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore invalid assignments", + "{ \"reservedList\": }", + "invalid character '}' looking for beginning of value", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore valid file", + "{ " + + "\"defaultCpuSet\": \"23-24\", " + + "\"reservedList\": { " + + "\"container1\": \"4-6\", " + + "\"container2\": \"1-3\"" + + " } }", + "", + stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(23, 24), + }, + }, + { + "Try restore un-parsable defaultCPUSet ", + "{ \"defaultCpuSet\": \"2-sd\" }", + "strconv.Atoi: parsing \"sd\": invalid syntax", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore un-parsable assignments", + "{ " + + "\"defaultCpuSet\": \"23-24\", " + + "\"reservedList\": { " + + "\"container1\": \"p-6\", " + + "\"container2\": \"1-3\"" + + " } }", + "strconv.Atoi: parsing \"p\": invalid syntax", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "TryRestoreState creates empty state file", + "", + "", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + } + + for idx, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + sfilePath := path.Join("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) + // Don't create state file, let TryRestoreState figure out that is should create + if tc.stateFileContent != "" { + writeToStateFile(sfilePath, tc.stateFileContent) + } + + // Always remove file - regardless of who created + defer os.Remove(sfilePath) + + fileState := NewFileState(sfilePath) + err := fileState.TryRestoreState() + + if tc.expErr != "" { + if err != nil { + if err.Error() != tc.expErr { + t.Errorf("TryRestoreState() error = %v, wantErr %v", err, tc.expErr) + return + } + } else { + t.Errorf("TryRestoreState() error = nil, wantErr %v", tc.expErr) + return + } + } else { + if err != nil { + t.Errorf("TryRestoreState() error = %v, wantErr nil", err) + return + } + } + + if !reflect.DeepEqual(fileState.State, &tc.expectedState) { + t.Errorf("TryRestoreState() = %v, want %v", fileState.State, tc.expectedState) + } + }) + } +} + +func TestFileStateTryRestorePanic(t *testing.T) { + + testCase := struct { + description string + wantPanic bool + panicMessage string + }{ + "Panic creating file", + true, + "[cpumanager] state file not created", + } + + t.Run(testCase.description, func(t *testing.T) { + + sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test") + fileState := NewFileState(sfilePath) + + defer func() { + if err := recover(); err != nil { + if testCase.wantPanic { + if testCase.panicMessage == err { + t.Logf("TryRestoreState() got expected panic = %v", err) + return + } + t.Errorf("TryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage) + } + } + }() + fileState.TryRestoreState() + }) +} + +func TestUpdateStateFile(t *testing.T) { + + testCases := []struct { + description string + expErr string + expectedState stateMemory + }{ + { + "Save empty state", + "", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Save defaultCPUSet only", + "", + stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(1, 6), + }, + }, + { + "Save assignments only", + "", + stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + } + + for idx, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + sfilePath := path.Join("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) + + fileState := NewFileState(sfilePath) + + fileState.SetDefaultCPUSet(tc.expectedState.defaultCPUSet) + fileState.SetCPUAssignments(tc.expectedState.assignments) + + err := fileState.UpdateStateFile() + defer os.Remove(sfilePath) + + if tc.expErr != "" { + if err != nil { + if err.Error() != tc.expErr { + t.Errorf("UpdateStateFile() error = %v, wantErr %v", err, tc.expErr) + return + } + } else { + t.Errorf("UpdateStateFile() error = nil, wantErr %v", tc.expErr) + return + } + } else { + if err != nil { + t.Errorf("UpdateStateFile() error = %v, wantErr nil", err) + return + } + } + + fileState.ClearState() + err = fileState.TryRestoreState() + if !reflect.DeepEqual(fileState.State, &tc.expectedState) { + t.Errorf("TryRestoreState() = %v, want %v", fileState.State, tc.expectedState) + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go index 751b1726aae..797cdb15b2b 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_mem.go +++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go @@ -25,7 +25,7 @@ import ( type stateMemory struct { sync.RWMutex - assignments map[string]cpuset.CPUSet + assignments ContainerCPUAssignments defaultCPUSet cpuset.CPUSet } @@ -35,7 +35,7 @@ var _ State = &stateMemory{} func NewMemoryState() State { glog.Infof("[cpumanager] initializing new in-memory state store") return &stateMemory{ - assignments: map[string]cpuset.CPUSet{}, + assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), } } @@ -65,6 +65,12 @@ func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { return s.GetDefaultCPUSet() } +func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments { + s.RLock() + defer s.RUnlock() + return s.assignments.Clone() +} + func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { s.Lock() defer s.Unlock() @@ -81,6 +87,14 @@ func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { glog.Infof("[cpumanager] updated default cpuset: \"%s\"", cset) } +func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) { + s.Lock() + defer s.Unlock() + + s.assignments = a.Clone() + glog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a) +} + func (s *stateMemory) Delete(containerID string) { s.Lock() defer s.Unlock() @@ -88,3 +102,12 @@ func (s *stateMemory) Delete(containerID string) { delete(s.assignments, containerID) glog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) } + +func (s *stateMemory) ClearState() { + s.Lock() + defer s.Unlock() + + s.defaultCPUSet = cpuset.CPUSet{} + s.assignments = make(ContainerCPUAssignments) + glog.V(2).Infof("[cpumanager] cleared state") +}