Change CPUManager state to key off of podUID and containerName

Previously, the state was keyed off of containerID intead of podUID and
containerName. Unfortunately, this is no longer possible as we move to a
to model where we we allocate CPUs to containers at pod adit time rather
than container start time.

This patch is the first step towards full migration to the new
semantics. Only the unit tests in cpumanager/state are passing. In
subsequent commits we will update the CPUManager itself to use these new
semantics.

This patch also includes code to do migration from the old checkpoint format
to the new one, assuming the existence of a ContainerMap with the proper
mapping of (containerID)->(podUID, containerName). A subsequent commit
will update code in higher layers to make sure that this ContainerMap is
made available to this state logic.
This commit is contained in:
Kevin Klues 2019-10-25 16:25:29 +02:00
parent 9191a949ae
commit 7c760fea38
9 changed files with 475 additions and 191 deletions

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
@ -30,6 +31,7 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library", "//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",

View File

@ -23,38 +23,87 @@ import (
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
) )
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpoint{}
// CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint // CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint
type CPUManagerCheckpoint struct { type CPUManagerCheckpoint = CPUManagerCheckpointV2
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV1{}
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV2{}
// CPUManagerCheckpointV1 struct is used to store cpu/pod assignments in a checkpoint in v1 format
type CPUManagerCheckpointV1 struct {
PolicyName string `json:"policyName"` PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"` DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"` Entries map[string]string `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"` Checksum checksum.Checksum `json:"checksum"`
} }
// CPUManagerCheckpointV2 struct is used to store cpu/pod assignments in a checkpoint in v2 format
type CPUManagerCheckpointV2 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]map[string]string `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewCPUManagerCheckpoint returns an instance of Checkpoint // NewCPUManagerCheckpoint returns an instance of Checkpoint
func NewCPUManagerCheckpoint() *CPUManagerCheckpoint { func NewCPUManagerCheckpoint() *CPUManagerCheckpoint {
return &CPUManagerCheckpoint{ //lint:ignore unexported-type-in-api user-facing error message
return newCPUManagerCheckpointV2()
}
func newCPUManagerCheckpointV1() *CPUManagerCheckpointV1 {
return &CPUManagerCheckpointV1{
Entries: make(map[string]string), Entries: make(map[string]string),
} }
} }
// MarshalCheckpoint returns marshalled checkpoint func newCPUManagerCheckpointV2() *CPUManagerCheckpointV2 {
func (cp *CPUManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { return &CPUManagerCheckpointV2{
Entries: make(map[string]map[string]string),
}
}
// MarshalCheckpoint returns marshalled checkpoint in v1 format
func (cp *CPUManagerCheckpointV1) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum // make sure checksum wasn't set before so it doesn't affect output checksum
cp.Checksum = 0 cp.Checksum = 0
cp.Checksum = checksum.New(cp) cp.Checksum = checksum.New(cp)
return json.Marshal(*cp) return json.Marshal(*cp)
} }
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint // MarshalCheckpoint returns marshalled checkpoint in v2 format
func (cp *CPUManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { func (cp *CPUManagerCheckpointV2) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
cp.Checksum = 0
cp.Checksum = checksum.New(cp)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v1 format
func (cp *CPUManagerCheckpointV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp) return json.Unmarshal(blob, cp)
} }
// VerifyChecksum verifies that current checksum of checkpoint is valid // UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v2 format
func (cp *CPUManagerCheckpoint) VerifyChecksum() error { func (cp *CPUManagerCheckpointV2) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid in v1 format
func (cp *CPUManagerCheckpointV1) VerifyChecksum() error {
if cp.Checksum == 0 {
// accept empty checksum for compatibility with old file backend
return nil
}
ck := cp.Checksum
cp.Checksum = 0
err := ck.Verify(cp)
cp.Checksum = ck
return err
}
// VerifyChecksum verifies that current checksum of checkpoint is valid in v2 format
func (cp *CPUManagerCheckpointV2) VerifyChecksum() error {
if cp.Checksum == 0 { if cp.Checksum == 0 {
// accept empty checksum for compatibility with old file backend // accept empty checksum for compatibility with old file backend
return nil return nil

View File

@ -21,30 +21,33 @@ import (
) )
// ContainerCPUAssignments type used in cpu manager state // ContainerCPUAssignments type used in cpu manager state
type ContainerCPUAssignments map[string]cpuset.CPUSet type ContainerCPUAssignments map[string]map[string]cpuset.CPUSet
// Clone returns a copy of ContainerCPUAssignments // Clone returns a copy of ContainerCPUAssignments
func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments { func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments {
ret := make(ContainerCPUAssignments) ret := make(ContainerCPUAssignments)
for key, val := range as { for pod := range as {
ret[key] = val ret[pod] = make(map[string]cpuset.CPUSet)
for container, cset := range as[pod] {
ret[pod][container] = cset
}
} }
return ret return ret
} }
// Reader interface used to read current cpu/pod assignment state // Reader interface used to read current cpu/pod assignment state
type Reader interface { type Reader interface {
GetCPUSet(containerID string) (cpuset.CPUSet, bool) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool)
GetDefaultCPUSet() cpuset.CPUSet GetDefaultCPUSet() cpuset.CPUSet
GetCPUSetOrDefault(containerID string) cpuset.CPUSet GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet
GetCPUAssignments() ContainerCPUAssignments GetCPUAssignments() ContainerCPUAssignments
} }
type writer interface { type writer interface {
SetCPUSet(containerID string, cpuset cpuset.CPUSet) SetCPUSet(podUID string, containerName string, cpuset cpuset.CPUSet)
SetDefaultCPUSet(cpuset cpuset.CPUSet) SetDefaultCPUSet(cpuset cpuset.CPUSet)
SetCPUAssignments(ContainerCPUAssignments) SetCPUAssignments(ContainerCPUAssignments)
Delete(containerID string) Delete(podUID string, containerName string)
ClearState() ClearState()
} }

View File

@ -24,6 +24,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
) )
@ -35,10 +36,11 @@ type stateCheckpoint struct {
cache State cache State
checkpointManager checkpointmanager.CheckpointManager checkpointManager checkpointmanager.CheckpointManager
checkpointName string checkpointName string
initialContainers containermap.ContainerMap
} }
// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend // NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) { func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
@ -48,6 +50,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err
policyName: policyName, policyName: policyName,
checkpointManager: checkpointManager, checkpointManager: checkpointManager,
checkpointName: checkpointName, checkpointName: checkpointName,
initialContainers: initialContainers,
} }
if err := stateCheckpoint.restoreState(); err != nil { if err := stateCheckpoint.restoreState(); err != nil {
@ -60,6 +63,30 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err
return stateCheckpoint, nil return stateCheckpoint, nil
} }
// migrateV1CheckpointToV2Checkpoint() converts checkpoints from the v1 format to the v2 format
func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error {
if src.PolicyName != "" {
dst.PolicyName = src.PolicyName
}
if src.DefaultCPUSet != "" {
dst.DefaultCPUSet = src.DefaultCPUSet
}
for containerID, cset := range src.Entries {
podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID)
if err != nil {
return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
}
if dst.Entries == nil {
dst.Entries = make(map[string]map[string]string)
}
if _, exists := dst.Entries[podUID]; !exists {
dst.Entries[podUID] = make(map[string]string)
}
dst.Entries[podUID][containerName] = cset
}
return nil
}
// restores state from a checkpoint and creates it if it doesn't exist // restores state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) restoreState() error { func (sc *stateCheckpoint) restoreState() error {
sc.mux.Lock() sc.mux.Lock()
@ -71,28 +98,40 @@ func (sc *stateCheckpoint) restoreState() error {
tmpDefaultCPUSet := cpuset.NewCPUSet() tmpDefaultCPUSet := cpuset.NewCPUSet()
tmpContainerCPUSet := cpuset.NewCPUSet() tmpContainerCPUSet := cpuset.NewCPUSet()
checkpoint := NewCPUManagerCheckpoint() checkpointV1 := newCPUManagerCheckpointV1()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { checkpointV2 := newCPUManagerCheckpointV2()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil {
checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
sc.storeState() sc.storeState()
return nil return nil
} }
return err return err
} }
if sc.policyName != checkpoint.PolicyName {
return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
} }
if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil { if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil {
return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err) return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err)
} }
for containerID, cpuString := range checkpoint.Entries { if sc.policyName != checkpointV2.PolicyName {
return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil {
return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err)
}
for pod := range checkpointV2.Entries {
tmpAssignments[pod] = make(map[string]cpuset.CPUSet)
for container, cpuString := range checkpointV2.Entries[pod] {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err) return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err)
}
tmpAssignments[pod][container] = tmpContainerCPUSet
} }
tmpAssignments[containerID] = tmpContainerCPUSet
} }
sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet) sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
@ -110,8 +149,12 @@ func (sc *stateCheckpoint) storeState() {
checkpoint.PolicyName = sc.policyName checkpoint.PolicyName = sc.policyName
checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
for containerID, cset := range sc.cache.GetCPUAssignments() { assignments := sc.cache.GetCPUAssignments()
checkpoint.Entries[containerID] = cset.String() for pod := range assignments {
checkpoint.Entries[pod] = make(map[string]string)
for container, cset := range assignments[pod] {
checkpoint.Entries[pod][container] = cset.String()
}
} }
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
@ -122,11 +165,11 @@ func (sc *stateCheckpoint) storeState() {
} }
// GetCPUSet returns current CPU set // GetCPUSet returns current CPU set
func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
sc.mux.RLock() sc.mux.RLock()
defer sc.mux.RUnlock() defer sc.mux.RUnlock()
res, ok := sc.cache.GetCPUSet(containerID) res, ok := sc.cache.GetCPUSet(podUID, containerName)
return res, ok return res, ok
} }
@ -139,11 +182,11 @@ func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
} }
// GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed // GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed
func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
sc.mux.RLock() sc.mux.RLock()
defer sc.mux.RUnlock() defer sc.mux.RUnlock()
return sc.cache.GetCPUSetOrDefault(containerID) return sc.cache.GetCPUSetOrDefault(podUID, containerName)
} }
// GetCPUAssignments returns current CPU to pod assignments // GetCPUAssignments returns current CPU to pod assignments
@ -155,10 +198,10 @@ func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
} }
// SetCPUSet sets CPU set // SetCPUSet sets CPU set
func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) { func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetCPUSet(containerID, cset) sc.cache.SetCPUSet(podUID, containerName, cset)
sc.storeState() sc.storeState()
} }
@ -179,10 +222,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
} }
// Delete deletes assignment for specified pod // Delete deletes assignment for specified pod
func (sc *stateCheckpoint) Delete(containerID string) { func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.Delete(containerID) sc.cache.Delete(podUID, containerName)
sc.storeState() sc.storeState()
} }

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
) )
@ -35,6 +36,7 @@ func TestCheckpointStateRestore(t *testing.T) {
description string description string
checkpointContent string checkpointContent string
policyName string policyName string
initialContainers containermap.ContainerMap
expectedError string expectedError string
expectedState *stateMemory expectedState *stateMemory
}{ }{
@ -42,6 +44,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"Restore non-existing checkpoint", "Restore non-existing checkpoint",
"", "",
"none", "none",
containermap.ContainerMap{},
"", "",
&stateMemory{}, &stateMemory{},
}, },
@ -51,9 +54,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCPUSet": "4-6", "defaultCPUSet": "4-6",
"entries": {}, "entries": {},
"checksum": 2912033808 "checksum": 2655485041
}`, }`,
"none", "none",
containermap.ContainerMap{},
"", "",
&stateMemory{ &stateMemory{
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
@ -65,18 +69,23 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCPUSet": "1-3", "defaultCPUSet": "1-3",
"entries": { "entries": {
"pod": {
"container1": "4-6", "container1": "4-6",
"container2": "1-3" "container2": "1-3"
}
}, },
"checksum": 1535905563 "checksum": 3415933391
}`, }`,
"none", "none",
containermap.ContainerMap{},
"", "",
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6), "container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3), "container2": cpuset.NewCPUSet(1, 2, 3),
}, },
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
}, },
}, },
@ -89,6 +98,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"checksum": 1337 "checksum": 1337
}`, }`,
"none", "none",
containermap.ContainerMap{},
"checkpoint is corrupted", "checkpoint is corrupted",
&stateMemory{}, &stateMemory{},
}, },
@ -96,6 +106,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"Restore checkpoint with invalid JSON", "Restore checkpoint with invalid JSON",
`{`, `{`,
"none", "none",
containermap.ContainerMap{},
"unexpected end of JSON input", "unexpected end of JSON input",
&stateMemory{}, &stateMemory{},
}, },
@ -105,9 +116,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "other", "policyName": "other",
"defaultCPUSet": "1-3", "defaultCPUSet": "1-3",
"entries": {}, "entries": {},
"checksum": 4195836012 "checksum": 698611581
}`, }`,
"none", "none",
containermap.ContainerMap{},
`configured policy "none" differs from state checkpoint policy "other"`, `configured policy "none" differs from state checkpoint policy "other"`,
&stateMemory{}, &stateMemory{},
}, },
@ -117,9 +129,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCPUSet": "1.3", "defaultCPUSet": "1.3",
"entries": {}, "entries": {},
"checksum": 1025273327 "checksum": 1966990140
}`, }`,
"none", "none",
containermap.ContainerMap{},
`could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`, `could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`,
&stateMemory{}, &stateMemory{},
}, },
@ -129,15 +142,47 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCPUSet": "1-3", "defaultCPUSet": "1-3",
"entries": { "entries": {
"pod": {
"container1": "4-6", "container1": "4-6",
"container2": "asd" "container2": "asd"
}
}, },
"checksum": 2764213924 "checksum": 3082925826
}`, }`,
"none", "none",
`could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`, containermap.ContainerMap{},
`could not parse cpuset "asd" for container "container2" in pod "pod": strconv.Atoi: parsing "asd": invalid syntax`,
&stateMemory{}, &stateMemory{},
}, },
{
"Restore checkpoint with migration",
`{
"policyName": "none",
"defaultCPUSet": "1-3",
"entries": {
"containerID1": "4-6",
"containerID2": "1-3"
},
"checksum": 2832947348
}`,
"none",
func() containermap.ContainerMap {
cm := containermap.NewContainerMap()
cm.Add("pod", "container1", "containerID1")
cm.Add("pod", "container2", "containerID2")
return cm
}(),
"",
&stateMemory{
assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
},
},
} }
// create checkpoint manager for testing // create checkpoint manager for testing
@ -159,7 +204,7 @@ func TestCheckpointStateRestore(t *testing.T) {
} }
} }
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName) restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName, tc.initialContainers)
if err != nil { if err != nil {
if strings.TrimSpace(tc.expectedError) != "" { if strings.TrimSpace(tc.expectedError) != "" {
tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError
@ -189,11 +234,13 @@ func TestCheckpointStateStore(t *testing.T) {
{ {
"Store assignments", "Store assignments",
&stateMemory{ &stateMemory{
assignments: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"container1": cpuset.NewCPUSet(1, 5, 8), "container1": cpuset.NewCPUSet(1, 5, 8),
}, },
}, },
}, },
},
} }
cpm, err := checkpointmanager.NewCheckpointManager(testingDir) cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
@ -206,7 +253,7 @@ func TestCheckpointStateStore(t *testing.T) {
// ensure there is no previous checkpoint // ensure there is no previous checkpoint
cpm.RemoveCheckpoint(testingCheckpoint) cpm.RemoveCheckpoint(testingCheckpoint)
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none") cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil { if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err) t.Fatalf("could not create testing checkpointState instance: %v", err)
} }
@ -216,7 +263,7 @@ func TestCheckpointStateStore(t *testing.T) {
cs1.SetCPUAssignments(tc.expectedState.assignments) cs1.SetCPUAssignments(tc.expectedState.assignments)
// restore checkpoint with previously stored values // restore checkpoint with previously stored values
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none") cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil { if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err) t.Fatalf("could not create testing checkpointState instance: %v", err)
} }
@ -230,30 +277,36 @@ func TestCheckpointStateHelpers(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
defaultCPUset cpuset.CPUSet defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet assignments map[string]map[string]cpuset.CPUSet
}{ }{
{ {
description: "One container", description: "One container",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1), "c1": cpuset.NewCPUSet(0, 1),
}, },
}, },
},
{ {
description: "Two containers", description: "Two containers",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1), "c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5), "c2": cpuset.NewCPUSet(2, 3, 4, 5),
}, },
}, },
},
{ {
description: "Container without assigned cpus", description: "Container without assigned cpus",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(), "c1": cpuset.NewCPUSet(),
}, },
}, },
},
} }
cpm, err := checkpointmanager.NewCheckpointManager(testingDir) cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
@ -266,23 +319,25 @@ func TestCheckpointStateHelpers(t *testing.T) {
// ensure there is no previous checkpoint // ensure there is no previous checkpoint
cpm.RemoveCheckpoint(testingCheckpoint) cpm.RemoveCheckpoint(testingCheckpoint)
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil { if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err) t.Fatalf("could not create testing checkpointState instance: %v", err)
} }
state.SetDefaultCPUSet(tc.defaultCPUset) state.SetDefaultCPUSet(tc.defaultCPUset)
for container, set := range tc.containers { for pod := range tc.assignments {
state.SetCPUSet(container, set) for container, set := range tc.assignments[pod] {
if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) { state.SetCPUSet(pod, container, set)
if cpus, _ := state.GetCPUSet(pod, container); !cpus.Equals(set) {
t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) t.Fatalf("state inconsistent, got %q instead of %q", set, cpus)
} }
state.Delete(container) state.Delete(pod, container)
if _, ok := state.GetCPUSet(container); ok { if _, ok := state.GetCPUSet(pod, container); ok {
t.Fatal("deleted container still existing in state") t.Fatal("deleted container still existing in state")
} }
} }
}
}) })
} }
} }
@ -291,34 +346,38 @@ func TestCheckpointStateClear(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
defaultCPUset cpuset.CPUSet defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet assignments map[string]map[string]cpuset.CPUSet
}{ }{
{ {
"Valid state", "Valid state",
cpuset.NewCPUSet(1, 5, 10), cpuset.NewCPUSet(1, 5, 10),
map[string]cpuset.CPUSet{ map[string]map[string]cpuset.CPUSet{
"pod": {
"container1": cpuset.NewCPUSet(1, 4), "container1": cpuset.NewCPUSet(1, 4),
}, },
}, },
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) { t.Run(tc.description, func(t *testing.T) {
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil { if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err) t.Fatalf("could not create testing checkpointState instance: %v", err)
} }
state.SetDefaultCPUSet(tc.defaultCPUset) state.SetDefaultCPUSet(tc.defaultCPUset)
state.SetCPUAssignments(tc.containers) state.SetCPUAssignments(tc.assignments)
state.ClearState() state.ClearState()
if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) {
t.Fatal("cleared state with non-empty default cpu set") t.Fatal("cleared state with non-empty default cpu set")
} }
for container := range tc.containers { for pod := range tc.assignments {
if _, ok := state.GetCPUSet(container); ok { for container := range tc.assignments[pod] {
t.Fatalf("container %q with non-default cpu set in cleared state", container) if _, ok := state.GetCPUSet(pod, container); ok {
t.Fatalf("container %q in pod %q with non-default cpu set in cleared state", container, pod)
}
} }
} }
}) })

View File

@ -30,9 +30,11 @@ const compatibilityTestingCheckpoint = "cpumanager_state_compatibility_test"
var state = &stateMemory{ var state = &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6), "container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3), "container2": cpuset.NewCPUSet(1, 2, 3),
}, },
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
} }
@ -44,12 +46,12 @@ func TestFileToCheckpointCompatibility(t *testing.T) {
// ensure testing state is removed after testing // ensure testing state is removed after testing
defer os.Remove(statePath) defer os.Remove(statePath)
fileState := NewFileState(statePath, "none") fileState := NewFileState(statePath, "none", nil)
fileState.SetDefaultCPUSet(state.defaultCPUSet) fileState.SetDefaultCPUSet(state.defaultCPUSet)
fileState.SetCPUAssignments(state.assignments) fileState.SetCPUAssignments(state.assignments)
restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil)
if err != nil { if err != nil {
t.Fatalf("could not restore file state: %v", err) t.Fatalf("could not restore file state: %v", err)
} }
@ -68,13 +70,13 @@ func TestCheckpointToFileCompatibility(t *testing.T) {
// ensure testing checkpoint is removed after testing // ensure testing checkpoint is removed after testing
defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint) defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint)
checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil)
require.NoError(t, err) require.NoError(t, err)
checkpointState.SetDefaultCPUSet(state.defaultCPUSet) checkpointState.SetDefaultCPUSet(state.defaultCPUSet)
checkpointState.SetCPUAssignments(state.assignments) checkpointState.SetCPUAssignments(state.assignments)
restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none") restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil)
AssertStateEqual(t, restoredState, state) AssertStateEqual(t, restoredState, state)
} }

View File

@ -24,15 +24,22 @@ import (
"sync" "sync"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
) )
type stateFileData struct { type stateFileDataV1 struct {
PolicyName string `json:"policyName"` PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"` DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"` Entries map[string]string `json:"entries,omitempty"`
} }
type stateFileDataV2 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]map[string]string `json:"entries,omitempty"`
}
var _ State = &stateFile{} var _ State = &stateFile{}
type stateFile struct { type stateFile struct {
@ -40,14 +47,16 @@ type stateFile struct {
stateFilePath string stateFilePath string
policyName string policyName string
cache State cache State
initialContainers containermap.ContainerMap
} }
// NewFileState creates new State for keeping track of cpu/pod assignment with file backend // NewFileState creates new State for keeping track of cpu/pod assignment with file backend
func NewFileState(filePath string, policyName string) State { func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State {
stateFile := &stateFile{ stateFile := &stateFile{
stateFilePath: filePath, stateFilePath: filePath,
cache: NewMemoryState(), cache: NewMemoryState(),
policyName: policyName, policyName: policyName,
initialContainers: initialContainers,
} }
if err := stateFile.tryRestoreState(); err != nil { if err := stateFile.tryRestoreState(); err != nil {
@ -61,6 +70,30 @@ func NewFileState(filePath string, policyName string) State {
return stateFile return stateFile
} }
// migrateV1StateToV2State() converts state from the v1 format to the v2 format
func (sf *stateFile) migrateV1StateToV2State(src *stateFileDataV1, dst *stateFileDataV2) error {
if src.PolicyName != "" {
dst.PolicyName = src.PolicyName
}
if src.DefaultCPUSet != "" {
dst.DefaultCPUSet = src.DefaultCPUSet
}
for containerID, cset := range src.Entries {
podUID, containerName, err := sf.initialContainers.GetContainerRef(containerID)
if err != nil {
return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
}
if dst.Entries == nil {
dst.Entries = make(map[string]map[string]string)
}
if _, exists := dst.Entries[podUID]; !exists {
dst.Entries[podUID] = make(map[string]string)
}
dst.Entries[podUID][containerName] = cset
}
return nil
}
// tryRestoreState tries to read state file, upon any error, // tryRestoreState tries to read state file, upon any error,
// err message is logged and state is left clean. un-initialized // err message is logged and state is left clean. un-initialized
func (sf *stateFile) tryRestoreState() error { func (sf *stateFile) tryRestoreState() error {
@ -90,28 +123,40 @@ func (sf *stateFile) tryRestoreState() error {
} }
// File exists; try to read it. // File exists; try to read it.
var readState stateFileData var readStateV1 stateFileDataV1
var readStateV2 stateFileDataV2
if err = json.Unmarshal(content, &readState); err != nil { if err = json.Unmarshal(content, &readStateV1); err != nil {
readStateV1 = stateFileDataV1{} // reset it back to 0
if err = json.Unmarshal(content, &readStateV2); err != nil {
klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
return err return err
} }
if sf.policyName != readState.PolicyName {
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName)
} }
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { if err = sf.migrateV1StateToV2State(&readStateV1, &readStateV2); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) klog.Errorf("[cpumanager] state file: could not migrate v1 state to v2 state - \"%s\"", sf.stateFilePath)
return err return err
} }
for containerID, cpuString := range readState.Entries { if sf.policyName != readStateV2.PolicyName {
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readStateV2.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(readStateV2.DefaultCPUSet); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readStateV2.DefaultCPUSet)
return err
}
for pod := range readStateV2.Entries {
tmpAssignments[pod] = make(map[string]cpuset.CPUSet)
for container, cpuString := range readStateV2.Entries[pod] {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) klog.Errorf("[cpumanager] state file: could not parse state file - pod: %s, container: %s, cpuset: \"%s\"", pod, container, cpuString)
return err return err
} }
tmpAssignments[containerID] = tmpContainerCPUSet tmpAssignments[pod][container] = tmpContainerCPUSet
}
} }
sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
@ -128,14 +173,18 @@ func (sf *stateFile) storeState() {
var content []byte var content []byte
var err error var err error
data := stateFileData{ data := stateFileDataV2{
PolicyName: sf.policyName, PolicyName: sf.policyName,
DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(),
Entries: map[string]string{}, Entries: map[string]map[string]string{},
} }
for containerID, cset := range sf.cache.GetCPUAssignments() { assignments := sf.cache.GetCPUAssignments()
data.Entries[containerID] = cset.String() for pod := range assignments {
data.Entries[pod] = map[string]string{}
for container, cset := range assignments[pod] {
data.Entries[pod][container] = cset.String()
}
} }
if content, err = json.Marshal(data); err != nil { if content, err = json.Marshal(data); err != nil {
@ -147,11 +196,11 @@ func (sf *stateFile) storeState() {
} }
} }
func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
sf.RLock() sf.RLock()
defer sf.RUnlock() defer sf.RUnlock()
res, ok := sf.cache.GetCPUSet(containerID) res, ok := sf.cache.GetCPUSet(podUID, containerName)
return res, ok return res, ok
} }
@ -162,11 +211,11 @@ func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet {
return sf.cache.GetDefaultCPUSet() return sf.cache.GetDefaultCPUSet()
} }
func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { func (sf *stateFile) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
sf.RLock() sf.RLock()
defer sf.RUnlock() defer sf.RUnlock()
return sf.cache.GetCPUSetOrDefault(containerID) return sf.cache.GetCPUSetOrDefault(podUID, containerName)
} }
func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments {
@ -175,10 +224,10 @@ func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments {
return sf.cache.GetCPUAssignments() return sf.cache.GetCPUAssignments()
} }
func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) { func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.SetCPUSet(containerID, cset) sf.cache.SetCPUSet(podUID, containerName, cset)
sf.storeState() sf.storeState()
} }
@ -196,10 +245,10 @@ func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) {
sf.storeState() sf.storeState()
} }
func (sf *stateFile) Delete(containerID string) { func (sf *stateFile) Delete(podUID string, containerName string) {
sf.Lock() sf.Lock()
defer sf.Unlock() defer sf.Unlock()
sf.cache.Delete(containerID) sf.cache.Delete(podUID, containerName)
sf.storeState() sf.storeState()
} }

View File

@ -27,6 +27,7 @@ import (
"strings" "strings"
"testing" "testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
) )
@ -73,6 +74,7 @@ func TestFileStateTryRestore(t *testing.T) {
description string description string
stateFileContent string stateFileContent string
policyName string policyName string
initialContainers containermap.ContainerMap
expErr string expErr string
expPanic bool expPanic bool
expectedState *stateMemory expectedState *stateMemory
@ -81,6 +83,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Invalid JSON - one byte file", "Invalid JSON - one byte file",
"\n", "\n",
"none", "none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true, true,
&stateMemory{}, &stateMemory{},
@ -89,6 +92,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Invalid JSON - invalid content", "Invalid JSON - invalid content",
"{", "{",
"none", "none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true, true,
&stateMemory{}, &stateMemory{},
@ -97,6 +101,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore defaultCPUSet only", "Try restore defaultCPUSet only",
`{"policyName": "none", "defaultCpuSet": "4-6"}`, `{"policyName": "none", "defaultCpuSet": "4-6"}`,
"none", "none",
containermap.ContainerMap{},
"", "",
false, false,
&stateMemory{ &stateMemory{
@ -108,6 +113,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore defaultCPUSet only - invalid name", "Try restore defaultCPUSet only - invalid name",
`{"policyName": "none", "defaultCpuSet" "4-6"}`, `{"policyName": "none", "defaultCpuSet" "4-6"}`,
"none", "none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`,
true, true,
&stateMemory{}, &stateMemory{},
@ -117,18 +123,23 @@ func TestFileStateTryRestore(t *testing.T) {
`{ `{
"policyName": "none", "policyName": "none",
"entries": { "entries": {
"pod": {
"container1": "4-6", "container1": "4-6",
"container2": "1-3" "container2": "1-3"
} }
}
}`, }`,
"none", "none",
containermap.ContainerMap{},
"", "",
false, false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6), "container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3), "container2": cpuset.NewCPUSet(1, 2, 3),
}, },
},
defaultCPUSet: cpuset.NewCPUSet(), defaultCPUSet: cpuset.NewCPUSet(),
}, },
}, },
@ -140,6 +151,7 @@ func TestFileStateTryRestore(t *testing.T) {
"entries": {} "entries": {}
}`, }`,
"B", "B",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`,
true, true,
&stateMemory{}, &stateMemory{},
@ -148,6 +160,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore invalid assignments", "Try restore invalid assignments",
`{"entries": }`, `{"entries": }`,
"none", "none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)",
true, true,
&stateMemory{}, &stateMemory{},
@ -158,18 +171,23 @@ func TestFileStateTryRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCpuSet": "23-24", "defaultCpuSet": "23-24",
"entries": { "entries": {
"pod": {
"container1": "4-6", "container1": "4-6",
"container2": "1-3" "container2": "1-3"
} }
}
}`, }`,
"none", "none",
containermap.ContainerMap{},
"", "",
false, false,
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6), "container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3), "container2": cpuset.NewCPUSet(1, 2, 3),
}, },
},
defaultCPUSet: cpuset.NewCPUSet(23, 24), defaultCPUSet: cpuset.NewCPUSet(23, 24),
}, },
}, },
@ -180,6 +198,7 @@ func TestFileStateTryRestore(t *testing.T) {
"defaultCpuSet": "2-sd" "defaultCpuSet": "2-sd"
}`, }`,
"none", "none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`,
true, true,
&stateMemory{}, &stateMemory{},
@ -190,11 +209,14 @@ func TestFileStateTryRestore(t *testing.T) {
"policyName": "none", "policyName": "none",
"defaultCpuSet": "23-24", "defaultCpuSet": "23-24",
"entries": { "entries": {
"pod": {
"container1": "p-6", "container1": "p-6",
"container2": "1-3" "container2": "1-3"
} }
}
}`, }`,
"none", "none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`,
true, true,
&stateMemory{}, &stateMemory{},
@ -203,6 +225,7 @@ func TestFileStateTryRestore(t *testing.T) {
"tryRestoreState creates empty state file", "tryRestoreState creates empty state file",
"", "",
"none", "none",
containermap.ContainerMap{},
"", "",
false, false,
&stateMemory{ &stateMemory{
@ -210,6 +233,35 @@ func TestFileStateTryRestore(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(), defaultCPUSet: cpuset.NewCPUSet(),
}, },
}, },
{
"Try restore with migration",
`{
"policyName": "none",
"defaultCpuSet": "23-24",
"entries": {
"containerID1": "4-6",
"containerID2": "1-3"
}
}`,
"none",
func() containermap.ContainerMap {
cm := containermap.NewContainerMap()
cm.Add("pod", "container1", "containerID1")
cm.Add("pod", "container2", "containerID2")
return cm
}(),
"",
false,
&stateMemory{
assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(23, 24),
},
},
} }
for idx, tc := range testCases { for idx, tc := range testCases {
@ -239,7 +291,7 @@ func TestFileStateTryRestore(t *testing.T) {
defer os.Remove(sfilePath.Name()) defer os.Remove(sfilePath.Name())
logData, fileState := stderrCapture(t, func() State { logData, fileState := stderrCapture(t, func() State {
return NewFileState(sfilePath.Name(), tc.policyName) return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers)
}) })
if tc.expErr != "" { if tc.expErr != "" {
@ -284,7 +336,7 @@ func TestFileStateTryRestorePanic(t *testing.T) {
} }
} }
}() }()
NewFileState(sfilePath, "static") NewFileState(sfilePath, "static", nil)
}) })
} }
@ -315,9 +367,11 @@ func TestUpdateStateFile(t *testing.T) {
"", "",
&stateMemory{ &stateMemory{
assignments: ContainerCPUAssignments{ assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6), "container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3), "container2": cpuset.NewCPUSet(1, 2, 3),
}, },
},
defaultCPUSet: cpuset.NewCPUSet(), defaultCPUSet: cpuset.NewCPUSet(),
}, },
}, },
@ -363,7 +417,7 @@ func TestUpdateStateFile(t *testing.T) {
return return
} }
} }
newFileState := NewFileState(sfilePath.Name(), "static") newFileState := NewFileState(sfilePath.Name(), "static", nil)
AssertStateEqual(t, newFileState, tc.expectedState) AssertStateEqual(t, newFileState, tc.expectedState)
}) })
} }
@ -373,37 +427,45 @@ func TestHelpersStateFile(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
defaultCPUset cpuset.CPUSet defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet assignments map[string]map[string]cpuset.CPUSet
}{ }{
{ {
description: "one container", description: "one container",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1), "c1": cpuset.NewCPUSet(0, 1),
}, },
}, },
},
{ {
description: "two containers", description: "two containers",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1), "c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5), "c2": cpuset.NewCPUSet(2, 3, 4, 5),
}, },
}, },
},
{ {
description: "container with more cpus than is possible", description: "container with more cpus than is possible",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 10), "c1": cpuset.NewCPUSet(0, 10),
}, },
}, },
},
{ {
description: "container without assigned cpus", description: "container without assigned cpus",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(), "c1": cpuset.NewCPUSet(),
}, },
}, },
},
} }
for _, tc := range testCases { for _, tc := range testCases {
@ -414,20 +476,22 @@ func TestHelpersStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error()) t.Errorf("cannot create temporary test file: %q", err.Error())
} }
state := NewFileState(sfFile.Name(), "static") state := NewFileState(sfFile.Name(), "static", nil)
state.SetDefaultCPUSet(tc.defaultCPUset) state.SetDefaultCPUSet(tc.defaultCPUset)
for containerName, containerCPUs := range tc.containers { for podUID := range tc.assignments {
state.SetCPUSet(containerName, containerCPUs) for containerName, containerCPUs := range tc.assignments[podUID] {
if cpus, _ := state.GetCPUSet(containerName); !cpus.Equals(containerCPUs) { state.SetCPUSet(podUID, containerName, containerCPUs)
if cpus, _ := state.GetCPUSet(podUID, containerName); !cpus.Equals(containerCPUs) {
t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus) t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus)
} }
state.Delete(containerName) state.Delete(podUID, containerName)
if cpus := state.GetCPUSetOrDefault(containerName); !cpus.Equals(tc.defaultCPUset) { if cpus := state.GetCPUSetOrDefault(podUID, containerName); !cpus.Equals(tc.defaultCPUset) {
t.Error("deleted container still existing in state") t.Error("deleted container still existing in state")
} }
} }
}
}) })
} }
@ -437,17 +501,19 @@ func TestClearStateStateFile(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
defaultCPUset cpuset.CPUSet defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet assignments map[string]map[string]cpuset.CPUSet
}{ }{
{ {
description: "valid file", description: "valid file",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{ assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1), "c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3), "c2": cpuset.NewCPUSet(2, 3),
"c3": cpuset.NewCPUSet(4, 5), "c3": cpuset.NewCPUSet(4, 5),
}, },
}, },
},
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) { t.Run(testCase.description, func(t *testing.T) {
@ -457,21 +523,25 @@ func TestClearStateStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error()) t.Errorf("cannot create temporary test file: %q", err.Error())
} }
state := NewFileState(sfFile.Name(), "static") state := NewFileState(sfFile.Name(), "static", nil)
state.SetDefaultCPUSet(testCase.defaultCPUset) state.SetDefaultCPUSet(testCase.defaultCPUset)
for containerName, containerCPUs := range testCase.containers { for podUID := range testCase.assignments {
state.SetCPUSet(containerName, containerCPUs) for containerName, containerCPUs := range testCase.assignments[podUID] {
state.SetCPUSet(podUID, containerName, containerCPUs)
}
} }
state.ClearState() state.ClearState()
if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) {
t.Error("cleared state shouldn't has got information about available cpuset") t.Error("cleared state shouldn't has got information about available cpuset")
} }
for containerName := range testCase.containers { for podUID := range testCase.assignments {
if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(containerName)) { for containerName := range testCase.assignments[podUID] {
if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(podUID, containerName)) {
t.Error("cleared state shouldn't has got information about containers") t.Error("cleared state shouldn't has got information about containers")
} }
} }
}
}) })
} }
} }

View File

@ -40,11 +40,11 @@ func NewMemoryState() State {
} }
} }
func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { func (s *stateMemory) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
res, ok := s.assignments[containerID] res, ok := s.assignments[podUID][containerName]
return res.Clone(), ok return res.Clone(), ok
} }
@ -55,8 +55,8 @@ func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet {
return s.defaultCPUSet.Clone() return s.defaultCPUSet.Clone()
} }
func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { func (s *stateMemory) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(containerID); ok { if res, ok := s.GetCPUSet(podUID, containerName); ok {
return res return res
} }
return s.GetDefaultCPUSet() return s.GetDefaultCPUSet()
@ -68,12 +68,16 @@ func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments {
return s.assignments.Clone() return s.assignments.Clone()
} }
func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { func (s *stateMemory) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.assignments[containerID] = cset if _, ok := s.assignments[podUID]; !ok {
klog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset) s.assignments[podUID] = make(map[string]cpuset.CPUSet)
}
s.assignments[podUID][containerName] = cset
klog.Infof("[cpumanager] updated desired cpuset (pod: %s, container: %s, cpuset: \"%s\")", podUID, containerName, cset)
} }
func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) {
@ -92,12 +96,15 @@ func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) {
klog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a) klog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a)
} }
func (s *stateMemory) Delete(containerID string) { func (s *stateMemory) Delete(podUID string, containerName string) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
delete(s.assignments, containerID) delete(s.assignments[podUID], containerName)
klog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
klog.V(2).Infof("[cpumanager] deleted cpuset assignment (pod: %s, container: %s)", podUID, containerName)
} }
func (s *stateMemory) ClearState() { func (s *stateMemory) ClearState() {