Added Cpu Manager file state

This commit is contained in:
Szymon Scharmach 2017-10-16 11:51:29 +02:00 committed by Szymon Scharmach
parent 1f53329c67
commit 4ee0adc77a
8 changed files with 593 additions and 37 deletions

View File

@ -32,7 +32,7 @@ import (
)
type mockState struct {
assignments map[string]cpuset.CPUSet
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
}
@ -64,6 +64,19 @@ func (s *mockState) Delete(containerID string) {
delete(s.assignments, containerID)
}
func (s *mockState) ClearState() {
s.defaultCPUSet = cpuset.CPUSet{}
s.assignments = make(state.ContainerCPUAssignments)
}
func (s *mockState) SetCPUAssignments(a state.ContainerCPUAssignments) {
s.assignments = a.Clone()
}
func (s *mockState) GetCPUAssignments() state.ContainerCPUAssignments {
return s.assignments.Clone()
}
type mockPolicy struct {
err error
}
@ -190,7 +203,7 @@ func TestCPUManagerAdd(t *testing.T) {
err: testCase.regErr,
},
state: &mockState{
assignments: map[string]cpuset.CPUSet{},
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{
@ -216,7 +229,7 @@ func TestCPUManagerRemove(t *testing.T) {
err: nil,
},
state: &mockState{
assignments: map[string]cpuset.CPUSet{},
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{},
@ -251,7 +264,7 @@ func TestReconcileState(t *testing.T) {
activePods []*v1.Pod
pspPS v1.PodStatus
pspFound bool
stAssignments map[string]cpuset.CPUSet
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
updateErr error
expectFailedContainerName string
@ -282,7 +295,7 @@ func TestReconcileState(t *testing.T) {
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
@ -308,7 +321,7 @@ func TestReconcileState(t *testing.T) {
},
pspPS: v1.PodStatus{},
pspFound: false,
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectFailedContainerName: "fakeName",
@ -339,7 +352,7 @@ func TestReconcileState(t *testing.T) {
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectFailedContainerName: "fakeName",
@ -370,7 +383,7 @@ func TestReconcileState(t *testing.T) {
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(),
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
@ -403,7 +416,7 @@ func TestReconcileState(t *testing.T) {
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),

View File

@ -19,6 +19,7 @@ package cpumanager
import (
"testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
@ -36,7 +37,7 @@ func TestNonePolicyAdd(t *testing.T) {
policy := &nonePolicy{}
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}
@ -53,7 +54,7 @@ func TestNonePolicyRemove(t *testing.T) {
policy := &nonePolicy{}
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
@ -31,7 +32,7 @@ type staticPolicyTest struct {
topo *topology.CPUTopology
numReservedCPUs int
containerID string
stAssignments map[string]cpuset.CPUSet
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
@ -53,7 +54,7 @@ func TestStaticPolicyStart(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy)
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
}
@ -88,7 +89,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
@ -100,7 +101,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "1000m"),
expErr: nil,
@ -112,7 +113,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2, 3, 6, 7),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
@ -126,7 +127,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
@ -140,7 +141,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11),
@ -154,7 +155,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7),
@ -168,7 +169,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(4, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7),
@ -182,7 +183,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
@ -196,7 +197,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "2000m"),
expErr: nil,
@ -208,7 +209,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID4",
stAssignments: map[string]cpuset.CPUSet{},
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("977m", "977m"),
expErr: nil,
@ -220,7 +221,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 7),
@ -234,7 +235,7 @@ func TestStaticPolicyAdd(t *testing.T) {
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 2, 3),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11),
@ -250,7 +251,7 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocSock0",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7),
},
stDefaultCPUSet: largeTopoCPUSet.Difference(cpuset.NewCPUSet(3, 11, 4, 5, 6, 7)),
@ -265,7 +266,7 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllFullCoresFromThreeSockets",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51,
53, 173, 113, 233, 54, 61)),
},
@ -281,7 +282,7 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllSock1+FullCore",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53,
173, 61, 181, 108, 228, 115, 235))),
},
@ -298,7 +299,7 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
@ -314,7 +315,7 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, NoAlloc",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52),
@ -374,7 +375,7 @@ func TestStaticPolicyRemove(t *testing.T) {
description: "SingleSocketHT, DeAllocOneContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
},
stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7),
@ -384,7 +385,7 @@ func TestStaticPolicyRemove(t *testing.T) {
description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
"fakeID2": cpuset.NewCPUSet(4, 5, 6, 7),
},
@ -395,7 +396,7 @@ func TestStaticPolicyRemove(t *testing.T) {
description: "SingleSocketHT, DeAllocTwoContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
"fakeID2": cpuset.NewCPUSet(2, 4),
},
@ -406,7 +407,7 @@ func TestStaticPolicyRemove(t *testing.T) {
description: "SingleSocketHT, NoDeAlloc",
topo: topoSingleSocketHT,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7),

View File

@ -1,9 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"state.go",
"state_file.go",
"state_mem.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state",
@ -14,6 +15,14 @@ go_library(
],
)
go_test(
name = "go_default_test",
srcs = ["state_file_test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state",
library = ":go_default_library",
deps = ["//pkg/kubelet/cm/cpuset:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),

View File

@ -20,17 +20,32 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
// ContainerCPUAssignments type used in cpu manger state
type ContainerCPUAssignments map[string]cpuset.CPUSet
// Clone returns a copy of ContainerCPUAssignments
func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments {
ret := make(ContainerCPUAssignments)
for key, val := range as {
ret[key] = val
}
return ret
}
// Reader interface used to read current cpu/pod assignment state
type Reader interface {
GetCPUSet(containerID string) (cpuset.CPUSet, bool)
GetDefaultCPUSet() cpuset.CPUSet
GetCPUSetOrDefault(containerID string) cpuset.CPUSet
GetCPUAssignments() ContainerCPUAssignments
}
type writer interface {
SetCPUSet(containerID string, cpuset cpuset.CPUSet)
SetDefaultCPUSet(cpuset cpuset.CPUSet)
SetCPUAssignments(ContainerCPUAssignments)
Delete(containerID string)
ClearState()
}
// State interface provides methods for tracking and setting cpu/pod assignment

View File

@ -0,0 +1,195 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"github.com/golang/glog"
"io/ioutil"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"os"
"sync"
)
type stateFileData struct {
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"`
}
var _ State = &stateFile{}
type stateFile struct {
sync.RWMutex
stateFilePath string
cache State
}
// NewFileState creates new State for keeping track of cpu/pod assignment with file backend
func NewFileState(filePath string) State {
stateFile := &stateFile{
stateFilePath: filePath,
cache: NewMemoryState(),
}
if err := stateFile.tryRestoreState(); err != nil {
// could not restore state, init new state file
glog.Infof("[cpumanager] state file: initializing empty state file")
stateFile.cache.ClearState()
stateFile.storeState()
}
return stateFile
}
// tryRestoreState tries to read state file, upon any error,
// err message is logged and state is left clean. un-initialized
func (sf *stateFile) tryRestoreState() error {
sf.Lock()
defer sf.Unlock()
var err error
// used when all parsing is ok
tmpAssignments := make(ContainerCPUAssignments)
tmpDefaultCPUSet := cpuset.NewCPUSet()
tmpContainerCPUSet := cpuset.NewCPUSet()
var content []byte
if content, err = ioutil.ReadFile(sf.stateFilePath); os.IsNotExist(err) {
// Create file
if _, err = os.Create(sf.stateFilePath); err != nil {
glog.Errorf("[cpumanager] state file: unable to create state file \"%s\":%s", sf.stateFilePath, err.Error())
panic("[cpumanager] state file not created")
}
glog.Infof("[cpumanager] state file: created empty state file \"%s\"", sf.stateFilePath)
} else {
// File exists - try to read
var readState stateFileData
if err = json.Unmarshal(content, &readState); err != nil {
glog.Warningf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
return err
}
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil {
glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet)
return err
}
for containerID, cpuString := range readState.Entries {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
glog.Warningf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString)
return err
}
tmpAssignments[containerID] = tmpContainerCPUSet
}
sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
sf.cache.SetCPUAssignments(tmpAssignments)
glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath)
glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String())
}
return nil
}
// saves state to a file, caller is responsible for locking
func (sf *stateFile) storeState() {
var content []byte
var err error
data := stateFileData{
DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(),
Entries: map[string]string{},
}
for containerID, cset := range sf.cache.GetCPUAssignments() {
data.Entries[containerID] = cset.String()
}
if content, err = json.Marshal(data); err != nil {
panic("[cpumanager] state file: could not serialize state to json")
}
if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil {
panic("[cpumanager] state file not written")
}
return
}
func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
sf.RLock()
defer sf.RUnlock()
res, ok := sf.cache.GetCPUSet(containerID)
return res, ok
}
func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet {
sf.RLock()
defer sf.RUnlock()
return sf.cache.GetDefaultCPUSet()
}
func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
sf.RLock()
defer sf.RUnlock()
return sf.cache.GetCPUSetOrDefault(containerID)
}
func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments {
sf.RLock()
defer sf.RUnlock()
return sf.cache.GetCPUAssignments()
}
func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) {
sf.Lock()
defer sf.Unlock()
sf.cache.SetCPUSet(containerID, cset)
sf.storeState()
}
func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) {
sf.Lock()
defer sf.Unlock()
sf.cache.SetDefaultCPUSet(cset)
sf.storeState()
}
func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) {
sf.Lock()
defer sf.Unlock()
sf.cache.SetCPUAssignments(a)
sf.storeState()
}
func (sf *stateFile) Delete(containerID string) {
sf.Lock()
defer sf.Unlock()
sf.cache.Delete(containerID)
sf.storeState()
}
func (sf *stateFile) ClearState() {
sf.Lock()
defer sf.Unlock()
sf.cache.ClearState()
sf.storeState()
}

View File

@ -0,0 +1,299 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"io/ioutil"
"path"
"reflect"
"testing"
"fmt"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"os"
)
func writeToStateFile(statefile string, content string) {
ioutil.WriteFile(statefile, []byte(content), 0644)
}
func TestFileStateTryRestore(t *testing.T) {
testCases := []struct {
description string
stateFileContent string
expErr string
expectedState stateMemory
}{
{
"Invalid JSON - empty file",
"\n",
"unexpected end of JSON input",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Invalid JSON - invalid content",
"{",
"unexpected end of JSON input",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore defaultCPUSet only",
"{ \"defaultCpuSet\": \"4-6\"}",
"",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
},
},
{
"Try restore defaultCPUSet only - invalid name",
"{ \"defCPUSet\": \"4-6\"}",
"",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore assignments only",
"{" +
"\"reservedList\": { " +
"\"container1\": \"4-6\"," +
"\"container2\": \"1-3\"" +
"} }",
"",
stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore invalid assignments",
"{ \"reservedList\": }",
"invalid character '}' looking for beginning of value",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore valid file",
"{ " +
"\"defaultCpuSet\": \"23-24\", " +
"\"reservedList\": { " +
"\"container1\": \"4-6\", " +
"\"container2\": \"1-3\"" +
" } }",
"",
stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
defaultCPUSet: cpuset.NewCPUSet(23, 24),
},
},
{
"Try restore un-parsable defaultCPUSet ",
"{ \"defaultCpuSet\": \"2-sd\" }",
"strconv.Atoi: parsing \"sd\": invalid syntax",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore un-parsable assignments",
"{ " +
"\"defaultCpuSet\": \"23-24\", " +
"\"reservedList\": { " +
"\"container1\": \"p-6\", " +
"\"container2\": \"1-3\"" +
" } }",
"strconv.Atoi: parsing \"p\": invalid syntax",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"TryRestoreState creates empty state file",
"",
"",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
}
for idx, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
sfilePath := path.Join("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx))
// Don't create state file, let TryRestoreState figure out that is should create
if tc.stateFileContent != "" {
writeToStateFile(sfilePath, tc.stateFileContent)
}
// Always remove file - regardless of who created
defer os.Remove(sfilePath)
fileState := NewFileState(sfilePath)
err := fileState.TryRestoreState()
if tc.expErr != "" {
if err != nil {
if err.Error() != tc.expErr {
t.Errorf("TryRestoreState() error = %v, wantErr %v", err, tc.expErr)
return
}
} else {
t.Errorf("TryRestoreState() error = nil, wantErr %v", tc.expErr)
return
}
} else {
if err != nil {
t.Errorf("TryRestoreState() error = %v, wantErr nil", err)
return
}
}
if !reflect.DeepEqual(fileState.State, &tc.expectedState) {
t.Errorf("TryRestoreState() = %v, want %v", fileState.State, tc.expectedState)
}
})
}
}
func TestFileStateTryRestorePanic(t *testing.T) {
testCase := struct {
description string
wantPanic bool
panicMessage string
}{
"Panic creating file",
true,
"[cpumanager] state file not created",
}
t.Run(testCase.description, func(t *testing.T) {
sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test")
fileState := NewFileState(sfilePath)
defer func() {
if err := recover(); err != nil {
if testCase.wantPanic {
if testCase.panicMessage == err {
t.Logf("TryRestoreState() got expected panic = %v", err)
return
}
t.Errorf("TryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage)
}
}
}()
fileState.TryRestoreState()
})
}
func TestUpdateStateFile(t *testing.T) {
testCases := []struct {
description string
expErr string
expectedState stateMemory
}{
{
"Save empty state",
"",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Save defaultCPUSet only",
"",
stateMemory{
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(1, 6),
},
},
{
"Save assignments only",
"",
stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
defaultCPUSet: cpuset.NewCPUSet(),
},
},
}
for idx, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
sfilePath := path.Join("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx))
fileState := NewFileState(sfilePath)
fileState.SetDefaultCPUSet(tc.expectedState.defaultCPUSet)
fileState.SetCPUAssignments(tc.expectedState.assignments)
err := fileState.UpdateStateFile()
defer os.Remove(sfilePath)
if tc.expErr != "" {
if err != nil {
if err.Error() != tc.expErr {
t.Errorf("UpdateStateFile() error = %v, wantErr %v", err, tc.expErr)
return
}
} else {
t.Errorf("UpdateStateFile() error = nil, wantErr %v", tc.expErr)
return
}
} else {
if err != nil {
t.Errorf("UpdateStateFile() error = %v, wantErr nil", err)
return
}
}
fileState.ClearState()
err = fileState.TryRestoreState()
if !reflect.DeepEqual(fileState.State, &tc.expectedState) {
t.Errorf("TryRestoreState() = %v, want %v", fileState.State, tc.expectedState)
}
})
}
}

View File

@ -25,7 +25,7 @@ import (
type stateMemory struct {
sync.RWMutex
assignments map[string]cpuset.CPUSet
assignments ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
}
@ -35,7 +35,7 @@ var _ State = &stateMemory{}
func NewMemoryState() State {
glog.Infof("[cpumanager] initializing new in-memory state store")
return &stateMemory{
assignments: map[string]cpuset.CPUSet{},
assignments: ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(),
}
}
@ -65,6 +65,12 @@ func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
return s.GetDefaultCPUSet()
}
func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments {
s.RLock()
defer s.RUnlock()
return s.assignments.Clone()
}
func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) {
s.Lock()
defer s.Unlock()
@ -81,6 +87,14 @@ func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) {
glog.Infof("[cpumanager] updated default cpuset: \"%s\"", cset)
}
func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) {
s.Lock()
defer s.Unlock()
s.assignments = a.Clone()
glog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a)
}
func (s *stateMemory) Delete(containerID string) {
s.Lock()
defer s.Unlock()
@ -88,3 +102,12 @@ func (s *stateMemory) Delete(containerID string) {
delete(s.assignments, containerID)
glog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID)
}
func (s *stateMemory) ClearState() {
s.Lock()
defer s.Unlock()
s.defaultCPUSet = cpuset.CPUSet{}
s.assignments = make(ContainerCPUAssignments)
glog.V(2).Infof("[cpumanager] cleared state")
}