Merge pull request #78762 from klueska/upstream-inherit-cpus-from-init-containers

Proactively remove init Containers in CPUManager static policy
This commit is contained in:
Kubernetes Prow Robot 2019-07-30 03:35:18 -07:00 committed by GitHub
commit 320bc21dbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 457 additions and 4 deletions

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"container_map.go",
"cpu_assignment.go",
"cpu_manager.go",
"fake_cpu_manager.go",
@ -30,6 +31,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"container_map_test.go",
"cpu_assignment_test.go",
"cpu_manager_test.go",
"policy_none_test.go",

View File

@ -0,0 +1,68 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"k8s.io/api/core/v1"
)
// containerMap maps (podUID, containerName) -> containerID
type containerMap map[string]map[string]string
func newContainerMap() containerMap {
return make(containerMap)
}
func (cm containerMap) Add(p *v1.Pod, c *v1.Container, containerID string) {
podUID := string(p.UID)
if _, exists := cm[podUID]; !exists {
cm[podUID] = make(map[string]string)
}
cm[podUID][c.Name] = containerID
}
func (cm containerMap) Remove(containerID string) {
found := false
for podUID := range cm {
for containerName := range cm[podUID] {
if containerID == cm[podUID][containerName] {
delete(cm[podUID], containerName)
found = true
break
}
}
if len(cm[podUID]) == 0 {
delete(cm, podUID)
}
if found {
break
}
}
}
func (cm containerMap) Get(p *v1.Pod, c *v1.Container) (string, error) {
podUID := string(p.UID)
if _, exists := cm[podUID]; !exists {
return "", fmt.Errorf("pod %s not in containerMap", podUID)
}
if _, exists := cm[podUID][c.Name]; !exists {
return "", fmt.Errorf("container %s not in containerMap for pod %s", c.Name, podUID)
}
return cm[podUID][c.Name], nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"testing"
"k8s.io/api/core/v1"
apimachinery "k8s.io/apimachinery/pkg/types"
)
func TestContainerMap(t *testing.T) {
testCases := []struct {
podUID string
containerNames []string
containerIDs []string
}{
{
"fakePodUID",
[]string{"fakeContainerName-1", "fakeContainerName-2"},
[]string{"fakeContainerID-1", "fakeContainerName-2"},
},
}
for _, tc := range testCases {
pod := v1.Pod{}
pod.UID = apimachinery.UID(tc.podUID)
// Build a new containerMap from the testCases, checking proper
// addition, retrieval along the way.
cm := newContainerMap()
for i := range tc.containerNames {
container := v1.Container{Name: tc.containerNames[i]}
cm.Add(&pod, &container, tc.containerIDs[i])
containerID, err := cm.Get(&pod, &container)
if err != nil {
t.Errorf("error adding and retrieving container: %v", err)
}
if containerID != tc.containerIDs[i] {
t.Errorf("mismatched containerIDs %v, %v", containerID, tc.containerIDs[i])
}
}
// Remove all entries from the containerMap, checking proper removal of
// each along the way.
for i := range tc.containerNames {
container := v1.Container{Name: tc.containerNames[i]}
cm.Remove(tc.containerIDs[i])
containerID, err := cm.Get(&pod, &container)
if err == nil {
t.Errorf("unexpected retrieval of containerID after removal: %v", containerID)
}
}
// Verify containerMap now empty.
if len(cm) != 0 {
t.Errorf("unexpected entries still in containerMap: %v", cm)
}
}
}

View File

@ -140,6 +140,49 @@ func makePod(cpuRequest, cpuLimit string) *v1.Pod {
}
}
func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod {
pod := &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{},
Containers: []v1.Container{},
},
}
for i, cpu := range initCPUs {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: "initContainer-" + string(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
},
})
}
for i, cpu := range appCPUs {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: "appContainer-" + string(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.limit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
},
})
}
return pod
}
func TestCPUManagerAdd(t *testing.T) {
testPolicy := NewStaticPolicy(
&topology.CPUTopology{

View File

@ -73,6 +73,10 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
// containerMap provides a mapping from
// (pod, container) -> containerID
// for all containers a pod
containerMap containerMap
}
// Ensure staticPolicy implements Policy interface
@ -97,8 +101,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
return &staticPolicy{
topology: topology,
reserved: reserved,
topology: topology,
reserved: reserved,
containerMap: newContainerMap(),
}
}
@ -172,7 +177,15 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) (rerr error) {
// So long as this function does not return an error,
// add (pod, container, containerID) to the containerMap.
defer func() {
if rerr == nil {
p.containerMap.Add(pod, container, containerID)
}
}()
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
// container belongs in an exclusively allocated pool
@ -182,6 +195,22 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
return nil
}
// Proactively remove CPUs from init containers that have already run.
// They are guaranteed to have run to completion before any other
// container is run.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name != initContainer.Name {
initContainerID, err := p.containerMap.Get(pod, &initContainer)
if err != nil {
continue
}
err = p.RemoveContainer(s, initContainerID)
if err != nil {
klog.Warningf("[cpumanager] unable to remove init container (container id: %s, error: %v)", initContainerID, err)
}
}
}
cpuset, err := p.allocateCPUs(s, numCPUs)
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
@ -193,7 +222,15 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
return nil
}
func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error {
func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr error) {
// So long as this function does not return an error,
// remove containerID from the containerMap.
defer func() {
if rerr == nil {
p.containerMap.Remove(containerID)
}
}()
klog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID)
if toRelease, ok := s.GetCPUSet(containerID); ok {
s.Delete(containerID)

View File

@ -41,6 +41,22 @@ type staticPolicyTest struct {
expPanic bool
}
type staticPolicyMultiContainerTest struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
initContainerIDs []string
containerIDs []string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
expCPUAlloc bool
expInitCSets []cpuset.CPUSet
expCSets []cpuset.CPUSet
expPanic bool
}
func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1)
@ -445,6 +461,217 @@ func TestStaticPolicyAdd(t *testing.T) {
}
}
func TestStaticPolicyAddWithInitContainers(t *testing.T) {
testCases := []staticPolicyMultiContainerTest{
{
description: "No Guaranteed Init CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"100m", "100m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet()},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Equal Number of Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"4000m", "4000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "More Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"6000m", "6000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Less Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"2000m", "2000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Equal CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container Less CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"4000m", "4000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container More CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Increasing CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"6000m", "6000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
},
{
description: "Multi Init, Multi App Container Split CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID-1", "appFakeID-2"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(1, 5)},
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
containers := append(
testCase.pod.Spec.InitContainers,
testCase.pod.Spec.Containers...)
containerIDs := append(
testCase.initContainerIDs,
testCase.containerIDs...)
expCSets := append(
testCase.expInitCSets,
testCase.expCSets...)
for i := range containers {
err := policy.AddContainer(st, testCase.pod, &containers[i], containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
cset, found := st.assignments[containerIDs[i]]
if !expCSets[i].IsEmpty() && !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, containerIDs[i], st.assignments)
}
if found && !cset.Equals(expCSets[i]) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containerIDs[i], cset)
}
}
}
}
func TestStaticPolicyRemove(t *testing.T) {
testCases := []staticPolicyTest{
{