diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 65bb88bf0f0..f6563bbca23 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -214,6 +214,54 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) return pod } +func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + UID: "podUID", + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{}, + Containers: []v1.Container{}, + }, + } + + for i, cpu := range initCPUs { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ + Name: "initContainer-" + strconv.Itoa(i), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + }, + RestartPolicy: &cpu.restartPolicy, + }) + } + + for i, cpu := range appCPUs { + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: "appContainer-" + strconv.Itoa(i), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + }, + }) + } + + return pod +} + func TestCPUManagerAdd(t *testing.T) { testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 0f72e64dbc8..d22a6a64d5e 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/utils/cpuset" ) @@ -277,6 +278,12 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c // If so, add its cpuset to the cpuset of reusable CPUs for any new allocations. for _, initContainer := range pod.Spec.InitContainers { if container.Name == initContainer.Name { + if types.IsRestartableInitContainer(&initContainer) { + // If the container is a restartable init container, we should not + // reuse its cpuset, as a restartable init container can run with + // regular containers. + break + } p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset) return } @@ -444,15 +451,21 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { // The maximum of requested CPUs by init containers. requestedByInitContainers := 0 + requestedByRestartableInitContainers := 0 for _, container := range pod.Spec.InitContainers { if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { continue } requestedCPU := p.guaranteedCPUs(pod, &container) - if requestedCPU > requestedByInitContainers { - requestedByInitContainers = requestedCPU + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/753-sidecar-containers#resources-calculation-for-scheduling-and-pod-admission + // for the detail. + if types.IsRestartableInitContainer(&container) { + requestedByRestartableInitContainers += requestedCPU + } else if requestedByRestartableInitContainers+requestedCPU > requestedByInitContainers { + requestedByInitContainers = requestedByRestartableInitContainers + requestedCPU } } + // The sum of requested CPUs by app containers. requestedByAppContainers := 0 for _, container := range pod.Spec.Containers { @@ -462,10 +475,11 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { requestedByAppContainers += p.guaranteedCPUs(pod, &container) } - if requestedByInitContainers > requestedByAppContainers { + requestedByLongRunningContainers := requestedByAppContainers + requestedByRestartableInitContainers + if requestedByInitContainers > requestedByLongRunningContainers { return requestedByInitContainers } - return requestedByAppContainers + return requestedByLongRunningContainers } func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index b0f14ae04e7..8cc3fc2be74 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -716,6 +716,51 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { } } +func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { + testCases := []struct { + staticPolicyTest + expCSetAfterAlloc cpuset.CPUSet + }{ + { + staticPolicyTest: staticPolicyTest{ + description: "SingleSocketHT, Don't reuse CPUs of a restartable init container", + topo: topoSingleSocketHT, + pod: makeMultiContainerPodWithOptions( + []*containerOptions{ + {request: "4000m", limit: "4000m", restartPolicy: v1.ContainerRestartPolicyAlways}}, // 0, 1, 4, 5 + []*containerOptions{ + {request: "2000m", limit: "2000m"}}), // 2, 6 + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), + }, + expCSetAfterAlloc: cpuset.New(3, 7), + }, + } + + for _, testCase := range testCases { + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + pod := testCase.pod + + // allocate + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + err := policy.Allocate(st, pod, &container) + if err != nil { + t.Errorf("StaticPolicy Allocate() error (%v). expected no error but got %v", + testCase.description, err) + } + } + if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) { + t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", + testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) + } + } +} + func TestStaticPolicyRemove(t *testing.T) { testCases := []staticPolicyTest{ { diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index 13455e53bd2..53738b613c2 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -65,11 +65,14 @@ func returnMachineInfo() cadvisorapi.MachineInfo { } } +type containerOptions struct { + request string + limit string + restartPolicy v1.ContainerRestartPolicy +} + func TestPodGuaranteedCPUs(t *testing.T) { - CPUs := [][]struct { - request string - limit string - }{ + options := [][]*containerOptions{ { {request: "0", limit: "0"}, }, @@ -85,17 +88,46 @@ func TestPodGuaranteedCPUs(t *testing.T) { }, } // tc for not guaranteed Pod - testPod1 := makeMultiContainerPod(CPUs[0], CPUs[0]) - testPod2 := makeMultiContainerPod(CPUs[0], CPUs[1]) - testPod3 := makeMultiContainerPod(CPUs[1], CPUs[0]) + testPod1 := makeMultiContainerPodWithOptions(options[0], options[0]) + testPod2 := makeMultiContainerPodWithOptions(options[0], options[1]) + testPod3 := makeMultiContainerPodWithOptions(options[1], options[0]) // tc for guaranteed Pod - testPod4 := makeMultiContainerPod(CPUs[1], CPUs[1]) - testPod5 := makeMultiContainerPod(CPUs[2], CPUs[2]) + testPod4 := makeMultiContainerPodWithOptions(options[1], options[1]) + testPod5 := makeMultiContainerPodWithOptions(options[2], options[2]) // tc for comparing init containers and user containers - testPod6 := makeMultiContainerPod(CPUs[1], CPUs[2]) - testPod7 := makeMultiContainerPod(CPUs[2], CPUs[1]) + testPod6 := makeMultiContainerPodWithOptions(options[1], options[2]) + testPod7 := makeMultiContainerPodWithOptions(options[2], options[1]) // tc for multi containers - testPod8 := makeMultiContainerPod(CPUs[3], CPUs[3]) + testPod8 := makeMultiContainerPodWithOptions(options[3], options[3]) + // tc for restartable init containers + testPod9 := makeMultiContainerPodWithOptions([]*containerOptions{ + {request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways}, + }, []*containerOptions{ + {request: "1", limit: "1"}, + }) + testPod10 := makeMultiContainerPodWithOptions([]*containerOptions{ + {request: "5", limit: "5"}, + {request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways}, + {request: "2", limit: "2", restartPolicy: v1.ContainerRestartPolicyAlways}, + {request: "3", limit: "3", restartPolicy: v1.ContainerRestartPolicyAlways}, + }, []*containerOptions{ + {request: "1", limit: "1"}, + }) + testPod11 := makeMultiContainerPodWithOptions([]*containerOptions{ + {request: "5", limit: "5"}, + {request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways}, + {request: "2", limit: "2", restartPolicy: v1.ContainerRestartPolicyAlways}, + {request: "5", limit: "5"}, + {request: "3", limit: "3", restartPolicy: v1.ContainerRestartPolicyAlways}, + }, []*containerOptions{ + {request: "1", limit: "1"}, + }) + testPod12 := makeMultiContainerPodWithOptions([]*containerOptions{ + {request: "10", limit: "10", restartPolicy: v1.ContainerRestartPolicyAlways}, + {request: "200", limit: "200"}, + }, []*containerOptions{ + {request: "100", limit: "100"}, + }) p := staticPolicy{} @@ -144,13 +176,35 @@ func TestPodGuaranteedCPUs(t *testing.T) { pod: testPod8, expectedCPU: 6, }, + { + name: "TestCase09: restartable init container + regular container", + pod: testPod9, + expectedCPU: 2, + }, + { + name: "TestCase09: multiple restartable init containers", + pod: testPod10, + expectedCPU: 7, + }, + { + name: "TestCase11: multiple restartable and regular init containers", + pod: testPod11, + expectedCPU: 8, + }, + { + name: "TestCase12: restartable init, regular init and regular container", + pod: testPod12, + expectedCPU: 210, + }, } for _, tc := range tcases { - requestedCPU := p.podGuaranteedCPUs(tc.pod) + t.Run(tc.name, func(t *testing.T) { + requestedCPU := p.podGuaranteedCPUs(tc.pod) - if requestedCPU != tc.expectedCPU { - t.Errorf("Expected in result to be %v , got %v", tc.expectedCPU, requestedCPU) - } + if requestedCPU != tc.expectedCPU { + t.Errorf("Expected in result to be %v , got %v", tc.expectedCPU, requestedCPU) + } + }) } } diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 80515f47186..80ffa35d265 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -44,9 +44,10 @@ import ( // Helper for makeCPUManagerPod(). type ctnAttribute struct { - ctnName string - cpuRequest string - cpuLimit string + ctnName string + cpuRequest string + cpuLimit string + restartPolicy *v1.ContainerRestartPolicy } // makeCPUMangerPod returns a pod with the provided ctnAttributes. @@ -83,6 +84,63 @@ func makeCPUManagerPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod { } } +// makeCPUMangerInitContainersPod returns a pod with init containers with the +// provided ctnAttributes. +func makeCPUManagerInitContainersPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod { + var containers []v1.Container + cpusetCmd := "grep Cpus_allowed_list /proc/self/status | cut -f2" + cpusetAndSleepCmd := "grep Cpus_allowed_list /proc/self/status | cut -f2 && sleep 1d" + for _, ctnAttr := range ctnAttributes { + ctn := v1.Container{ + Name: ctnAttr.ctnName, + Image: busyboxImage, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(ctnAttr.cpuRequest), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(ctnAttr.cpuLimit), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + }, + Command: []string{"sh", "-c", cpusetCmd}, + RestartPolicy: ctnAttr.restartPolicy, + } + if ctnAttr.restartPolicy != nil && *ctnAttr.restartPolicy == v1.ContainerRestartPolicyAlways { + ctn.Command = []string{"sh", "-c", cpusetAndSleepCmd} + } + containers = append(containers, ctn) + } + + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + InitContainers: containers, + Containers: []v1.Container{ + { + Name: "regular", + Image: busyboxImage, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + }, + Command: []string{"sh", "-c", cpusetAndSleepCmd}, + }, + }, + }, + } +} + func deletePodSyncByName(ctx context.Context, f *framework.Framework, podName string) { gp := int64(0) delOpts := metav1.DeleteOptions{ @@ -645,6 +703,76 @@ func runCPUManagerTests(f *framework.Framework) { runSMTAlignmentPositiveTests(ctx, f, smtLevel) }) + ginkgo.It("should not reuse CPUs of restartable init containers [NodeAlphaFeature:SidecarContainers]", func(ctx context.Context) { + cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f) + + // Skip rest of the tests if CPU capacity < 3. + if cpuCap < 3 { + e2eskipper.Skipf("Skipping rest of the CPU Manager tests since CPU capacity < 3, got %d", cpuCap) + } + + // Enable CPU Manager in the kubelet. + newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.CPUSet{}, + }) + updateKubeletConfig(ctx, f, newCfg, true) + + ginkgo.By("running a Gu pod with a regular init container and a restartable init container") + ctrAttrs := []ctnAttribute{ + { + ctnName: "gu-init-container1", + cpuRequest: "1000m", + cpuLimit: "1000m", + }, + { + ctnName: "gu-restartable-init-container2", + cpuRequest: "1000m", + cpuLimit: "1000m", + restartPolicy: &containerRestartPolicyAlways, + }, + } + pod := makeCPUManagerInitContainersPod("gu-pod", ctrAttrs) + pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + + ginkgo.By("checking if the expected cpuset was assigned") + logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.InitContainers[0].Name) + framework.ExpectNoError(err, "expected log not found in init container [%s] of pod [%s]", pod.Spec.InitContainers[0].Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + reusableCPUs, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.InitContainers[0].Name, pod.Name) + + gomega.Expect(reusableCPUs.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", reusableCPUs.String()) + + logs, err = e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.InitContainers[1].Name) + framework.ExpectNoError(err, "expected log not found in init container [%s] of pod [%s]", pod.Spec.InitContainers[1].Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + nonReusableCPUs, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.InitContainers[1].Name, pod.Name) + + gomega.Expect(nonReusableCPUs.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", nonReusableCPUs.String()) + + logs, err = e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", pod.Spec.Containers[0].Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + cpus, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.Containers[0].Name, pod.Name) + + gomega.Expect(cpus.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", cpus.String()) + + gomega.Expect(reusableCPUs.Equals(nonReusableCPUs)).To(gomega.BeTrue(), "expected reusable cpuset [%s] to be equal to non-reusable cpuset [%s]", reusableCPUs.String(), nonReusableCPUs.String()) + gomega.Expect(nonReusableCPUs.Intersection(cpus).IsEmpty()).To(gomega.BeTrue(), "expected non-reusable cpuset [%s] to be disjoint from cpuset [%s]", nonReusableCPUs.String(), cpus.String()) + + ginkgo.By("by deleting the pods and waiting for container removal") + deletePods(ctx, f, []string{pod.Name}) + waitForContainerRemoval(ctx, pod.Spec.InitContainers[0].Name, pod.Name, pod.Namespace) + waitForContainerRemoval(ctx, pod.Spec.InitContainers[1].Name, pod.Name, pod.Namespace) + waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + }) + ginkgo.AfterEach(func(ctx context.Context) { updateKubeletConfig(ctx, f, oldCfg, true) })