diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 1122bba86f7..44deeaaf887 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -15,6 +15,9 @@ go_library( "helpers_linux.go", "helpers_unsupported.go", "internal_container_lifecycle.go", + "internal_container_lifecycle_linux.go", + "internal_container_lifecycle_unsupported.go", + "internal_container_lifecycle_windows.go", "node_container_manager_linux.go", "pod_container_manager_linux.go", "pod_container_manager_stub.go", @@ -42,6 +45,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis:go_default_library", + "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library", "//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ] + select({ diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index cc0b4266968..8db1638d179 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -1028,7 +1028,7 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr } func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { - return cm.cpuManager.GetCPUs(podUID, containerName) + return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64() } func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 44368efc441..00f8bb02d1c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -60,10 +60,9 @@ type Manager interface { // e.g. at pod admission time. Allocate(pod *v1.Pod, container *v1.Container) error - // AddContainer is called between container create and container start - // so that initial CPU affinity settings can be written through to the - // container runtime before the first process begins to execute. - AddContainer(p *v1.Pod, c *v1.Container, containerID string) error + // AddContainer adds the mapping between container ID to pod UID and the container name + // The mapping used to remove the CPU allocation during the container removal + AddContainer(p *v1.Pod, c *v1.Container, containerID string) // RemoveContainer is called after Kubelet decides to kill or delete a // container. After this call, the CPU manager stops trying to reconcile @@ -80,7 +79,7 @@ type Manager interface { // GetCPUs implements the podresources.CPUsProvider interface to provide allocated // cpus for the container - GetCPUs(podUID, containerName string) []int64 + GetCPUs(podUID, containerName string) cpuset.CPUSet // GetPodTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment per Pod @@ -237,29 +236,10 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { return nil } -func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { +func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { m.Lock() - // Get the CPUs assigned to the container during Allocate() - // (or fall back to the default CPUSet if none were assigned). - cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) - m.Unlock() - - if !cpus.IsEmpty() { - err := m.updateContainerCPUSet(containerID, cpus) - if err != nil { - klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err) - m.Lock() - err := m.policyRemoveContainerByRef(string(p.UID), c.Name) - if err != nil { - klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err) - } - m.Unlock() - } - return err - } - - klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty") - return nil + defer m.Unlock() + m.containerMap.Add(string(pod.UID), container.Name, containerID) } func (m *manager) RemoveContainer(containerID string) error { @@ -481,11 +461,6 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) }) } -func (m *manager) GetCPUs(podUID, containerName string) []int64 { - cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName) - result := []int64{} - for _, cpu := range cpus.ToSliceNoSort() { - result = append(result, int64(cpu)) - } - return result +func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet { + return m.state.GetCPUSetOrDefault(podUID, containerName) } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 34b170be234..7d7a7fb2b60 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -252,14 +252,6 @@ func TestCPUManagerAdd(t *testing.T) { expAllocateErr: fmt.Errorf("fake reg error"), expAddContainerErr: nil, }, - { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4), - expAllocateErr: nil, - expAddContainerErr: fmt.Errorf("fake update error"), - }, } for _, testCase := range testCases { @@ -287,7 +279,8 @@ func TestCPUManagerAdd(t *testing.T) { testCase.description, testCase.expAllocateErr, err) } - err = mgr.AddContainer(pod, container, "fakeID") + mgr.AddContainer(pod, container, "fakeID") + _, _, err = mgr.containerMap.GetContainerRef("fakeID") if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", testCase.description, testCase.expAddContainerErr, err) @@ -520,7 +513,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v", testCase.description, containerIDs[i], err) } - err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + + mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + _, _, err = mgr.containerMap.GetContainerRef(containerIDs[i]) if err != nil { t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", testCase.description, containerIDs[i], err) @@ -1018,14 +1013,6 @@ func TestCPUManagerAddWithResvList(t *testing.T) { expAllocateErr: nil, expAddContainerErr: nil, }, - { - description: "cpu manager add - container update error", - updateErr: fmt.Errorf("fake update error"), - policy: testPolicy, - expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), - expAllocateErr: nil, - expAddContainerErr: fmt.Errorf("fake update error"), - }, } for _, testCase := range testCases { @@ -1053,7 +1040,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) { testCase.description, testCase.expAllocateErr, err) } - err = mgr.AddContainer(pod, container, "fakeID") + mgr.AddContainer(pod, container, "fakeID") + _, _, err = mgr.containerMap.GetContainerRef("fakeID") if !reflect.DeepEqual(err, testCase.expAddContainerErr) { t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", testCase.description, testCase.expAddContainerErr, err) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 437a6af040c..478534b1c1c 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -21,6 +21,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/status" @@ -45,9 +46,8 @@ func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { return nil } -func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error { +func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) - return nil } func (m *fakeManager) RemoveContainer(containerID string) error { @@ -69,9 +69,9 @@ func (m *fakeManager) State() state.Reader { return m.state } -func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 { +func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet { klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName) - return nil + return cpuset.CPUSet{} } // NewFakeManager creates empty/fake cpu manager diff --git a/pkg/kubelet/cm/cpuset/cpuset.go b/pkg/kubelet/cm/cpuset/cpuset.go index ae2062bd8a8..ad7ea27afa4 100644 --- a/pkg/kubelet/cm/cpuset/cpuset.go +++ b/pkg/kubelet/cm/cpuset/cpuset.go @@ -19,11 +19,12 @@ package cpuset import ( "bytes" "fmt" - "k8s.io/klog/v2" "reflect" "sort" "strconv" "strings" + + "k8s.io/klog/v2" ) // Builder is a mutable builder for CPUSet. Functions that mutate instances @@ -198,6 +199,27 @@ func (s CPUSet) ToSliceNoSort() []int { return result } +// ToSliceInt64 returns an ordered slice of int64 that contains all elements from +// this set +func (s CPUSet) ToSliceInt64() []int64 { + var result []int64 + for cpu := range s.elems { + result = append(result, int64(cpu)) + } + sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) + return result +} + +// ToSliceNoSortInt64 returns a slice of int64 that contains all elements from +// this set. +func (s CPUSet) ToSliceNoSortInt64() []int64 { + var result []int64 + for cpu := range s.elems { + result = append(result, int64(cpu)) + } + return result +} + // String returns a new string representation of the elements in this CPU set // in canonical linux CPU list format. // diff --git a/pkg/kubelet/cm/fake_internal_container_lifecycle.go b/pkg/kubelet/cm/fake_internal_container_lifecycle.go index 4709a904020..153f3377d9a 100644 --- a/pkg/kubelet/cm/fake_internal_container_lifecycle.go +++ b/pkg/kubelet/cm/fake_internal_container_lifecycle.go @@ -18,6 +18,7 @@ package cm import ( "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle { @@ -26,6 +27,10 @@ func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle { type fakeInternalContainerLifecycle struct{} +func (f *fakeInternalContainerLifecycle) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} + func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { return nil } diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go index 690718e4e68..0635ea0ed4f 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle.go +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -18,14 +18,15 @@ package cm import ( "k8s.io/api/core/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type InternalContainerLifecycle interface { + PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error PreStopContainer(containerID string) error PostStopContainer(containerID string) error @@ -39,11 +40,9 @@ type internalContainerLifecycleImpl struct { func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { if i.cpuManager != nil { - err := i.cpuManager.AddContainer(pod, container, containerID) - if err != nil { - return err - } + i.cpuManager.AddContainer(pod, container, containerID) } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { err := i.topologyManager.AddContainer(pod, containerID) if err != nil { diff --git a/pkg/kubelet/cm/internal_container_lifecycle_linux.go b/pkg/kubelet/cm/internal_container_lifecycle_linux.go new file mode 100644 index 00000000000..dd0a37f086a --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_linux.go @@ -0,0 +1,35 @@ +// +build linux + +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + if i.cpuManager != nil { + allocatedCPUs := i.cpuManager.GetCPUs(string(pod.UID), container.Name) + if !allocatedCPUs.IsEmpty() { + containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String() + } + } + + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go b/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go new file mode 100644 index 00000000000..676bf9c4c1d --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_unsupported.go @@ -0,0 +1,28 @@ +// +build !linux,!windows + +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows.go b/pkg/kubelet/cm/internal_container_lifecycle_windows.go new file mode 100644 index 00000000000..0384d4d2ef4 --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows.go @@ -0,0 +1,28 @@ +// +build windows + +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + return nil +} diff --git a/pkg/kubelet/dockershim/helpers_linux.go b/pkg/kubelet/dockershim/helpers_linux.go index 9ba71fd5b56..dfc09da575e 100644 --- a/pkg/kubelet/dockershim/helpers_linux.go +++ b/pkg/kubelet/dockershim/helpers_linux.go @@ -118,6 +118,7 @@ func (ds *dockerService) updateCreateConfig( CPUShares: rOpts.CpuShares, CPUQuota: rOpts.CpuQuota, CPUPeriod: rOpts.CpuPeriod, + CpusetCpus: rOpts.CpusetCpus, } createConfig.HostConfig.OomScoreAdj = int(rOpts.OomScoreAdj) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index fd551c3f2e2..cd615c962a4 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -56,6 +56,8 @@ import ( var ( // ErrCreateContainerConfig - failed to create container config ErrCreateContainerConfig = errors.New("CreateContainerConfigError") + // ErrPreCreateHook - failed to execute PreCreateHook + ErrPreCreateHook = errors.New("PreCreateHookError") // ErrCreateContainer - failed to create container ErrCreateContainer = errors.New("CreateContainerError") // ErrPreStartHook - failed to execute PreStartHook @@ -167,6 +169,13 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb return s.Message(), ErrCreateContainerConfig } + err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig) + if err != nil { + s, _ := grpcstatus.FromError(err) + m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message()) + return s.Message(), ErrPreCreateHook + } + containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig) if err != nil { s, _ := grpcstatus.FromError(err)