Add Windows Affinity Featuregate

Signed-off-by: James Sturtevant <jsturtevant@gmail.com>
This commit is contained in:
James Sturtevant 2024-10-21 13:59:35 -07:00
parent 4d25c25eb0
commit 2c5a8c2618
No known key found for this signature in database
7 changed files with 82 additions and 94 deletions

View File

@ -693,6 +693,12 @@ const (
// Allows kube-proxy to run in Overlay mode for Windows
WinOverlay featuregate.Feature = "WinOverlay"
// owner: @jsturtevant
// kep: https://kep.k8s.io/4888
//
// Enables the Windows GMSA feature.
WindowsCPUAndMemoryAffinity featuregate.Feature = "WindowsCPUAndMemoryAffinity"
// owner: @marosset
// kep: https://kep.k8s.io/3503
//

View File

@ -764,6 +764,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.20"), Default: true, PreRelease: featuregate.Beta},
},
WindowsCPUAndMemoryAffinity: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
WindowsHostNetwork: {
{Version: version.MustParse("1.26"), Default: true, PreRelease: featuregate.Alpha},
},

View File

@ -27,6 +27,7 @@ import (
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"sync"
"k8s.io/klog/v2"
@ -42,7 +43,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -65,7 +65,6 @@ type containerManagerImpl struct {
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
nodeInfo *v1.Node
sync.RWMutex
}
@ -93,17 +92,10 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize CPU manager
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
return fmt.Errorf("start cpu manager error: %v", err)
}
}
@ -132,11 +124,33 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cadvisorInterface: cadvisorInterface,
}
klog.InfoS("Creating topology manager")
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
nodeConfig.TopologyManagerPolicy,
nodeConfig.TopologyManagerScope,
nodeConfig.TopologyManagerPolicyOptions)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.InfoS("Creating topology manager")
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
nodeConfig.TopologyManagerPolicy,
nodeConfig.TopologyManagerScope,
nodeConfig.TopologyManagerPolicyOptions)
klog.InfoS("Creating cpu manager")
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
}
if err != nil {
return nil, err
@ -149,40 +163,6 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
klog.InfoS("Creating cpu manager")
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
klog.InfoS("Creating memory manager")
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
}
return cm, nil
}
@ -284,7 +264,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
}
func (cm *containerManagerImpl) GetPodCgroupRoot() string {
@ -312,17 +292,23 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() {
}
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
}
return []int64{}
}
return []int64{}
return nil
}
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
if cm.cpuManager != nil {
return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
}
return []int64{}
}
return []int64{}
return nil
}
func (cm *containerManagerImpl) GetMemory(_, _ string) []*podresourcesapi.ContainerMemory {

View File

@ -19,6 +19,8 @@ package cpumanager
import (
"context"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
"math"
"runtime"
@ -267,7 +269,6 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
defer m.Unlock()
// Call down into the policy to assign this container CPUs if required.
klog.InfoS("jjs allocate call")
err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.ErrorS(err, "Allocate error")
@ -537,9 +538,7 @@ func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string,
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
if runtime.GOOS == "windows" {
klog.Info("Updating windows CPU affinity")
if runtime.GOOS == "windows" && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
affinities := winstats.CpusToGroupAffinity(cpus.List())
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
for _, affinity := range affinities {

View File

@ -300,10 +300,8 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
}
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
klog.InfoS("jjs Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
numCPUs := p.guaranteedCPUs(pod, container)
if numCPUs == 0 {
klog.InfoS("shared pool", "pod", klog.KObj(pod), "containerName", container.Name)
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
@ -404,7 +402,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
}
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.InfoS("CPUSet", "numCPUs", numCPUs, "socket", numaAffinity)
klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
@ -442,7 +440,6 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
klog.InfoS("Not guaranteed", "pod", pod.UID, "containerName", container.Name)
return 0
}
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]

View File

@ -21,15 +21,16 @@ package cm
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/winstats"
)
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
klog.Info("PreCreateContainer for Windows")
if i.cpuManager != nil {
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.Info("PreCreateContainer for Windows")
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
klog.Infof("Setting CPU affinity for container %q cpus %v", container.Name, allocatedCPUs.String())
@ -46,17 +47,5 @@ func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, contain
containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities
}
}
if i.memoryManager != nil {
numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container)
if numaNodes.Len() > 0 {
var affinity []int64
for _, numaNode := range sets.List(numaNodes) {
affinity = append(affinity, int64(numaNode))
}
klog.Info("Setting memory NUMA nodes for container")
containerConfig.Windows.Resources.AffinityPrefferedNumaNodes = affinity
}
}
return nil
}

View File

@ -20,6 +20,8 @@ limitations under the License.
package winstats
import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"os"
"runtime"
"strconv"
@ -177,21 +179,26 @@ func (p *perfCounterNodeStatsClient) getMachineInfo() (*cadvisorapi.MachineInfo,
return nil, err
}
numOfPysicalCores, numOfSockets, topology, err := processorInfo(RelationAll)
if err != nil {
return nil, err
mi := &cadvisorapi.MachineInfo{
NumCores: ProcessorCount(),
MemoryCapacity: p.nodeInfo.memoryPhysicalCapacityBytes,
MachineID: hostname,
SystemUUID: systemUUID,
BootID: bootId,
}
return &cadvisorapi.MachineInfo{
NumCores: ProcessorCount(),
NumSockets: numOfSockets,
NumPhysicalCores: numOfPysicalCores,
MemoryCapacity: p.nodeInfo.memoryPhysicalCapacityBytes,
MachineID: hostname,
SystemUUID: systemUUID,
BootID: bootId,
Topology: topology,
}, nil
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
numOfPysicalCores, numOfSockets, topology, err := processorInfo(RelationAll)
if err != nil {
return nil, err
}
mi.NumPhysicalCores = numOfPysicalCores
mi.NumSockets = numOfSockets
mi.Topology = topology
}
return mi, nil
}
// runtime.NumCPU() will only return the information for a single Processor Group.