Merge pull request #130498 from swatisehgal/distribute-across-numa-e2e-tests

node: cpumgr: e2e: Tests for `distribute-cpus-across-numa` policy option
This commit is contained in:
Kubernetes Prow Robot 2025-03-19 10:19:06 -07:00 committed by GitHub
commit fba63656c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -23,6 +23,7 @@ import (
"io/fs"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
@ -40,12 +41,19 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gcustom"
gomegatypes "github.com/onsi/gomega/types"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
)
const (
minSMTLevel = 2
minCPUCapacity = 2
)
// Helper for makeCPUManagerPod().
type ctnAttribute struct {
ctnName string
@ -877,9 +885,9 @@ func runCPUManagerTests(f *framework.Framework) {
ginkgo.It("should assign CPUs as expected based on the Pod spec", func(ctx context.Context) {
cpuCap, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
// Skip CPU Manager tests altogether if the CPU capacity < 2.
if cpuCap < 2 {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU capacity < 2")
// Skip CPU Manager tests altogether if the CPU capacity < minCPUCapacity.
if cpuCap < minCPUCapacity {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU capacity < %d", minCPUCapacity)
}
// Enable CPU Manager in the kubelet.
@ -972,13 +980,14 @@ func runCPUManagerTests(f *framework.Framework) {
smtLevel := getSMTLevel()
// strict SMT alignment is trivially verified and granted on non-SMT systems
if smtLevel < 2 {
if smtLevel < minSMTLevel {
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
}
// our tests want to allocate a full core, so we need at last 2*2=4 virtual cpus
if cpuAlloc < int64(smtLevel*2) {
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt)
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
minCPUCount := int64(smtLevel * minCPUCapacity)
if cpuAlloc < minCPUCount {
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
}
framework.Logf("SMT level %d", smtLevel)
@ -1153,6 +1162,155 @@ func runCPUManagerTests(f *framework.Framework) {
waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
})
ginkgo.It("should assign packed CPUs with distribute-cpus-across-numa disabled and pcpu-only policy options enabled", func(ctx context.Context) {
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
_, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
smtLevel := getSMTLevel()
// strict SMT alignment is trivially verified and granted on non-SMT systems
if smtLevel < minSMTLevel {
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
}
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
minCPUCount := int64(smtLevel * minCPUCapacity)
if cpuAlloc < minCPUCount {
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
}
framework.Logf("SMT level %d", smtLevel)
cpuPolicyOptions := map[string]string{
cpumanager.FullPCPUsOnlyOption: "true",
cpumanager.DistributeCPUsAcrossNUMAOption: "false",
}
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.New(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,
},
)
updateKubeletConfig(ctx, f, newCfg, true)
ctnAttrs := []ctnAttribute{
{
ctnName: "test-gu-container-distribute-cpus-across-numa-disabled",
cpuRequest: "2000m",
cpuLimit: "2000m",
},
}
pod := makeCPUManagerPod("test-pod-distribute-cpus-across-numa-disabled", ctnAttrs)
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
for _, cnt := range pod.Spec.Containers {
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
validateSMTAlignment(cpus, smtLevel, pod, &cnt)
gomega.Expect(cpus).To(BePackedCPUs())
}
deletePodSyncByName(ctx, f, pod.Name)
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
})
ginkgo.It("should assign CPUs distributed across NUMA with distribute-cpus-across-numa and pcpu-only policy options enabled", func(ctx context.Context) {
var cpusNumPerNUMA, coresNumPerNUMA, numaNodeNum, threadsPerCore int
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
_, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
smtLevel := getSMTLevel()
framework.Logf("SMT level %d", smtLevel)
// strict SMT alignment is trivially verified and granted on non-SMT systems
if smtLevel < minSMTLevel {
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
}
// our tests want to allocate a full core, so we need at least 2*2=4 virtual cpus
minCPUCount := int64(smtLevel * minCPUCapacity)
if cpuAlloc < minCPUCount {
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < %d", fullCPUsOnlyOpt, minCPUCount)
}
// this test is intended to be run on a multi-node NUMA system and
// a system with at least 4 cores per socket, hostcheck skips test
// if above requirements are not satisfied
numaNodeNum, coresNumPerNUMA, threadsPerCore = hostCheck()
cpusNumPerNUMA = coresNumPerNUMA * threadsPerCore
framework.Logf("numaNodes on the system %d", numaNodeNum)
framework.Logf("Cores per NUMA on the system %d", coresNumPerNUMA)
framework.Logf("Threads per Core on the system %d", threadsPerCore)
framework.Logf("CPUs per NUMA on the system %d", cpusNumPerNUMA)
cpuPolicyOptions := map[string]string{
cpumanager.FullPCPUsOnlyOption: "true",
cpumanager.DistributeCPUsAcrossNUMAOption: "true",
}
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.New(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,
},
)
updateKubeletConfig(ctx, f, newCfg, true)
// 'distribute-cpus-across-numa' policy option ensures that CPU allocations are evenly distributed
// across NUMA nodes in cases where more than one NUMA node is required to satisfy the allocation.
// So, we want to ensure that the CPU Request exceeds the number of CPUs that can fit within a single
// NUMA node. We have to pick cpuRequest such that:
// 1. CPURequest > cpusNumPerNUMA
// 2. Not occupy all the CPUs on the node ande leave room for reserved CPU
// 3. CPURequest is a multiple if number of NUMA nodes to allow equal CPU distribution across NUMA nodes
//
// In summary: cpusNumPerNUMA < CPURequest < ((cpusNumPerNuma * numaNodeNum) - reservedCPUscount)
// Considering all these constraints we select: CPURequest= (cpusNumPerNUMA-smtLevel)*numaNodeNum
cpuReq := (cpusNumPerNUMA - smtLevel) * numaNodeNum
ctnAttrs := []ctnAttribute{
{
ctnName: "test-gu-container-distribute-cpus-across-numa",
cpuRequest: fmt.Sprintf("%d", cpuReq),
cpuLimit: fmt.Sprintf("%d", cpuReq),
},
}
pod := makeCPUManagerPod("test-pod-distribute-cpus-across-numa", ctnAttrs)
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
for _, cnt := range pod.Spec.Containers {
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
validateSMTAlignment(cpus, smtLevel, pod, &cnt)
// We expect a perfectly even spilit i.e. equal distribution across NUMA Node as the CPU Request is 4*smtLevel*numaNodeNum.
expectedSpread := cpus.Size() / numaNodeNum
gomega.Expect(cpus).To(BeDistributedCPUs(expectedSpread))
}
deletePodSyncByName(ctx, f, pod.Name)
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace)
})
ginkgo.AfterEach(func(ctx context.Context) {
updateKubeletConfig(ctx, f, oldCfg, true)
})
@ -1260,6 +1418,78 @@ func isSMTAlignmentError(pod *v1.Pod) bool {
return re.MatchString(pod.Status.Reason)
}
// getNumaNodeCPUs retrieves CPUs for each NUMA node.
func getNumaNodeCPUs() (map[int]cpuset.CPUSet, error) {
numaNodes := make(map[int]cpuset.CPUSet)
nodePaths, err := filepath.Glob("/sys/devices/system/node/node*/cpulist")
if err != nil {
return nil, err
}
for _, nodePath := range nodePaths {
data, err := os.ReadFile(nodePath)
framework.ExpectNoError(err, "Error obtaning CPU information from the node")
cpuSet := strings.TrimSpace(string(data))
cpus, err := cpuset.Parse(cpuSet)
framework.ExpectNoError(err, "Error parsing CPUset")
// Extract node ID from path (e.g., "node0" -> 0)
base := filepath.Base(filepath.Dir(nodePath))
nodeID, err := strconv.Atoi(strings.TrimPrefix(base, "node"))
if err != nil {
continue
}
numaNodes[nodeID] = cpus
}
return numaNodes, nil
}
// computeNUMADistribution calculates CPU distribution per NUMA node.
func computeNUMADistribution(allocatedCPUs cpuset.CPUSet) map[int]int {
numaCPUs, err := getNumaNodeCPUs()
framework.ExpectNoError(err, "Error retrieving NUMA nodes")
framework.Logf("NUMA Node CPUs allocation: %v", numaCPUs)
distribution := make(map[int]int)
for node, cpus := range numaCPUs {
distribution[node] = cpus.Intersection(allocatedCPUs).Size()
}
framework.Logf("allocated CPUs %s distribution: %v", allocatedCPUs.String(), distribution)
return distribution
}
// Custom matcher for checking packed CPUs.
func BePackedCPUs() gomegatypes.GomegaMatcher {
return gcustom.MakeMatcher(func(allocatedCPUs cpuset.CPUSet) (bool, error) {
distribution := computeNUMADistribution(allocatedCPUs)
for _, count := range distribution {
// This assumption holds true if there are enough CPUs on a single NUMA node.
// We are intentionally limiting the CPU request to 2 to minimize the number
// of CPUs required to fulfill this case and therefore maximize the chances
// of correctly validating this case.
if count == allocatedCPUs.Size() {
return true, nil
}
}
return false, nil
}).WithMessage("expected CPUs to be packed")
}
// Custom matcher for checking distributed CPUs.
func BeDistributedCPUs(expectedSpread int) gomegatypes.GomegaMatcher {
return gcustom.MakeMatcher(func(allocatedCPUs cpuset.CPUSet) (bool, error) {
distribution := computeNUMADistribution(allocatedCPUs)
for _, count := range distribution {
if count != expectedSpread {
return false, nil
}
}
return true, nil
}).WithTemplate("expected CPUs to be evenly distributed across NUMA nodes\nExpected: {{.Data}}\nGot:\n{{.FormattedActual}}\nDistribution: {{.Data}}\n").WithTemplateData(expectedSpread)
}
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("CPU Manager", framework.WithSerial(), feature.CPUManager, func() {
f := framework.NewDefaultFramework("cpu-manager-test")