Memory manager support for Windows nodes (#128560)

This commit is contained in:
Mark Rossetti 2024-11-07 15:32:49 -08:00 committed by GitHub
parent 8504758a2e
commit 3c9380c449
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 412 additions and 22 deletions

View File

@ -68,6 +68,7 @@ type containerManagerImpl struct {
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
nodeInfo *v1.Node
sync.RWMutex
}
@ -95,12 +96,17 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
// Initialize memory manager
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
}
}
// Starts device manager.
@ -128,6 +134,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cadvisorInterface: cadvisorInterface,
}
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
cm.memoryManager = memorymanager.NewFakeManager()
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.InfoS("Creating topology manager")
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
@ -155,9 +165,21 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
klog.InfoS("Creating memory manager")
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
}
klog.InfoS("Creating device plugin manager")
@ -273,7 +295,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
}
func (cm *containerManagerImpl) GetPodCgroupRoot() string {

View File

@ -20,30 +20,122 @@ limitations under the License.
package cm
import (
"k8s.io/api/core/v1"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List())
for _, affinity := range affinities {
klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors())
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
return nil
}
containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities
klog.V(4).Info("PreCreateContainer for Windows")
// retrieve CPU and NUMA affinity from CPU Manager and Memory Manager (if enabled)
var allocatedCPUs cpuset.CPUSet
if i.cpuManager != nil {
allocatedCPUs = i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
}
var numaNodes sets.Set[int]
if i.memoryManager != nil {
numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container)
}
// Gather all CPUs associated with the selected NUMA nodes
var allNumaNodeCPUs []winstats.GroupAffinity
for _, numaNode := range sets.List(numaNodes) {
affinity, err := winstats.GetCPUsforNUMANode(uint16(numaNode))
if err != nil {
return fmt.Errorf("failed to get CPUs for NUMA node %d: %v", numaNode, err)
}
allNumaNodeCPUs = append(allNumaNodeCPUs, *affinity)
}
var finalCPUSet = computeFinalCpuSet(allocatedCPUs, allNumaNodeCPUs)
klog.V(4).InfoS("Setting CPU affinity", "affinity", finalCPUSet, "container", container.Name, "pod", pod.UID)
// Set CPU group affinities in the container config
if finalCPUSet != nil {
var cpusToGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for group, mask := range groupMasks(finalCPUSet) {
cpusToGroupAffinities = append(cpusToGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(group),
CpuMask: uint64(mask),
})
}
containerConfig.Windows.Resources.AffinityCpus = cpusToGroupAffinities
}
// return nil if no CPUs were selected
return nil
}
// computeFinalCpuSet determines the final set of CPUs to use based on the CPU and memory managers
// and is extracted so that it can be tested
func computeFinalCpuSet(allocatedCPUs cpuset.CPUSet, allNumaNodeCPUs []winstats.GroupAffinity) sets.Set[int] {
if !allocatedCPUs.IsEmpty() && len(allNumaNodeCPUs) > 0 {
// Both CPU and memory managers are enabled
numaNodeAffinityCPUSet := computeCPUSet(allNumaNodeCPUs)
cpuManagerAffinityCPUSet := sets.New[int](allocatedCPUs.List()...)
// Determine which set of CPUs to use using the following logic outlined in the KEP:
// Case 1: CPU manager selects more CPUs than those available in the NUMA nodes selected by the memory manager
// Case 2: CPU manager selects fewer CPUs, and they all fall within the CPUs available in the NUMA nodes selected by the memory manager
// Case 3: CPU manager selects fewer CPUs, but some are outside of the CPUs available in the NUMA nodes selected by the memory manager
if cpuManagerAffinityCPUSet.Len() > numaNodeAffinityCPUSet.Len() {
// Case 1, use CPU manager selected CPUs
return cpuManagerAffinityCPUSet
} else if numaNodeAffinityCPUSet.IsSuperset(cpuManagerAffinityCPUSet) {
// case 2, use CPU manager selected CPUstry
return cpuManagerAffinityCPUSet
} else {
// Case 3, merge CPU manager and memory manager selected CPUs
return cpuManagerAffinityCPUSet.Union(numaNodeAffinityCPUSet)
}
} else if !allocatedCPUs.IsEmpty() {
// Only CPU manager is enabled, use CPU manager selected CPUs
return sets.New[int](allocatedCPUs.List()...)
} else if len(allNumaNodeCPUs) > 0 {
// Only memory manager is enabled, use CPUs associated with selected NUMA nodes
return computeCPUSet(allNumaNodeCPUs)
}
return nil
}
// computeCPUSet converts a list of GroupAffinity to a set of CPU IDs
func computeCPUSet(affinities []winstats.GroupAffinity) sets.Set[int] {
cpuSet := sets.New[int]()
for _, affinity := range affinities {
for i := 0; i < 64; i++ {
if (affinity.Mask>>i)&1 == 1 {
cpuID := int(affinity.Group)*64 + i
cpuSet.Insert(cpuID)
}
}
}
return cpuSet
}
// groupMasks converts a set of CPU IDs into group and mask representations
func groupMasks(cpuSet sets.Set[int]) map[int]uint64 {
groupMasks := make(map[int]uint64)
for cpu := range cpuSet {
group := cpu / 64
mask := uint64(1) << (cpu % 64)
groupMasks[group] |= mask
}
return groupMasks
}

View File

@ -0,0 +1,160 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)
func TestComputeCPUSet(t *testing.T) {
affinities := []winstats.GroupAffinity{
{Mask: 0b1010, Group: 0}, // CPUs 1 and 3 in Group 0
{Mask: 0b1001, Group: 1}, // CPUs 0 and 3 in Group 1
}
expected := map[int]struct{}{
1: {}, // Group 0, CPU 1
3: {}, // Group 0, CPU 3
64: {}, // Group 1, CPU 0
67: {}, // Group 1, CPU 3
}
result := computeCPUSet(affinities)
if len(result) != len(expected) {
t.Errorf("expected length %v, but got length %v", len(expected), len(result))
}
for key := range expected {
if _, exists := result[key]; !exists {
t.Errorf("expected key %v to be in result", key)
}
}
}
func TestGroupMasks(t *testing.T) {
tests := []struct {
cpuSet sets.Set[int]
expected map[int]uint64
}{
{
cpuSet: sets.New[int](0, 1, 2, 3, 64, 65, 66, 67),
expected: map[int]uint64{
0: 0b1111,
1: 0b1111,
},
},
{
cpuSet: sets.New[int](0, 2, 64, 66),
expected: map[int]uint64{
0: 0b0101,
1: 0b0101,
},
},
{
cpuSet: sets.New[int](1, 65),
expected: map[int]uint64{
0: 0b0010,
1: 0b0010,
},
},
{
cpuSet: sets.New[int](),
expected: map[int]uint64{},
},
}
for _, test := range tests {
result := groupMasks(test.cpuSet)
if len(result) != len(test.expected) {
t.Errorf("expected length %v, but got length %v", len(test.expected), len(result))
}
for group, mask := range test.expected {
if result[group] != mask {
t.Errorf("expected group %v to have mask %v, but got mask %v", group, mask, result[group])
}
}
}
}
func TestComputeFinalCpuSet(t *testing.T) {
tests := []struct {
name string
allocatedCPUs cpuset.CPUSet
allNumaNodeCPUs []winstats.GroupAffinity
expectedCPUSet sets.Set[int]
}{
{
name: "Both managers enabled, CPU manager selects more CPUs",
allocatedCPUs: cpuset.New(0, 1, 2, 3),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b0011, Group: 0}, // CPUs 0 and 1 in Group 0
},
expectedCPUSet: sets.New[int](0, 1, 2, 3),
},
{
name: "Both managers enabled, CPU manager selects fewer CPUs within NUMA nodes",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1111, Group: 0}, // CPUs 0, 1, 2, 3 in Group 0
},
expectedCPUSet: sets.New[int](0, 1),
},
{
name: "Both managers enabled, CPU manager selects fewer CPUs outside NUMA nodes",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
},
expectedCPUSet: sets.New[int](0, 1, 2, 3),
},
{
name: "Only CPU manager enabled",
allocatedCPUs: cpuset.New(0, 1),
allNumaNodeCPUs: nil,
expectedCPUSet: sets.New[int](0, 1),
},
{
name: "Only memory manager enabled",
allocatedCPUs: cpuset.New(),
allNumaNodeCPUs: []winstats.GroupAffinity{
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
},
expectedCPUSet: sets.New[int](2, 3),
},
{
name: "Neither manager enabled",
allocatedCPUs: cpuset.New(),
allNumaNodeCPUs: nil,
expectedCPUSet: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := computeFinalCpuSet(test.allocatedCPUs, test.allNumaNodeCPUs)
if !result.Equal(test.expectedCPUSet) {
t.Errorf("expected %v, but got %v", test.expectedCPUSet, result)
}
})
}
}

View File

@ -19,6 +19,7 @@ package memorymanager
import (
"context"
"fmt"
"runtime"
"sync"
cadvisorapi "github.com/google/cadvisor/info/v1"
@ -140,6 +141,10 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll
policy = NewPolicyNone()
case policyTypeStatic:
if runtime.GOOS == "windows" {
return nil, fmt.Errorf("policy %q is not available on Windows", policyTypeStatic)
}
systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory)
if err != nil {
return nil, err
@ -150,8 +155,22 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll
return nil, err
}
case policyTypeBestEffort:
if runtime.GOOS == "windows" {
systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory)
if err != nil {
return nil, err
}
policy, err = NewPolicyBestEffort(machineInfo, systemReserved, affinity)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("policy %q is not available for platform %q", policyTypeBestEffort, runtime.GOOS)
}
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", policyName)
return nil, fmt.Errorf("unknown policy: %q", policyName)
}
manager := &manager{

View File

@ -0,0 +1,80 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memorymanager
import (
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
// On Windows we want to use the same logic as the StaticPolicy to compute the memory topology hints
// but unlike linux based systems, on Windows systems numa nodes cannot be directly assigned or guaranteed via Windows APIs
// (windows scheduler will use the numa node that is closest to the cpu assigned therefor respecting the numa node assignment as a best effort). Because of this we don't want to have users specify "StaticPolicy" for the memory manager
// policy via kubelet configuration. Instead we want to use the "BestEffort" policy which will use the same logic as the StaticPolicy
// and doing so will reduce code duplication.
const policyTypeBestEffort policyType = "BestEffort"
// bestEffortPolicy is implementation of the policy interface for the BestEffort policy
type bestEffortPolicy struct {
static *staticPolicy
}
var _ Policy = &bestEffortPolicy{}
func NewPolicyBestEffort(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
p, err := NewPolicyStatic(machineInfo, reserved, affinity)
if err != nil {
return nil, err
}
return &bestEffortPolicy{
static: p.(*staticPolicy),
}, nil
}
func (p *bestEffortPolicy) Name() string {
return string(policyTypeBestEffort)
}
func (p *bestEffortPolicy) Start(s state.State) error {
return p.static.Start(s)
}
func (p *bestEffortPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
return p.static.Allocate(s, pod, container)
}
func (p *bestEffortPolicy) RemoveContainer(s state.State, podUID string, containerName string) {
p.static.RemoveContainer(s, podUID, containerName)
}
func (p *bestEffortPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return p.static.GetPodTopologyHints(s, pod)
}
func (p *bestEffortPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return p.static.GetTopologyHints(s, pod, container)
}
func (p *bestEffortPolicy) GetAllocatableMemory(s state.State) []state.Block {
return p.static.GetAllocatableMemory(s)
}

View File

@ -21,15 +21,17 @@ package winstats
import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/klog/v2"
"syscall"
"unsafe"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/klog/v2"
)
var (
procGetLogicalProcessorInformationEx = modkernel32.NewProc("GetLogicalProcessorInformationEx")
getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx")
procGetNumaNodeProcessorMaskEx = modkernel32.NewProc("GetNumaNodeProcessorMaskEx")
)
type relationType int
@ -106,6 +108,21 @@ func CpusToGroupAffinity(cpus []int) map[int]*GroupAffinity {
return groupAffinities
}
// GetCPUsForNUMANode queries the system for the CPUs that are part of the given NUMA node.
func GetCPUsforNUMANode(nodeNumber uint16) (*GroupAffinity, error) {
var affinity GroupAffinity
r1, _, err := procGetNumaNodeProcessorMaskEx.Call(
uintptr(nodeNumber),
uintptr(unsafe.Pointer(&affinity)),
)
if r1 == 0 {
return nil, fmt.Errorf("Error getting CPU mask for NUMA node %d: %v", nodeNumber, err)
}
return &affinity, nil
}
type numaNodeRelationship struct {
NodeNumber uint32
Reserved [18]byte