diff --git a/pkg/kubelet/cm/cgroup_manager_linux.go b/pkg/kubelet/cm/cgroup_manager_linux.go index 7e96dbe25ff..9f8fe3f40f1 100644 --- a/pkg/kubelet/cm/cgroup_manager_linux.go +++ b/pkg/kubelet/cm/cgroup_manager_linux.go @@ -298,6 +298,9 @@ func (m *cgroupCommon) toResources(resourceConfig *ResourceConfig) *libcontainer if resourceConfig.PidsLimit != nil { resources.PidsLimit = *resourceConfig.PidsLimit } + if !resourceConfig.CPUSet.IsEmpty() { + resources.CpusetCpus = resourceConfig.CPUSet.String() + } m.maybeSetHugetlb(resourceConfig, resources) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 9f9cd48eb5f..b17af7ff8fa 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -93,6 +93,10 @@ type Manager interface { // GetCPUAffinity returns cpuset which includes cpus from shared pools // as well as exclusively allocated cpus GetCPUAffinity(podUID, containerName string) cpuset.CPUSet + + // GetAllCPUs returns all the CPUs known by cpumanager, as reported by the + // hardware discovery. Maps to the CPU capacity. + GetAllCPUs() cpuset.CPUSet } type manager struct { @@ -136,7 +140,11 @@ type manager struct { // stateFileDirectory holds the directory where the state file for checkpoints is held. stateFileDirectory string - // allocatableCPUs is the set of online CPUs as reported by the system + // allCPUs is the set of online CPUs as reported by the system + allCPUs cpuset.CPUSet + + // allocatableCPUs is the set of online CPUs as reported by the system, + // and available for allocation, minus the reserved set allocatableCPUs cpuset.CPUSet // pendingAdmissionPod contain the pod during the admission phase @@ -156,6 +164,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc var policy Policy var err error + topo, err = topology.Discover(machineInfo) + if err != nil { + return nil, err + } + switch policyName(cpuPolicyName) { case PolicyNone: @@ -165,10 +178,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc } case PolicyStatic: - topo, err = topology.Discover(machineInfo) - if err != nil { - return nil, err - } klog.InfoS("Detected CPU topology", "topology", topo) reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU] @@ -205,6 +214,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc topology: topo, nodeAllocatableReservation: nodeAllocatableReservation, stateFileDirectory: stateFileDirectory, + allCPUs: topo.CPUDetails.CPUs(), } manager.sourcesReady = &sourcesReadyStub{} return manager, nil @@ -339,6 +349,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet { return m.allocatableCPUs.Clone() } +func (m *manager) GetAllCPUs() cpuset.CPUSet { + return m.allCPUs.Clone() +} + type reconciledContainer struct { podName string containerName string diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index f6563bbca23..0630032c511 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -693,15 +693,8 @@ func TestCPUManagerGenerate(t *testing.T) { if rawMgr.policy.Name() != testCase.expectedPolicy { t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy) } - if rawMgr.policy.Name() == string(PolicyNone) { - if rawMgr.topology != nil { - t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology) - } - } - if rawMgr.policy.Name() != string(PolicyNone) { - if rawMgr.topology == nil { - t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology) - } + if rawMgr.topology == nil { + t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology) } } }) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 4a03f3dd23f..8f00ec3784b 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet return cpuset.CPUSet{} } +func (m *fakeManager) GetAllCPUs() cpuset.CPUSet { + klog.InfoS("GetAllCPUs") + return cpuset.CPUSet{} +} + // NewFakeManager creates empty/fake cpu manager func NewFakeManager() Manager { return &fakeManager{ diff --git a/pkg/kubelet/cm/node_container_manager_linux.go b/pkg/kubelet/cm/node_container_manager_linux.go index b2ad28290b8..7fa180cefdd 100644 --- a/pkg/kubelet/cm/node_container_manager_linux.go +++ b/pkg/kubelet/cm/node_container_manager_linux.go @@ -53,7 +53,7 @@ func (cm *containerManagerImpl) createNodeAllocatableCgroups() error { cgroupConfig := &CgroupConfig{ Name: cm.cgroupRoot, // The default limits for cpu shares can be very low which can lead to CPU starvation for pods. - ResourceParameters: getCgroupConfig(nodeAllocatable, false), + ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false), } if cm.cgroupManager.Exists(cgroupConfig.Name) { return nil @@ -81,7 +81,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { cgroupConfig := &CgroupConfig{ Name: cm.cgroupRoot, - ResourceParameters: getCgroupConfig(nodeAllocatable, false), + ResourceParameters: cm.getCgroupConfig(nodeAllocatable, false), } // Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail. @@ -110,7 +110,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { // Now apply kube reserved and system reserved limits if required. if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) { klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, false); err != nil { + if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, false); err != nil { message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return errors.New(message) @@ -119,7 +119,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { } if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) { klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, false); err != nil { + if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, false); err != nil { message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return errors.New(message) @@ -129,7 +129,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedCompressibleEnforcementKey) { klog.V(2).InfoS("Enforcing system reserved compressible on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved, true); err != nil { + if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved, true); err != nil { message := fmt.Sprintf("Failed to enforce System Reserved Compressible Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return errors.New(message) @@ -139,7 +139,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedCompressibleEnforcementKey) { klog.V(2).InfoS("Enforcing kube reserved compressible on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved) - if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved, true); err != nil { + if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved, true); err != nil { message := fmt.Sprintf("Failed to enforce Kube Reserved Compressible Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err) cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message) return errors.New(message) @@ -150,9 +150,9 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error { } // enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface. -func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList, compressibleResources bool) error { - rp := getCgroupConfig(rl, compressibleResources) - +func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.ResourceList, compressibleResources bool) error { + cName := cm.cgroupManager.CgroupName(cNameStr) + rp := cm.getCgroupConfig(rl, compressibleResources) if rp == nil { return fmt.Errorf("%q cgroup is not configured properly", cName) } @@ -173,17 +173,40 @@ func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1. ResourceParameters: rp, } klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit) - if err := cgroupManager.Validate(cgroupConfig.Name); err != nil { + if err := cm.cgroupManager.Validate(cgroupConfig.Name); err != nil { return err } - if err := cgroupManager.Update(cgroupConfig); err != nil { + if err := cm.cgroupManager.Update(cgroupConfig); err != nil { return err } return nil } // getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface. -func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig { +func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig { + rc := getCgroupConfigInternal(rl, compressibleResourcesOnly) + if rc == nil { + return nil + } + + // In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup. + // By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request. + // However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd + // doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected). + // An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer, + // and this is sufficient. + // Only do so on None policy, as Static policy will do its own updating of the cpuset. + // Please see the comment on policy none's GetAllocatableCPUs + if cm.cpuManager.GetAllocatableCPUs().IsEmpty() { + rc.CPUSet = cm.cpuManager.GetAllCPUs() + } + + return rc +} + +// getCgroupConfigInternal are the pieces of getCgroupConfig that don't require the cm object. +// This is added to unit test without needing to create a full containerManager +func getCgroupConfigInternal(rl v1.ResourceList, compressibleResourcesOnly bool) *ResourceConfig { // TODO(vishh): Set CPU Quota if necessary. if rl == nil { return nil @@ -216,7 +239,6 @@ func getCgroupConfig(rl v1.ResourceList, compressibleResourcesOnly bool) *Resour } rc.HugePageLimit = HugePageLimits(rl) } - return &rc } diff --git a/pkg/kubelet/cm/node_container_manager_linux_test.go b/pkg/kubelet/cm/node_container_manager_linux_test.go index ba060045310..a74e1c7cb23 100644 --- a/pkg/kubelet/cm/node_container_manager_linux_test.go +++ b/pkg/kubelet/cm/node_container_manager_linux_test.go @@ -458,7 +458,7 @@ func TestGetCgroupConfig(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - actual := getCgroupConfig(tc.resourceList, tc.compressibleResources) + actual := getCgroupConfigInternal(tc.resourceList, tc.compressibleResources) tc.checks(actual, t) }) } diff --git a/pkg/kubelet/cm/types.go b/pkg/kubelet/cm/types.go index 319ae06f07e..7a7d9268015 100644 --- a/pkg/kubelet/cm/types.go +++ b/pkg/kubelet/cm/types.go @@ -19,12 +19,15 @@ package cm import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/cpuset" ) // ResourceConfig holds information about all the supported cgroup resource parameters. type ResourceConfig struct { // Memory limit (in bytes). Memory *int64 + // CPU set (number of CPUs the cgroup has access to). + CPUSet cpuset.CPUSet // CPU shares (relative weight vs. other containers). CPUShares *uint64 // CPU hardcap limit (in usecs). Allowed cpu time in a given period. diff --git a/test/e2e_node/node_container_manager_test.go b/test/e2e_node/node_container_manager_test.go index 93c048d2e53..6761e195a3b 100644 --- a/test/e2e_node/node_container_manager_test.go +++ b/test/e2e_node/node_container_manager_test.go @@ -76,6 +76,64 @@ var _ = SIGDescribe("Node Container Manager", framework.WithSerial(), func() { framework.ExpectNoError(runTest(ctx, f)) }) }) + f.Describe("Validate CGroup management", func() { + // Regression test for https://issues.k8s.io/125923 + // In this issue there's a race involved with systemd which seems to manifest most likely, or perhaps only + // (data gathered so far seems inconclusive) on the very first boot of the machine, so restarting the kubelet + // seems not sufficient. OTOH, the exact reproducer seems to require a dedicate lane with only this test, or + // to reboot the machine before to run this test. Both are practically unrealistic in CI. + // The closest approximation is this test in this current form, using a kubelet restart. This at least + // acts as non regression testing, so it still brings value. + ginkgo.It("should correctly start with cpumanager none policy in use with systemd", func(ctx context.Context) { + if !IsCgroup2UnifiedMode() { + ginkgo.Skip("this test requires cgroups v2") + } + + var err error + var oldCfg *kubeletconfig.KubeletConfiguration + // Get current kubelet configuration + oldCfg, err = getCurrentKubeletConfig(ctx) + framework.ExpectNoError(err) + + ginkgo.DeferCleanup(func(ctx context.Context) { + if oldCfg != nil { + // Update the Kubelet configuration. + framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg)) + + ginkgo.By("Restarting the kubelet") + restartKubelet(true) + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func(ctx context.Context) bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state")) + ginkgo.By("Started the kubelet") + } + }) + + newCfg := oldCfg.DeepCopy() + // Change existing kubelet configuration + newCfg.CPUManagerPolicy = "none" + newCfg.CgroupDriver = "systemd" + newCfg.FailCgroupV1 = true // extra safety. We want to avoid false negatives though, so we added the skip check earlier + + // Update the Kubelet configuration. + framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg)) + + ginkgo.By("Restarting the kubelet") + restartKubelet(true) + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func(ctx context.Context) bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state")) + ginkgo.By("Started the kubelet") + + gomega.Consistently(ctx, func(ctx context.Context) bool { + return getNodeReadyStatus(ctx, f) && kubeletHealthCheck(kubeletHealthCheckURL) + }).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeTrueBecause("node keeps reporting ready status")) + }) + }) }) func expectFileValToEqual(filePath string, expectedValue, delta int64) error {