diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index d670c795b42..37850de90c3 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -30,6 +31,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/state/testing:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/state/checkpoint.go b/pkg/kubelet/cm/cpumanager/state/checkpoint.go index 40e0fc81ad3..3683e66aedc 100644 --- a/pkg/kubelet/cm/cpumanager/state/checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/checkpoint.go @@ -23,38 +23,87 @@ import ( "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) -var _ checkpointmanager.Checkpoint = &CPUManagerCheckpoint{} - // CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint -type CPUManagerCheckpoint struct { +type CPUManagerCheckpoint = CPUManagerCheckpointV2 + +var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV1{} +var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV2{} + +// CPUManagerCheckpointV1 struct is used to store cpu/pod assignments in a checkpoint in v1 format +type CPUManagerCheckpointV1 struct { PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` Checksum checksum.Checksum `json:"checksum"` } +// CPUManagerCheckpointV2 struct is used to store cpu/pod assignments in a checkpoint in v2 format +type CPUManagerCheckpointV2 struct { + PolicyName string `json:"policyName"` + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]map[string]string `json:"entries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` +} + // NewCPUManagerCheckpoint returns an instance of Checkpoint func NewCPUManagerCheckpoint() *CPUManagerCheckpoint { - return &CPUManagerCheckpoint{ + //lint:ignore unexported-type-in-api user-facing error message + return newCPUManagerCheckpointV2() +} + +func newCPUManagerCheckpointV1() *CPUManagerCheckpointV1 { + return &CPUManagerCheckpointV1{ Entries: make(map[string]string), } } -// MarshalCheckpoint returns marshalled checkpoint -func (cp *CPUManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { +func newCPUManagerCheckpointV2() *CPUManagerCheckpointV2 { + return &CPUManagerCheckpointV2{ + Entries: make(map[string]map[string]string), + } +} + +// MarshalCheckpoint returns marshalled checkpoint in v1 format +func (cp *CPUManagerCheckpointV1) MarshalCheckpoint() ([]byte, error) { // make sure checksum wasn't set before so it doesn't affect output checksum cp.Checksum = 0 cp.Checksum = checksum.New(cp) return json.Marshal(*cp) } -// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint -func (cp *CPUManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { +// MarshalCheckpoint returns marshalled checkpoint in v2 format +func (cp *CPUManagerCheckpointV2) MarshalCheckpoint() ([]byte, error) { + // make sure checksum wasn't set before so it doesn't affect output checksum + cp.Checksum = 0 + cp.Checksum = checksum.New(cp) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v1 format +func (cp *CPUManagerCheckpointV1) UnmarshalCheckpoint(blob []byte) error { return json.Unmarshal(blob, cp) } -// VerifyChecksum verifies that current checksum of checkpoint is valid -func (cp *CPUManagerCheckpoint) VerifyChecksum() error { +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v2 format +func (cp *CPUManagerCheckpointV2) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid in v1 format +func (cp *CPUManagerCheckpointV1) VerifyChecksum() error { + if cp.Checksum == 0 { + // accept empty checksum for compatibility with old file backend + return nil + } + ck := cp.Checksum + cp.Checksum = 0 + err := ck.Verify(cp) + cp.Checksum = ck + return err +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid in v2 format +func (cp *CPUManagerCheckpointV2) VerifyChecksum() error { if cp.Checksum == 0 { // accept empty checksum for compatibility with old file backend return nil diff --git a/pkg/kubelet/cm/cpumanager/state/state.go b/pkg/kubelet/cm/cpumanager/state/state.go index be32509279b..a9bd906fcb2 100644 --- a/pkg/kubelet/cm/cpumanager/state/state.go +++ b/pkg/kubelet/cm/cpumanager/state/state.go @@ -21,30 +21,33 @@ import ( ) // ContainerCPUAssignments type used in cpu manager state -type ContainerCPUAssignments map[string]cpuset.CPUSet +type ContainerCPUAssignments map[string]map[string]cpuset.CPUSet // Clone returns a copy of ContainerCPUAssignments func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments { ret := make(ContainerCPUAssignments) - for key, val := range as { - ret[key] = val + for pod := range as { + ret[pod] = make(map[string]cpuset.CPUSet) + for container, cset := range as[pod] { + ret[pod][container] = cset + } } return ret } // Reader interface used to read current cpu/pod assignment state type Reader interface { - GetCPUSet(containerID string) (cpuset.CPUSet, bool) + GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) GetDefaultCPUSet() cpuset.CPUSet - GetCPUSetOrDefault(containerID string) cpuset.CPUSet + GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet GetCPUAssignments() ContainerCPUAssignments } type writer interface { - SetCPUSet(containerID string, cpuset cpuset.CPUSet) + SetCPUSet(podUID string, containerName string, cpuset cpuset.CPUSet) SetDefaultCPUSet(cpuset cpuset.CPUSet) SetCPUAssignments(ContainerCPUAssignments) - Delete(containerID string) + Delete(podUID string, containerName string) ClearState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go index b9fe0f46374..cd8bd994a69 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -35,10 +36,11 @@ type stateCheckpoint struct { cache State checkpointManager checkpointmanager.CheckpointManager checkpointName string + initialContainers containermap.ContainerMap } // NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend -func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) { +func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) { checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) @@ -48,6 +50,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err policyName: policyName, checkpointManager: checkpointManager, checkpointName: checkpointName, + initialContainers: initialContainers, } if err := stateCheckpoint.restoreState(); err != nil { @@ -60,6 +63,30 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err return stateCheckpoint, nil } +// migrateV1CheckpointToV2Checkpoint() converts checkpoints from the v1 format to the v2 format +func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error { + if src.PolicyName != "" { + dst.PolicyName = src.PolicyName + } + if src.DefaultCPUSet != "" { + dst.DefaultCPUSet = src.DefaultCPUSet + } + for containerID, cset := range src.Entries { + podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID) + if err != nil { + return fmt.Errorf("containerID '%v' not found in initial containers list", containerID) + } + if dst.Entries == nil { + dst.Entries = make(map[string]map[string]string) + } + if _, exists := dst.Entries[podUID]; !exists { + dst.Entries[podUID] = make(map[string]string) + } + dst.Entries[podUID][containerName] = cset + } + return nil +} + // restores state from a checkpoint and creates it if it doesn't exist func (sc *stateCheckpoint) restoreState() error { sc.mux.Lock() @@ -71,28 +98,40 @@ func (sc *stateCheckpoint) restoreState() error { tmpDefaultCPUSet := cpuset.NewCPUSet() tmpContainerCPUSet := cpuset.NewCPUSet() - checkpoint := NewCPUManagerCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { - sc.storeState() - return nil + checkpointV1 := newCPUManagerCheckpointV1() + checkpointV2 := newCPUManagerCheckpointV2() + + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil { + checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0 + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil { + if err == errors.ErrCheckpointNotFound { + sc.storeState() + return nil + } + return err } - return err } - if sc.policyName != checkpoint.PolicyName { - return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil { + return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err) } - if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil { - return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err) + if sc.policyName != checkpointV2.PolicyName { + return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName) } - for containerID, cpuString := range checkpoint.Entries { - if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err) + if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil { + return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err) + } + + for pod := range checkpointV2.Entries { + tmpAssignments[pod] = make(map[string]cpuset.CPUSet) + for container, cpuString := range checkpointV2.Entries[pod] { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err) + } + tmpAssignments[pod][container] = tmpContainerCPUSet } - tmpAssignments[containerID] = tmpContainerCPUSet } sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet) @@ -110,8 +149,12 @@ func (sc *stateCheckpoint) storeState() { checkpoint.PolicyName = sc.policyName checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() - for containerID, cset := range sc.cache.GetCPUAssignments() { - checkpoint.Entries[containerID] = cset.String() + assignments := sc.cache.GetCPUAssignments() + for pod := range assignments { + checkpoint.Entries[pod] = make(map[string]string) + for container, cset := range assignments[pod] { + checkpoint.Entries[pod][container] = cset.String() + } } err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) @@ -122,11 +165,11 @@ func (sc *stateCheckpoint) storeState() { } // GetCPUSet returns current CPU set -func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { sc.mux.RLock() defer sc.mux.RUnlock() - res, ok := sc.cache.GetCPUSet(containerID) + res, ok := sc.cache.GetCPUSet(podUID, containerName) return res, ok } @@ -139,11 +182,11 @@ func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet { } // GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed -func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { +func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { sc.mux.RLock() defer sc.mux.RUnlock() - return sc.cache.GetCPUSetOrDefault(containerID) + return sc.cache.GetCPUSetOrDefault(podUID, containerName) } // GetCPUAssignments returns current CPU to pod assignments @@ -155,10 +198,10 @@ func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments { } // SetCPUSet sets CPU set -func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.SetCPUSet(containerID, cset) + sc.cache.SetCPUSet(podUID, containerName, cset) sc.storeState() } @@ -179,10 +222,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) { } // Delete deletes assignment for specified pod -func (sc *stateCheckpoint) Delete(containerID string) { +func (sc *stateCheckpoint) Delete(podUID string, containerName string) { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.Delete(containerID) + sc.cache.Delete(podUID, containerName) sc.storeState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go index 2471ecdf341..34d40c43174 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -35,6 +36,7 @@ func TestCheckpointStateRestore(t *testing.T) { description string checkpointContent string policyName string + initialContainers containermap.ContainerMap expectedError string expectedState *stateMemory }{ @@ -42,6 +44,7 @@ func TestCheckpointStateRestore(t *testing.T) { "Restore non-existing checkpoint", "", "none", + containermap.ContainerMap{}, "", &stateMemory{}, }, @@ -51,9 +54,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "4-6", "entries": {}, - "checksum": 2912033808 + "checksum": 2655485041 }`, "none", + containermap.ContainerMap{}, "", &stateMemory{ defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), @@ -65,17 +69,22 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1-3", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } }, - "checksum": 1535905563 + "checksum": 3415933391 }`, "none", + containermap.ContainerMap{}, "", &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), }, @@ -89,6 +98,7 @@ func TestCheckpointStateRestore(t *testing.T) { "checksum": 1337 }`, "none", + containermap.ContainerMap{}, "checkpoint is corrupted", &stateMemory{}, }, @@ -96,6 +106,7 @@ func TestCheckpointStateRestore(t *testing.T) { "Restore checkpoint with invalid JSON", `{`, "none", + containermap.ContainerMap{}, "unexpected end of JSON input", &stateMemory{}, }, @@ -105,9 +116,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "other", "defaultCPUSet": "1-3", "entries": {}, - "checksum": 4195836012 + "checksum": 698611581 }`, "none", + containermap.ContainerMap{}, `configured policy "none" differs from state checkpoint policy "other"`, &stateMemory{}, }, @@ -117,9 +129,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1.3", "entries": {}, - "checksum": 1025273327 + "checksum": 1966990140 }`, "none", + containermap.ContainerMap{}, `could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`, &stateMemory{}, }, @@ -129,15 +142,47 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1-3", "entries": { - "container1": "4-6", - "container2": "asd" + "pod": { + "container1": "4-6", + "container2": "asd" + } }, - "checksum": 2764213924 + "checksum": 3082925826 }`, "none", - `could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`, + containermap.ContainerMap{}, + `could not parse cpuset "asd" for container "container2" in pod "pod": strconv.Atoi: parsing "asd": invalid syntax`, &stateMemory{}, }, + { + "Restore checkpoint with migration", + `{ + "policyName": "none", + "defaultCPUSet": "1-3", + "entries": { + "containerID1": "4-6", + "containerID2": "1-3" + }, + "checksum": 2832947348 + }`, + "none", + func() containermap.ContainerMap { + cm := containermap.NewContainerMap() + cm.Add("pod", "container1", "containerID1") + cm.Add("pod", "container2", "containerID2") + return cm + }(), + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + }, + defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), + }, + }, } // create checkpoint manager for testing @@ -159,7 +204,7 @@ func TestCheckpointStateRestore(t *testing.T) { } } - restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName) + restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName, tc.initialContainers) if err != nil { if strings.TrimSpace(tc.expectedError) != "" { tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError @@ -189,8 +234,10 @@ func TestCheckpointStateStore(t *testing.T) { { "Store assignments", &stateMemory{ - assignments: map[string]cpuset.CPUSet{ - "container1": cpuset.NewCPUSet(1, 5, 8), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "container1": cpuset.NewCPUSet(1, 5, 8), + }, }, }, }, @@ -206,7 +253,7 @@ func TestCheckpointStateStore(t *testing.T) { // ensure there is no previous checkpoint cpm.RemoveCheckpoint(testingCheckpoint) - cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } @@ -216,7 +263,7 @@ func TestCheckpointStateStore(t *testing.T) { cs1.SetCPUAssignments(tc.expectedState.assignments) // restore checkpoint with previously stored values - cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } @@ -230,28 +277,34 @@ func TestCheckpointStateHelpers(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "One container", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + }, }, }, { description: "Two containers", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3, 4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, }, }, { description: "Container without assigned cpus", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(), + }, }, }, } @@ -266,21 +319,23 @@ func TestCheckpointStateHelpers(t *testing.T) { // ensure there is no previous checkpoint cpm.RemoveCheckpoint(testingCheckpoint) - state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } state.SetDefaultCPUSet(tc.defaultCPUset) - for container, set := range tc.containers { - state.SetCPUSet(container, set) - if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) { - t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) - } + for pod := range tc.assignments { + for container, set := range tc.assignments[pod] { + state.SetCPUSet(pod, container, set) + if cpus, _ := state.GetCPUSet(pod, container); !cpus.Equals(set) { + t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) + } - state.Delete(container) - if _, ok := state.GetCPUSet(container); ok { - t.Fatal("deleted container still existing in state") + state.Delete(pod, container) + if _, ok := state.GetCPUSet(pod, container); ok { + t.Fatal("deleted container still existing in state") + } } } }) @@ -291,34 +346,38 @@ func TestCheckpointStateClear(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { "Valid state", cpuset.NewCPUSet(1, 5, 10), - map[string]cpuset.CPUSet{ - "container1": cpuset.NewCPUSet(1, 4), + map[string]map[string]cpuset.CPUSet{ + "pod": { + "container1": cpuset.NewCPUSet(1, 4), + }, }, }, } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } state.SetDefaultCPUSet(tc.defaultCPUset) - state.SetCPUAssignments(tc.containers) + state.SetCPUAssignments(tc.assignments) state.ClearState() if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { t.Fatal("cleared state with non-empty default cpu set") } - for container := range tc.containers { - if _, ok := state.GetCPUSet(container); ok { - t.Fatalf("container %q with non-default cpu set in cleared state", container) + for pod := range tc.assignments { + for container := range tc.assignments[pod] { + if _, ok := state.GetCPUSet(pod, container); ok { + t.Fatalf("container %q in pod %q with non-default cpu set in cleared state", container, pod) + } } } }) diff --git a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go index e15406f0914..a02bf82e4a5 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go @@ -30,8 +30,10 @@ const compatibilityTestingCheckpoint = "cpumanager_state_compatibility_test" var state = &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), } @@ -44,12 +46,12 @@ func TestFileToCheckpointCompatibility(t *testing.T) { // ensure testing state is removed after testing defer os.Remove(statePath) - fileState := NewFileState(statePath, "none") + fileState := NewFileState(statePath, "none", nil) fileState.SetDefaultCPUSet(state.defaultCPUSet) fileState.SetCPUAssignments(state.assignments) - restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") + restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not restore file state: %v", err) } @@ -68,13 +70,13 @@ func TestCheckpointToFileCompatibility(t *testing.T) { // ensure testing checkpoint is removed after testing defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint) - checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") + checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil) require.NoError(t, err) checkpointState.SetDefaultCPUSet(state.defaultCPUSet) checkpointState.SetCPUAssignments(state.assignments) - restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none") + restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil) AssertStateEqual(t, restoredState, state) } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index 603467c1c03..f3b8e70470d 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -24,30 +24,39 @@ import ( "sync" "k8s.io/klog" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) -type stateFileData struct { +type stateFileDataV1 struct { PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` } +type stateFileDataV2 struct { + PolicyName string `json:"policyName"` + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]map[string]string `json:"entries,omitempty"` +} + var _ State = &stateFile{} type stateFile struct { sync.RWMutex - stateFilePath string - policyName string - cache State + stateFilePath string + policyName string + cache State + initialContainers containermap.ContainerMap } // NewFileState creates new State for keeping track of cpu/pod assignment with file backend -func NewFileState(filePath string, policyName string) State { +func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State { stateFile := &stateFile{ - stateFilePath: filePath, - cache: NewMemoryState(), - policyName: policyName, + stateFilePath: filePath, + cache: NewMemoryState(), + policyName: policyName, + initialContainers: initialContainers, } if err := stateFile.tryRestoreState(); err != nil { @@ -61,6 +70,30 @@ func NewFileState(filePath string, policyName string) State { return stateFile } +// migrateV1StateToV2State() converts state from the v1 format to the v2 format +func (sf *stateFile) migrateV1StateToV2State(src *stateFileDataV1, dst *stateFileDataV2) error { + if src.PolicyName != "" { + dst.PolicyName = src.PolicyName + } + if src.DefaultCPUSet != "" { + dst.DefaultCPUSet = src.DefaultCPUSet + } + for containerID, cset := range src.Entries { + podUID, containerName, err := sf.initialContainers.GetContainerRef(containerID) + if err != nil { + return fmt.Errorf("containerID '%v' not found in initial containers list", containerID) + } + if dst.Entries == nil { + dst.Entries = make(map[string]map[string]string) + } + if _, exists := dst.Entries[podUID]; !exists { + dst.Entries[podUID] = make(map[string]string) + } + dst.Entries[podUID][containerName] = cset + } + return nil +} + // tryRestoreState tries to read state file, upon any error, // err message is logged and state is left clean. un-initialized func (sf *stateFile) tryRestoreState() error { @@ -90,28 +123,40 @@ func (sf *stateFile) tryRestoreState() error { } // File exists; try to read it. - var readState stateFileData + var readStateV1 stateFileDataV1 + var readStateV2 stateFileDataV2 - if err = json.Unmarshal(content, &readState); err != nil { - klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) - return err - } - - if sf.policyName != readState.PolicyName { - return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) - } - - if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { - klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) - return err - } - - for containerID, cpuString := range readState.Entries { - if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - klog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) + if err = json.Unmarshal(content, &readStateV1); err != nil { + readStateV1 = stateFileDataV1{} // reset it back to 0 + if err = json.Unmarshal(content, &readStateV2); err != nil { + klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) return err } - tmpAssignments[containerID] = tmpContainerCPUSet + } + + if err = sf.migrateV1StateToV2State(&readStateV1, &readStateV2); err != nil { + klog.Errorf("[cpumanager] state file: could not migrate v1 state to v2 state - \"%s\"", sf.stateFilePath) + return err + } + + if sf.policyName != readStateV2.PolicyName { + return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readStateV2.PolicyName) + } + + if tmpDefaultCPUSet, err = cpuset.Parse(readStateV2.DefaultCPUSet); err != nil { + klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readStateV2.DefaultCPUSet) + return err + } + + for pod := range readStateV2.Entries { + tmpAssignments[pod] = make(map[string]cpuset.CPUSet) + for container, cpuString := range readStateV2.Entries[pod] { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + klog.Errorf("[cpumanager] state file: could not parse state file - pod: %s, container: %s, cpuset: \"%s\"", pod, container, cpuString) + return err + } + tmpAssignments[pod][container] = tmpContainerCPUSet + } } sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) @@ -128,14 +173,18 @@ func (sf *stateFile) storeState() { var content []byte var err error - data := stateFileData{ + data := stateFileDataV2{ PolicyName: sf.policyName, DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), - Entries: map[string]string{}, + Entries: map[string]map[string]string{}, } - for containerID, cset := range sf.cache.GetCPUAssignments() { - data.Entries[containerID] = cset.String() + assignments := sf.cache.GetCPUAssignments() + for pod := range assignments { + data.Entries[pod] = map[string]string{} + for container, cset := range assignments[pod] { + data.Entries[pod][container] = cset.String() + } } if content, err = json.Marshal(data); err != nil { @@ -147,11 +196,11 @@ func (sf *stateFile) storeState() { } } -func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { sf.RLock() defer sf.RUnlock() - res, ok := sf.cache.GetCPUSet(containerID) + res, ok := sf.cache.GetCPUSet(podUID, containerName) return res, ok } @@ -162,11 +211,11 @@ func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet { return sf.cache.GetDefaultCPUSet() } -func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { +func (sf *stateFile) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { sf.RLock() defer sf.RUnlock() - return sf.cache.GetCPUSetOrDefault(containerID) + return sf.cache.GetCPUSetOrDefault(podUID, containerName) } func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { @@ -175,10 +224,10 @@ func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { return sf.cache.GetCPUAssignments() } -func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { sf.Lock() defer sf.Unlock() - sf.cache.SetCPUSet(containerID, cset) + sf.cache.SetCPUSet(podUID, containerName, cset) sf.storeState() } @@ -196,10 +245,10 @@ func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { sf.storeState() } -func (sf *stateFile) Delete(containerID string) { +func (sf *stateFile) Delete(podUID string, containerName string) { sf.Lock() defer sf.Unlock() - sf.cache.Delete(containerID) + sf.cache.Delete(podUID, containerName) sf.storeState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index fb83eac5966..a928e9529a9 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -27,6 +27,7 @@ import ( "strings" "testing" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -70,17 +71,19 @@ func stderrCapture(t *testing.T, f func() State) (bytes.Buffer, State) { func TestFileStateTryRestore(t *testing.T) { testCases := []struct { - description string - stateFileContent string - policyName string - expErr string - expPanic bool - expectedState *stateMemory + description string + stateFileContent string + policyName string + initialContainers containermap.ContainerMap + expErr string + expPanic bool + expectedState *stateMemory }{ { "Invalid JSON - one byte file", "\n", "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", true, &stateMemory{}, @@ -89,6 +92,7 @@ func TestFileStateTryRestore(t *testing.T) { "Invalid JSON - invalid content", "{", "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", true, &stateMemory{}, @@ -97,6 +101,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore defaultCPUSet only", `{"policyName": "none", "defaultCpuSet": "4-6"}`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ @@ -108,6 +113,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore defaultCPUSet only - invalid name", `{"policyName": "none", "defaultCpuSet" "4-6"}`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, true, &stateMemory{}, @@ -117,17 +123,22 @@ func TestFileStateTryRestore(t *testing.T) { `{ "policyName": "none", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(), }, @@ -140,6 +151,7 @@ func TestFileStateTryRestore(t *testing.T) { "entries": {} }`, "B", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, true, &stateMemory{}, @@ -148,6 +160,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore invalid assignments", `{"entries": }`, "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", true, &stateMemory{}, @@ -158,17 +171,22 @@ func TestFileStateTryRestore(t *testing.T) { "policyName": "none", "defaultCpuSet": "23-24", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(23, 24), }, @@ -180,6 +198,7 @@ func TestFileStateTryRestore(t *testing.T) { "defaultCpuSet": "2-sd" }`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, true, &stateMemory{}, @@ -190,11 +209,14 @@ func TestFileStateTryRestore(t *testing.T) { "policyName": "none", "defaultCpuSet": "23-24", "entries": { - "container1": "p-6", - "container2": "1-3" + "pod": { + "container1": "p-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, true, &stateMemory{}, @@ -203,6 +225,7 @@ func TestFileStateTryRestore(t *testing.T) { "tryRestoreState creates empty state file", "", "none", + containermap.ContainerMap{}, "", false, &stateMemory{ @@ -210,6 +233,35 @@ func TestFileStateTryRestore(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(), }, }, + { + "Try restore with migration", + `{ + "policyName": "none", + "defaultCpuSet": "23-24", + "entries": { + "containerID1": "4-6", + "containerID2": "1-3" + } + }`, + "none", + func() containermap.ContainerMap { + cm := containermap.NewContainerMap() + cm.Add("pod", "container1", "containerID1") + cm.Add("pod", "container2", "containerID2") + return cm + }(), + "", + false, + &stateMemory{ + assignments: ContainerCPUAssignments{ + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + }, + defaultCPUSet: cpuset.NewCPUSet(23, 24), + }, + }, } for idx, tc := range testCases { @@ -239,7 +291,7 @@ func TestFileStateTryRestore(t *testing.T) { defer os.Remove(sfilePath.Name()) logData, fileState := stderrCapture(t, func() State { - return NewFileState(sfilePath.Name(), tc.policyName) + return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers) }) if tc.expErr != "" { @@ -284,7 +336,7 @@ func TestFileStateTryRestorePanic(t *testing.T) { } } }() - NewFileState(sfilePath, "static") + NewFileState(sfilePath, "static", nil) }) } @@ -315,8 +367,10 @@ func TestUpdateStateFile(t *testing.T) { "", &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(), }, @@ -363,7 +417,7 @@ func TestUpdateStateFile(t *testing.T) { return } } - newFileState := NewFileState(sfilePath.Name(), "static") + newFileState := NewFileState(sfilePath.Name(), "static", nil) AssertStateEqual(t, newFileState, tc.expectedState) }) } @@ -373,35 +427,43 @@ func TestHelpersStateFile(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "one container", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + }, }, }, { description: "two containers", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3, 4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, }, }, { description: "container with more cpus than is possible", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 10), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 10), + }, }, }, { description: "container without assigned cpus", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(), + }, }, }, } @@ -414,19 +476,21 @@ func TestHelpersStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static") + state := NewFileState(sfFile.Name(), "static", nil) state.SetDefaultCPUSet(tc.defaultCPUset) - for containerName, containerCPUs := range tc.containers { - state.SetCPUSet(containerName, containerCPUs) - if cpus, _ := state.GetCPUSet(containerName); !cpus.Equals(containerCPUs) { - t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus) - } - state.Delete(containerName) - if cpus := state.GetCPUSetOrDefault(containerName); !cpus.Equals(tc.defaultCPUset) { - t.Error("deleted container still existing in state") - } + for podUID := range tc.assignments { + for containerName, containerCPUs := range tc.assignments[podUID] { + state.SetCPUSet(podUID, containerName, containerCPUs) + if cpus, _ := state.GetCPUSet(podUID, containerName); !cpus.Equals(containerCPUs) { + t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus) + } + state.Delete(podUID, containerName) + if cpus := state.GetCPUSetOrDefault(podUID, containerName); !cpus.Equals(tc.defaultCPUset) { + t.Error("deleted container still existing in state") + } + } } }) @@ -437,15 +501,17 @@ func TestClearStateStateFile(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "valid file", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3), - "c3": cpuset.NewCPUSet(4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3), + "c3": cpuset.NewCPUSet(4, 5), + }, }, }, } @@ -457,19 +523,23 @@ func TestClearStateStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static") + state := NewFileState(sfFile.Name(), "static", nil) state.SetDefaultCPUSet(testCase.defaultCPUset) - for containerName, containerCPUs := range testCase.containers { - state.SetCPUSet(containerName, containerCPUs) + for podUID := range testCase.assignments { + for containerName, containerCPUs := range testCase.assignments[podUID] { + state.SetCPUSet(podUID, containerName, containerCPUs) + } } state.ClearState() if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { t.Error("cleared state shouldn't has got information about available cpuset") } - for containerName := range testCase.containers { - if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(containerName)) { - t.Error("cleared state shouldn't has got information about containers") + for podUID := range testCase.assignments { + for containerName := range testCase.assignments[podUID] { + if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(podUID, containerName)) { + t.Error("cleared state shouldn't has got information about containers") + } } } }) diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go index 77c5f4a525c..25ab6889bc8 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_mem.go +++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go @@ -40,11 +40,11 @@ func NewMemoryState() State { } } -func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (s *stateMemory) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { s.RLock() defer s.RUnlock() - res, ok := s.assignments[containerID] + res, ok := s.assignments[podUID][containerName] return res.Clone(), ok } @@ -55,8 +55,8 @@ func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet { return s.defaultCPUSet.Clone() } -func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { - if res, ok := s.GetCPUSet(containerID); ok { +func (s *stateMemory) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { + if res, ok := s.GetCPUSet(podUID, containerName); ok { return res } return s.GetDefaultCPUSet() @@ -68,12 +68,16 @@ func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments { return s.assignments.Clone() } -func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (s *stateMemory) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { s.Lock() defer s.Unlock() - s.assignments[containerID] = cset - klog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset) + if _, ok := s.assignments[podUID]; !ok { + s.assignments[podUID] = make(map[string]cpuset.CPUSet) + } + + s.assignments[podUID][containerName] = cset + klog.Infof("[cpumanager] updated desired cpuset (pod: %s, container: %s, cpuset: \"%s\")", podUID, containerName, cset) } func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { @@ -92,12 +96,15 @@ func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) { klog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a) } -func (s *stateMemory) Delete(containerID string) { +func (s *stateMemory) Delete(podUID string, containerName string) { s.Lock() defer s.Unlock() - delete(s.assignments, containerID) - klog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) + delete(s.assignments[podUID], containerName) + if len(s.assignments[podUID]) == 0 { + delete(s.assignments, podUID) + } + klog.V(2).Infof("[cpumanager] deleted cpuset assignment (pod: %s, container: %s)", podUID, containerName) } func (s *stateMemory) ClearState() {