From 69f8053850443217ba0f0d470c816b0b728f2b9d Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 5 Nov 2019 20:25:12 +0000 Subject: [PATCH] Update top-level CPUManager to adhere to new state semantics For now, we just pass 'nil' as the set of 'initialContainers' for migrating from old state semantics to new ones. In a subsequent commit will we pull this information from higher layers so that we can pass it down at this stage properly. --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 69 +++++----- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 123 +++++++++++------- .../cm/cpumanager/topology_hints_test.go | 20 ++- test/e2e_node/cpu_manager_test.go | 2 +- 4 files changed, 120 insertions(+), 94 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index ee527a912ea..16ece4d8f64 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -158,7 +158,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) } - stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name()) + stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name(), nil) if err != nil { return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err) } @@ -209,7 +209,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e m.Unlock() return err } - cpus := m.state.GetCPUSetOrDefault(containerID) + cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) m.Unlock() if !cpus.IsEmpty() { @@ -238,11 +238,12 @@ func (m *manager) RemoveContainer(containerID string) error { klog.Errorf("[cpumanager] RemoveContainer error: %v", err) return err } + return nil } func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error { - err := m.policy.AddContainer(m.state, p, c, containerID) + err := m.policy.AddContainer(m.state, p, c) if err == nil { m.containerMap.Add(string(p.UID), c.Name, containerID) } @@ -250,7 +251,12 @@ func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID str } func (m *manager) policyRemoveContainerByID(containerID string) error { - err := m.policy.RemoveContainer(m.state, containerID) + podUID, containerName, err := m.containerMap.GetContainerRef(containerID) + if err != nil { + return nil + } + + err = m.policy.RemoveContainer(m.state, podUID, containerName) if err == nil { m.containerMap.RemoveByContainerID(containerID) } @@ -259,14 +265,9 @@ func (m *manager) policyRemoveContainerByID(containerID string) error { } func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error { - containerID, err := m.containerMap.GetContainerID(podUID, containerName) - if err != nil { - return nil - } - - err = m.policy.RemoveContainer(m.state, containerID) + err := m.policy.RemoveContainer(m.state, podUID, containerName) if err == nil { - m.containerMap.RemoveByContainerID(containerID) + m.containerMap.RemoveByContainerRef(podUID, containerName) } return err @@ -304,43 +305,35 @@ func (m *manager) removeStaleState() { m.Lock() defer m.Unlock() - // We remove stale state very conservatively, only removing *any* state - // once we know for sure that we wont be accidentally removing state that - // is still valid. Since this function is called periodically, we will just - // try again next time this function is called. + // Get the list of active pods. activePods := m.activePods() if len(activePods) == 0 { // If there are no active pods, skip the removal of stale state. + // Since this function is called periodically, we will just try again + // next time this function is called. return } - // Build a list of containerIDs for all containers in all active Pods. - activeContainers := make(map[string]struct{}) + // Build a list of (podUID, containerName) pairs for all containers in all active Pods. + activeContainers := make(map[string]map[string]struct{}) for _, pod := range activePods { - pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) - if !ok { - // If even one pod does not have it's status set, skip state removal. - return - } + activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { - containerID, err := findContainerIDByName(&pstatus, container.Name) - if err != nil { - // If even one container does not have it's containerID set, skip state removal. - return - } - activeContainers[containerID] = struct{}{} + activeContainers[string(pod.UID)][container.Name] = struct{}{} } } // Loop through the CPUManager state. Remove any state for containers not - // in the `activeContainers` list built above. The shortcircuits in place - // above ensure that no erroneous state will ever be removed. - for containerID := range m.state.GetCPUAssignments() { - if _, ok := activeContainers[containerID]; !ok { - klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID) - err := m.policy.RemoveContainer(m.state, containerID) - if err != nil { - klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err) + // in the `activeContainers` list built above. + assignments := m.state.GetCPUAssignments() + for podUID := range assignments { + for containerName := range assignments[podUID] { + if _, ok := activeContainers[podUID][containerName]; !ok { + klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName) + err := m.policyRemoveContainerByRef(podUID, containerName) + if err != nil { + klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err) + } } } } @@ -373,7 +366,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec // - policy does not want to track the container // - kubelet has just been restarted - and there is no previous state file // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set) - if _, ok := m.state.GetCPUSet(containerID); !ok { + if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok { if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil { klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) err := m.AddContainer(pod, &container, containerID) @@ -389,7 +382,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } - cset := m.state.GetCPUSetOrDefault(containerID) + cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index d51fc805b74..c1c494583dc 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -45,8 +45,8 @@ type mockState struct { defaultCPUSet cpuset.CPUSet } -func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { - res, ok := s.assignments[containerID] +func (s *mockState) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { + res, ok := s.assignments[podUID][containerName] return res.Clone(), ok } @@ -54,23 +54,29 @@ func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet { return s.defaultCPUSet.Clone() } -func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { - if res, ok := s.GetCPUSet(containerID); ok { +func (s *mockState) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { + if res, ok := s.GetCPUSet(podUID, containerName); ok { return res } return s.GetDefaultCPUSet() } -func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) { - s.assignments[containerID] = cset +func (s *mockState) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { + if _, exists := s.assignments[podUID]; !exists { + s.assignments[podUID] = make(map[string]cpuset.CPUSet) + } + s.assignments[podUID][containerName] = cset } func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) { s.defaultCPUSet = cset } -func (s *mockState) Delete(containerID string) { - delete(s.assignments, containerID) +func (s *mockState) Delete(podUID string, containerName string) { + delete(s.assignments[podUID], containerName) + if len(s.assignments[podUID]) == 0 { + delete(s.assignments, podUID) + } } func (s *mockState) ClearState() { @@ -97,11 +103,11 @@ func (p *mockPolicy) Name() string { func (p *mockPolicy) Start(s state.State) { } -func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { +func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { return p.err } -func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error { +func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error { return p.err } @@ -154,6 +160,10 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod { func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod { pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + UID: "podUID", + }, Spec: v1.PodSpec{ InitContainers: []v1.Container{}, Containers: []v1.Container{}, @@ -489,15 +499,15 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { testCase.description, containerIDs[i], err) } - cset, found := state.assignments[containerIDs[i]] + cset, found := state.assignments[string(testCase.pod.UID)][containers[i].Name] if !expCSets[i].IsEmpty() && !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", - testCase.description, containerIDs[i], state.assignments) + t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + testCase.description, containers[i].Name, state.assignments) } if found && !cset.Equals(expCSets[i]) { t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", - testCase.description, expCSets[i], containerIDs[i], cset) + testCase.description, expCSets[i], containers[i].Name, cset) } } } @@ -616,6 +626,9 @@ func TestCPUManagerGenerate(t *testing.T) { } func TestCPUManagerRemove(t *testing.T) { + containerID := "fakeID" + containerMap := containermap.NewContainerMap() + mgr := &manager{ policy: &mockPolicy{ err: nil, @@ -625,12 +638,13 @@ func TestCPUManagerRemove(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{}, - containerMap: containermap.NewContainerMap(), + containerMap: containerMap, activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - err := mgr.RemoveContainer("fakeID") + containerMap.Add("", "", containerID) + err := mgr.RemoveContainer(containerID) if err != nil { t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err) } @@ -641,12 +655,13 @@ func TestCPUManagerRemove(t *testing.T) { }, state: state.NewMemoryState(), containerRuntime: mockRuntimeService{}, - containerMap: containermap.NewContainerMap(), + containerMap: containerMap, activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - err = mgr.RemoveContainer("fakeID") + containerMap.Add("", "", containerID) + err = mgr.RemoveContainer(containerID) if !reflect.DeepEqual(err, fmt.Errorf("fake error")) { t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err) } @@ -670,12 +685,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -684,18 +699,20 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: nil, - expectSucceededContainerName: "fakeName", + expectSucceededContainerName: "fakeContainerName", expectFailedContainerName: "", }, { @@ -704,12 +721,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ InitContainers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -718,18 +735,20 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ InitContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: nil, - expectSucceededContainerName: "fakeName", + expectSucceededContainerName: "fakeContainerName", expectFailedContainerName: "", }, { @@ -738,12 +757,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -755,7 +774,7 @@ func TestReconcileState(t *testing.T) { stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - container id not found", @@ -763,12 +782,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -777,8 +796,8 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName1", - ContainerID: "docker://fakeID", + Name: "fakeContainerName1", + ContainerID: "docker://fakeContainerID", }, }, }, @@ -787,7 +806,7 @@ func TestReconcileState(t *testing.T) { stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - cpuset is empty", @@ -795,12 +814,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -809,19 +828,21 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - container update error", @@ -829,12 +850,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -843,19 +864,21 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: fmt.Errorf("fake container update error"), expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, } diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index 1e80e68472f..62d91e9cf98 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -156,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(0, 6), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -175,7 +177,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(3, 9), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -194,7 +198,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod4, container: *testContainer4, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + string(testPod4.UID): map[string]cpuset.CPUSet{ + testContainer4.Name: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -209,7 +215,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6, 3, 9), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(0, 6, 3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{}, @@ -219,7 +227,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod4, container: *testContainer4, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6, 3, 9), + string(testPod4.UID): map[string]cpuset.CPUSet{ + testContainer4.Name: cpuset.NewCPUSet(0, 6, 3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{}, diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 203d983d274..e5dcd64782f 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -124,7 +124,7 @@ func waitForContainerRemoval(containerName, podName, podNS string) { func waitForStateFileCleanedUp() { gomega.Eventually(func() bool { - restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static") + restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static", nil) framework.ExpectNoError(err, "failed to create testing cpumanager state instance") assignments := restoredState.GetCPUAssignments() if len(assignments) == 0 {