Update top-level CPUManager to adhere to new state semantics

For now, we just pass 'nil' as the set of 'initialContainers' for
migrating from old state semantics to new ones. In a subsequent commit
will we pull this information from higher layers so that we can pass it
down at this stage properly.
This commit is contained in:
Kevin Klues 2019-11-05 20:25:12 +00:00
parent 185e790f71
commit 69f8053850
4 changed files with 120 additions and 94 deletions

View File

@ -158,7 +158,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
}
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name())
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name(), nil)
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err)
}
@ -209,7 +209,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e
m.Unlock()
return err
}
cpus := m.state.GetCPUSetOrDefault(containerID)
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()
if !cpus.IsEmpty() {
@ -238,11 +238,12 @@ func (m *manager) RemoveContainer(containerID string) error {
klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
return err
}
return nil
}
func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
err := m.policy.AddContainer(m.state, p, c, containerID)
err := m.policy.AddContainer(m.state, p, c)
if err == nil {
m.containerMap.Add(string(p.UID), c.Name, containerID)
}
@ -250,7 +251,12 @@ func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID str
}
func (m *manager) policyRemoveContainerByID(containerID string) error {
err := m.policy.RemoveContainer(m.state, containerID)
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
return nil
}
err = m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerID(containerID)
}
@ -259,14 +265,9 @@ func (m *manager) policyRemoveContainerByID(containerID string) error {
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
containerID, err := m.containerMap.GetContainerID(podUID, containerName)
if err != nil {
return nil
}
err = m.policy.RemoveContainer(m.state, containerID)
err := m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerID(containerID)
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
return err
@ -304,43 +305,35 @@ func (m *manager) removeStaleState() {
m.Lock()
defer m.Unlock()
// We remove stale state very conservatively, only removing *any* state
// once we know for sure that we wont be accidentally removing state that
// is still valid. Since this function is called periodically, we will just
// try again next time this function is called.
// Get the list of active pods.
activePods := m.activePods()
if len(activePods) == 0 {
// If there are no active pods, skip the removal of stale state.
// Since this function is called periodically, we will just try again
// next time this function is called.
return
}
// Build a list of containerIDs for all containers in all active Pods.
activeContainers := make(map[string]struct{})
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
// If even one pod does not have it's status set, skip state removal.
return
}
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
// If even one container does not have it's containerID set, skip state removal.
return
}
activeContainers[containerID] = struct{}{}
activeContainers[string(pod.UID)][container.Name] = struct{}{}
}
}
// Loop through the CPUManager state. Remove any state for containers not
// in the `activeContainers` list built above. The shortcircuits in place
// above ensure that no erroneous state will ever be removed.
for containerID := range m.state.GetCPUAssignments() {
if _, ok := activeContainers[containerID]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID)
err := m.policy.RemoveContainer(m.state, containerID)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err)
// in the `activeContainers` list built above.
assignments := m.state.GetCPUAssignments()
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
}
}
}
}
@ -373,7 +366,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
// - policy does not want to track the container
// - kubelet has just been restarted - and there is no previous state file
// - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
if _, ok := m.state.GetCPUSet(containerID); !ok {
if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok {
if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
err := m.AddContainer(pod, &container, containerID)
@ -389,7 +382,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
}
}
cset := m.state.GetCPUSetOrDefault(containerID)
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)

View File

@ -45,8 +45,8 @@ type mockState struct {
defaultCPUSet cpuset.CPUSet
}
func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
res, ok := s.assignments[containerID]
func (s *mockState) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
res, ok := s.assignments[podUID][containerName]
return res.Clone(), ok
}
@ -54,23 +54,29 @@ func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet {
return s.defaultCPUSet.Clone()
}
func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(containerID); ok {
func (s *mockState) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(podUID, containerName); ok {
return res
}
return s.GetDefaultCPUSet()
}
func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) {
s.assignments[containerID] = cset
func (s *mockState) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
if _, exists := s.assignments[podUID]; !exists {
s.assignments[podUID] = make(map[string]cpuset.CPUSet)
}
s.assignments[podUID][containerName] = cset
}
func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) {
s.defaultCPUSet = cset
}
func (s *mockState) Delete(containerID string) {
delete(s.assignments, containerID)
func (s *mockState) Delete(podUID string, containerName string) {
delete(s.assignments[podUID], containerName)
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
}
func (s *mockState) ClearState() {
@ -97,11 +103,11 @@ func (p *mockPolicy) Name() string {
func (p *mockPolicy) Start(s state.State) {
}
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
return p.err
}
func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error {
func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
return p.err
}
@ -154,6 +160,10 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod {
func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
UID: "podUID",
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{},
Containers: []v1.Container{},
@ -489,15 +499,15 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.description, containerIDs[i], err)
}
cset, found := state.assignments[containerIDs[i]]
cset, found := state.assignments[string(testCase.pod.UID)][containers[i].Name]
if !expCSets[i].IsEmpty() && !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, containerIDs[i], state.assignments)
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
testCase.description, containers[i].Name, state.assignments)
}
if found && !cset.Equals(expCSets[i]) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containerIDs[i], cset)
testCase.description, expCSets[i], containers[i].Name, cset)
}
}
}
@ -616,6 +626,9 @@ func TestCPUManagerGenerate(t *testing.T) {
}
func TestCPUManagerRemove(t *testing.T) {
containerID := "fakeID"
containerMap := containermap.NewContainerMap()
mgr := &manager{
policy: &mockPolicy{
err: nil,
@ -625,12 +638,13 @@ func TestCPUManagerRemove(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{},
containerMap: containermap.NewContainerMap(),
containerMap: containerMap,
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err := mgr.RemoveContainer("fakeID")
containerMap.Add("", "", containerID)
err := mgr.RemoveContainer(containerID)
if err != nil {
t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err)
}
@ -641,12 +655,13 @@ func TestCPUManagerRemove(t *testing.T) {
},
state: state.NewMemoryState(),
containerRuntime: mockRuntimeService{},
containerMap: containermap.NewContainerMap(),
containerMap: containerMap,
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err = mgr.RemoveContainer("fakeID")
containerMap.Add("", "", containerID)
err = mgr.RemoveContainer(containerID)
if !reflect.DeepEqual(err, fmt.Errorf("fake error")) {
t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err)
}
@ -670,12 +685,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -684,18 +699,20 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "fakeName",
expectSucceededContainerName: "fakeContainerName",
expectFailedContainerName: "",
},
{
@ -704,12 +721,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -718,18 +735,20 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
InitContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "fakeName",
expectSucceededContainerName: "fakeContainerName",
expectFailedContainerName: "",
},
{
@ -738,12 +757,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -755,7 +774,7 @@ func TestReconcileState(t *testing.T) {
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - container id not found",
@ -763,12 +782,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -777,8 +796,8 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName1",
ContainerID: "docker://fakeID",
Name: "fakeContainerName1",
ContainerID: "docker://fakeContainerID",
},
},
},
@ -787,7 +806,7 @@ func TestReconcileState(t *testing.T) {
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - cpuset is empty",
@ -795,12 +814,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -809,19 +828,21 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - container update error",
@ -829,12 +850,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -843,19 +864,21 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: fmt.Errorf("fake container update error"),
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
}

View File

@ -156,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(0, 6),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -175,7 +177,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(3, 9),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -194,7 +198,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
string(testPod4.UID): map[string]cpuset.CPUSet{
testContainer4.Name: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -209,7 +215,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(0, 6, 3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},
@ -219,7 +227,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
string(testPod4.UID): map[string]cpuset.CPUSet{
testContainer4.Name: cpuset.NewCPUSet(0, 6, 3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},

View File

@ -124,7 +124,7 @@ func waitForContainerRemoval(containerName, podName, podNS string) {
func waitForStateFileCleanedUp() {
gomega.Eventually(func() bool {
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static")
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static", nil)
framework.ExpectNoError(err, "failed to create testing cpumanager state instance")
assignments := restoredState.GetCPUAssignments()
if len(assignments) == 0 {