Merge pull request #119447 from gjkim42/do-not-reuse-cpu-set-of-restartable-init-container

Don't reuse CPU set of a restartable init container
This commit is contained in:
Kubernetes Prow Robot 2023-10-31 19:15:26 +01:00 committed by GitHub
commit bfeb3c2621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 312 additions and 23 deletions

View File

@ -214,6 +214,54 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
return pod
}
func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
UID: "podUID",
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{},
Containers: []v1.Container{},
},
}
for i, cpu := range initCPUs {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: "initContainer-" + strconv.Itoa(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
},
RestartPolicy: &cpu.restartPolicy,
})
}
for i, cpu := range appCPUs {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: "appContainer-" + strconv.Itoa(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
},
})
}
return pod
}
func TestCPUManagerAdd(t *testing.T) {
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/cpuset"
)
@ -277,6 +278,12 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if types.IsRestartableInitContainer(&initContainer) {
// If the container is a restartable init container, we should not
// reuse its cpuset, as a restartable init container can run with
// regular containers.
break
}
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
return
}
@ -444,15 +451,21 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
// The maximum of requested CPUs by init containers.
requestedByInitContainers := 0
requestedByRestartableInitContainers := 0
for _, container := range pod.Spec.InitContainers {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
continue
}
requestedCPU := p.guaranteedCPUs(pod, &container)
if requestedCPU > requestedByInitContainers {
requestedByInitContainers = requestedCPU
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/753-sidecar-containers#resources-calculation-for-scheduling-and-pod-admission
// for the detail.
if types.IsRestartableInitContainer(&container) {
requestedByRestartableInitContainers += requestedCPU
} else if requestedByRestartableInitContainers+requestedCPU > requestedByInitContainers {
requestedByInitContainers = requestedByRestartableInitContainers + requestedCPU
}
}
// The sum of requested CPUs by app containers.
requestedByAppContainers := 0
for _, container := range pod.Spec.Containers {
@ -462,10 +475,11 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
requestedByAppContainers += p.guaranteedCPUs(pod, &container)
}
if requestedByInitContainers > requestedByAppContainers {
requestedByLongRunningContainers := requestedByAppContainers + requestedByRestartableInitContainers
if requestedByInitContainers > requestedByLongRunningContainers {
return requestedByInitContainers
}
return requestedByAppContainers
return requestedByLongRunningContainers
}
func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {

View File

@ -716,6 +716,51 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
}
func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
testCases := []struct {
staticPolicyTest
expCSetAfterAlloc cpuset.CPUSet
}{
{
staticPolicyTest: staticPolicyTest{
description: "SingleSocketHT, Don't reuse CPUs of a restartable init container",
topo: topoSingleSocketHT,
pod: makeMultiContainerPodWithOptions(
[]*containerOptions{
{request: "4000m", limit: "4000m", restartPolicy: v1.ContainerRestartPolicyAlways}}, // 0, 1, 4, 5
[]*containerOptions{
{request: "2000m", limit: "2000m"}}), // 2, 6
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
},
expCSetAfterAlloc: cpuset.New(3, 7),
},
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
pod := testCase.pod
// allocate
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := policy.Allocate(st, pod, &container)
if err != nil {
t.Errorf("StaticPolicy Allocate() error (%v). expected no error but got %v",
testCase.description, err)
}
}
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) {
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v",
testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet)
}
}
}
func TestStaticPolicyRemove(t *testing.T) {
testCases := []staticPolicyTest{
{

View File

@ -65,11 +65,14 @@ func returnMachineInfo() cadvisorapi.MachineInfo {
}
}
type containerOptions struct {
request string
limit string
restartPolicy v1.ContainerRestartPolicy
}
func TestPodGuaranteedCPUs(t *testing.T) {
CPUs := [][]struct {
request string
limit string
}{
options := [][]*containerOptions{
{
{request: "0", limit: "0"},
},
@ -85,17 +88,46 @@ func TestPodGuaranteedCPUs(t *testing.T) {
},
}
// tc for not guaranteed Pod
testPod1 := makeMultiContainerPod(CPUs[0], CPUs[0])
testPod2 := makeMultiContainerPod(CPUs[0], CPUs[1])
testPod3 := makeMultiContainerPod(CPUs[1], CPUs[0])
testPod1 := makeMultiContainerPodWithOptions(options[0], options[0])
testPod2 := makeMultiContainerPodWithOptions(options[0], options[1])
testPod3 := makeMultiContainerPodWithOptions(options[1], options[0])
// tc for guaranteed Pod
testPod4 := makeMultiContainerPod(CPUs[1], CPUs[1])
testPod5 := makeMultiContainerPod(CPUs[2], CPUs[2])
testPod4 := makeMultiContainerPodWithOptions(options[1], options[1])
testPod5 := makeMultiContainerPodWithOptions(options[2], options[2])
// tc for comparing init containers and user containers
testPod6 := makeMultiContainerPod(CPUs[1], CPUs[2])
testPod7 := makeMultiContainerPod(CPUs[2], CPUs[1])
testPod6 := makeMultiContainerPodWithOptions(options[1], options[2])
testPod7 := makeMultiContainerPodWithOptions(options[2], options[1])
// tc for multi containers
testPod8 := makeMultiContainerPod(CPUs[3], CPUs[3])
testPod8 := makeMultiContainerPodWithOptions(options[3], options[3])
// tc for restartable init containers
testPod9 := makeMultiContainerPodWithOptions([]*containerOptions{
{request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways},
}, []*containerOptions{
{request: "1", limit: "1"},
})
testPod10 := makeMultiContainerPodWithOptions([]*containerOptions{
{request: "5", limit: "5"},
{request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways},
{request: "2", limit: "2", restartPolicy: v1.ContainerRestartPolicyAlways},
{request: "3", limit: "3", restartPolicy: v1.ContainerRestartPolicyAlways},
}, []*containerOptions{
{request: "1", limit: "1"},
})
testPod11 := makeMultiContainerPodWithOptions([]*containerOptions{
{request: "5", limit: "5"},
{request: "1", limit: "1", restartPolicy: v1.ContainerRestartPolicyAlways},
{request: "2", limit: "2", restartPolicy: v1.ContainerRestartPolicyAlways},
{request: "5", limit: "5"},
{request: "3", limit: "3", restartPolicy: v1.ContainerRestartPolicyAlways},
}, []*containerOptions{
{request: "1", limit: "1"},
})
testPod12 := makeMultiContainerPodWithOptions([]*containerOptions{
{request: "10", limit: "10", restartPolicy: v1.ContainerRestartPolicyAlways},
{request: "200", limit: "200"},
}, []*containerOptions{
{request: "100", limit: "100"},
})
p := staticPolicy{}
@ -144,13 +176,35 @@ func TestPodGuaranteedCPUs(t *testing.T) {
pod: testPod8,
expectedCPU: 6,
},
{
name: "TestCase09: restartable init container + regular container",
pod: testPod9,
expectedCPU: 2,
},
{
name: "TestCase09: multiple restartable init containers",
pod: testPod10,
expectedCPU: 7,
},
{
name: "TestCase11: multiple restartable and regular init containers",
pod: testPod11,
expectedCPU: 8,
},
{
name: "TestCase12: restartable init, regular init and regular container",
pod: testPod12,
expectedCPU: 210,
},
}
for _, tc := range tcases {
requestedCPU := p.podGuaranteedCPUs(tc.pod)
t.Run(tc.name, func(t *testing.T) {
requestedCPU := p.podGuaranteedCPUs(tc.pod)
if requestedCPU != tc.expectedCPU {
t.Errorf("Expected in result to be %v , got %v", tc.expectedCPU, requestedCPU)
}
if requestedCPU != tc.expectedCPU {
t.Errorf("Expected in result to be %v , got %v", tc.expectedCPU, requestedCPU)
}
})
}
}

View File

@ -44,9 +44,10 @@ import (
// Helper for makeCPUManagerPod().
type ctnAttribute struct {
ctnName string
cpuRequest string
cpuLimit string
ctnName string
cpuRequest string
cpuLimit string
restartPolicy *v1.ContainerRestartPolicy
}
// makeCPUMangerPod returns a pod with the provided ctnAttributes.
@ -83,6 +84,63 @@ func makeCPUManagerPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod {
}
}
// makeCPUMangerInitContainersPod returns a pod with init containers with the
// provided ctnAttributes.
func makeCPUManagerInitContainersPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod {
var containers []v1.Container
cpusetCmd := "grep Cpus_allowed_list /proc/self/status | cut -f2"
cpusetAndSleepCmd := "grep Cpus_allowed_list /proc/self/status | cut -f2 && sleep 1d"
for _, ctnAttr := range ctnAttributes {
ctn := v1.Container{
Name: ctnAttr.ctnName,
Image: busyboxImage,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(ctnAttr.cpuRequest),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(ctnAttr.cpuLimit),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
},
Command: []string{"sh", "-c", cpusetCmd},
RestartPolicy: ctnAttr.restartPolicy,
}
if ctnAttr.restartPolicy != nil && *ctnAttr.restartPolicy == v1.ContainerRestartPolicyAlways {
ctn.Command = []string{"sh", "-c", cpusetAndSleepCmd}
}
containers = append(containers, ctn)
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
InitContainers: containers,
Containers: []v1.Container{
{
Name: "regular",
Image: busyboxImage,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
},
Command: []string{"sh", "-c", cpusetAndSleepCmd},
},
},
},
}
}
func deletePodSyncByName(ctx context.Context, f *framework.Framework, podName string) {
gp := int64(0)
delOpts := metav1.DeleteOptions{
@ -645,6 +703,76 @@ func runCPUManagerTests(f *framework.Framework) {
runSMTAlignmentPositiveTests(ctx, f, smtLevel)
})
ginkgo.It("should not reuse CPUs of restartable init containers [NodeAlphaFeature:SidecarContainers]", func(ctx context.Context) {
cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
// Skip rest of the tests if CPU capacity < 3.
if cpuCap < 3 {
e2eskipper.Skipf("Skipping rest of the CPU Manager tests since CPU capacity < 3, got %d", cpuCap)
}
// Enable CPU Manager in the kubelet.
newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(ctx, f, newCfg, true)
ginkgo.By("running a Gu pod with a regular init container and a restartable init container")
ctrAttrs := []ctnAttribute{
{
ctnName: "gu-init-container1",
cpuRequest: "1000m",
cpuLimit: "1000m",
},
{
ctnName: "gu-restartable-init-container2",
cpuRequest: "1000m",
cpuLimit: "1000m",
restartPolicy: &containerRestartPolicyAlways,
},
}
pod := makeCPUManagerInitContainersPod("gu-pod", ctrAttrs)
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
ginkgo.By("checking if the expected cpuset was assigned")
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.InitContainers[0].Name)
framework.ExpectNoError(err, "expected log not found in init container [%s] of pod [%s]", pod.Spec.InitContainers[0].Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
reusableCPUs, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.InitContainers[0].Name, pod.Name)
gomega.Expect(reusableCPUs.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", reusableCPUs.String())
logs, err = e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.InitContainers[1].Name)
framework.ExpectNoError(err, "expected log not found in init container [%s] of pod [%s]", pod.Spec.InitContainers[1].Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
nonReusableCPUs, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.InitContainers[1].Name, pod.Name)
gomega.Expect(nonReusableCPUs.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", nonReusableCPUs.String())
logs, err = e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", pod.Spec.Containers[0].Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", pod.Spec.Containers[0].Name, pod.Name)
gomega.Expect(cpus.Size()).To(gomega.Equal(1), "expected cpu set size == 1, got %q", cpus.String())
gomega.Expect(reusableCPUs.Equals(nonReusableCPUs)).To(gomega.BeTrue(), "expected reusable cpuset [%s] to be equal to non-reusable cpuset [%s]", reusableCPUs.String(), nonReusableCPUs.String())
gomega.Expect(nonReusableCPUs.Intersection(cpus).IsEmpty()).To(gomega.BeTrue(), "expected non-reusable cpuset [%s] to be disjoint from cpuset [%s]", nonReusableCPUs.String(), cpus.String())
ginkgo.By("by deleting the pods and waiting for container removal")
deletePods(ctx, f, []string{pod.Name})
waitForContainerRemoval(ctx, pod.Spec.InitContainers[0].Name, pod.Name, pod.Namespace)
waitForContainerRemoval(ctx, pod.Spec.InitContainers[1].Name, pod.Name, pod.Namespace)
waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
})
ginkgo.AfterEach(func(ctx context.Context) {
updateKubeletConfig(ctx, f, oldCfg, true)
})