Merge pull request #113018 from fromanirh/cpumanager-ga-features

node: kubelet: cpumgr: CPU Manager to GA
This commit is contained in:
Kubernetes Prow Robot 2022-11-02 14:41:01 -07:00 committed by GitHub
commit 433787d25b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 45 additions and 88 deletions

View File

@ -705,13 +705,11 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
kubeDeps.ContainerManager, err = cm.NewContainerManager(
@ -737,9 +735,9 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
CPUManagerPolicy: s.CPUManagerPolicy,
CPUManagerPolicyOptions: cpuManagerPolicyOptions,
CPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,

View File

@ -59,9 +59,10 @@ const (
// Enable nodes to change CPUCFSQuotaPeriod
CPUCFSQuotaPeriod featuregate.Feature = "CustomCPUCFSQuotaPeriod"
// owner: @ConnorDoyle
// owner: @ConnorDoyle, @fromanirh (only for GA graduation)
// alpha: v1.8
// beta: v1.10
// GA: v1.26
//
// Alternative container-level CPU affinity policies.
CPUManager featuregate.Feature = "CPUManager"
@ -833,7 +834,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
CPUCFSQuotaPeriod: {Default: false, PreRelease: featuregate.Alpha},
CPUManager: {Default: true, PreRelease: featuregate.Beta},
CPUManager: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.26
CPUManagerPolicyAlphaOptions: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -134,10 +134,10 @@ type NodeConfig struct {
ProtectKernelDefaults bool
NodeAllocatableConfig
QOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
ExperimentalCPUManagerPolicyOptions map[string]string
CPUManagerPolicy string
CPUManagerPolicyOptions map[string]string
ExperimentalTopologyManagerScope string
ExperimentalCPUManagerReconcilePeriod time.Duration
CPUManagerReconcilePeriod time.Duration
ExperimentalMemoryManagerPolicy string
ExperimentalMemoryManagerReservedMemory []kubeletconfig.MemoryReservation
ExperimentalPodPidsLimit int64

View File

@ -306,23 +306,21 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager.AddHintProvider(cm.deviceManager)
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerPolicyOptions,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
cm.memoryManager, err = memorymanager.NewManager(
@ -555,12 +553,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
localStorageCapacityIsolation bool) error {
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
containerMap := buildContainerMapFromRuntime(runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
containerMap := buildContainerMapFromRuntime(runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
// Initialize memory manager

View File

@ -27,8 +27,10 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
// LoopControl controls the behavior of the cpu accumulator loop logic
type LoopControl int
// Possible loop control outcomes
const (
Continue LoopControl = iota
Break

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
// Names of the options, as part of the user interface.
const (
FullPCPUsOnlyOption string = "full-pcpus-only"
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
@ -44,6 +45,8 @@ var (
stableOptions = sets.NewString()
)
// CheckPolicyOptionAvailable verifies if the given option can be used depending on the Feature Gate Settings.
// returns nil on success, or an error describing the failure on error.
func CheckPolicyOptionAvailable(option string) error {
if !alphaOptions.Has(option) && !betaOptions.Has(option) && !stableOptions.Has(option) {
return fmt.Errorf("unknown CPU Manager Policy option: %q", option)
@ -60,6 +63,7 @@ func CheckPolicyOptionAvailable(option string) error {
return nil
}
// StaticPolicyOptions holds the parsed value of the policy options, ready to be consumed internally.
type StaticPolicyOptions struct {
// flag to enable extra allocation restrictions to avoid
// different containers to possibly end up on the same core.
@ -78,6 +82,7 @@ type StaticPolicyOptions struct {
AlignBySocket bool
}
// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) {
opts := StaticPolicyOptions{}
for name, value := range policyOptions {
@ -113,6 +118,7 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
return opts, nil
}
// ValidateStaticPolicyOptions ensures that the requested policy options are compatible with the machine on which the CPUManager is running.
func ValidateStaticPolicyOptions(opts StaticPolicyOptions, topology *topology.CPUTopology, topologyManager topologymanager.Store) error {
if opts.AlignBySocket {
// Not compatible with topology manager single-numa-node policy option.

View File

@ -51,6 +51,7 @@ func (e SMTAlignmentError) Error() string {
return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore)
}
// Type returns human-readable type of this error. Used in the admission control to populate Admission Failure reason.
func (e SMTAlignmentError) Type() string {
return ErrorSMTAlignment
}

View File

@ -264,7 +264,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) {
testPod2 := makePod("fakePod", "fakeContainer", "41", "41")
testContainer2 := &testPod1.Spec.Containers[0]
cpu_set_across_socket, _ := cpuset.Parse("0-28,40-57")
cpuSetAcrossSocket, _ := cpuset.Parse("0-28,40-57")
m0001, _ := bitmask.NewBitMask(0)
m0011, _ := bitmask.NewBitMask(0, 1)
@ -376,7 +376,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) {
description: "AlignBySocket:true Preferred hints are spread across socket since 2 sockets are required",
pod: *testPod2,
container: *testContainer2,
defaultCPUSet: cpu_set_across_socket,
defaultCPUSet: cpuSetAcrossSocket,
topology: topoDualSocketMultiNumaPerSocketHT,
policyOptions: map[string]string{AlignBySocketOption: "true"},
expectedHints: []topologymanager.TopologyHint{

View File

@ -578,6 +578,8 @@ func Register(collectors ...metrics.StableCollector) {
legacyregistry.MustRegister(StartedHostProcessContainersErrorsTotal)
legacyregistry.MustRegister(RunPodSandboxDuration)
legacyregistry.MustRegister(RunPodSandboxErrors)
legacyregistry.MustRegister(CPUManagerPinningRequestsTotal)
legacyregistry.MustRegister(CPUManagerPinningErrorsTotal)
for _, collector := range collectors {
legacyregistry.CustomMustRegister(collector)
@ -592,11 +594,6 @@ func Register(collectors ...metrics.StableCollector) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
legacyregistry.MustRegister(CPUManagerPinningRequestsTotal)
legacyregistry.MustRegister(CPUManagerPinningErrorsTotal)
}
})
}

View File

@ -78,7 +78,6 @@ var _ = SIGDescribe("CPU Manager Metrics [Serial][Feature:CPUManager]", func() {
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.NewCPUSet(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,

View File

@ -31,7 +31,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cpumanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/types"
admissionapi "k8s.io/pod-security-admission/api"
@ -130,18 +129,6 @@ func waitForContainerRemoval(containerName, podName, podNS string) {
}, 2*time.Minute, 1*time.Second).Should(gomega.BeTrue())
}
func waitForStateFileCleanedUp() {
gomega.Eventually(func() bool {
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static", nil)
framework.ExpectNoError(err, "failed to create testing cpumanager state instance")
assignments := restoredState.GetCPUAssignments()
if len(assignments) == 0 {
return true
}
return false
}, 2*time.Minute, 1*time.Second).Should(gomega.BeTrue())
}
func isHTEnabled() bool {
outData, err := exec.Command("/bin/sh", "-c", "lscpu | grep \"Thread(s) per core:\" | cut -d \":\" -f 2").Output()
framework.ExpectNoError(err)
@ -187,7 +174,6 @@ func getCoreSiblingList(cpuRes int64) string {
type cpuManagerKubeletArguments struct {
policyName string
enableCPUManager bool
enableCPUManagerOptions bool
reservedSystemCPUs cpuset.CPUSet
options map[string]string
@ -199,8 +185,6 @@ func configureCPUManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, ku
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["CPUManager"] = kubeletArguments.enableCPUManager
newCfg.FeatureGates["CPUManagerPolicyOptions"] = kubeletArguments.enableCPUManagerOptions
newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = kubeletArguments.enableCPUManagerOptions
newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = kubeletArguments.enableCPUManagerOptions
@ -550,7 +534,6 @@ func runCPUManagerTests(f *framework.Framework) {
// Enable CPU Manager in the kubelet.
newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, true)
@ -611,30 +594,6 @@ func runCPUManagerTests(f *framework.Framework) {
err = e2epod.NewPodClient(f).MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]",
pod.Spec.Containers[0].Name, pod.Name)
ginkgo.By("disable cpu manager in kubelet")
newCfg = configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: false,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, false)
ginkgo.By("by deleting the pod and waiting for container removal")
deletePods(f, []string{pod.Name})
waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
ginkgo.By("enable cpu manager in kubelet without delete state file")
newCfg = configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.CPUSet{},
})
updateKubeletConfig(f, newCfg, false)
ginkgo.By("wait for the deleted pod to be cleaned up from the state file")
waitForStateFileCleanedUp()
ginkgo.By("the deleted pod has already been deleted from the state file")
})
ginkgo.It("should assign CPUs as expected with enhanced policy based on strict SMT alignment", func() {
@ -662,7 +621,6 @@ func runCPUManagerTests(f *framework.Framework) {
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
enableCPUManager: true,
reservedSystemCPUs: cpuset.NewCPUSet(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,

View File

@ -201,7 +201,6 @@ func configureTopologyManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguratio
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["CPUManager"] = true
newCfg.FeatureGates["TopologyManager"] = true
// Set the Topology Manager policy