respond to feedback

Signed-off-by: James Sturtevant <jsturtevant@gmail.com>
This commit is contained in:
James Sturtevant 2024-10-25 08:39:16 -07:00
parent 2c5a8c2618
commit 0c2357710a
No known key found for this signature in database
11 changed files with 316 additions and 172 deletions

View File

@ -130,6 +130,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
nodeConfig.TopologyManagerPolicy, nodeConfig.TopologyManagerPolicy,
nodeConfig.TopologyManagerScope, nodeConfig.TopologyManagerScope,
nodeConfig.TopologyManagerPolicyOptions) nodeConfig.TopologyManagerPolicyOptions)
if err != nil {
klog.ErrorS(err, "Failed to initialize topology manager")
return nil, err
}
klog.InfoS("Creating cpu manager") klog.InfoS("Creating cpu manager")
cm.cpuManager, err = cpumanager.NewManager( cm.cpuManager, err = cpumanager.NewManager(
@ -152,10 +156,6 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.cpuManager = cpumanager.NewFakeManager() cm.cpuManager = cpumanager.NewFakeManager()
} }
if err != nil {
return nil, err
}
klog.InfoS("Creating device plugin manager") klog.InfoS("Creating device plugin manager")
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager) cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
if err != nil { if err != nil {

View File

@ -19,11 +19,7 @@ package cpumanager
import ( import (
"context" "context"
"fmt" "fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"math" "math"
"runtime"
"sync" "sync"
"time" "time"
@ -532,37 +528,6 @@ func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.Container
return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name) return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
} }
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
if runtime.GOOS == "windows" && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
affinities := winstats.CpusToGroupAffinity(cpus.List())
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for _, affinity := range affinities {
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
return m.containerRuntime.UpdateContainerResources(ctx, containerID, &runtimeapi.ContainerResources{
Windows: &runtimeapi.WindowsContainerResources{
AffinityCpus: cpuGroupAffinities,
},
})
}
return m.containerRuntime.UpdateContainerResources(
ctx,
containerID,
&runtimeapi.ContainerResources{
Linux: &runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
},
})
}
func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet { func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
if result, ok := m.state.GetCPUSet(podUID, containerName); ok { if result, ok := m.state.GetCPUSet(podUID, containerName); ok {
return result return result

View File

@ -0,0 +1,43 @@
//go:build !windows
// +build !windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"context"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/cpuset"
)
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
ctx,
containerID,
&runtimeapi.ContainerResources{
Linux: &runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
},
})
}

View File

@ -19,8 +19,12 @@ package cpumanager
import ( import (
"context" "context"
"fmt" "fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"os" "os"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -263,6 +267,10 @@ func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1
} }
func TestCPUManagerAdd(t *testing.T) { func TestCPUManagerAdd(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy( testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{
NumCPUs: 4, NumCPUs: 4,
@ -347,6 +355,10 @@ func TestCPUManagerAdd(t *testing.T) {
} }
func TestCPUManagerAddWithInitContainers(t *testing.T) { func TestCPUManagerAddWithInitContainers(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct { testCases := []struct {
description string description string
topo *topology.CPUTopology topo *topology.CPUTopology
@ -598,6 +610,10 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
} }
func TestCPUManagerGenerate(t *testing.T) { func TestCPUManagerGenerate(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct { testCases := []struct {
description string description string
cpuPolicyName string cpuPolicyName string
@ -703,6 +719,10 @@ func TestCPUManagerGenerate(t *testing.T) {
} }
func TestCPUManagerRemove(t *testing.T) { func TestCPUManagerRemove(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
containerID := "fakeID" containerID := "fakeID"
containerMap := containermap.NewContainerMap() containerMap := containermap.NewContainerMap()
@ -746,6 +766,10 @@ func TestCPUManagerRemove(t *testing.T) {
} }
func TestReconcileState(t *testing.T) { func TestReconcileState(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy( testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{
NumCPUs: 8, NumCPUs: 8,
@ -1269,6 +1293,10 @@ func TestReconcileState(t *testing.T) {
// above test cases are without kubelet --reserved-cpus cmd option // above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured // the following tests are with --reserved-cpus configured
func TestCPUManagerAddWithResvList(t *testing.T) { func TestCPUManagerAddWithResvList(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testPolicy, _ := NewStaticPolicy( testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{
NumCPUs: 4, NumCPUs: 4,
@ -1343,6 +1371,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
} }
func TestCPUManagerHandlePolicyOptions(t *testing.T) { func TestCPUManagerHandlePolicyOptions(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
testCases := []struct { testCases := []struct {
description string description string
cpuPolicyName string cpuPolicyName string
@ -1409,6 +1441,10 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
} }
func TestCPUManagerGetAllocatableCPUs(t *testing.T) { func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
if runtime.GOOS == "windows" {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
nonePolicy, _ := NewNonePolicy(nil) nonePolicy, _ := NewNonePolicy(nil)
staticPolicy, _ := NewStaticPolicy( staticPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{ &topology.CPUTopology{

View File

@ -0,0 +1,49 @@
//go:build windows
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"context"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"k8s.io/utils/cpuset"
)
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
affinities := winstats.CpusToGroupAffinity(cpus.List())
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for _, affinity := range affinities {
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask),
})
}
return m.containerRuntime.UpdateContainerResources(ctx, containerID, &runtimeapi.ContainerResources{
Windows: &runtimeapi.WindowsContainerResources{
AffinityCpus: cpuGroupAffinities,
},
})
}
return nil
}

View File

@ -30,14 +30,12 @@ import (
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error { func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.Info("PreCreateContainer for Windows")
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name) allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() { if !allocatedCPUs.IsEmpty() {
klog.Infof("Setting CPU affinity for container %q cpus %v", container.Name, allocatedCPUs.String())
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List()) affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List())
for _, affinity := range affinities { for _, affinity := range affinities {
klog.Infof("Setting CPU affinity for container %q in group %v with mask %v (processors %v)", container.Name, affinity.Group, affinity.Mask, affinity.Processors()) klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors())
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{ cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
CpuGroup: uint32(affinity.Group), CpuGroup: uint32(affinity.Group),
CpuMask: uint64(affinity.Mask), CpuMask: uint64(affinity.Mask),

View File

@ -1421,7 +1421,7 @@ func (kl *Kubelet) setupDataDirs() error {
if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil { if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil {
return fmt.Errorf("error configuring root directory: %v", err) return fmt.Errorf("error configuring root directory: %v", err)
} }
if err := utilfs.MkdirAll(kl.getPodsDir(), 0750); err != nil { if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err) return fmt.Errorf("error creating pods directory: %v", err)
} }
if err := utilfs.MkdirAll(kl.getPluginsDir(), 0750); err != nil { if err := utilfs.MkdirAll(kl.getPluginsDir(), 0750); err != nil {

View File

@ -1,3 +1,22 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winstats package winstats
import ( import (
@ -13,41 +32,51 @@ var (
getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx") getNumaAvailableMemoryNodeEx = modkernel32.NewProc("GetNumaAvailableMemoryNodeEx")
) )
type RelationType int type relationType int
const ( const (
RelationProcessorCore RelationType = iota relationProcessorCore relationType = iota
RelationNumaNode relationNumaNode
RelationCache relationCache
RelationProcessorPackage relationProcessorPackage
RelationGroup relationGroup
RelationProcessorDie relationProcessorDie
RelationNumaNodeEx relationNumaNodeEx
RelationProcessorModule relationProcessorModule
RelationAll = 0xffff relationAll = 0xffff
) )
type SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX struct { type systemLogicalProcessorInformationEx struct {
Relationship uint32 Relationship uint32
Size uint32 Size uint32
data interface{} data interface{}
} }
type PROCESSOR_RELATIONSHIP struct { type processorRelationship struct {
Flags byte Flags byte
EfficiencyClass byte EfficiencyClass byte
Reserved [20]byte Reserved [20]byte
GroupCount uint16 GroupCount uint16
GroupMasks interface{} //[]GROUP_AFFINITY // in c++ this is a union of either one or many GROUP_AFFINITY based on GroupCount // groupMasks is an []GroupAffinity. In c++ this is a union of either one or many GroupAffinity based on GroupCount
GroupMasks interface{}
} }
type GROUP_AFFINITY struct { // GroupAffinity represents the processor group affinity of cpus
Mask uintptr // https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-group_affinity
type GroupAffinity struct {
Mask uint64
Group uint16 Group uint16
Reserved [3]uint16 Reserved [3]uint16
} }
func (a GROUP_AFFINITY) Processors() []int { // MaskString returns the affinity mask as a string of 0s and 1s
func (a GroupAffinity) MaskString() string {
return fmt.Sprintf("%064b", a.Mask)
}
// Processors returns a list of processors ids that are part of the affinity mask
// Windows doesn't track processors by ID but kubelet converts them to a number
func (a GroupAffinity) Processors() []int {
processors := []int{} processors := []int{}
for i := 0; i < 64; i++ { for i := 0; i < 64; i++ {
if a.Mask&(1<<i) != 0 { if a.Mask&(1<<i) != 0 {
@ -57,64 +86,31 @@ func (a GROUP_AFFINITY) Processors() []int {
return processors return processors
} }
func CpusToGroupAffinity(cpus []int) map[int]*GROUP_AFFINITY { // CpusToGroupAffinity converts a list of CPUs to a map of GroupAffinity split by windows CPU group.
groupAffinities := make(map[int]*GROUP_AFFINITY) // Windows doesn't track processors by ID but kubelet converts them to a number and this function goes in reverse.
func CpusToGroupAffinity(cpus []int) map[int]*GroupAffinity {
groupAffinities := make(map[int]*GroupAffinity)
for _, cpu := range cpus { for _, cpu := range cpus {
group := uint16(cpu / 64) group := uint16(cpu / 64)
groupaffinity, ok := groupAffinities[int(group)] groupAffinity, ok := groupAffinities[int(group)]
if !ok { if !ok {
groupaffinity = &GROUP_AFFINITY{ groupAffinity = &GroupAffinity{
Group: group, Group: group,
} }
groupAffinities[int(group)] = groupaffinity groupAffinities[int(group)] = groupAffinity
} }
mask := uintptr(1 << (cpu % 64)) mask := uint64(1 << (cpu % 64))
groupaffinity.Mask |= mask groupAffinity.Mask |= mask
} }
return groupAffinities return groupAffinities
} }
type NUMA_NODE_RELATIONSHIP struct { type numaNodeRelationship struct {
NodeNumber uint32 NodeNumber uint32
Reserved [18]byte Reserved [18]byte
GroupCount uint16 GroupCount uint16
GroupMasks interface{} //[]GROUP_AFFINITY // in c++ this is a union of either one or many GROUP_AFFINITY based on GroupCount GroupMasks interface{} //[]GroupAffinity in c++ this is a union of either one or many GroupAffinity based on GroupCount
}
type CACHE_RELATIONSHIP struct {
Level byte
Associativity byte
LineSize uint16
CacheSize uint32
Type PROCESSOR_CACHE_TYPE
Reserved [18]byte
GroupCount uint16
GroupMasks interface{} //interface{}[]GROUP_AFFINITY // in c++ this is a union of either one or many GROUP_AFFINITY based on GroupCount
}
type PROCESSOR_CACHE_TYPE int
const (
CacheUnified PROCESSOR_CACHE_TYPE = iota
CacheInstruction
CacheData
CacheTrace
CacheUnknown
)
type GROUP_RELATIONSHIP struct {
MaximumGroupCount uint16
ActiveGroupCount uint16
Reserved [20]byte
GroupInfo interface{} //[]PROCESSOR_GROUP_INFO
}
type PROCESSOR_GROUP_INFO struct {
MaximumProcessorCount byte
ActiveProcessorCount byte
Reserved [38]byte
ActiveProcessorMask uintptr
} }
type processor struct { type processor struct {
@ -123,7 +119,7 @@ type processor struct {
NodeID int NodeID int
} }
func processorInfo(relationShip RelationType) (int, int, []cadvisorapi.Node, error) { func processorInfo(relationShip relationType) (int, int, []cadvisorapi.Node, error) {
// Call once to get the length of data to return // Call once to get the length of data to return
var returnLength uint32 = 0 var returnLength uint32 = 0
r1, _, err := procGetLogicalProcessorInformationEx.Call( r1, _, err := procGetLogicalProcessorInformationEx.Call(
@ -132,7 +128,7 @@ func processorInfo(relationShip RelationType) (int, int, []cadvisorapi.Node, err
uintptr(unsafe.Pointer(&returnLength)), uintptr(unsafe.Pointer(&returnLength)),
) )
if r1 != 0 && err.(syscall.Errno) != syscall.ERROR_INSUFFICIENT_BUFFER { if r1 != 0 && err.(syscall.Errno) != syscall.ERROR_INSUFFICIENT_BUFFER {
return 0, 0, nil, fmt.Errorf("Call to GetLogicalProcessorInformationEx failed: %v", err) return 0, 0, nil, fmt.Errorf("call to GetLogicalProcessorInformationEx failed: %v", err)
} }
// Allocate the buffer with the length it should be // Allocate the buffer with the length it should be
@ -145,7 +141,7 @@ func processorInfo(relationShip RelationType) (int, int, []cadvisorapi.Node, err
uintptr(unsafe.Pointer(&returnLength)), uintptr(unsafe.Pointer(&returnLength)),
) )
if r1 == 0 { if r1 == 0 {
return 0, 0, nil, fmt.Errorf("Call to GetLogicalProcessorInformationEx failed: %v", err) return 0, 0, nil, fmt.Errorf("call to GetLogicalProcessorInformationEx failed: %v", err)
} }
return convertWinApiToCadvisorApi(buffer) return convertWinApiToCadvisorApi(buffer)
@ -156,23 +152,29 @@ func convertWinApiToCadvisorApi(buffer []byte) (int, int, []cadvisorapi.Node, er
numofSockets := 0 numofSockets := 0
numOfcores := 0 numOfcores := 0
nodes := []cadvisorapi.Node{} nodes := []cadvisorapi.Node{}
//iterate over the buffer casting it to the correct type
for offset := 0; offset < len(buffer); { for offset := 0; offset < len(buffer); {
//todo check if there is enough left in buffer to read system_logical_processor_information_ex? // check size in buffer to avoid out of bounds access, we don't know the type or size yet
info := (*SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)(unsafe.Pointer(&buffer[offset])) if offset+int(unsafe.Sizeof(systemLogicalProcessorInformationEx{})) > len(buffer) {
switch (RelationType)(info.Relationship) { return 0, 0, nil, fmt.Errorf("remaining buffer too small while reading windows processor relationship")
case RelationProcessorCore, RelationProcessorPackage: }
processorRelationship := (*PROCESSOR_RELATIONSHIP)(unsafe.Pointer(&info.data)) info := (*systemLogicalProcessorInformationEx)(unsafe.Pointer(&buffer[offset]))
groupMasks := make([]GROUP_AFFINITY, processorRelationship.GroupCount) // check one more time now that we know the size of the struct
for i := 0; i < int(processorRelationship.GroupCount); i++ { if offset+int(info.Size) > len(buffer) {
groupMasks[i] = *(*GROUP_AFFINITY)(unsafe.Pointer(uintptr(unsafe.Pointer(&processorRelationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GROUP_AFFINITY{}))) return 0, 0, nil, fmt.Errorf("remaining buffer too small while reading windows processor relationship")
}
switch (relationType)(info.Relationship) {
case relationProcessorCore, relationProcessorPackage:
relationship := (*processorRelationship)(unsafe.Pointer(&info.data))
groupMasks := make([]GroupAffinity, relationship.GroupCount)
for i := 0; i < int(relationship.GroupCount); i++ {
groupMasks[i] = *(*GroupAffinity)(unsafe.Pointer(uintptr(unsafe.Pointer(&relationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GroupAffinity{})))
} }
if RelationProcessorCore == (RelationType)(info.Relationship) { if relationProcessorCore == (relationType)(info.Relationship) {
numOfcores++ numOfcores++
} }
if RelationProcessorPackage == (RelationType)(info.Relationship) { if relationProcessorPackage == (relationType)(info.Relationship) {
numofSockets++ numofSockets++
} }
@ -184,20 +186,20 @@ func convertWinApiToCadvisorApi(buffer []byte) (int, int, []cadvisorapi.Node, er
p = &processor{} p = &processor{}
logicalProcessors[processorId] = p logicalProcessors[processorId] = p
} }
if RelationProcessorCore == (RelationType)(info.Relationship) { if relationProcessorCore == (relationType)(info.Relationship) {
p.CoreID = numOfcores p.CoreID = numOfcores
} }
if RelationProcessorPackage == (RelationType)(info.Relationship) { if relationProcessorPackage == (relationType)(info.Relationship) {
p.SocketID = numofSockets p.SocketID = numofSockets
} }
} }
} }
case RelationNumaNode, RelationNumaNodeEx: case relationNumaNode, relationNumaNodeEx:
numaNodeRelationship := (*NUMA_NODE_RELATIONSHIP)(unsafe.Pointer(&info.data)) numaNodeRelationship := (*numaNodeRelationship)(unsafe.Pointer(&info.data))
groupMasks := make([]GROUP_AFFINITY, numaNodeRelationship.GroupCount) groupMasks := make([]GroupAffinity, numaNodeRelationship.GroupCount)
for i := 0; i < int(numaNodeRelationship.GroupCount); i++ { for i := 0; i < int(numaNodeRelationship.GroupCount); i++ {
groupMasks[i] = *(*GROUP_AFFINITY)(unsafe.Pointer(uintptr(unsafe.Pointer(&numaNodeRelationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GROUP_AFFINITY{}))) groupMasks[i] = *(*GroupAffinity)(unsafe.Pointer(uintptr(unsafe.Pointer(&numaNodeRelationship.GroupMasks)) + uintptr(i)*unsafe.Sizeof(GroupAffinity{})))
} }
nodes = append(nodes, cadvisorapi.Node{Id: int(numaNodeRelationship.NodeNumber)}) nodes = append(nodes, cadvisorapi.Node{Id: int(numaNodeRelationship.NodeNumber)})
@ -213,12 +215,8 @@ func convertWinApiToCadvisorApi(buffer []byte) (int, int, []cadvisorapi.Node, er
} }
} }
case RelationCache:
//cacheRelationship := (*CACHE_RELATIONSHIP)(unsafe.Pointer(&info.data))
// TODO Process cache relationship data
default: default:
klog.V(4).Infof("Not using relationship type: %d", info.Relationship) klog.V(4).Infof("Not using Windows CPU relationship type: %d", info.Relationship)
} }
// Move the offset to the next SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX struct // Move the offset to the next SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX struct
@ -226,21 +224,20 @@ func convertWinApiToCadvisorApi(buffer []byte) (int, int, []cadvisorapi.Node, er
} }
for processId, p := range logicalProcessors { for processId, p := range logicalProcessors {
klog.V(4).Infof("Processor (%d): %v", processId, p)
node := nodes[p.NodeID] node := nodes[p.NodeID]
if node.Id != p.NodeID { if node.Id != p.NodeID {
return 0, 0, nil, fmt.Errorf("Node ID mismatch: %d != %d", node.Id, p.NodeID) return 0, 0, nil, fmt.Errorf("node ID mismatch: %d != %d", node.Id, p.NodeID)
} }
availableBytes := uint64(0) availableBytes := uint64(0)
r1, _, err := getNumaAvailableMemoryNodeEx.Call(uintptr(p.NodeID), uintptr(unsafe.Pointer(&availableBytes))) r1, _, err := getNumaAvailableMemoryNodeEx.Call(uintptr(p.NodeID), uintptr(unsafe.Pointer(&availableBytes)))
if r1 == 0 { if r1 == 0 {
return 0, 0, nil, fmt.Errorf("Call to GetNumaAvailableMemoryNodeEx failed: %v", err) return 0, 0, nil, fmt.Errorf("call to GetNumaAvailableMemoryNodeEx failed: %v", err)
} }
node.Memory = availableBytes node.Memory = availableBytes
node.AddThread(processId, p.CoreID) node.AddThread(processId, p.CoreID)
ok, coreIdx := node.FindCore(p.CoreID) ok, coreIdx := node.FindCore(p.CoreID)
if !ok { if !ok {
return 0, 0, nil, fmt.Errorf("Core not found: %d", p.CoreID) return 0, 0, nil, fmt.Errorf("core not found: %d", p.CoreID)
} }
node.Cores[coreIdx].SocketID = p.SocketID node.Cores[coreIdx].SocketID = p.SocketID
nodes[p.NodeID] = node nodes[p.NodeID] = node

View File

@ -1,7 +1,25 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winstats package winstats
import ( import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing" "testing"
@ -11,7 +29,7 @@ import (
func TestGROUP_AFFINITY_Processors(t *testing.T) { func TestGROUP_AFFINITY_Processors(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
Mask uintptr Mask uint64
Group uint16 Group uint16
want []int want []int
}{ }{
@ -78,7 +96,7 @@ func TestGROUP_AFFINITY_Processors(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
a := GROUP_AFFINITY{ a := GroupAffinity{
Mask: tt.Mask, Mask: tt.Mask,
Group: tt.Group, Group: tt.Group,
} }
@ -100,16 +118,16 @@ func TestCpusToGroupAffinity(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cpus []int cpus []int
want map[int]*GROUP_AFFINITY want map[int]*GroupAffinity
}{ }{
{ {
name: "empty", name: "empty",
want: map[int]*GROUP_AFFINITY{}, want: map[int]*GroupAffinity{},
}, },
{ {
name: "single cpu group 0", name: "single cpu group 0",
cpus: []int{0}, cpus: []int{0},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 1, Mask: 1,
Group: 0, Group: 0,
@ -119,7 +137,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "single cpu group 0", name: "single cpu group 0",
cpus: []int{63}, cpus: []int{63},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 1 << 63, Mask: 1 << 63,
Group: 0, Group: 0,
@ -129,7 +147,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "single cpu group 1", name: "single cpu group 1",
cpus: []int{64}, cpus: []int{64},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
1: { 1: {
Mask: 1, Mask: 1,
Group: 1, Group: 1,
@ -139,7 +157,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "multiple cpus same group", name: "multiple cpus same group",
cpus: []int{0, 1, 2}, cpus: []int{0, 1, 2},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 1 | 2 | 4, // Binary OR to combine the masks Mask: 1 | 2 | 4, // Binary OR to combine the masks
Group: 0, Group: 0,
@ -149,7 +167,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "multiple cpus different groups", name: "multiple cpus different groups",
cpus: []int{0, 64}, cpus: []int{0, 64},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 1, Mask: 1,
Group: 0, Group: 0,
@ -163,7 +181,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "multiple cpus different groups", name: "multiple cpus different groups",
cpus: []int{0, 1, 2, 64, 65, 66}, cpus: []int{0, 1, 2, 64, 65, 66},
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 1 | 2 | 4, Mask: 1 | 2 | 4,
Group: 0, Group: 0,
@ -177,7 +195,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "64 cpus group 0", name: "64 cpus group 0",
cpus: makeRange(0, 63), cpus: makeRange(0, 63),
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
0: { 0: {
Mask: 0xffffffffffffffff, // All 64 bits set Mask: 0xffffffffffffffff, // All 64 bits set
Group: 0, Group: 0,
@ -187,7 +205,7 @@ func TestCpusToGroupAffinity(t *testing.T) {
{ {
name: "64 cpus group 1", name: "64 cpus group 1",
cpus: makeRange(64, 127), cpus: makeRange(64, 127),
want: map[int]*GROUP_AFFINITY{ want: map[int]*GroupAffinity{
1: { 1: {
Mask: 0xffffffffffffffff, // All 64 bits set Mask: 0xffffffffffffffff, // All 64 bits set
Group: 1, Group: 1,
@ -209,7 +227,7 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
expectedNumOfCores int expectedNumOfCores int
expectedNumOfSockets int expectedNumOfSockets int
expectedNodes []cadvisorapi.Node expectedNodes []cadvisorapi.Node
wantErr assert.ErrorAssertionFunc wantErr bool
}{ }{
{ {
name: "empty", name: "empty",
@ -217,7 +235,7 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
expectedNumOfCores: 0, expectedNumOfCores: 0,
expectedNumOfSockets: 0, expectedNumOfSockets: 0,
expectedNodes: []cadvisorapi.Node{}, expectedNodes: []cadvisorapi.Node{},
wantErr: assert.NoError, wantErr: false,
}, },
{ {
name: "single core", name: "single core",
@ -235,7 +253,7 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
}, },
}, },
}, },
wantErr: assert.NoError, wantErr: false,
}, },
{ {
name: "single core, multiple cpus", name: "single core, multiple cpus",
@ -253,7 +271,7 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
}, },
}, },
}, },
wantErr: assert.NoError, wantErr: false,
}, },
{ {
name: "single core, multiple groups", name: "single core, multiple groups",
@ -271,13 +289,32 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
}, },
}, },
}, },
wantErr: assert.NoError, wantErr: false,
},
{
name: "buffer to small",
buffer: createProcessorRelationships([]int{0, 64})[:48],
expectedNumOfCores: 1,
expectedNumOfSockets: 1,
expectedNodes: []cadvisorapi.Node{
{
Id: 0,
Cores: []cadvisorapi.Core{
{
Id: 1,
Threads: []int{0, 64},
},
},
},
},
wantErr: true,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
numOfCores, numOfSockets, nodes, err := convertWinApiToCadvisorApi(tt.buffer) numOfCores, numOfSockets, nodes, err := convertWinApiToCadvisorApi(tt.buffer)
if !tt.wantErr(t, err, fmt.Sprintf("convertWinApiToCadvisorApi(%v)", tt.name)) { if tt.wantErr {
assert.Error(t, err)
return return
} }
assert.Equalf(t, tt.expectedNumOfCores, numOfCores, "num of cores") assert.Equalf(t, tt.expectedNumOfCores, numOfCores, "num of cores")
@ -286,14 +323,26 @@ func Test_convertWinApiToCadvisorApi(t *testing.T) {
assert.Equalf(t, tt.expectedNodes[node].Id, nodes[node].Id, "node id") assert.Equalf(t, tt.expectedNodes[node].Id, nodes[node].Id, "node id")
for core := range nodes[node].Cores { for core := range nodes[node].Cores {
assert.Equalf(t, tt.expectedNodes[node].Cores[core].Id, nodes[node].Cores[core].Id, "core id") assert.Equalf(t, tt.expectedNodes[node].Cores[core].Id, nodes[node].Cores[core].Id, "core id")
assert.Equalf(t, tt.expectedNodes[node].Cores[core].Threads, nodes[node].Cores[core].Threads, "threads") assert.Equalf(t, len(tt.expectedNodes[node].Cores[core].Threads), len(nodes[node].Cores[core].Threads), "num of threads")
for _, thread := range nodes[node].Cores[core].Threads {
assert.Truef(t, containsThread(tt.expectedNodes[node].Cores[core].Threads, thread), "thread %d", thread)
}
} }
} }
}) })
} }
} }
func genbuffer(infos ...SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) []byte { func containsThread(threads []int, thread int) bool {
for _, t := range threads {
if t == thread {
return true
}
}
return false
}
func genBuffer(infos ...systemLogicalProcessorInformationEx) []byte {
var buffer []byte var buffer []byte
for _, info := range infos { for _, info := range infos {
buffer = append(buffer, structToBytes(info)...) buffer = append(buffer, structToBytes(info)...)
@ -304,32 +353,32 @@ func genbuffer(infos ...SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) []byte {
func createProcessorRelationships(cpus []int) []byte { func createProcessorRelationships(cpus []int) []byte {
groups := CpusToGroupAffinity(cpus) groups := CpusToGroupAffinity(cpus)
grouplen := len(groups) grouplen := len(groups)
groupAffinities := make([]GROUP_AFFINITY, 0, grouplen) groupAffinities := make([]GroupAffinity, 0, grouplen)
for _, group := range groups { for _, group := range groups {
groupAffinities = append(groupAffinities, *group) groupAffinities = append(groupAffinities, *group)
} }
return genbuffer(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX{ return genBuffer(systemLogicalProcessorInformationEx{
Relationship: uint32(RelationProcessorCore), Relationship: uint32(relationProcessorCore),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)), Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: PROCESSOR_RELATIONSHIP{ data: processorRelationship{
Flags: 0, Flags: 0,
EfficiencyClass: 0, EfficiencyClass: 0,
Reserved: [20]byte{}, Reserved: [20]byte{},
GroupCount: uint16(grouplen), GroupCount: uint16(grouplen),
GroupMasks: groupAffinities, GroupMasks: groupAffinities,
}, },
}, SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX{ }, systemLogicalProcessorInformationEx{
Relationship: uint32(RelationNumaNode), Relationship: uint32(relationNumaNode),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + NUMA_NODE_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)), Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + NUMA_NODE_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: NUMA_NODE_RELATIONSHIP{ data: numaNodeRelationship{
NodeNumber: 0, NodeNumber: 0,
Reserved: [18]byte{}, Reserved: [18]byte{},
GroupCount: uint16(grouplen), GroupCount: uint16(grouplen),
GroupMasks: groupAffinities, GroupMasks: groupAffinities,
}}, SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX{ }}, systemLogicalProcessorInformationEx{
Relationship: uint32(RelationProcessorPackage), Relationship: uint32(relationProcessorPackage),
Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)), Size: uint32(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE + PROCESSOR_RELATIONSHIP_SIZE + (GROUP_AFFINITY_SIZE * grouplen)),
data: PROCESSOR_RELATIONSHIP{ data: processorRelationship{
Flags: 0, Flags: 0,
EfficiencyClass: 0, EfficiencyClass: 0,
Reserved: [20]byte{}, Reserved: [20]byte{},
@ -342,29 +391,29 @@ func createProcessorRelationships(cpus []int) []byte {
const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE = 8 const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE = 8
const PROCESSOR_RELATIONSHIP_SIZE = 24 const PROCESSOR_RELATIONSHIP_SIZE = 24
const NUMA_NODE_RELATIONSHIP_SIZE = 24 const NUMA_NODE_RELATIONSHIP_SIZE = 24
const GROUP_AFFINITY_SIZE = int(unsafe.Sizeof(GROUP_AFFINITY{})) // this one is known at compile time const GROUP_AFFINITY_SIZE = int(unsafe.Sizeof(GroupAffinity{})) // this one is known at compile time
func structToBytes(info SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) []byte { func structToBytes(info systemLogicalProcessorInformationEx) []byte {
var pri []byte = (*(*[SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE]byte)(unsafe.Pointer(&info)))[:SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE] var pri []byte = (*(*[SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE]byte)(unsafe.Pointer(&info)))[:SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_SIZE]
switch info.data.(type) { switch info.data.(type) {
case PROCESSOR_RELATIONSHIP: case processorRelationship:
rel := info.data.(PROCESSOR_RELATIONSHIP) rel := info.data.(processorRelationship)
var prBytes []byte = (*(*[PROCESSOR_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&rel)))[:PROCESSOR_RELATIONSHIP_SIZE] var prBytes []byte = (*(*[PROCESSOR_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&rel)))[:PROCESSOR_RELATIONSHIP_SIZE]
pri = append(pri, prBytes...) pri = append(pri, prBytes...)
groupAffinities := rel.GroupMasks.([]GROUP_AFFINITY) groupAffinities := rel.GroupMasks.([]GroupAffinity)
for _, groupAffinity := range groupAffinities { for _, groupAffinity := range groupAffinities {
var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:] var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:]
pri = append(pri, groupByte...) pri = append(pri, groupByte...)
} }
case NUMA_NODE_RELATIONSHIP: case numaNodeRelationship:
numa := info.data.(NUMA_NODE_RELATIONSHIP) numa := info.data.(numaNodeRelationship)
var nameBytes []byte = (*(*[NUMA_NODE_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&numa)))[:NUMA_NODE_RELATIONSHIP_SIZE] var nameBytes []byte = (*(*[NUMA_NODE_RELATIONSHIP_SIZE]byte)(unsafe.Pointer(&numa)))[:NUMA_NODE_RELATIONSHIP_SIZE]
pri = append(pri, nameBytes...) pri = append(pri, nameBytes...)
groupAffinities := numa.GroupMasks.([]GROUP_AFFINITY) groupAffinities := numa.GroupMasks.([]GroupAffinity)
for _, groupAffinity := range groupAffinities { for _, groupAffinity := range groupAffinities {
var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:] var groupByte []byte = (*(*[GROUP_AFFINITY_SIZE]byte)(unsafe.Pointer(&groupAffinity)))[:]

View File

@ -188,7 +188,7 @@ func (p *perfCounterNodeStatsClient) getMachineInfo() (*cadvisorapi.MachineInfo,
} }
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
numOfPysicalCores, numOfSockets, topology, err := processorInfo(RelationAll) numOfPysicalCores, numOfSockets, topology, err := processorInfo(relationAll)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -201,6 +201,7 @@ func (p *perfCounterNodeStatsClient) getMachineInfo() (*cadvisorapi.MachineInfo,
return mi, nil return mi, nil
} }
// ProcessorCount returns the number of logical processors on the system.
// runtime.NumCPU() will only return the information for a single Processor Group. // runtime.NumCPU() will only return the information for a single Processor Group.
// Since a single group can only hold 64 logical processors, this // Since a single group can only hold 64 logical processors, this
// means when there are more they will be divided into multiple groups. // means when there are more they will be divided into multiple groups.

View File

@ -1396,6 +1396,12 @@
lockToDefault: false lockToDefault: false
preRelease: Beta preRelease: Beta
version: "1.32" version: "1.32"
- name: WindowsCPUAndMemoryAffinity
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: WindowsHostNetwork - name: WindowsHostNetwork
versionedSpecs: versionedSpecs:
- default: true - default: true