diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index e44b07bc8a2..9982e73cbb9 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "container_map.go", "cpu_assignment.go", "cpu_manager.go", "fake_cpu_manager.go", @@ -30,6 +31,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "container_map_test.go", "cpu_assignment_test.go", "cpu_manager_test.go", "policy_none_test.go", diff --git a/pkg/kubelet/cm/cpumanager/container_map.go b/pkg/kubelet/cm/cpumanager/container_map.go new file mode 100644 index 00000000000..2da2c931b60 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/container_map.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + + "k8s.io/api/core/v1" +) + +// containerMap maps (podUID, containerName) -> containerID +type containerMap map[string]map[string]string + +func newContainerMap() containerMap { + return make(containerMap) +} + +func (cm containerMap) Add(p *v1.Pod, c *v1.Container, containerID string) { + podUID := string(p.UID) + if _, exists := cm[podUID]; !exists { + cm[podUID] = make(map[string]string) + } + cm[podUID][c.Name] = containerID +} + +func (cm containerMap) Remove(containerID string) { + found := false + for podUID := range cm { + for containerName := range cm[podUID] { + if containerID == cm[podUID][containerName] { + delete(cm[podUID], containerName) + found = true + break + } + } + if len(cm[podUID]) == 0 { + delete(cm, podUID) + } + if found { + break + } + } +} + +func (cm containerMap) Get(p *v1.Pod, c *v1.Container) (string, error) { + podUID := string(p.UID) + if _, exists := cm[podUID]; !exists { + return "", fmt.Errorf("pod %s not in containerMap", podUID) + } + if _, exists := cm[podUID][c.Name]; !exists { + return "", fmt.Errorf("container %s not in containerMap for pod %s", c.Name, podUID) + } + return cm[podUID][c.Name], nil +} diff --git a/pkg/kubelet/cm/cpumanager/container_map_test.go b/pkg/kubelet/cm/cpumanager/container_map_test.go new file mode 100644 index 00000000000..6e0f43fb652 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/container_map_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "testing" + + "k8s.io/api/core/v1" + apimachinery "k8s.io/apimachinery/pkg/types" +) + +func TestContainerMap(t *testing.T) { + testCases := []struct { + podUID string + containerNames []string + containerIDs []string + }{ + { + "fakePodUID", + []string{"fakeContainerName-1", "fakeContainerName-2"}, + []string{"fakeContainerID-1", "fakeContainerName-2"}, + }, + } + + for _, tc := range testCases { + pod := v1.Pod{} + pod.UID = apimachinery.UID(tc.podUID) + + // Build a new containerMap from the testCases, checking proper + // addition, retrieval along the way. + cm := newContainerMap() + for i := range tc.containerNames { + container := v1.Container{Name: tc.containerNames[i]} + + cm.Add(&pod, &container, tc.containerIDs[i]) + containerID, err := cm.Get(&pod, &container) + if err != nil { + t.Errorf("error adding and retrieving container: %v", err) + } + if containerID != tc.containerIDs[i] { + t.Errorf("mismatched containerIDs %v, %v", containerID, tc.containerIDs[i]) + } + } + + // Remove all entries from the containerMap, checking proper removal of + // each along the way. + for i := range tc.containerNames { + container := v1.Container{Name: tc.containerNames[i]} + cm.Remove(tc.containerIDs[i]) + containerID, err := cm.Get(&pod, &container) + if err == nil { + t.Errorf("unexpected retrieval of containerID after removal: %v", containerID) + } + } + + // Verify containerMap now empty. + if len(cm) != 0 { + t.Errorf("unexpected entries still in containerMap: %v", cm) + } + + } +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 1bb68dcb772..ea4144567cb 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -140,6 +140,49 @@ func makePod(cpuRequest, cpuLimit string) *v1.Pod { } } +func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{}, + Containers: []v1.Container{}, + }, + } + + for i, cpu := range initCPUs { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ + Name: "initContainer-" + string(i), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + }, + }) + } + + for i, cpu := range appCPUs { + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: "appContainer-" + string(i), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + }, + }) + } + + return pod +} + func TestCPUManagerAdd(t *testing.T) { testPolicy := NewStaticPolicy( &topology.CPUTopology{ diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 38dce8b08e3..c374cb42e55 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -73,6 +73,10 @@ type staticPolicy struct { topology *topology.CPUTopology // set of CPUs that is not available for exclusive assignment reserved cpuset.CPUSet + // containerMap provides a mapping from + // (pod, container) -> containerID + // for all containers a pod + containerMap containerMap } // Ensure staticPolicy implements Policy interface @@ -97,8 +101,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) return &staticPolicy{ - topology: topology, - reserved: reserved, + topology: topology, + reserved: reserved, + containerMap: newContainerMap(), } } @@ -172,7 +177,15 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } -func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { +func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) (rerr error) { + // So long as this function does not return an error, + // add (pod, container, containerID) to the containerMap. + defer func() { + if rerr == nil { + p.containerMap.Add(pod, container, containerID) + } + }() + if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) // container belongs in an exclusively allocated pool @@ -182,6 +195,22 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co return nil } + // Proactively remove CPUs from init containers that have already run. + // They are guaranteed to have run to completion before any other + // container is run. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name != initContainer.Name { + initContainerID, err := p.containerMap.Get(pod, &initContainer) + if err != nil { + continue + } + err = p.RemoveContainer(s, initContainerID) + if err != nil { + klog.Warningf("[cpumanager] unable to remove init container (container id: %s, error: %v)", initContainerID, err) + } + } + } + cpuset, err := p.allocateCPUs(s, numCPUs) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) @@ -193,7 +222,15 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co return nil } -func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error { +func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr error) { + // So long as this function does not return an error, + // remove containerID from the containerMap. + defer func() { + if rerr == nil { + p.containerMap.Remove(containerID) + } + }() + klog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID) if toRelease, ok := s.GetCPUSet(containerID); ok { s.Delete(containerID) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index d4d84d0da19..9103f9c0c85 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -41,6 +41,22 @@ type staticPolicyTest struct { expPanic bool } +type staticPolicyMultiContainerTest struct { + description string + topo *topology.CPUTopology + numReservedCPUs int + initContainerIDs []string + containerIDs []string + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + pod *v1.Pod + expErr error + expCPUAlloc bool + expInitCSets []cpuset.CPUSet + expCSets []cpuset.CPUSet + expPanic bool +} + func TestStaticPolicyName(t *testing.T) { policy := NewStaticPolicy(topoSingleSocketHT, 1) @@ -445,6 +461,217 @@ func TestStaticPolicyAdd(t *testing.T) { } } +func TestStaticPolicyAddWithInitContainers(t *testing.T) { + testCases := []staticPolicyMultiContainerTest{ + { + description: "No Guaranteed Init CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"100m", "100m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet()}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Equal Number of Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"4000m", "4000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "More Init Container Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"6000m", "6000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Less Init Container Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"2000m", "2000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Multi Init Container Equal CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + }, + { + description: "Multi Init Container Less CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"4000m", "4000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + }, + { + description: "Multi Init Container More CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}, + []struct{ request, limit string }{ + {"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Multi Init Container Increasing CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"6000m", "6000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, + }, + { + description: "Multi Init, Multi App Container Split CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID-1", "appFakeID-2"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(1, 5)}, + }, + } + + for _, testCase := range testCases { + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + + containers := append( + testCase.pod.Spec.InitContainers, + testCase.pod.Spec.Containers...) + + containerIDs := append( + testCase.initContainerIDs, + testCase.containerIDs...) + + expCSets := append( + testCase.expInitCSets, + testCase.expCSets...) + + for i := range containers { + err := policy.AddContainer(st, testCase.pod, &containers[i], containerIDs[i]) + if err != nil { + t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", + testCase.description, containerIDs[i], err) + } + + cset, found := st.assignments[containerIDs[i]] + if !expCSets[i].IsEmpty() && !found { + t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", + testCase.description, containerIDs[i], st.assignments) + } + + if found && !cset.Equals(expCSets[i]) { + t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", + testCase.description, expCSets[i], containerIDs[i], cset) + } + } + } +} + func TestStaticPolicyRemove(t *testing.T) { testCases := []staticPolicyTest{ {