Merge pull request #125296 from jsturtevant/windows-numa-support

Support CPU and Topology manager on Windows
This commit is contained in:
Kubernetes Prow Robot 2024-11-05 00:33:28 +00:00 committed by GitHub
commit 4932adf80d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 930 additions and 38 deletions

View File

@ -715,6 +715,12 @@ const (
// Allows kube-proxy to run in Overlay mode for Windows
WinOverlay featuregate.Feature = "WinOverlay"
// owner: @jsturtevant
// kep: https://kep.k8s.io/4888
//
// Add CPU and Memory Affinity support to Windows nodes with CPUManager, MemoryManager and Topology manager
WindowsCPUAndMemoryAffinity featuregate.Feature = "WindowsCPUAndMemoryAffinity"
// owner: @marosset
// kep: https://kep.k8s.io/3503
//

View File

@ -774,6 +774,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.20"), Default: true, PreRelease: featuregate.Beta},
},
WindowsCPUAndMemoryAffinity: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
WindowsHostNetwork: {
{Version: version.MustParse("1.26"), Default: true, PreRelease: featuregate.Alpha},
},

View File

@ -198,6 +198,14 @@ type Status struct {
SoftRequirements error
}
func int64Slice(in []int) []int64 {
out := make([]int64, len(in))
for i := range in {
out[i] = int64(in[i])
}
return out
}
// parsePercentage parses the percentage string to numeric value.
func parsePercentage(v string) (int64, error) {
if !strings.HasSuffix(v, "%") {

View File

@ -920,14 +920,6 @@ func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.Conta
return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
}
func int64Slice(in []int) []int64 {
out := make([]int64, len(in))
for i := range in {
out[i] = int64(in[i])
}
return out
}
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())

View File

@ -25,6 +25,10 @@ package cm
import (
"context"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"sync"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
@ -38,10 +42,8 @@ import (
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -63,12 +65,9 @@ type containerManagerImpl struct {
deviceManager devicemanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
}
type noopWindowsResourceAllocator struct{}
func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
return admission.GetPodAdmitResult(nil)
cpuManager cpumanager.Manager
nodeInfo *v1.Node
sync.RWMutex
}
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
@ -80,6 +79,8 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
localStorageCapacityIsolation bool) error {
klog.V(2).InfoS("Starting Windows container manager")
cm.nodeInfo = node
if localStorageCapacityIsolation {
rootfs, err := cm.cadvisorInterface.RootFsInfo()
if err != nil {
@ -92,6 +93,14 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
}
// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
@ -117,7 +126,37 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cadvisorInterface: cadvisorInterface,
}
cm.topologyManager = topologymanager.NewFakeManager()
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.InfoS("Creating topology manager")
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
nodeConfig.TopologyManagerPolicy,
nodeConfig.TopologyManagerScope,
nodeConfig.TopologyManagerPolicyOptions)
if err != nil {
klog.ErrorS(err, "Failed to initialize topology manager")
return nil, err
}
klog.InfoS("Creating cpu manager")
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
}
klog.InfoS("Creating device plugin manager")
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
@ -134,7 +173,9 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
return NodeConfig{}
cm.RLock()
defer cm.RUnlock()
return cm.nodeConfig
}
func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
@ -226,7 +267,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
}
func (cm *containerManagerImpl) GetPodCgroupRoot() string {
@ -246,18 +287,30 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
}
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
return &noopWindowsResourceAllocator{}
return cm.topologyManager
}
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
return
}
func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
}
return []int64{}
}
return nil
}
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
}
return []int64{}
}
return nil
}

View File

@ -512,21 +512,6 @@ func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.Container
return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
}
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
ctx,
containerID,
&runtimeapi.ContainerResources{
Linux: &runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
},
})
}
func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
if result, ok := m.state.GetCPUSet(podUID, containerName); ok {
return result

View File

@ -0,0 +1,43 @@
//go:build !windows
// +build !windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"context"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/cpuset"
)
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
ctx,
containerID,
&runtimeapi.ContainerResources{
Linux: &runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
},
})
}

View File

@ -21,11 +21,16 @@ import (
"fmt"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -263,6 +268,10 @@ func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1
}
func TestCPUManagerAdd(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@ -347,6 +356,10 @@ func TestCPUManagerAdd(t *testing.T) {
}
func TestCPUManagerAddWithInitContainers(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct {
description string
topo *topology.CPUTopology
@ -598,6 +611,10 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
}
func TestCPUManagerGenerate(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct {
description string
cpuPolicyName string
@ -703,6 +720,10 @@ func TestCPUManagerGenerate(t *testing.T) {
}
func TestCPUManagerRemove(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
containerID := "fakeID"
containerMap := containermap.NewContainerMap()
@ -746,6 +767,10 @@ func TestCPUManagerRemove(t *testing.T) {
}
func TestReconcileState(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 8,
@ -1269,6 +1294,10 @@ func TestReconcileState(t *testing.T) {
// above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured
func TestCPUManagerAddWithResvList(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@ -1343,6 +1372,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
}
func TestCPUManagerHandlePolicyOptions(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct {
description string
cpuPolicyName string
@ -1409,6 +1442,10 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
}
func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
nonePolicy, _ := NewNonePolicy(nil)
staticPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{

View File

@ -0,0 +1,49 @@
//go:build windows
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"context"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
return nil
}
affinities := winstats.CpusToGroupAffinity(cpus.List())
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for _, affinity := range affinities {
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
return m.containerRuntime.UpdateContainerResources(ctx, containerID, &runtimeapi.ContainerResources{
Windows: &runtimeapi.WindowsContainerResources{
AffinityCpus: cpuGroupAffinities,
},
})
}

View File

@ -21,9 +21,29 @@ package cm
import (
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
)
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List())
for _, affinity := range affinities {
klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors())
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities
}
}
return nil
}

View File

@ -0,0 +1,247 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winstats
import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/klog/v2"
"syscall"
"unsafe"
)
var (
procGetLogicalProcessorInformationEx = modkernel32.NewProc("GetLogicalProcessorInformationEx")
getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx")
)
type relationType int
const (
relationProcessorCore relationType = iota
relationNumaNode
relationCache
relationProcessorPackage
relationGroup
relationProcessorDie
relationNumaNodeEx
relationProcessorModule
relationAll = 0xffff
)
type systemLogicalProcessorInformationEx struct {
Relationship uint32
Size uint32
data interface{}
}
type processorRelationship struct {
Flags byte
EfficiencyClass byte
Reserved [20]byte
GroupCount uint16
// groupMasks is an []GroupAffinity. In c++ this is a union of either one or many GroupAffinity based on GroupCount
GroupMasks interface{}
}
// GroupAffinity represents the processor group affinity of cpus
// https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-group_affinity
type GroupAffinity struct {
Mask uint64
Group uint16
Reserved [3]uint16
}
// MaskString returns the affinity mask as a string of 0s and 1s
func (a GroupAffinity) MaskString() string {
return fmt.Sprintf("%064b", a.Mask)
}
// Processors returns a list of processors ids that are part of the affinity mask
// Windows doesn't track processors by ID but kubelet converts them to a number
func (a GroupAffinity) Processors() []int {
processors := []int{}
for i := 0; i < 64; i++ {
if a.Mask&(1<<i) != 0 {
processors = append(processors, i+(int(a.Group)*64))
}
}
return processors
}
// CpusToGroupAffinity converts a list of CPUs to a map of GroupAffinity split by windows CPU group.
// Windows doesn't track processors by ID but kubelet converts them to a number and this function goes in reverse.
func CpusToGroupAffinity(cpus []int) map[int]*GroupAffinity {
groupAffinities := make(map[int]*GroupAffinity)
for _, cpu := range cpus {
group := uint16(cpu / 64)
groupAffinity, ok := groupAffinities[int(group)]
if !ok {
groupAffinity = &GroupAffinity{
Group: group,
}
groupAffinities[int(group)] = groupAffinity
}
mask := uint64(1 << (cpu % 64))
groupAffinity.Mask |= mask
}
return groupAffinities
}
type numaNodeRelationship struct {
NodeNumber uint32
Reserved [18]byte
GroupCount uint16
GroupMasks interface{} //[]GroupAffinity in c++ this is a union of either one or many GroupAffinity based on GroupCount
}
type processor struct {
CoreID int
SocketID int
NodeID int
}
func processorInfo(relationShip relationType) (int, int, []cadvisorapi.Node, error) {
// Call once to get the length of data to return
var returnLength uint32 = 0
r1, _, err := procGetLogicalProcessorInformationEx.Call(
uintptr(relationShip),
uintptr(0),
uintptr(unsafe.Pointer(&returnLength)),
)
if r1 != 0 && err.(syscall.Errno) != syscall.ERROR_INSUFFICIENT_BUFFER {
return 0, 0, nil, fmt.Errorf("call to GetLogicalProcessorInformationEx failed: %v", err)
}
// Allocate the buffer with the length it should be
buffer := make([]byte, returnLength)
// Call GetLogicalProcessorInformationEx again to get the actual information
r1, _, err = procGetLogicalProcessorInformationEx.Call(
uintptr(relationShip),
uintptr(unsafe.Pointer(&buffer[0])),
uintptr(unsafe.Pointer(&returnLength)),
)
if r1 == 0 {
return 0, 0, nil, fmt.Errorf("call to GetLogicalProcessorInformationEx failed: %v", err)
}
return convertWinApiToCadvisorApi(buffer)
}
func convertWinApiToCadvisorApi(buffer []byte) (int, int, []cadvisorapi.Node, error) {
logicalProcessors := make(map[int]*processor)
numofSockets := 0
numOfcores := 0
nodes := []cadvisorapi.Node{}
for offset := 0; offset < len(buffer); {
// check size in buffer to avoid out of bounds access, we don't know the type or size yet
if offset+int(unsafe.Sizeof(systemLogicalProcessorInformationEx{})) > len(buffer) {
return 0, 0, nil, fmt.Errorf("remaining buffer too small while reading windows processor relationship")
}
info := (*systemLogicalProcessorInformationEx)(unsafe.Pointer(&buffer[offset]))
// check one more time now that we know the size of the struct
if offset+int(info.Size) > len(buffer) {
return 0, 0, nil, fmt.Errorf("remaining buffer too small while reading windows processor relationship")
}
switch (relationType)(info.Relationship) {
case relationProcessorCore, relationProcessorPackage:
relationship := (*processorRelationship)(unsafe.Pointer(&info.data))
groupMasks := make([]GroupAffinity, relationship.GroupCount)
for i := 0; i < int(relationship.GroupCount); i++ {
groupMasks[i] = *(*GroupAffinity)(unsafe.Pointer(uintptr(unsafe.Pointer(&relationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GroupAffinity{})))
}
if relationProcessorCore == (relationType)(info.Relationship) {
numOfcores++
}
if relationProcessorPackage == (relationType)(info.Relationship) {
numofSockets++
}
//iterate over group masks and add each processor to the map
for _, groupMask := range groupMasks {
for _, processorId := range groupMask.Processors() {
p, ok := logicalProcessors[processorId]
if !ok {
p = &processor{}
logicalProcessors[processorId] = p
}
if relationProcessorCore == (relationType)(info.Relationship) {
p.CoreID = numOfcores
}
if relationProcessorPackage == (relationType)(info.Relationship) {
p.SocketID = numofSockets
}
}
}
case relationNumaNode, relationNumaNodeEx:
numaNodeRelationship := (*numaNodeRelationship)(unsafe.Pointer(&info.data))
groupMasks := make([]GroupAffinity, numaNodeRelationship.GroupCount)
for i := 0; i < int(numaNodeRelationship.GroupCount); i++ {
groupMasks[i] = *(*GroupAffinity)(unsafe.Pointer(uintptr(unsafe.Pointer(&numaNodeRelationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GroupAffinity{})))
}
nodes = append(nodes, cadvisorapi.Node{Id: int(numaNodeRelationship.NodeNumber)})
for _, groupMask := range groupMasks {
for processorId := range groupMask.Processors() {
p, ok := logicalProcessors[processorId]
if !ok {
p = &processor{}
logicalProcessors[processorId] = p
}
p.NodeID = int(numaNodeRelationship.NodeNumber)
}
}
default:
klog.V(4).Infof("Not using Windows CPU relationship type: %d", info.Relationship)
}
// Move the offset to the next SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX struct
offset += int(info.Size)
}
for processId, p := range logicalProcessors {
node := nodes[p.NodeID]
if node.Id != p.NodeID {
return 0, 0, nil, fmt.Errorf("node ID mismatch: %d != %d", node.Id, p.NodeID)
}
availableBytes := uint64(0)
r1, _, err := getNumaAvailableMemoryNodeEx.Call(uintptr(p.NodeID), uintptr(unsafe.Pointer(&availableBytes)))
if r1 == 0 {
return 0, 0, nil, fmt.Errorf("call to GetNumaAvailableMemoryNodeEx failed: %v", err)
}
node.Memory = availableBytes
node.AddThread(processId, p.CoreID)
ok, coreIdx := node.FindCore(p.CoreID)
if !ok {
return 0, 0, nil, fmt.Errorf("core not found: %d", p.CoreID)
}
node.Cores[coreIdx].SocketID = p.SocketID
nodes[p.NodeID] = node
}
return numOfcores, numofSockets, nodes, nil
}

View File

@ -0,0 +1,425 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winstats
import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"testing"
"unsafe"
)
func TestGROUP_AFFINITY_Processors(t *testing.T) {
tests := []struct {
name string
Mask uint64
Group uint16
want []int
}{
{
name: "empty",
Mask: 0,
Group: 0,
want: []int{},
},
{
name: "empty group 2",
Mask: 0,
Group: 1,
want: []int{},
},
{
name: "cpu 1 Group 0",
Mask: 1,
Group: 0,
want: []int{0},
},
{
name: "cpu 64 Group 0",
Mask: 1 << 63,
Group: 0,
want: []int{63},
},
{
name: "cpu 128 Group 1",
Mask: 1 << 63,
Group: 1,
want: []int{127},
},
{
name: "cpu 128 (Group 1)",
Mask: 1 << 63,
Group: 1,
want: []int{127},
},
{
name: "Mask 1 Group 2",
Mask: 1,
Group: 2,
want: []int{128},
},
{
name: "64 cpus group 0",
Mask: 0xffffffffffffffff,
Group: 0,
want: makeRange(0, 63),
},
{
name: "64 cpus group 1",
Mask: 0xffffffffffffffff,
Group: 1,
want: makeRange(64, 127),
},
{
name: "64 cpus group 1",
Mask: 0xffffffffffffffff,
Group: 1,
want: makeRange(64, 127),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := GroupAffinity{
Mask: tt.Mask,
Group: tt.Group,
}
assert.Equalf(t, tt.want, a.Processors(), "Processors()")
})
}
}
// https://stackoverflow.com/a/39868255/697126
func makeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}
func TestCpusToGroupAffinity(t *testing.T) {
tests := []struct {
name string
cpus []int
want map[int]*GroupAffinity
}{
{
name: "empty",
want: map[int]*GroupAffinity{},
},
{
name: "single cpu group 0",
cpus: []int{0},
want: map[int]*GroupAffinity{
0: {
Mask: 1,
Group: 0,
},
},
},
{
name: "single cpu group 0",
cpus: []int{63},
want: map[int]*GroupAffinity{
0: {
Mask: 1 << 63,
Group: 0,
},
},
},
{
name: "single cpu group 1",
cpus: []int{64},
want: map[int]*GroupAffinity{
1: {
Mask: 1,
Group: 1,
},
},
},
{
name: "multiple cpus same group",
cpus: []int{0, 1, 2},
want: map[int]*GroupAffinity{
0: {
Mask: 1 | 2 | 4, // Binary OR to combine the masks
Group: 0,
},
},
},
{
name: "multiple cpus different groups",
cpus: []int{0, 64},
want: map[int]*GroupAffinity{
0: {
Mask: 1,
Group: 0,
},
1: {
Mask: 1,
Group: 1,
},
},
},
{
name: "multiple cpus different groups",
cpus: []int{0, 1, 2, 64, 65, 66},
want: map[int]*GroupAffinity{
0: {
Mask: 1 | 2 | 4,
Group: 0,
},
1: {
Mask: 1 | 2 | 4,
Group: 1,
},
},
},
{
name: "64 cpus group 0",
cpus: makeRange(0, 63),
want: map[int]*GroupAffinity{
0: {
Mask: 0xffffffffffffffff, // All 64 bits set
Group: 0,
},
},
},
{
name: "64 cpus group 1",
cpus: makeRange(64, 127),
want: map[int]*GroupAffinity{
1: {
Mask: 0xffffffffffffffff, // All 64 bits set
Group: 1,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, CpusToGroupAffinity(tt.cpus), "CpusToGroupAffinity(%v)", tt.cpus)
})
}
}
func Test_convertWinApiToCadvisorApi(t *testing.T) {
tests := []struct {
name string
buffer []byte
expectedNumOfCores int
expectedNumOfSockets int
expectedNodes []cadvisorapi.Node
wantErr bool
}{
{
name: "empty",
buffer: []byte{},
expectedNumOfCores: 0,
expectedNumOfSockets: 0,
expectedNodes: []cadvisorapi.Node{},
wantErr: false,
},
{
name: "single core",
buffer: createProcessorRelationships([]int{0}),
expectedNumOfCores: 1,
expectedNumOfSockets: 1,
expectedNodes: []cadvisorapi.Node{
{
Id: 0,
Cores: []cadvisorapi.Core{
{
Id: 1,
Threads: []int{0},
},
},
},
},
wantErr: false,
},
{
name: "single core, multiple cpus",
buffer: createProcessorRelationships([]int{0, 1, 2}),
expectedNumOfCores: 1,
expectedNumOfSockets: 1,
expectedNodes: []cadvisorapi.Node{
{
Id: 0,
Cores: []cadvisorapi.Core{
{
Id: 1,
Threads: []int{0, 1, 2},
},
},
},
},
wantErr: false,
},
{
name: "single core, multiple groups",
buffer: createProcessorRelationships([]int{0, 64}),
expectedNumOfCores: 1,
expectedNumOfSockets: 1,
expectedNodes: []cadvisorapi.Node{
{
Id: 0,
Cores: []cadvisorapi.Core{
{
Id: 1,
Threads: []int{0, 64},
},
},
},
},
wantErr: false,
},
{
name: "buffer to small",
buffer: createProcessorRelationships([]int{0, 64})[:48],
expectedNumOfCores: 1,
expectedNumOfSockets: 1,
expectedNodes: []cadvisorapi.Node{
{
Id: 0,
Cores: []cadvisorapi.Core{
{
Id: 1,
Threads: []int{0, 64},
},
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
numOfCores, numOfSockets, nodes, err := convertWinApiToCadvisorApi(tt.buffer)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equalf(t, tt.expectedNumOfCores, numOfCores, "num of cores")
assert.Equalf(t, tt.expectedNumOfSockets, numOfSockets, "num of sockets")
for node := range nodes {
assert.Equalf(t, tt.expectedNodes[node].Id, nodes[node].Id, "node id")
for core := range nodes[node].Cores {
assert.Equalf(t, tt.expectedNodes[node].Cores[core].Id, nodes[node].Cores[core].Id, "core id")
assert.Equalf(t, len(tt.expectedNodes[node].Cores[core].Threads), len(nodes[node].Cores[core].Threads), "num of threads")
for _, thread := range nodes[node].Cores[core].Threads {
assert.Truef(t, containsThread(tt.expectedNodes[node].Cores[core].Threads, thread), "thread %d", thread)
}
}
}
})
}
}
func containsThread(threads []int, thread int) bool {
for _, t := range threads {
if t == thread {
return true
}
}
return false
}
func genBuffer(infos ...systemLogicalProcessorInformationEx) []byte {
var buffer []byte
for _, info := range infos {
buffer = append(buffer, structToBytes(info)...)
}
return buffer
}
func createProcessorRelationships(cpus []int) []byte {
groups := CpusToGroupAffinity(cpus)
grouplen := len(groups)
groupAffinities := make([]GroupAffinity, 0, grouplen)
for _, group := range groups {
groupAffinities = append(groupAffinities, *group)
}
return genBuffer(systemLogicalProcessorInformationEx{
Relationship: uint32(relationProcessorCore),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: processorRelationship{
Flags: 0,
EfficiencyClass: 0,
Reserved: [20]byte{},
GroupCount: uint16(grouplen),
GroupMasks: groupAffinities,
},
}, systemLogicalProcessorInformationEx{
Relationship: uint32(relationNumaNode),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + NUMA_NODE_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: numaNodeRelationship{
NodeNumber: 0,
Reserved: [18]byte{},
GroupCount: uint16(grouplen),
GroupMasks: groupAffinities,
}}, systemLogicalProcessorInformationEx{
Relationship: uint32(relationProcessorPackage),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: processorRelationship{
Flags: 0,
EfficiencyClass: 0,
Reserved: [20]byte{},
GroupCount: uint16(grouplen),
GroupMasks: groupAffinities,
},
})
}
const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE = 8
const PROCESSOR_RELATIONSHIP_SIZE = 24
const NUMA_NODE_RELATIONSHIP_SIZE = 24
const GROUP_AFFINITY_SIZE = int(unsafe.Sizeof(GroupAffinity{})) // this one is known at compile time
func structToBytes(info systemLogicalProcessorInformationEx) []byte {
var pri []byte = (*(*[SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE]byte)(unsafe.Pointer(&info)))[:SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE]
switch info.data.(type) {
case processorRelationship:
rel := info.data.(processorRelationship)
var prBytes []byte = (*(*[PROCESSOR_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&rel)))[:PROCESSOR_RELATIONSHIP_SIZE]
pri = append(pri, prBytes...)
groupAffinities := rel.GroupMasks.([]GroupAffinity)
for _, groupAffinity := range groupAffinities {
var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:]
pri = append(pri, groupByte...)
}
case numaNodeRelationship:
numa := info.data.(numaNodeRelationship)
var nameBytes []byte = (*(*[NUMA_NODE_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&numa)))[:NUMA_NODE_RELATIONSHIP_SIZE]
pri = append(pri, nameBytes...)
groupAffinities := numa.GroupMasks.([]GroupAffinity)
for _, groupAffinity := range groupAffinities {
var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:]
pri = append(pri, groupByte...)
}
}
return pri
}

View File

@ -29,6 +29,9 @@ import (
"time"
"unsafe"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/pkg/errors"
"golang.org/x/sys/windows"
@ -177,15 +180,29 @@ func (p *perfCounterNodeStatsClient) getMachineInfo() (*cadvisorapi.MachineInfo,
return nil, err
}
return &cadvisorapi.MachineInfo{
mi := &cadvisorapi.MachineInfo{
NumCores: ProcessorCount(),
MemoryCapacity: p.nodeInfo.memoryPhysicalCapacityBytes,
MachineID: hostname,
SystemUUID: systemUUID,
BootID: bootId,
}, nil
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
numOfPysicalCores, numOfSockets, topology, err := processorInfo(relationAll)
if err != nil {
return nil, err
}
mi.NumPhysicalCores = numOfPysicalCores
mi.NumSockets = numOfSockets
mi.Topology = topology
}
return mi, nil
}
// ProcessorCount returns the number of logical processors on the system.
// runtime.NumCPU() will only return the information for a single Processor Group.
// Since a single group can only hold 64 logical processors, this
// means when there are more they will be divided into multiple groups.

View File

@ -1438,6 +1438,12 @@
lockToDefault: false
preRelease: Beta
version: "1.32"
- name: WindowsCPUAndMemoryAffinity
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: WindowsHostNetwork
versionedSpecs:
- default: true