From 69db36b9583a49d7c32ac8d95998636374dd0a23 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Mon, 18 Jan 2021 19:59:23 +0200 Subject: [PATCH 1/3] Provide additional methods under the CPUSet - ToSliceInt64 returns sorted slice of cores IDs in int64 format - ToSliceNoSortInt64 returns slice of cores IDs in int64 format Signed-off-by: Artyom Lukianov --- pkg/kubelet/cm/cpuset/cpuset.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/cpuset/cpuset.go b/pkg/kubelet/cm/cpuset/cpuset.go index ae2062bd8a8..ad7ea27afa4 100644 --- a/pkg/kubelet/cm/cpuset/cpuset.go +++ b/pkg/kubelet/cm/cpuset/cpuset.go @@ -19,11 +19,12 @@ package cpuset import ( "bytes" "fmt" - "k8s.io/klog/v2" "reflect" "sort" "strconv" "strings" + + "k8s.io/klog/v2" ) // Builder is a mutable builder for CPUSet. Functions that mutate instances @@ -198,6 +199,27 @@ func (s CPUSet) ToSliceNoSort() []int { return result } +// ToSliceInt64 returns an ordered slice of int64 that contains all elements from +// this set +func (s CPUSet) ToSliceInt64() []int64 { + var result []int64 + for cpu := range s.elems { + result = append(result, int64(cpu)) + } + sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) + return result +} + +// ToSliceNoSortInt64 returns a slice of int64 that contains all elements from +// this set. +func (s CPUSet) ToSliceNoSortInt64() []int64 { + var result []int64 + for cpu := range s.elems { + result = append(result, int64(cpu)) + } + return result +} + // String returns a new string representation of the elements in this CPU set // in canonical linux CPU list format. // From 60678a24caa8d339d8955307dc28cfcab5cc2a44 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Mon, 18 Jan 2021 19:59:35 +0200 Subject: [PATCH 2/3] Update CPU manager GetCPUs method to return pointer to CPUSet --- pkg/kubelet/cm/container_manager_linux.go | 2 +- pkg/kubelet/cm/cpumanager/cpu_manager.go | 11 +++-------- pkg/kubelet/cm/cpumanager/fake_cpu_manager.go | 5 +++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index cc0b4266968..8db1638d179 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -1028,7 +1028,7 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr } func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { - return cm.cpuManager.GetCPUs(podUID, containerName) + return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64() } func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 44368efc441..b1732fe765e 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -80,7 +80,7 @@ type Manager interface { // GetCPUs implements the podresources.CPUsProvider interface to provide allocated // cpus for the container - GetCPUs(podUID, containerName string) []int64 + GetCPUs(podUID, containerName string) cpuset.CPUSet // GetPodTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment per Pod @@ -481,11 +481,6 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) }) } -func (m *manager) GetCPUs(podUID, containerName string) []int64 { - cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName) - result := []int64{} - for _, cpu := range cpus.ToSliceNoSort() { - result = append(result, int64(cpu)) - } - return result +func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet { + return m.state.GetCPUSetOrDefault(podUID, containerName) } diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 437a6af040c..5362fbebb20 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -21,6 +21,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/status" @@ -69,9 +70,9 @@ func (m *fakeManager) State() state.Reader { return m.state } -func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 { +func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet { klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName) - return nil + return cpuset.CPUSet{} } // NewFakeManager creates empty/fake cpu manager From 38dc7509f862f081828e7d9167107b8c6e98ea23 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Wed, 13 Jan 2021 15:25:19 +0200 Subject: [PATCH 3/3] cpu manager: specify the container CPU set during the creation We can set the container cpuset.cpus diring the creation and it will not need to call to update resources after the container creation. Additional side effect of the change, that the runc process that responsible to create the container will run with the same CPU affinity because the runc runs on the cpuset provided in the config.json arg. It will allow to prevent undesirable interupts on isolated CPUs. Signed-off-by: Artyom Lukianov --- pkg/kubelet/cm/BUILD | 4 +++ pkg/kubelet/cm/cpumanager/cpu_manager.go | 32 ++++------------- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 26 ++++---------- pkg/kubelet/cm/cpumanager/fake_cpu_manager.go | 3 +- .../cm/fake_internal_container_lifecycle.go | 5 +++ .../cm/internal_container_lifecycle.go | 9 +++-- .../cm/internal_container_lifecycle_linux.go | 35 +++++++++++++++++++ ...nternal_container_lifecycle_unsupported.go | 28 +++++++++++++++ .../internal_container_lifecycle_windows.go | 28 +++++++++++++++ pkg/kubelet/dockershim/helpers_linux.go | 1 + .../kuberuntime/kuberuntime_container.go | 9 +++++ 11 files changed, 128 insertions(+), 52 deletions(-) create mode 100644 pkg/kubelet/cm/internal_container_lifecycle_linux.go create mode 100644 pkg/kubelet/cm/internal_container_lifecycle_unsupported.go create mode 100644 pkg/kubelet/cm/internal_container_lifecycle_windows.go diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 1122bba86f7..44deeaaf887 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -15,6 +15,9 @@ go_library( "helpers_linux.go", "helpers_unsupported.go", "internal_container_lifecycle.go", + "internal_container_lifecycle_linux.go", + "internal_container_lifecycle_unsupported.go", + "internal_container_lifecycle_windows.go", "node_container_manager_linux.go", "pod_container_manager_linux.go", "pod_container_manager_stub.go", @@ -42,6 +45,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis:go_default_library", + "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library", "//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ] + select({ diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b1732fe765e..00f8bb02d1c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -60,10 +60,9 @@ type Manager interface { // e.g. at pod admission time. Allocate(pod *v1.Pod, container *v1.Container) error - // AddContainer is called between container create and container start - // so that initial CPU affinity settings can be written through to the - // container runtime before the first process begins to execute. - AddContainer(p *v1.Pod, c *v1.Container, containerID string) error + // AddContainer adds the mapping between container ID to pod UID and the container name + // The mapping used to remove the CPU allocation during the container removal + AddContainer(p *v1.Pod, c *v1.Container, containerID string) // RemoveContainer is called after Kubelet decides to kill or delete a // container. After this call, the CPU manager stops trying to reconcile @@ -237,29 +236,10 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { return nil } -func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { +func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { m.Lock() - // Get the CPUs assigned to the container during Allocate() - // (or fall back to the default CPUSet if none were assigned). - cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) - m.Unlock() - - if !cpus.IsEmpty() { - err := m.updateContainerCPUSet(containerID, cpus) - if err != nil { - klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err) - m.Lock() - err := m.policyRemoveContainerByRef(string(p.UID), c.Name) - if err != nil { - klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err) - } - m.Unlock() - } - return err - } - - klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty") - return nil + defer m.Unlock() + m.containerMap.Add(string(pod.UID), container.Name, containerID) } func (m *manager) RemoveContainer(containerID string) error { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 34b170be234..7d7a7fb2b60 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -252,14 +252,6 @@ func TestCPUManagerAdd(t *testing.T) { expAllocateErr: fmt.Errorf("fake reg error"), expAddContainerErr: nil, }, - { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), - expAllocateErr: nil, - expAddContainerErr: fmt.Errorf("fake update error"), - }, } for _, testCase := range testCases { @@ -287,7 +279,8 @@ func TestCPUManagerAdd(t *testing.T) { testCase.description, testCase.expAllocateErr, err) } - err = mgr.AddContainer(pod, container, "fakeID") + mgr.AddContainer(pod, container, "fakeID") + _, _, err = mgr.containerMap.GetContainerRef("fakeID") if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", testCase.description, testCase.expAddContainerErr, err) @@ -520,7 +513,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v", testCase.description, containerIDs[i], err) } - err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + + mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + _, _, err = mgr.containerMap.GetContainerRef(containerIDs[i]) if err != nil { t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", testCase.description, containerIDs[i], err) @@ -1018,14 +1013,6 @@ func TestCPUManagerAddWithResvList(t *testing.T) { expAllocateErr: nil, expAddContainerErr: nil, }, - { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), - expAllocateErr: nil, - expAddContainerErr: fmt.Errorf("fake update error"), - }, } for _, testCase := range testCases { @@ -1053,7 +1040,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) { testCase.description, testCase.expAllocateErr, err) } - err = mgr.AddContainer(pod, container, "fakeID") + mgr.AddContainer(pod, container, "fakeID") + _, _, err = mgr.containerMap.GetContainerRef("fakeID") if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", testCase.description, testCase.expAddContainerErr, err) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 5362fbebb20..478534b1c1c 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -46,9 +46,8 @@ func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { return nil } -func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error { +func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) - return nil } func (m *fakeManager) RemoveContainer(containerID string) error { diff --git a/pkg/kubelet/cm/fake_internal_container_lifecycle.go b/pkg/kubelet/cm/fake_internal_container_lifecycle.go index 4709a904020..153f3377d9a 100644 --- a/pkg/kubelet/cm/fake_internal_container_lifecycle.go +++ b/pkg/kubelet/cm/fake_internal_container_lifecycle.go @@ -18,6 +18,7 @@ package cm import ( "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle { @@ -26,6 +27,10 @@ func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle { type fakeInternalContainerLifecycle struct{} +func (f *fakeInternalContainerLifecycle) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} + func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { return nil } diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go index 690718e4e68..0635ea0ed4f 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle.go +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -18,14 +18,15 @@ package cm import ( "k8s.io/api/core/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type InternalContainerLifecycle interface { + PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error PreStopContainer(containerID string) error PostStopContainer(containerID string) error @@ -39,11 +40,9 @@ type internalContainerLifecycleImpl struct { func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { if i.cpuManager != nil { - err := i.cpuManager.AddContainer(pod, container, containerID) - if err != nil { - return err - } + i.cpuManager.AddContainer(pod, container, containerID) } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { err := i.topologyManager.AddContainer(pod, containerID) if err != nil { diff --git a/pkg/kubelet/cm/internal_container_lifecycle_linux.go b/pkg/kubelet/cm/internal_container_lifecycle_linux.go new file mode 100644 index 00000000000..dd0a37f086a --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_linux.go @@ -0,0 +1,35 @@ +// +build linux + +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + if i.cpuManager != nil { + allocatedCPUs := i.cpuManager.GetCPUs(string(pod.UID), container.Name) + if !allocatedCPUs.IsEmpty() { + containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String() + } + } + + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go b/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go new file mode 100644 index 00000000000..676bf9c4c1d --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go @@ -0,0 +1,28 @@ +// +build !linux,!windows + +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows.go b/pkg/kubelet/cm/internal_container_lifecycle_windows.go new file mode 100644 index 00000000000..0384d4d2ef4 --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows.go @@ -0,0 +1,28 @@ +// +build windows + +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} diff --git a/pkg/kubelet/dockershim/helpers_linux.go b/pkg/kubelet/dockershim/helpers_linux.go index 9ba71fd5b56..dfc09da575e 100644 --- a/pkg/kubelet/dockershim/helpers_linux.go +++ b/pkg/kubelet/dockershim/helpers_linux.go @@ -118,6 +118,7 @@ func (ds *dockerService) updateCreateConfig( CPUShares: rOpts.CpuShares, CPUQuota: rOpts.CpuQuota, CPUPeriod: rOpts.CpuPeriod, + CpusetCpus: rOpts.CpusetCpus, } createConfig.HostConfig.OomScoreAdj = int(rOpts.OomScoreAdj) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index fd551c3f2e2..cd615c962a4 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -56,6 +56,8 @@ import ( var ( // ErrCreateContainerConfig - failed to create container config ErrCreateContainerConfig = errors.New("CreateContainerConfigError") + // ErrPreCreateHook - failed to execute PreCreateHook + ErrPreCreateHook = errors.New("PreCreateHookError") // ErrCreateContainer - failed to create container ErrCreateContainer = errors.New("CreateContainerError") // ErrPreStartHook - failed to execute PreStartHook @@ -167,6 +169,13 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb return s.Message(), ErrCreateContainerConfig } + err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig) + if err != nil { + s, _ := grpcstatus.FromError(err) + m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message()) + return s.Message(), ErrPreCreateHook + } + containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig) if err != nil { s, _ := grpcstatus.FromError(err)