From 4d25c25eb0d4aebb2faddf1d8998e13e60fd7c1b Mon Sep 17 00:00:00 2001 From: James Sturtevant Date: Thu, 30 May 2024 14:19:08 -0700 Subject: [PATCH] Support CPU and Memory manager on Windows Signed-off-by: James Sturtevant --- pkg/kubelet/cm/container_manager.go | 8 + pkg/kubelet/cm/container_manager_linux.go | 8 - pkg/kubelet/cm/container_manager_windows.go | 95 ++++- pkg/kubelet/cm/cpumanager/cpu_manager.go | 21 + pkg/kubelet/cm/cpumanager/policy_static.go | 5 +- .../internal_container_lifecycle_windows.go | 33 ++ pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/winstats/cpu_topology.go | 250 ++++++++++++ pkg/kubelet/winstats/cpu_topology_test.go | 376 ++++++++++++++++++ .../winstats/perfcounter_nodestats_windows.go | 18 +- 10 files changed, 787 insertions(+), 29 deletions(-) create mode 100644 pkg/kubelet/winstats/cpu_topology.go create mode 100644 pkg/kubelet/winstats/cpu_topology_test.go diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 70edb484109..1743518d82f 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -196,6 +196,14 @@ type Status struct { SoftRequirements error } +func int64Slice(in []int) []int64 { + out := make([]int64, len(in)) + for i := range in { + out[i] = int64(in[i]) + } + return out +} + // parsePercentage parses the percentage string to numeric value. func parsePercentage(v string) (int64, error) { if !strings.HasSuffix(v, "%") { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 02f56d82418..125493b90d4 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -912,14 +912,6 @@ func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.Conta return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices()) } -func int64Slice(in []int) []int64 { - out := make([]int64, len(in)) - for i := range in { - out[i] = int64(in[i]) - } - return out -} - func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { if cm.cpuManager != nil { return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList()) diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 232d121223a..8019cc64537 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -25,6 +25,9 @@ package cm import ( "context" "fmt" + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubefeatures "k8s.io/kubernetes/pkg/features" + "sync" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -37,7 +40,6 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" @@ -62,12 +64,10 @@ type containerManagerImpl struct { deviceManager devicemanager.Manager // Interface for Topology resource co-ordination topologyManager topologymanager.Manager -} - -type noopWindowsResourceAllocator struct{} - -func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { - return admission.GetPodAdmitResult(nil) + cpuManager cpumanager.Manager + memoryManager memorymanager.Manager + nodeInfo *v1.Node + sync.RWMutex } func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, @@ -78,6 +78,8 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, localStorageCapacityIsolation bool) error { klog.V(2).InfoS("Starting Windows container manager") + cm.nodeInfo = node + if localStorageCapacityIsolation { rootfs, err := cm.cadvisorInterface.RootFsInfo() if err != nil { @@ -90,6 +92,21 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Initialize CPU manager + err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) + if err != nil { + return fmt.Errorf("start cpu manager error: %v", err) + } + + // Initialize memory manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { + containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) + if err != nil { + return fmt.Errorf("start memory manager error: %v", err) + } + } + // Starts device manager. if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err @@ -115,7 +132,15 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cadvisorInterface: cadvisorInterface, } - cm.topologyManager = topologymanager.NewFakeManager() + klog.InfoS("Creating topology manager") + cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology, + nodeConfig.TopologyManagerPolicy, + nodeConfig.TopologyManagerScope, + nodeConfig.TopologyManagerPolicyOptions) + + if err != nil { + return nil, err + } klog.InfoS("Creating device plugin manager") cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager) @@ -124,6 +149,40 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } cm.topologyManager.AddHintProvider(cm.deviceManager) + klog.InfoS("Creating cpu manager") + cm.cpuManager, err = cpumanager.NewManager( + nodeConfig.CPUManagerPolicy, + nodeConfig.CPUManagerPolicyOptions, + nodeConfig.CPUManagerReconcilePeriod, + machineInfo, + nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs, + cm.GetNodeAllocatableReservation(), + nodeConfig.KubeletRootDir, + cm.topologyManager, + ) + if err != nil { + klog.ErrorS(err, "Failed to initialize cpu manager") + return nil, err + } + cm.topologyManager.AddHintProvider(cm.cpuManager) + + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { + klog.InfoS("Creating memory manager") + cm.memoryManager, err = memorymanager.NewManager( + nodeConfig.ExperimentalMemoryManagerPolicy, + machineInfo, + cm.GetNodeAllocatableReservation(), + nodeConfig.ExperimentalMemoryManagerReservedMemory, + nodeConfig.KubeletRootDir, + cm.topologyManager, + ) + if err != nil { + klog.ErrorS(err, "Failed to initialize memory manager") + return nil, err + } + cm.topologyManager.AddHintProvider(cm.memoryManager) + } + return cm, nil } @@ -132,7 +191,9 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { } func (cm *containerManagerImpl) GetNodeConfig() NodeConfig { - return NodeConfig{} + cm.RLock() + defer cm.RUnlock() + return cm.nodeConfig } func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems { @@ -223,7 +284,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()} + return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager} } func (cm *containerManagerImpl) GetPodCgroupRoot() string { @@ -243,19 +304,25 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { } func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { - return &noopWindowsResourceAllocator{} + return cm.topologyManager } func (cm *containerManagerImpl) UpdateAllocatedDevices() { return } -func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 { - return nil +func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { + if cm.cpuManager != nil { + return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList()) + } + return []int64{} } func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 { - return nil + if cm.cpuManager != nil { + return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList()) + } + return []int64{} } func (cm *containerManagerImpl) GetMemory(_, _ string) []*podresourcesapi.ContainerMemory { diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b17af7ff8fa..38c8ee95e19 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -19,7 +19,9 @@ package cpumanager import ( "context" "fmt" + "k8s.io/kubernetes/pkg/kubelet/winstats" "math" + "runtime" "sync" "time" @@ -265,6 +267,7 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { defer m.Unlock() // Call down into the policy to assign this container CPUs if required. + klog.InfoS("jjs allocate call") err := m.policy.Allocate(m.state, p, c) if err != nil { klog.ErrorS(err, "Allocate error") @@ -533,6 +536,24 @@ func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, // helpers_linux.go similar to what exists for pods. // It would be better to pass the full container resources here instead of // this patch-like partial resources. + + if runtime.GOOS == "windows" { + klog.Info("Updating windows CPU affinity") + + affinities := winstats.CpusToGroupAffinity(cpus.List()) + var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity + for _, affinity := range affinities { + cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{ + CpuGroup: uint32(affinity.Group), + CpuMask: uint64(affinity.Mask), + }) + } + return m.containerRuntime.UpdateContainerResources(ctx, containerID, &runtimeapi.ContainerResources{ + Windows: &runtimeapi.WindowsContainerResources{ + AffinityCpus: cpuGroupAffinities, + }, + }) + } return m.containerRuntime.UpdateContainerResources( ctx, containerID, diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index db8770f96f6..64d706554a0 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -300,8 +300,10 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c } func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + klog.InfoS("jjs Allocate", "pod", klog.KObj(pod), "containerName", container.Name) numCPUs := p.guaranteedCPUs(pod, container) if numCPUs == 0 { + klog.InfoS("shared pool", "pod", klog.KObj(pod), "containerName", container.Name) // container belongs in the shared pool (nothing to do; use default cpuset) return nil } @@ -402,7 +404,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa } func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { - klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity) + klog.InfoS("CPUSet", "numCPUs", numCPUs, "socket", numaAffinity) allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs) @@ -440,6 +442,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { + klog.InfoS("Not guaranteed", "pod", pod.UID, "containerName", container.Name) return 0 } cpuQuantity := container.Resources.Requests[v1.ResourceCPU] diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows.go b/pkg/kubelet/cm/internal_container_lifecycle_windows.go index 1471770b0d9..7a47aec0057 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle_windows.go +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows.go @@ -21,9 +21,42 @@ package cm import ( "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/winstats" ) func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { + klog.Info("PreCreateContainer for Windows") + if i.cpuManager != nil { + allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name) + if !allocatedCPUs.IsEmpty() { + klog.Infof("Setting CPU affinity for container %q cpus %v", container.Name, allocatedCPUs.String()) + var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity + affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List()) + for _, affinity := range affinities { + klog.Infof("Setting CPU affinity for container %q in group %v with mask %v (processors %v)", container.Name, affinity.Group, affinity.Mask, affinity.Processors()) + cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{ + CpuGroup: uint32(affinity.Group), + CpuMask: uint64(affinity.Mask), + }) + } + + containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities + } + } + + if i.memoryManager != nil { + numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container) + if numaNodes.Len() > 0 { + var affinity []int64 + for _, numaNode := range sets.List(numaNodes) { + affinity = append(affinity, int64(numaNode)) + } + klog.Info("Setting memory NUMA nodes for container") + containerConfig.Windows.Resources.AffinityPrefferedNumaNodes = affinity + } + } return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c37d8e88ec7..12561b737bb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1421,7 +1421,7 @@ func (kl *Kubelet) setupDataDirs() error { if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil { return fmt.Errorf("error configuring root directory: %v", err) } - if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil { + if err := utilfs.MkdirAll(kl.getPodsDir(), 0750); err != nil { return fmt.Errorf("error creating pods directory: %v", err) } if err := utilfs.MkdirAll(kl.getPluginsDir(), 0750); err != nil { diff --git a/pkg/kubelet/winstats/cpu_topology.go b/pkg/kubelet/winstats/cpu_topology.go new file mode 100644 index 00000000000..ce06895e401 --- /dev/null +++ b/pkg/kubelet/winstats/cpu_topology.go @@ -0,0 +1,250 @@ +package winstats + +import ( + "fmt" + cadvisorapi "github.com/google/cadvisor/info/v1" + "k8s.io/klog/v2" + "syscall" + "unsafe" +) + +var ( + procGetLogicalProcessorInformationEx = modkernel32.NewProc("GetLogicalProcessorInformationEx") + getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx") +) + +type RelationType int + +const ( + RelationProcessorCore RelationType = iota + RelationNumaNode + RelationCache + RelationProcessorPackage + RelationGroup + RelationProcessorDie + RelationNumaNodeEx + RelationProcessorModule + RelationAll = 0xffff +) + +type SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX struct { + Relationship uint32 + Size uint32 + data interface{} +} + +type PROCESSOR_RELATIONSHIP struct { + Flags byte + EfficiencyClass byte + Reserved [20]byte + GroupCount uint16 + GroupMasks interface{} //[]GROUP_AFFINITY // in c++ this is a union of either one or many GROUP_AFFINITY based on GroupCount +} + +type GROUP_AFFINITY struct { + Mask uintptr + Group uint16 + Reserved [3]uint16 +} + +func (a GROUP_AFFINITY) Processors() []int { + processors := []int{} + for i := 0; i < 64; i++ { + if a.Mask&(1<