mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #98019 from cynepco3hahue/specify_resource_during_container_creation_cpu_manager
cpu manager: specify the container CPU set during the creation
This commit is contained in:
commit
6fae18523d
@ -15,6 +15,9 @@ go_library(
|
||||
"helpers_linux.go",
|
||||
"helpers_unsupported.go",
|
||||
"internal_container_lifecycle.go",
|
||||
"internal_container_lifecycle_linux.go",
|
||||
"internal_container_lifecycle_unsupported.go",
|
||||
"internal_container_lifecycle_windows.go",
|
||||
"node_container_manager_linux.go",
|
||||
"pod_container_manager_linux.go",
|
||||
"pod_container_manager_stub.go",
|
||||
@ -42,6 +45,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
|
||||
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
||||
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
] + select({
|
||||
|
@ -1028,7 +1028,7 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
|
||||
return cm.cpuManager.GetCPUs(podUID, containerName)
|
||||
return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64()
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
|
||||
|
@ -60,10 +60,9 @@ type Manager interface {
|
||||
// e.g. at pod admission time.
|
||||
Allocate(pod *v1.Pod, container *v1.Container) error
|
||||
|
||||
// AddContainer is called between container create and container start
|
||||
// so that initial CPU affinity settings can be written through to the
|
||||
// container runtime before the first process begins to execute.
|
||||
AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
|
||||
// AddContainer adds the mapping between container ID to pod UID and the container name
|
||||
// The mapping used to remove the CPU allocation during the container removal
|
||||
AddContainer(p *v1.Pod, c *v1.Container, containerID string)
|
||||
|
||||
// RemoveContainer is called after Kubelet decides to kill or delete a
|
||||
// container. After this call, the CPU manager stops trying to reconcile
|
||||
@ -80,7 +79,7 @@ type Manager interface {
|
||||
|
||||
// GetCPUs implements the podresources.CPUsProvider interface to provide allocated
|
||||
// cpus for the container
|
||||
GetCPUs(podUID, containerName string) []int64
|
||||
GetCPUs(podUID, containerName string) cpuset.CPUSet
|
||||
|
||||
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
|
||||
// and is consulted to achieve NUMA aware resource alignment per Pod
|
||||
@ -237,29 +236,10 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
|
||||
func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
|
||||
m.Lock()
|
||||
// Get the CPUs assigned to the container during Allocate()
|
||||
// (or fall back to the default CPUSet if none were assigned).
|
||||
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
|
||||
m.Unlock()
|
||||
|
||||
if !cpus.IsEmpty() {
|
||||
err := m.updateContainerCPUSet(containerID, cpus)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
|
||||
m.Lock()
|
||||
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
|
||||
}
|
||||
m.Unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
|
||||
return nil
|
||||
defer m.Unlock()
|
||||
m.containerMap.Add(string(pod.UID), container.Name, containerID)
|
||||
}
|
||||
|
||||
func (m *manager) RemoveContainer(containerID string) error {
|
||||
@ -481,11 +461,6 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *manager) GetCPUs(podUID, containerName string) []int64 {
|
||||
cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName)
|
||||
result := []int64{}
|
||||
for _, cpu := range cpus.ToSliceNoSort() {
|
||||
result = append(result, int64(cpu))
|
||||
}
|
||||
return result
|
||||
func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
|
||||
return m.state.GetCPUSetOrDefault(podUID, containerName)
|
||||
}
|
||||
|
@ -252,14 +252,6 @@ func TestCPUManagerAdd(t *testing.T) {
|
||||
expAllocateErr: fmt.Errorf("fake reg error"),
|
||||
expAddContainerErr: nil,
|
||||
},
|
||||
{
|
||||
description: "cpu manager add - container update error",
|
||||
updateErr: fmt.Errorf("fake update error"),
|
||||
policy: testPolicy,
|
||||
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
|
||||
expAllocateErr: nil,
|
||||
expAddContainerErr: fmt.Errorf("fake update error"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
@ -287,7 +279,8 @@ func TestCPUManagerAdd(t *testing.T) {
|
||||
testCase.description, testCase.expAllocateErr, err)
|
||||
}
|
||||
|
||||
err = mgr.AddContainer(pod, container, "fakeID")
|
||||
mgr.AddContainer(pod, container, "fakeID")
|
||||
_, _, err = mgr.containerMap.GetContainerRef("fakeID")
|
||||
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
|
||||
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
|
||||
testCase.description, testCase.expAddContainerErr, err)
|
||||
@ -520,7 +513,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
|
||||
t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v",
|
||||
testCase.description, containerIDs[i], err)
|
||||
}
|
||||
err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
|
||||
|
||||
mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
|
||||
_, _, err = mgr.containerMap.GetContainerRef(containerIDs[i])
|
||||
if err != nil {
|
||||
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
|
||||
testCase.description, containerIDs[i], err)
|
||||
@ -1018,14 +1013,6 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
|
||||
expAllocateErr: nil,
|
||||
expAddContainerErr: nil,
|
||||
},
|
||||
{
|
||||
description: "cpu manager add - container update error",
|
||||
updateErr: fmt.Errorf("fake update error"),
|
||||
policy: testPolicy,
|
||||
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
|
||||
expAllocateErr: nil,
|
||||
expAddContainerErr: fmt.Errorf("fake update error"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
@ -1053,7 +1040,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
|
||||
testCase.description, testCase.expAllocateErr, err)
|
||||
}
|
||||
|
||||
err = mgr.AddContainer(pod, container, "fakeID")
|
||||
mgr.AddContainer(pod, container, "fakeID")
|
||||
_, _, err = mgr.containerMap.GetContainerRef("fakeID")
|
||||
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
|
||||
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
|
||||
testCase.description, testCase.expAddContainerErr, err)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
@ -45,9 +46,8 @@ func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
|
||||
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
|
||||
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) RemoveContainer(containerID string) error {
|
||||
@ -69,9 +69,9 @@ func (m *fakeManager) State() state.Reader {
|
||||
return m.state
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 {
|
||||
func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
|
||||
klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName)
|
||||
return nil
|
||||
return cpuset.CPUSet{}
|
||||
}
|
||||
|
||||
// NewFakeManager creates empty/fake cpu manager
|
||||
|
@ -19,11 +19,12 @@ package cpuset
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"k8s.io/klog/v2"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// Builder is a mutable builder for CPUSet. Functions that mutate instances
|
||||
@ -198,6 +199,27 @@ func (s CPUSet) ToSliceNoSort() []int {
|
||||
return result
|
||||
}
|
||||
|
||||
// ToSliceInt64 returns an ordered slice of int64 that contains all elements from
|
||||
// this set
|
||||
func (s CPUSet) ToSliceInt64() []int64 {
|
||||
var result []int64
|
||||
for cpu := range s.elems {
|
||||
result = append(result, int64(cpu))
|
||||
}
|
||||
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
|
||||
return result
|
||||
}
|
||||
|
||||
// ToSliceNoSortInt64 returns a slice of int64 that contains all elements from
|
||||
// this set.
|
||||
func (s CPUSet) ToSliceNoSortInt64() []int64 {
|
||||
var result []int64
|
||||
for cpu := range s.elems {
|
||||
result = append(result, int64(cpu))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// String returns a new string representation of the elements in this CPU set
|
||||
// in canonical linux CPU list format.
|
||||
//
|
||||
|
@ -18,6 +18,7 @@ package cm
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {
|
||||
@ -26,6 +27,10 @@ func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {
|
||||
|
||||
type fakeInternalContainerLifecycle struct{}
|
||||
|
||||
func (f *fakeInternalContainerLifecycle) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -18,14 +18,15 @@ package cm
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
)
|
||||
|
||||
type InternalContainerLifecycle interface {
|
||||
PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error
|
||||
PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
|
||||
PreStopContainer(containerID string) error
|
||||
PostStopContainer(containerID string) error
|
||||
@ -39,11 +40,9 @@ type internalContainerLifecycleImpl struct {
|
||||
|
||||
func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
|
||||
if i.cpuManager != nil {
|
||||
err := i.cpuManager.AddContainer(pod, container, containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i.cpuManager.AddContainer(pod, container, containerID)
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
|
||||
err := i.topologyManager.AddContainer(pod, containerID)
|
||||
if err != nil {
|
||||
|
35
pkg/kubelet/cm/internal_container_lifecycle_linux.go
Normal file
35
pkg/kubelet/cm/internal_container_lifecycle_linux.go
Normal file
@ -0,0 +1,35 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cm
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
|
||||
if i.cpuManager != nil {
|
||||
allocatedCPUs := i.cpuManager.GetCPUs(string(pod.UID), container.Name)
|
||||
if !allocatedCPUs.IsEmpty() {
|
||||
containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
28
pkg/kubelet/cm/internal_container_lifecycle_unsupported.go
Normal file
28
pkg/kubelet/cm/internal_container_lifecycle_unsupported.go
Normal file
@ -0,0 +1,28 @@
|
||||
// +build !linux,!windows
|
||||
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cm
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
|
||||
return nil
|
||||
}
|
28
pkg/kubelet/cm/internal_container_lifecycle_windows.go
Normal file
28
pkg/kubelet/cm/internal_container_lifecycle_windows.go
Normal file
@ -0,0 +1,28 @@
|
||||
// +build windows
|
||||
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cm
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
|
||||
return nil
|
||||
}
|
@ -118,6 +118,7 @@ func (ds *dockerService) updateCreateConfig(
|
||||
CPUShares: rOpts.CpuShares,
|
||||
CPUQuota: rOpts.CpuQuota,
|
||||
CPUPeriod: rOpts.CpuPeriod,
|
||||
CpusetCpus: rOpts.CpusetCpus,
|
||||
}
|
||||
createConfig.HostConfig.OomScoreAdj = int(rOpts.OomScoreAdj)
|
||||
}
|
||||
|
@ -56,6 +56,8 @@ import (
|
||||
var (
|
||||
// ErrCreateContainerConfig - failed to create container config
|
||||
ErrCreateContainerConfig = errors.New("CreateContainerConfigError")
|
||||
// ErrPreCreateHook - failed to execute PreCreateHook
|
||||
ErrPreCreateHook = errors.New("PreCreateHookError")
|
||||
// ErrCreateContainer - failed to create container
|
||||
ErrCreateContainer = errors.New("CreateContainerError")
|
||||
// ErrPreStartHook - failed to execute PreStartHook
|
||||
@ -167,6 +169,13 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
|
||||
return s.Message(), ErrCreateContainerConfig
|
||||
}
|
||||
|
||||
err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
|
||||
if err != nil {
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
|
||||
return s.Message(), ErrPreCreateHook
|
||||
}
|
||||
|
||||
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
|
||||
if err != nil {
|
||||
s, _ := grpcstatus.FromError(err)
|
||||
|
Loading…
Reference in New Issue
Block a user