Merge pull request #92967 from cezaryzukowski/tm-scope

Topology Manager Scope (container | pod) Feature
This commit is contained in:
Kubernetes Prow Robot 2020-11-12 06:32:51 -08:00 committed by GitHub
commit 75463bda1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 2912 additions and 929 deletions

View File

@ -537,6 +537,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig
fs.Int32Var(&c.PodsPerCore, "pods-per-core", c.PodsPerCore, "Number of Pods per core that can run on this Kubelet. The total number of Pods on this Kubelet cannot exceed max-pods, so max-pods will be used if this calculation results in a larger number of Pods allowed on the Kubelet. A value of 0 disables this limit.")
fs.BoolVar(&c.ProtectKernelDefaults, "protect-kernel-defaults", c.ProtectKernelDefaults, "Default kubelet behaviour for kernel tuning. If set, kubelet errors if any of kernel tunables is different than kubelet defaults.")
fs.StringVar(&c.ReservedSystemCPUs, "reserved-cpus", c.ReservedSystemCPUs, "A comma-separated list of CPUs or CPU ranges that are reserved for system and kubernetes usage. This specific list will supersede cpu counts in --system-reserved and --kube-reserved.")
fs.StringVar(&c.TopologyManagerScope, "topology-manager-scope", c.TopologyManagerScope, "Scope to which topology hints applied. Topology Manager collects hints from Hint Providers and applies them to defined scope to ensure the pod admission. Possible values: 'container' (default), 'pod'.")
// Node Allocatable Flags
fs.Var(cliflag.NewMapStringString(&c.SystemReserved), "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")
fs.Var(cliflag.NewMapStringString(&c.KubeReserved), "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for kubernetes system components. Currently cpu, memory and local ephemeral storage for root file system are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")

View File

@ -738,6 +738,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,

View File

@ -70,6 +70,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency
obj.NodeStatusMaxImages = 50
obj.TopologyManagerPolicy = kubeletconfig.NoneTopologyManagerPolicy
obj.TopologyManagerScope = kubeletconfig.ContainerTopologyManagerScope
obj.QOSReserved = map[string]string{
"memory": "50%",
}

View File

@ -152,6 +152,7 @@ var (
"CPUManagerPolicy",
"CPUManagerReconcilePeriod.Duration",
"TopologyManagerPolicy",
"TopologyManagerScope",
"QOSReserved[*]",
"CgroupDriver",
"CgroupRoot",

View File

@ -70,5 +70,6 @@ serializeImagePulls: true
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
topologyManagerPolicy: none
topologyManagerScope: container
volumePluginDir: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/
volumeStatsAggPeriod: 1m0s

View File

@ -70,5 +70,6 @@ serializeImagePulls: true
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
topologyManagerPolicy: none
topologyManagerScope: container
volumePluginDir: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/
volumeStatsAggPeriod: 1m0s

View File

@ -61,12 +61,18 @@ const (
// BestEffortTopologyManagerPolicy is a mode in which kubelet will favour
// pods with NUMA alignment of CPU and device resources.
BestEffortTopologyManagerPolicy = "best-effort"
// NoneTopologyManager Policy is a mode in which kubelet has no knowledge
// NoneTopologyManagerPolicy is a mode in which kubelet has no knowledge
// of NUMA alignment of a pod's CPU and device resources.
NoneTopologyManagerPolicy = "none"
// SingleNumaNodeTopologyManager Policy iis a mode in which kubelet only allows
// SingleNumaNodeTopologyManagerPolicy is a mode in which kubelet only allows
// pods with a single NUMA alignment of CPU and device resources.
SingleNumaNodeTopologyManager = "single-numa-node"
SingleNumaNodeTopologyManagerPolicy = "single-numa-node"
// ContainerTopologyManagerScope represents that
// topology policy is applied on a per-container basis.
ContainerTopologyManagerScope = "container"
// PodTopologyManagerScope represents that
// topology policy is applied on a per-pod basis.
PodTopologyManagerScope = "pod"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -221,6 +227,12 @@ type KubeletConfiguration struct {
// TopologyManagerPolicy is the name of the policy to use.
// Policies other than "none" require the TopologyManager feature gate to be enabled.
TopologyManagerPolicy string
// TopologyManagerScope represents the scope of topology hint generation
// that topology manager requests and hint providers generate.
// "pod" scope requires the TopologyManager feature gate to be enabled.
// Default: "container"
// +optional
TopologyManagerScope string
// Map of QoS resource reservation percentages (memory only for now).
// Requires the QOSReserved feature gate to be enabled.
QOSReserved map[string]string

View File

@ -157,6 +157,9 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
if obj.TopologyManagerPolicy == "" {
obj.TopologyManagerPolicy = kubeletconfigv1beta1.NoneTopologyManagerPolicy
}
if obj.TopologyManagerScope == "" {
obj.TopologyManagerScope = kubeletconfigv1beta1.ContainerTopologyManagerScope
}
if obj.RuntimeRequestTimeout == zeroDuration {
obj.RuntimeRequestTimeout = metav1.Duration{Duration: 2 * time.Minute}
}

View File

@ -275,6 +275,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.TopologyManagerPolicy = in.TopologyManagerPolicy
out.TopologyManagerScope = in.TopologyManagerScope
out.QOSReserved = *(*map[string]string)(unsafe.Pointer(&in.QOSReserved))
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
out.HairpinMode = in.HairpinMode
@ -427,6 +428,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.TopologyManagerPolicy = in.TopologyManagerPolicy
out.TopologyManagerScope = in.TopologyManagerScope
out.QOSReserved = *(*map[string]string)(unsafe.Pointer(&in.QOSReserved))
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
out.HairpinMode = in.HairpinMode

View File

@ -123,8 +123,23 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error
allErrors = append(allErrors, fmt.Errorf("invalid configuration: serverTLSBootstrap %v requires feature gate RotateKubeletServerCertificate", kc.ServerTLSBootstrap))
}
if kc.TopologyManagerPolicy != kubeletconfig.NoneTopologyManagerPolicy && !localFeatureGate.Enabled(features.TopologyManager) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManager %v requires feature gate TopologyManager", kc.TopologyManagerPolicy))
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManagerPolicy %v requires feature gate TopologyManager", kc.TopologyManagerPolicy))
}
switch kc.TopologyManagerPolicy {
case kubeletconfig.NoneTopologyManagerPolicy:
case kubeletconfig.BestEffortTopologyManagerPolicy:
case kubeletconfig.RestrictedTopologyManagerPolicy:
case kubeletconfig.SingleNumaNodeTopologyManagerPolicy:
default:
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManagerPolicy non-allowable value: %v", kc.TopologyManagerPolicy))
}
if kc.TopologyManagerScope != kubeletconfig.ContainerTopologyManagerScope && !localFeatureGate.Enabled(features.TopologyManager) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManagerScope %v requires feature gate TopologyManager", kc.TopologyManagerScope))
}
if kc.TopologyManagerScope != kubeletconfig.ContainerTopologyManagerScope && kc.TopologyManagerScope != kubeletconfig.PodTopologyManagerScope {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManagerScope non-allowable value: %v", kc.TopologyManagerScope))
}
for _, val := range kc.EnforceNodeAllocatable {
switch val {
case kubetypes.NodeAllocatableEnforcementKey:

View File

@ -53,6 +53,8 @@ func TestValidateKubeletConfiguration(t *testing.T) {
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 25 * time.Millisecond},
TopologyManagerScope: kubeletconfig.PodTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.SingleNumaNodeTopologyManagerPolicy,
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
},
@ -89,6 +91,8 @@ func TestValidateKubeletConfiguration(t *testing.T) {
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: kubeletconfig.ContainerTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.NoneTopologyManagerPolicy,
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
},
@ -123,7 +127,7 @@ func TestValidateKubeletConfiguration(t *testing.T) {
NodeLeaseDurationSeconds: -1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
}
const numErrsErrorCase1 = 25
const numErrsErrorCase1 = 27
if allErrors := ValidateKubeletConfiguration(errorCase1); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase1 {
t.Errorf("expect %d errors, got %v", numErrsErrorCase1, len(allErrors.(utilerrors.Aggregate).Errors()))
}
@ -156,11 +160,13 @@ func TestValidateKubeletConfiguration(t *testing.T) {
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: "invalid",
TopologyManagerPolicy: "invalid",
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
},
}
const numErrsErrorCase2 = 1
const numErrsErrorCase2 = 3
if allErrors := ValidateKubeletConfiguration(errorCase2); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase2 {
t.Errorf("expect %d errors, got %v", numErrsErrorCase2, len(allErrors.(utilerrors.Aggregate).Errors()))
}

View File

@ -133,6 +133,7 @@ type NodeConfig struct {
NodeAllocatableConfig
QOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
ExperimentalTopologyManagerScope string
ExperimentalCPUManagerReconcilePeriod time.Duration
ExperimentalPodPidsLimit int64
EnforceCPULimits bool

View File

@ -300,13 +300,14 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager, err = topologymanager.NewManager(
machineInfo.Topology,
nodeConfig.ExperimentalTopologyManagerPolicy,
nodeConfig.ExperimentalTopologyManagerScope,
)
if err != nil {
return nil, err
}
klog.Infof("[topologymanager] Initializing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy)
klog.Infof("[topologymanager] Initializing Topology Manager with %s policy and %s-level scope", nodeConfig.ExperimentalTopologyManagerPolicy, nodeConfig.ExperimentalTopologyManagerScope)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
}

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",

View File

@ -81,6 +81,11 @@ type Manager interface {
// GetCPUs implements the podresources.CPUsProvider interface to provide allocated
// cpus for the container
GetCPUs(podUID, containerName string) []int64
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
}
type manager struct {
@ -304,6 +309,13 @@ func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[str
return m.policy.GetTopologyHints(m.state, pod, container)
}
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetPodTopologyHints(m.state, pod)
}
type reconciledContainer struct {
podName string
containerName string

View File

@ -116,6 +116,10 @@ func (p *mockPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.
return nil
}
func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
type mockRuntimeService struct {
err error
}

View File

@ -56,7 +56,12 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
}
func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake cpumanager] Get Topology Hints")
klog.Infof("[fake cpumanager] Get Container Topology Hints")
return map[string][]topologymanager.TopologyHint{}
}
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake cpumanager] Get Pod Topology Hints")
return map[string][]topologymanager.TopologyHint{}
}

View File

@ -34,4 +34,8 @@ type Policy interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
}

View File

@ -55,3 +55,7 @@ func (p *nonePolicy) RemoveContainer(s state.State, podUID string, containerName
func (p *nonePolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return nil
}
func (p *nonePolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
// PolicyStatic is the name of the static policy
@ -217,23 +218,23 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", format.Pod(pod), container.Name)
// container belongs in an exclusively allocated pool
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", format.Pod(pod), container.Name)
return nil
}
// Call Topology Manager to get the aligned socket affinity across all hint providers.
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", format.Pod(pod), container.Name, hint)
// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, format.Pod(pod), container.Name, err)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
@ -308,20 +309,41 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
return int(cpuQuantity.Value())
}
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// If there are no CPU resources requested for this container, we do not
// generate any topology hints.
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
return nil
func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
// The maximum of requested CPUs by init containers.
requestedByInitContainers := 0
for _, container := range pod.Spec.InitContainers {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
continue
}
requestedCPU := p.guaranteedCPUs(pod, &container)
if requestedCPU > requestedByInitContainers {
requestedByInitContainers = requestedCPU
}
}
// The sum of requested CPUs by app containers.
requestedByAppContainers := 0
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
continue
}
requestedByAppContainers += p.guaranteedCPUs(pod, &container)
}
if requestedByInitContainers > requestedByAppContainers {
return requestedByInitContainers
}
return requestedByAppContainers
}
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Get a count of how many guaranteed CPUs have been requested.
requested := p.guaranteedCPUs(pod, container)
// If there are no guaranteed CPUs being requested, we do not generate
// any topology hints. This can happen, for example, because init
// containers don't have to have guaranteed CPUs in order for the pod
// to still be in the Guaranteed QOS tier.
// Number of required CPUs is not an integer or a container is not part of the Guaranteed QoS class.
// It will be treated by the TopologyManager as having no preference and cause it to ignore this
// resource when considering pod alignment.
// In terms of hints, this is equal to: TopologyHints[NUMANodeAffinity: nil, Preferred: true].
if requested == 0 {
return nil
}
@ -331,12 +353,15 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
// kubelet restart, for example.
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
if allocated.Size() != requested {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size())
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requested, allocated.Size())
// An empty list of hints will be treated as a preference that cannot be satisfied.
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): {},
}
}
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name)
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", format.Pod(pod), container.Name)
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, cpuset.CPUSet{}, requested),
}
@ -344,11 +369,72 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
// Get a list of available CPUs.
available := p.assignableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.
reusable := p.cpusToReuse[string(pod.UID)]
// Generate hints.
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", pod.Name, container.Name, cpuHints)
klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", format.Pod(pod), container.Name, cpuHints)
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): cpuHints,
}
}
func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// Get a count of how many guaranteed CPUs have been requested by Pod.
requested := p.podGuaranteedCPUs(pod)
// Number of required CPUs is not an integer or a pod is not part of the Guaranteed QoS class.
// It will be treated by the TopologyManager as having no preference and cause it to ignore this
// resource when considering pod alignment.
// In terms of hints, this is equal to: TopologyHints[NUMANodeAffinity: nil, Preferred: true].
if requested == 0 {
return nil
}
assignedCPUs := cpuset.NewCPUSet()
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
requestedByContainer := p.guaranteedCPUs(pod, &container)
// Short circuit to regenerate the same hints if there are already
// guaranteed CPUs allocated to the Container. This might happen after a
// kubelet restart, for example.
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
if allocated.Size() != requestedByContainer {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requestedByContainer, allocated.Size())
// An empty list of hints will be treated as a preference that cannot be satisfied.
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): {},
}
}
// A set of CPUs already assigned to containers in this pod
assignedCPUs = assignedCPUs.Union(allocated)
}
}
if assignedCPUs.Size() == requested {
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to pod %v", format.Pod(pod))
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): p.generateCPUTopologyHints(assignedCPUs, cpuset.CPUSet{}, requested),
}
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.
reusable := p.cpusToReuse[string(pod.UID)]
// Ensure any CPUs already assigned to containers in this pod are included as part of the hint generation.
reusable = reusable.Union(assignedCPUs)
// Generate hints.
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
klog.Infof("[cpumanager] TopologyHints generated for pod '%v' : %v", format.Pod(pod), cpuHints)
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): cpuHints,

View File

@ -31,21 +31,17 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
)
func TestGetTopologyHints(t *testing.T) {
testPod1 := makePod("fakePod", "fakeContainer", "2", "2")
testContainer1 := &testPod1.Spec.Containers[0]
testPod2 := makePod("fakePod", "fakeContainer", "5", "5")
testContainer2 := &testPod2.Spec.Containers[0]
testPod3 := makePod("fakePod", "fakeContainer", "7", "7")
testContainer3 := &testPod3.Spec.Containers[0]
testPod4 := makePod("fakePod", "fakeContainer", "11", "11")
testContainer4 := &testPod4.Spec.Containers[0]
type testCase struct {
name string
pod v1.Pod
container v1.Container
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
expectedHints []topologymanager.TopologyHint
}
firstSocketMask, _ := bitmask.NewBitMask(0)
secondSocketMask, _ := bitmask.NewBitMask(1)
crossSocketMask, _ := bitmask.NewBitMask(0, 1)
machineInfo := cadvisorapi.MachineInfo{
func returnMachineInfo() cadvisorapi.MachineInfo {
return cadvisorapi.MachineInfo{
NumCores: 12,
Topology: []cadvisorapi.Node{
{Id: 0,
@ -64,15 +60,209 @@ func TestGetTopologyHints(t *testing.T) {
},
},
}
}
func TestPodGuaranteedCPUs(t *testing.T) {
CPUs := [][]struct {
request string
limit string
}{
{
{request: "0", limit: "0"},
},
{
{request: "2", limit: "2"},
},
{
{request: "5", limit: "5"},
},
{
{request: "2", limit: "2"},
{request: "4", limit: "4"},
},
}
// tc for not guaranteed Pod
testPod1 := makeMultiContainerPod(CPUs[0], CPUs[0])
testPod2 := makeMultiContainerPod(CPUs[0], CPUs[1])
testPod3 := makeMultiContainerPod(CPUs[1], CPUs[0])
// tc for guaranteed Pod
testPod4 := makeMultiContainerPod(CPUs[1], CPUs[1])
testPod5 := makeMultiContainerPod(CPUs[2], CPUs[2])
// tc for comparing init containers and user containers
testPod6 := makeMultiContainerPod(CPUs[1], CPUs[2])
testPod7 := makeMultiContainerPod(CPUs[2], CPUs[1])
// tc for multi containers
testPod8 := makeMultiContainerPod(CPUs[3], CPUs[3])
p := staticPolicy{}
tcases := []struct {
name string
pod v1.Pod
container v1.Container
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
expectedHints []topologymanager.TopologyHint
name string
pod *v1.Pod
expectedCPU int
}{
{
name: "TestCase01: if requestedCPU == 0, Pod is not Guaranteed Qos",
pod: testPod1,
expectedCPU: 0,
},
{
name: "TestCase02: if requestedCPU == 0, Pod is not Guaranteed Qos",
pod: testPod2,
expectedCPU: 0,
},
{
name: "TestCase03: if requestedCPU == 0, Pod is not Guaranteed Qos",
pod: testPod3,
expectedCPU: 0,
},
{
name: "TestCase04: Guaranteed Pod requests 2 CPUs",
pod: testPod4,
expectedCPU: 2,
},
{
name: "TestCase05: Guaranteed Pod requests 5 CPUs",
pod: testPod5,
expectedCPU: 5,
},
{
name: "TestCase06: The number of CPUs requested By app is bigger than the number of CPUs requested by init",
pod: testPod6,
expectedCPU: 5,
},
{
name: "TestCase07: The number of CPUs requested By init is bigger than the number of CPUs requested by app",
pod: testPod7,
expectedCPU: 5,
},
{
name: "TestCase08: Sum of CPUs requested by multiple containers",
pod: testPod8,
expectedCPU: 6,
},
}
for _, tc := range tcases {
requestedCPU := p.podGuaranteedCPUs(tc.pod)
if requestedCPU != tc.expectedCPU {
t.Errorf("Expected in result to be %v , got %v", tc.expectedCPU, requestedCPU)
}
}
}
func TestGetTopologyHints(t *testing.T) {
machineInfo := returnMachineInfo()
tcases := returnTestCases()
for _, tc := range tcases {
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
for p := range tc.assignments {
pod := v1.Pod{}
pod.UID = types.UID(p)
for c := range tc.assignments[p] {
container := v1.Container{}
container.Name = c
pod.Spec.Containers = append(pod.Spec.Containers, container)
}
activePods = append(activePods, &pod)
}
m := manager{
policy: &staticPolicy{
topology: topology,
},
state: &mockState{
assignments: tc.assignments,
defaultCPUSet: tc.defaultCPUSet,
},
topology: topology,
activePods: func() []*v1.Pod { return activePods },
podStatusProvider: mockPodStatusProvider{},
sourcesReady: &sourcesReadyStub{},
}
hints := m.GetTopologyHints(&tc.pod, &tc.container)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
}
sort.SliceStable(hints, func(i, j int) bool {
return hints[i].LessThan(hints[j])
})
sort.SliceStable(tc.expectedHints, func(i, j int) bool {
return tc.expectedHints[i].LessThan(tc.expectedHints[j])
})
if !reflect.DeepEqual(tc.expectedHints, hints) {
t.Errorf("Expected in result to be %v , got %v", tc.expectedHints, hints)
}
}
}
func TestGetPodTopologyHints(t *testing.T) {
machineInfo := returnMachineInfo()
for _, tc := range returnTestCases() {
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
for p := range tc.assignments {
pod := v1.Pod{}
pod.UID = types.UID(p)
for c := range tc.assignments[p] {
container := v1.Container{}
container.Name = c
pod.Spec.Containers = append(pod.Spec.Containers, container)
}
activePods = append(activePods, &pod)
}
m := manager{
policy: &staticPolicy{
topology: topology,
},
state: &mockState{
assignments: tc.assignments,
defaultCPUSet: tc.defaultCPUSet,
},
topology: topology,
activePods: func() []*v1.Pod { return activePods },
podStatusProvider: mockPodStatusProvider{},
sourcesReady: &sourcesReadyStub{},
}
podHints := m.GetPodTopologyHints(&tc.pod)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
}
sort.SliceStable(podHints, func(i, j int) bool {
return podHints[i].LessThan(podHints[j])
})
sort.SliceStable(tc.expectedHints, func(i, j int) bool {
return tc.expectedHints[i].LessThan(tc.expectedHints[j])
})
if !reflect.DeepEqual(tc.expectedHints, podHints) {
t.Errorf("Expected in result to be %v , got %v", tc.expectedHints, podHints)
}
}
}
func returnTestCases() []testCase {
testPod1 := makePod("fakePod", "fakeContainer", "2", "2")
testContainer1 := &testPod1.Spec.Containers[0]
testPod2 := makePod("fakePod", "fakeContainer", "5", "5")
testContainer2 := &testPod2.Spec.Containers[0]
testPod3 := makePod("fakePod", "fakeContainer", "7", "7")
testContainer3 := &testPod3.Spec.Containers[0]
testPod4 := makePod("fakePod", "fakeContainer", "11", "11")
testContainer4 := &testPod4.Spec.Containers[0]
firstSocketMask, _ := bitmask.NewBitMask(0)
secondSocketMask, _ := bitmask.NewBitMask(1)
crossSocketMask, _ := bitmask.NewBitMask(0, 1)
return []testCase{
{
name: "Request 2 CPUs, 4 available on NUMA 0, 6 available on NUMA 1",
pod: *testPod1,
@ -231,47 +421,4 @@ func TestGetTopologyHints(t *testing.T) {
expectedHints: []topologymanager.TopologyHint{},
},
}
for _, tc := range tcases {
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
for p := range tc.assignments {
pod := v1.Pod{}
pod.UID = types.UID(p)
for c := range tc.assignments[p] {
container := v1.Container{}
container.Name = c
pod.Spec.Containers = append(pod.Spec.Containers, container)
}
activePods = append(activePods, &pod)
}
m := manager{
policy: &staticPolicy{
topology: topology,
},
state: &mockState{
assignments: tc.assignments,
defaultCPUSet: tc.defaultCPUSet,
},
topology: topology,
activePods: func() []*v1.Pod { return activePods },
podStatusProvider: mockPodStatusProvider{},
sourcesReady: &sourcesReadyStub{},
}
hints := m.GetTopologyHints(&tc.pod, &tc.container)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
}
sort.SliceStable(hints, func(i, j int) bool {
return hints[i].LessThan(hints[j])
})
sort.SliceStable(tc.expectedHints, func(i, j int) bool {
return tc.expectedHints[i].LessThan(tc.expectedHints[j])
})
if !reflect.DeepEqual(tc.expectedHints, hints) {
t.Errorf("Expected in result to be %v , got %v", tc.expectedHints, hints)
}
}
}

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/scheduler/framework:go_default_library",
"//pkg/util/selinux:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/util/selinux"
)
@ -641,19 +642,19 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
// This can happen if a container restarts for example.
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices != nil {
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, string(podUID), devices.List())
needed = needed - devices.Len()
// A pod's resource is not expected to change once admitted by the API server,
// so just fail loudly here. We can revisit this part if this no longer holds.
if needed != 0 {
return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", string(podUID), contName, resource, devices.Len(), required)
}
}
if needed == 0 {
// No change, no work.
return nil, nil
}
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, string(podUID), contName)
// Check if resource registered with devicemanager
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
@ -944,7 +945,7 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
}
}
if needsReAllocate {
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name)
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", format.Pod(pod), container.Name)
if err := m.Allocate(pod, container); err != nil {
return nil, err
}
@ -971,12 +972,12 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices == nil {
m.mutex.Unlock()
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", string(podUID), contName, resource)
}
m.mutex.Unlock()
devs := devices.UnsortedList()
klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, string(podUID))
_, err := eI.e.preStartContainer(devs)
if err != nil {
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
@ -999,7 +1000,7 @@ func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, re
}
m.mutex.Unlock()
klog.V(4).Infof("Issuing a GetPreferredAllocation call for container, %s, of pod %s", contName, podUID)
klog.V(4).Infof("Issuing a GetPreferredAllocation call for container, %s, of pod %s", contName, string(podUID))
resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
m.mutex.Lock()
if err != nil {

View File

@ -74,6 +74,11 @@ func (h *ManagerStub) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
return map[string][]topologymanager.TopologyHint{}
}
// GetPodTopologyHints returns an empty TopologyHint map
func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return map[string][]topologymanager.TopologyHint{}
}
// GetDevices returns nil
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil

View File

@ -93,6 +93,19 @@ func (pdev *podDevices) delete(pods []string) {
}
}
// Returns list of device Ids allocated to the given pod for the given resource.
// Returns nil if we don't have cached state for the given <podUID, resource>.
func (pdev *podDevices) podDevices(podUID, resource string) sets.String {
pdev.RLock()
defer pdev.RUnlock()
ret := sets.NewString()
for contName := range pdev.devs[podUID] {
ret = ret.Union(pdev.containerDevices(podUID, contName, resource))
}
return ret
}
// Returns list of device Ids allocated to the given container for the given resource.
// Returns nil if we don't have cached state for the given <podUID, contName, resource>.
func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.String {

View File

@ -23,6 +23,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
// GetTopologyHints implements the TopologyManager HintProvider Interface which
@ -53,11 +54,11 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
if allocated.Len() > 0 {
if allocated.Len() != requested {
klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, string(pod.UID), container.Name, requested, allocated.Len())
klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, format.Pod(pod), container.Name, requested, allocated.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, string(pod.UID), container.Name)
klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, format.Pod(pod), container.Name)
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
continue
}
@ -80,6 +81,54 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
return deviceHints
}
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()
deviceHints := make(map[string][]topologymanager.TopologyHint)
accumulatedResourceRequests := m.getPodDeviceRequest(pod)
for resource, requested := range accumulatedResourceRequests {
// Only consider devices that actually contain topology information.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource)
deviceHints[resource] = nil
continue
}
// Short circuit to regenerate the same hints if there are already
// devices allocated to the Pod. This might happen after a
// kubelet restart, for example.
allocated := m.podDevices.podDevices(string(pod.UID), resource)
if allocated.Len() > 0 {
if allocated.Len() != requested {
klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v) with different number than request: requested: %d, allocated: %d", resource, format.Pod(pod), requested, allocated.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v)", resource, format.Pod(pod))
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
continue
}
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
if available.Len() < requested {
klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
// Generate TopologyHints for this resource given the current
// request size and the list of available devices.
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, sets.String{}, requested)
}
return deviceHints
}
func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
// If any device has Topology set, we assume they care about alignment.
for device := range m.allDevices[resource] {
@ -172,3 +221,72 @@ func (m *ManagerImpl) getNUMANodeIds(topology *pluginapi.TopologyInfo) []int {
}
return ids
}
func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
podResources := sets.NewString()
// Find the max request of a given resource across all init containers
initContainerRequests := make(map[string]int)
for _, container := range pod.Spec.InitContainers {
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
podResources.Insert(resource)
if _, exists := initContainerRequests[resource]; !exists {
initContainerRequests[resource] = requested
continue
}
if requested > initContainerRequests[resource] {
initContainerRequests[resource] = requested
}
}
}
// Compute the sum of requests across all app containers for a given resource
appContainerRequests := make(map[string]int)
for _, container := range pod.Spec.Containers {
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
podResources.Insert(resource)
appContainerRequests[resource] += requested
}
}
// Calculate podRequests as the max of init and app container requests for a given resource
podRequests := make(map[string]int)
for resource := range podResources {
_, initExists := initContainerRequests[resource]
_, appExists := appContainerRequests[resource]
if initExists && !appExists {
podRequests[resource] = initContainerRequests[resource]
continue
}
if !initExists && appExists {
podRequests[resource] = appContainerRequests[resource]
continue
}
if initContainerRequests[resource] > appContainerRequests[resource] {
podRequests[resource] = initContainerRequests[resource]
continue
}
podRequests[resource] = appContainerRequests[resource]
}
return podRequests
}

File diff suppressed because it is too large Load Diff

View File

@ -71,6 +71,10 @@ type Manager interface {
// and is consulted to make Topology aware resource alignments
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
// TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface
// and is consulted to make Topology aware resource alignments per Pod
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
}

View File

@ -9,6 +9,9 @@ go_library(
"policy_none.go",
"policy_restricted.go",
"policy_single_numa_node.go",
"scope.go",
"scope_container.go",
"scope_pod.go",
"topology_manager.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager",
@ -16,6 +19,7 @@ go_library(
deps = [
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
@ -48,6 +52,9 @@ go_test(
"policy_restricted_test.go",
"policy_single_numa_node_test.go",
"policy_test.go",
"scope_container_test.go",
"scope_pod_test.go",
"scope_test.go",
"topology_manager_test.go",
],
embed = [":go_default_library"],

View File

@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
type fakeManager struct{}
@ -31,7 +32,7 @@ func NewFakeManager() Manager {
}
func (m *fakeManager) GetAffinity(podUID string, containerName string) TopologyHint {
klog.Infof("[fake topologymanager] GetAffinity podUID: %v container name: %v", podUID, containerName)
klog.Infof("[fake topologymanager] GetAffinity pod: %v container name: %v", podUID, containerName)
return TopologyHint{}
}
@ -40,7 +41,7 @@ func (m *fakeManager) AddHintProvider(h HintProvider) {
}
func (m *fakeManager) AddContainer(pod *v1.Pod, containerID string) error {
klog.Infof("[fake topologymanager] AddContainer pod: %v container id: %v", pod, containerID)
klog.Infof("[fake topologymanager] AddContainer pod: %v container id: %v", format.Pod(pod), containerID)
return nil
}

View File

@ -75,8 +75,6 @@ func TestFakeAddContainer(t *testing.T) {
},
}
fm := fakeManager{}
mngr := manager{}
mngr.podMap = make(map[string]string)
for _, tc := range testCases {
pod := v1.Pod{}
pod.UID = tc.podUID
@ -107,8 +105,6 @@ func TestFakeRemoveContainer(t *testing.T) {
},
}
fm := fakeManager{}
mngr := manager{}
mngr.podMap = make(map[string]string)
for _, tc := range testCases {
err := fm.RemoveContainer(tc.containerID)
if err != nil {
@ -147,8 +143,6 @@ func TestFakeAdmit(t *testing.T) {
}
fm := fakeManager{}
for _, tc := range tcases {
mngr := manager{}
mngr.podTopologyHints = make(map[string]map[string]TopologyHint)
podAttr := lifecycle.PodAdmitAttributes{}
pod := v1.Pod{}
pod.Status.QOSClass = tc.qosClass

View File

@ -0,0 +1,148 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
const (
// containerTopologyScope specifies the TopologyManagerScope per container.
containerTopologyScope = "container"
// podTopologyScope specifies the TopologyManagerScope per pod.
podTopologyScope = "pod"
)
type podTopologyHints map[string]map[string]TopologyHint
// Scope interface for Topology Manager
type Scope interface {
Name() string
Admit(pod *v1.Pod) lifecycle.PodAdmitResult
// AddHintProvider adds a hint provider to manager to indicate the hint provider
// wants to be consoluted with when making topology hints
AddHintProvider(h HintProvider)
// AddContainer adds pod to Manager for tracking
AddContainer(pod *v1.Pod, containerID string) error
// RemoveContainer removes pod from Manager tracking
RemoveContainer(containerID string) error
// Store is the interface for storing pod topology hints
Store
}
type scope struct {
mutex sync.Mutex
name string
// Mapping of a Pods mapping of Containers and their TopologyHints
// Indexed by PodUID to ContainerName
podTopologyHints podTopologyHints
// The list of components registered with the Manager
hintProviders []HintProvider
// Topology Manager Policy
policy Policy
// Mapping of PodUID to ContainerID for Adding/Removing Pods from PodTopologyHints mapping
podMap map[string]string
}
func (s *scope) Name() string {
return s.name
}
func (s *scope) GetAffinity(podUID string, containerName string) TopologyHint {
return s.podTopologyHints[podUID][containerName]
}
func (s *scope) AddHintProvider(h HintProvider) {
s.hintProviders = append(s.hintProviders, h)
}
// It would be better to implement this function in topologymanager instead of scope
// but topologymanager do not track mapping anymore
func (s *scope) AddContainer(pod *v1.Pod, containerID string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.podMap[containerID] = string(pod.UID)
return nil
}
// It would be better to implement this function in topologymanager instead of scope
// but topologymanager do not track mapping anymore
func (s *scope) RemoveContainer(containerID string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
klog.Infof("[topologymanager] RemoveContainer - Container ID: %v", containerID)
podUIDString := s.podMap[containerID]
delete(s.podMap, containerID)
if _, exists := s.podTopologyHints[podUIDString]; exists {
delete(s.podTopologyHints[podUIDString], containerID)
if len(s.podTopologyHints[podUIDString]) == 0 {
delete(s.podTopologyHints, podUIDString)
}
}
return nil
}
func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
}
}
return admitPod()
}
// It would be better to implement this function in topologymanager instead of scope
// but topologymanager do not track providers anymore
func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error {
for _, provider := range s.hintProviders {
err := provider.Allocate(pod, container)
if err != nil {
return err
}
}
return nil
}
func topologyAffinityError() lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Message: "Resources cannot be allocated with Topology locality",
Reason: "TopologyAffinityError",
Admit: false,
}
}
func unexpectedAdmissionError(err error) lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
func admitPod() lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{Admit: true}
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
type containerScope struct {
scope
}
// Ensure containerScope implements Scope interface
var _ Scope = &containerScope{}
// NewContainerScope returns a container scope.
func NewContainerScope(policy Policy) Scope {
return &containerScope{
scope{
name: containerTopologyScope,
podTopologyHints: podTopologyHints{},
policy: policy,
podMap: make(map[string]string),
},
}
}
func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
// Exception - Policy : none
if s.policy.Name() == PolicyNone {
return s.admitPolicyNone(pod)
}
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
bestHint, admit := s.calculateAffinity(pod, &container)
klog.Infof("[topologymanager] Best TopologyHint for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint)
if !admit {
return topologyAffinityError()
}
if (s.podTopologyHints)[string(pod.UID)] == nil {
(s.podTopologyHints)[string(pod.UID)] = make(map[string]TopologyHint)
}
klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint)
(s.podTopologyHints)[string(pod.UID)][container.Name] = bestHint
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
}
}
return admitPod()
}
func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint {
var providersHints []map[string][]TopologyHint
for _, provider := range s.hintProviders {
// Get the TopologyHints for a Container from a provider.
hints := provider.GetTopologyHints(pod, container)
providersHints = append(providersHints, hints)
klog.Infof("[topologymanager] TopologyHints for pod '%v', container '%v': %v", format.Pod(pod), container.Name, hints)
}
return providersHints
}
func (s *containerScope) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(pod, container)
bestHint, admit := s.policy.Merge(providersHints)
klog.Infof("[topologymanager] ContainerTopologyHint: %v", bestHint)
return bestHint, admit
}

View File

@ -0,0 +1,268 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
)
func TestContainerCalculateAffinity(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "No hint providers",
hp: []HintProvider{},
expected: ([]map[string][]TopologyHint)(nil),
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns -nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "Assorted HintProviders",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-3": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
{
"resource-3": nil,
},
},
},
}
for _, tc := range tcases {
ctnScope := &containerScope{
scope{
hintProviders: tc.hp,
policy: &mockPolicy{},
name: podTopologyScope,
},
}
ctnScope.calculateAffinity(&v1.Pod{}, &v1.Container{})
actual := ctnScope.policy.(*mockPolicy).ph
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Test Case: %s", tc.name)
t.Errorf("Expected result to be %v, got %v", tc.expected, actual)
}
}
}
func TestContainerAccumulateProvidersHints(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "TopologyHint not set",
hp: []HintProvider{},
expected: nil,
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns - nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "2 HintProviders with 1 resource returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{
"resource2": {TopologyHint{}},
},
},
},
{
name: "2 HintProviders 1 with 1 resource 1 with nil hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{nil},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
nil,
},
},
{
name: "2 HintProviders 1 with 1 resource 1 empty hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{},
},
},
{
name: "HintProvider with 2 resources returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
}
for _, tc := range tcases {
ctnScope := containerScope{
scope{
hintProviders: tc.hp,
},
}
actual := ctnScope.accumulateProvidersHints(&v1.Pod{}, &v1.Container{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Test Case %s: Expected NUMANodeAffinity in result to be %v, got %v", tc.name, tc.expected, actual)
}
}
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
type podScope struct {
scope
}
// Ensure podScope implements Scope interface
var _ Scope = &podScope{}
// NewPodScope returns a pod scope.
func NewPodScope(policy Policy) Scope {
return &podScope{
scope{
name: podTopologyScope,
podTopologyHints: podTopologyHints{},
policy: policy,
podMap: make(map[string]string),
},
}
}
func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
// Exception - Policy : none
if s.policy.Name() == PolicyNone {
return s.admitPolicyNone(pod)
}
bestHint, admit := s.calculateAffinity(pod)
klog.Infof("[topologymanager] Best TopologyHint for (pod: %v): %v", format.Pod(pod), bestHint)
if !admit {
return topologyAffinityError()
}
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint)
if (s.podTopologyHints)[string(pod.UID)] == nil {
(s.podTopologyHints)[string(pod.UID)] = make(map[string]TopologyHint)
}
(s.podTopologyHints)[string(pod.UID)][container.Name] = bestHint
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
}
}
return admitPod()
}
func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint {
var providersHints []map[string][]TopologyHint
for _, provider := range s.hintProviders {
// Get the TopologyHints for a Pod from a provider.
hints := provider.GetPodTopologyHints(pod)
providersHints = append(providersHints, hints)
klog.Infof("[topologymanager] TopologyHints for pod '%v': %v", format.Pod(pod), hints)
}
return providersHints
}
func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool) {
providersHints := s.accumulateProvidersHints(pod)
bestHint, admit := s.policy.Merge(providersHints)
klog.Infof("[topologymanager] PodTopologyHint: %v", bestHint)
return bestHint, admit
}

View File

@ -0,0 +1,268 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
)
func TestPodCalculateAffinity(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "No hint providers",
hp: []HintProvider{},
expected: ([]map[string][]TopologyHint)(nil),
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns -nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "Assorted HintProviders",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-3": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
{
"resource-3": nil,
},
},
},
}
for _, tc := range tcases {
podScope := &podScope{
scope{
hintProviders: tc.hp,
policy: &mockPolicy{},
name: podTopologyScope,
},
}
podScope.calculateAffinity(&v1.Pod{})
actual := podScope.policy.(*mockPolicy).ph
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Test Case: %s", tc.name)
t.Errorf("Expected result to be %v, got %v", tc.expected, actual)
}
}
}
func TestPodAccumulateProvidersHints(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "TopologyHint not set",
hp: []HintProvider{},
expected: nil,
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns - nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "2 HintProviders with 1 resource returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{
"resource2": {TopologyHint{}},
},
},
},
{
name: "2 HintProviders 1 with 1 resource 1 with nil hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{nil},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
nil,
},
},
{
name: "2 HintProviders 1 with 1 resource 1 empty hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{},
},
},
{
name: "HintProvider with 2 resources returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
}
for _, tc := range tcases {
pScope := podScope{
scope{
hintProviders: tc.hp,
},
}
actual := pScope.accumulateProvidersHints(&v1.Pod{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Test Case %s: Expected NUMANodeAffinity in result to be %v, got %v", tc.name, tc.expected, actual)
}
}
}

View File

@ -0,0 +1,118 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"testing"
)
func TestGetAffinity(t *testing.T) {
tcases := []struct {
name string
containerName string
podUID string
expected TopologyHint
}{
{
name: "case1",
containerName: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
expected: TopologyHint{},
},
}
for _, tc := range tcases {
scope := scope{}
actual := scope.GetAffinity(tc.podUID, tc.containerName)
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Expected Affinity in result to be %v, got %v", tc.expected, actual)
}
}
}
func TestAddContainer(t *testing.T) {
testCases := []struct {
name string
containerID string
podUID types.UID
}{
{
name: "Case1",
containerID: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
},
{
name: "Case2",
containerID: "Busy_Box",
podUID: "b3ee37fc-39a5-11e9-bcb1-a4bf01040474",
},
}
scope := scope{}
scope.podMap = make(map[string]string)
for _, tc := range testCases {
pod := v1.Pod{}
pod.UID = tc.podUID
err := scope.AddContainer(&pod, tc.containerID)
if err != nil {
t.Errorf("Expected error to be nil but got: %v", err)
}
if val, ok := scope.podMap[tc.containerID]; ok {
if reflect.DeepEqual(val, pod.UID) {
t.Errorf("Error occurred")
}
} else {
t.Errorf("Error occurred, Pod not added to podMap")
}
}
}
func TestRemoveContainer(t *testing.T) {
testCases := []struct {
name string
containerID string
podUID types.UID
}{
{
name: "Case1",
containerID: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
},
{
name: "Case2",
containerID: "Busy_Box",
podUID: "b3ee37fc-39a5-11e9-bcb1-a4bf01040474",
},
}
var len1, len2 int
scope := scope{}
scope.podMap = make(map[string]string)
for _, tc := range testCases {
scope.podMap[tc.containerID] = string(tc.podUID)
len1 = len(scope.podMap)
err := scope.RemoveContainer(tc.containerID)
len2 = len(scope.podMap)
if err != nil {
t.Errorf("Expected error to be nil but got: %v", err)
}
if len1-len2 != 1 {
t.Errorf("Remove Pod resulted in error")
}
}
}

View File

@ -18,10 +18,9 @@ package topologymanager
import (
"fmt"
"sync"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -55,16 +54,8 @@ type Manager interface {
}
type manager struct {
mutex sync.Mutex
//The list of components registered with the Manager
hintProviders []HintProvider
//Mapping of a Pods mapping of Containers and their TopologyHints
//Indexed by PodUID to ContainerName
podTopologyHints map[string]map[string]TopologyHint
//Mapping of PodUID to ContainerID for Adding/Removing Pods from PodTopologyHints mapping
podMap map[string]string
//Topology Manager Policy
policy Policy
//Topology Manager Scope
scope Scope
}
// HintProvider is an interface for components that want to collaborate to
@ -79,6 +70,9 @@ type HintProvider interface {
// a consensus "best" hint. The hint providers may subsequently query the
// topology manager to influence actual resource assignment.
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint
// GetPodTopologyHints returns a map of resource names to a list of possible
// concrete resource allocations per Pod in terms of NUMA locality hints.
GetPodTopologyHints(pod *v1.Pod) map[string][]TopologyHint
// Allocate triggers resource allocation to occur on the HintProvider after
// all hints have been gathered and the aggregated Hint is available via a
// call to Store.GetAffinity().
@ -121,9 +115,9 @@ func (th *TopologyHint) LessThan(other TopologyHint) bool {
var _ Manager = &manager{}
// NewManager creates a new TopologyManager based on provided policy
func NewManager(topology []cadvisorapi.Node, topologyPolicyName string) (Manager, error) {
klog.Infof("[topologymanager] Creating topology manager with %s policy", topologyPolicyName)
// NewManager creates a new TopologyManager based on provided policy and scope
func NewManager(topology []cadvisorapi.Node, topologyPolicyName string, topologyScopeName string) (Manager, error) {
klog.Infof("[topologymanager] Creating topology manager with %s policy per %s scope", topologyPolicyName, topologyScopeName)
var numaNodes []int
for _, node := range topology {
@ -153,123 +147,45 @@ func NewManager(topology []cadvisorapi.Node, topologyPolicyName string) (Manager
return nil, fmt.Errorf("unknown policy: \"%s\"", topologyPolicyName)
}
var hp []HintProvider
pth := make(map[string]map[string]TopologyHint)
pm := make(map[string]string)
var scope Scope
switch topologyScopeName {
case containerTopologyScope:
scope = NewContainerScope(policy)
case podTopologyScope:
scope = NewPodScope(policy)
default:
return nil, fmt.Errorf("unknown scope: \"%s\"", topologyScopeName)
}
manager := &manager{
hintProviders: hp,
podTopologyHints: pth,
podMap: pm,
policy: policy,
scope: scope,
}
return manager, nil
}
func (m *manager) GetAffinity(podUID string, containerName string) TopologyHint {
return m.podTopologyHints[podUID][containerName]
}
func (m *manager) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) (providersHints []map[string][]TopologyHint) {
// Loop through all hint providers and save an accumulated list of the
// hints returned by each hint provider.
for _, provider := range m.hintProviders {
// Get the TopologyHints from a provider.
hints := provider.GetTopologyHints(pod, container)
providersHints = append(providersHints, hints)
klog.Infof("[topologymanager] TopologyHints for pod '%v', container '%v': %v", pod.Name, container.Name, hints)
}
return providersHints
}
func (m *manager) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error {
for _, provider := range m.hintProviders {
err := provider.Allocate(pod, container)
if err != nil {
return err
}
}
return nil
}
// Collect Hints from hint providers and pass to policy to retrieve the best one.
func (m *manager) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
providersHints := m.accumulateProvidersHints(pod, container)
bestHint, admit := m.policy.Merge(providersHints)
klog.Infof("[topologymanager] ContainerTopologyHint: %v", bestHint)
return bestHint, admit
return m.scope.GetAffinity(podUID, containerName)
}
func (m *manager) AddHintProvider(h HintProvider) {
m.hintProviders = append(m.hintProviders, h)
m.scope.AddHintProvider(h)
}
func (m *manager) AddContainer(pod *v1.Pod, containerID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.podMap[containerID] = string(pod.UID)
return nil
return m.scope.AddContainer(pod, containerID)
}
func (m *manager) RemoveContainer(containerID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
klog.Infof("[topologymanager] RemoveContainer - Container ID: %v", containerID)
podUIDString := m.podMap[containerID]
delete(m.podMap, containerID)
if _, exists := m.podTopologyHints[podUIDString]; exists {
delete(m.podTopologyHints[podUIDString], containerID)
if len(m.podTopologyHints[podUIDString]) == 0 {
delete(m.podTopologyHints, podUIDString)
}
}
return nil
return m.scope.RemoveContainer(containerID)
}
func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.Infof("[topologymanager] Topology Admit Handler")
pod := attrs.Pod
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
if m.policy.Name() == PolicyNone {
err := m.allocateAlignedResources(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
continue
}
result, admit := m.calculateAffinity(pod, &container)
if !admit {
return lifecycle.PodAdmitResult{
Message: "Resources cannot be allocated with Topology locality",
Reason: "TopologyAffinityError",
Admit: false,
}
}
klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", pod.UID, container.Name, result)
if m.podTopologyHints[string(pod.UID)] == nil {
m.podTopologyHints[string(pod.UID)] = make(map[string]TopologyHint)
}
m.podTopologyHints[string(pod.UID)][container.Name] = result
err := m.allocateAlignedResources(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
return lifecycle.PodAdmitResult{Admit: true}
return m.scope.Admit(pod)
}

View File

@ -18,12 +18,10 @@ package topologymanager
import (
"fmt"
"reflect"
"strings"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -40,6 +38,11 @@ func TestNewManager(t *testing.T) {
expectedPolicy string
expectedError error
}{
{
description: "Policy is set to none",
policyName: "none",
expectedPolicy: "none",
},
{
description: "Policy is set to best-effort",
policyName: "best-effort",
@ -50,6 +53,11 @@ func TestNewManager(t *testing.T) {
policyName: "restricted",
expectedPolicy: "restricted",
},
{
description: "Policy is set to single-numa-node",
policyName: "single-numa-node",
expectedPolicy: "single-numa-node",
},
{
description: "Policy is set to unknown",
policyName: "unknown",
@ -58,7 +66,7 @@ func TestNewManager(t *testing.T) {
}
for _, tc := range tcases {
mngr, err := NewManager(nil, tc.policyName)
mngr, err := NewManager(nil, tc.policyName, "container")
if tc.expectedError != nil {
if !strings.Contains(err.Error(), tc.expectedError.Error()) {
@ -66,8 +74,49 @@ func TestNewManager(t *testing.T) {
}
} else {
rawMgr := mngr.(*manager)
if rawMgr.policy.Name() != tc.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), tc.expectedPolicy)
rawScope := rawMgr.scope.(*containerScope)
if rawScope.policy.Name() != tc.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawScope.policy.Name(), tc.expectedPolicy)
}
}
}
}
func TestManagerScope(t *testing.T) {
tcases := []struct {
description string
scopeName string
expectedScope string
expectedError error
}{
{
description: "Topology Manager Scope is set to container",
scopeName: "container",
expectedScope: "container",
},
{
description: "Topology Manager Scope is set to pod",
scopeName: "pod",
expectedScope: "pod",
},
{
description: "Topology Manager Scope is set to unknown",
scopeName: "unknown",
expectedError: fmt.Errorf("unknown scope: \"unknown\""),
},
}
for _, tc := range tcases {
mngr, err := NewManager(nil, "best-effort", tc.scopeName)
if tc.expectedError != nil {
if !strings.Contains(err.Error(), tc.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), tc.expectedError.Error())
}
} else {
rawMgr := mngr.(*manager)
if rawMgr.scope.Name() != tc.expectedScope {
t.Errorf("Unexpected scope name. Have: %q wants %q", rawMgr.scope, tc.expectedScope)
}
}
}
@ -84,160 +133,15 @@ func (m *mockHintProvider) GetTopologyHints(pod *v1.Pod, container *v1.Container
return m.th
}
func (m *mockHintProvider) GetPodTopologyHints(pod *v1.Pod) map[string][]TopologyHint {
return m.th
}
func (m *mockHintProvider) Allocate(pod *v1.Pod, container *v1.Container) error {
//return allocateError
return nil
}
func TestGetAffinity(t *testing.T) {
tcases := []struct {
name string
containerName string
podUID string
expected TopologyHint
}{
{
name: "case1",
containerName: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
expected: TopologyHint{},
},
}
for _, tc := range tcases {
mngr := manager{}
actual := mngr.GetAffinity(tc.podUID, tc.containerName)
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Expected Affinity in result to be %v, got %v", tc.expected, actual)
}
}
}
func TestAccumulateProvidersHints(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "TopologyHint not set",
hp: []HintProvider{},
expected: nil,
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns - nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "2 HintProviders with 1 resource returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{
"resource2": {TopologyHint{}},
},
},
},
{
name: "2 HintProviders 1 with 1 resource 1 with nil hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{nil},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
nil,
},
},
{
name: "2 HintProviders 1 with 1 resource 1 empty hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
},
},
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
},
{},
},
},
{
name: "HintProvider with 2 resources returns hints",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
expected: []map[string][]TopologyHint{
{
"resource1": {TopologyHint{}},
"resource2": {TopologyHint{}},
},
},
},
}
for _, tc := range tcases {
mngr := manager{
hintProviders: tc.hp,
}
actual := mngr.accumulateProvidersHints(&v1.Pod{}, &v1.Container{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("Test Case %s: Expected NUMANodeAffinity in result to be %v, got %v", tc.name, tc.expected, actual)
}
}
}
type mockPolicy struct {
nonePolicy
ph []map[string][]TopologyHint
@ -248,189 +152,7 @@ func (p *mockPolicy) Merge(providersHints []map[string][]TopologyHint) (Topology
return TopologyHint{}, true
}
func TestCalculateAffinity(t *testing.T) {
tcases := []struct {
name string
hp []HintProvider
expected []map[string][]TopologyHint
}{
{
name: "No hint providers",
hp: []HintProvider{},
expected: ([]map[string][]TopologyHint)(nil),
},
{
name: "HintProvider returns empty non-nil map[string][]TopologyHint",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{},
},
},
expected: []map[string][]TopologyHint{
{},
},
},
{
name: "HintProvider returns -nil map[string][]TopologyHint from provider",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource": nil,
},
},
},
{
name: "Assorted HintProviders",
hp: []HintProvider{
&mockHintProvider{
map[string][]TopologyHint{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
},
&mockHintProvider{
map[string][]TopologyHint{
"resource-3": nil,
},
},
},
expected: []map[string][]TopologyHint{
{
"resource-1/A": {
{NUMANodeAffinity: NewTestBitMask(0), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: false},
},
"resource-1/B": {
{NUMANodeAffinity: NewTestBitMask(1), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(1, 2), Preferred: false},
},
},
{
"resource-2/A": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
"resource-2/B": {
{NUMANodeAffinity: NewTestBitMask(2), Preferred: true},
{NUMANodeAffinity: NewTestBitMask(3, 4), Preferred: false},
},
},
{
"resource-3": nil,
},
},
},
}
for _, tc := range tcases {
mngr := manager{}
mngr.policy = &mockPolicy{}
mngr.hintProviders = tc.hp
mngr.calculateAffinity(&v1.Pod{}, &v1.Container{})
actual := mngr.policy.(*mockPolicy).ph
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Test Case: %s", tc.name)
t.Errorf("Expected result to be %v, got %v", tc.expected, actual)
}
}
}
func TestAddContainer(t *testing.T) {
testCases := []struct {
name string
containerID string
podUID types.UID
}{
{
name: "Case1",
containerID: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
},
{
name: "Case2",
containerID: "Busy_Box",
podUID: "b3ee37fc-39a5-11e9-bcb1-a4bf01040474",
},
}
mngr := manager{}
mngr.podMap = make(map[string]string)
for _, tc := range testCases {
pod := v1.Pod{}
pod.UID = tc.podUID
err := mngr.AddContainer(&pod, tc.containerID)
if err != nil {
t.Errorf("Expected error to be nil but got: %v", err)
}
if val, ok := mngr.podMap[tc.containerID]; ok {
if reflect.DeepEqual(val, pod.UID) {
t.Errorf("Error occurred")
}
} else {
t.Errorf("Error occurred, Pod not added to podMap")
}
}
}
func TestRemoveContainer(t *testing.T) {
testCases := []struct {
name string
containerID string
podUID types.UID
}{
{
name: "Case1",
containerID: "nginx",
podUID: "0aafa4c4-38e8-11e9-bcb1-a4bf01040474",
},
{
name: "Case2",
containerID: "Busy_Box",
podUID: "b3ee37fc-39a5-11e9-bcb1-a4bf01040474",
},
}
var len1, len2 int
mngr := manager{}
mngr.podMap = make(map[string]string)
for _, tc := range testCases {
mngr.podMap[tc.containerID] = string(tc.podUID)
len1 = len(mngr.podMap)
err := mngr.RemoveContainer(tc.containerID)
len2 = len(mngr.podMap)
if err != nil {
t.Errorf("Expected error to be nil but got: %v", err)
}
if len1-len2 != 1 {
t.Errorf("Remove Pod resulted in error")
}
}
}
func TestAddHintProvider(t *testing.T) {
var len1 int
tcases := []struct {
name string
hp []HintProvider
@ -439,18 +161,20 @@ func TestAddHintProvider(t *testing.T) {
name: "Add HintProvider",
hp: []HintProvider{
&mockHintProvider{},
&mockHintProvider{},
&mockHintProvider{},
},
},
}
mngr := manager{}
mngr.scope = NewContainerScope(NewNonePolicy())
for _, tc := range tcases {
mngr.hintProviders = []HintProvider{}
len1 = len(mngr.hintProviders)
mngr.AddHintProvider(tc.hp[0])
}
len2 := len(mngr.hintProviders)
if len2-len1 != 1 {
t.Errorf("error")
for _, hp := range tc.hp {
mngr.AddHintProvider(hp)
}
if len(tc.hp) != len(mngr.scope.(*containerScope).hintProviders) {
t.Errorf("error")
}
}
}
@ -723,11 +447,13 @@ func TestAdmit(t *testing.T) {
},
}
for _, tc := range tcases {
man := manager{
policy: tc.policy,
podTopologyHints: make(map[string]map[string]TopologyHint),
hintProviders: tc.hp,
}
ctnScopeManager := manager{}
ctnScopeManager.scope = NewContainerScope(tc.policy)
ctnScopeManager.scope.(*containerScope).hintProviders = tc.hp
podScopeManager := manager{}
podScopeManager.scope = NewPodScope(tc.policy)
podScopeManager.scope.(*podScope).hintProviders = tc.hp
pod := &v1.Pod{
Spec: v1.PodSpec{
@ -746,9 +472,16 @@ func TestAdmit(t *testing.T) {
Pod: pod,
}
actual := man.Admit(&podAttr)
if actual.Admit != tc.expected {
t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, actual.Admit)
// Container scope Admit
ctnActual := ctnScopeManager.Admit(&podAttr)
if ctnActual.Admit != tc.expected {
t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, ctnActual.Admit)
}
// Pod scope Admit
podActual := podScopeManager.Admit(&podAttr)
if podActual.Admit != tc.expected {
t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, podActual.Admit)
}
}
}

View File

@ -61,12 +61,18 @@ const (
// BestEffortTopologyManagerPolicy is a mode in which kubelet will favour
// pods with NUMA alignment of CPU and device resources.
BestEffortTopologyManagerPolicy = "best-effort"
// NoneTopologyManager Policy is a mode in which kubelet has no knowledge
// NoneTopologyManagerPolicy is a mode in which kubelet has no knowledge
// of NUMA alignment of a pod's CPU and device resources.
NoneTopologyManagerPolicy = "none"
// SingleNumaNodeTopologyManager Policy iis a mode in which kubelet only allows
// SingleNumaNodeTopologyManagerPolicy is a mode in which kubelet only allows
// pods with a single NUMA alignment of CPU and device resources.
SingleNumaNodeTopologyManager = "single-numa-node"
SingleNumaNodeTopologyManagerPolicy = "single-numa-node"
// ContainerTopologyManagerScope represents that
// topology policy is applied on a per-container basis.
ContainerTopologyManagerScope = "container"
// PodTopologyManagerScope represents that
// topology policy is applied on a per-pod basis.
PodTopologyManagerScope = "pod"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -434,6 +440,12 @@ type KubeletConfiguration struct {
// Default: "none"
// +optional
TopologyManagerPolicy string `json:"topologyManagerPolicy,omitempty"`
// TopologyManagerScope represents the scope of topology hint generation
// that topology manager requests and hint providers generate.
// "pod" scope requires the TopologyManager feature gate to be enabled.
// Default: "container"
// +optional
TopologyManagerScope string `json:"topologyManagerScope,omitempty"`
// qosReserved is a set of resource name to percentage pairs that specify
// the minimum percentage of a resource reserved for exclusive use by the
// guaranteed QoS tier.

View File

@ -181,6 +181,7 @@ type testEnvInfo struct {
numaNodes int
sriovResourceName string
policy string
scope string
}
func containerWantsDevices(cnt *v1.Container, envInfo *testEnvInfo) bool {

View File

@ -20,13 +20,14 @@ import (
"context"
"fmt"
"io/ioutil"
testutils "k8s.io/kubernetes/test/utils"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
testutils "k8s.io/kubernetes/test/utils"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -48,7 +49,10 @@ import (
)
const (
numalignCmd = `export CPULIST_ALLOWED=$( awk -F":\t*" '/Cpus_allowed_list/ { print $2 }' /proc/self/status); env; sleep 1d`
numaAlignmentCommand = `export CPULIST_ALLOWED=$( awk -F":\t*" '/Cpus_allowed_list/ { print $2 }' /proc/self/status); env;`
numaAlignmentSleepCommand = numaAlignmentCommand + `sleep 1d;`
podScopeTopology = "pod"
containerScopeTopology = "container"
minNumaNodes = 2
minCoreCount = 4
@ -95,9 +99,8 @@ func detectSRIOVDevices() int {
return devCount
}
func makeTopologyManagerTestPod(podName, podCmd string, tmCtnAttributes []tmCtnAttribute) *v1.Pod {
var containers []v1.Container
for _, ctnAttr := range tmCtnAttributes {
func makeContainers(ctnCmd string, ctnAttributes []tmCtnAttribute) (ctns []v1.Container) {
for _, ctnAttr := range ctnAttributes {
ctn := v1.Container{
Name: ctnAttr.ctnName,
Image: busyboxImage,
@ -111,22 +114,32 @@ func makeTopologyManagerTestPod(podName, podCmd string, tmCtnAttributes []tmCtnA
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100Mi"),
},
},
Command: []string{"sh", "-c", podCmd},
Command: []string{"sh", "-c", ctnCmd},
}
if ctnAttr.deviceName != "" {
ctn.Resources.Requests[v1.ResourceName(ctnAttr.deviceName)] = resource.MustParse(ctnAttr.deviceRequest)
ctn.Resources.Limits[v1.ResourceName(ctnAttr.deviceName)] = resource.MustParse(ctnAttr.deviceLimit)
}
containers = append(containers, ctn)
ctns = append(ctns, ctn)
}
return
}
func makeTopologyManagerTestPod(podName string, tmCtnAttributes, tmInitCtnAttributes []tmCtnAttribute) *v1.Pod {
var containers, initContainers []v1.Container
if len(tmInitCtnAttributes) > 0 {
initContainers = makeContainers(numaAlignmentCommand, tmInitCtnAttributes)
}
containers = makeContainers(numaAlignmentSleepCommand, tmCtnAttributes)
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
RestartPolicy: v1.RestartPolicyNever,
InitContainers: initContainers,
Containers: containers,
},
}
}
@ -190,7 +203,7 @@ func findNUMANodeWithoutSRIOVDevices(configMap *v1.ConfigMap, numaNodes int) (in
return findNUMANodeWithoutSRIOVDevicesFromSysfs(numaNodes)
}
func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletconfig.KubeletConfiguration, policy string, configMap *v1.ConfigMap, numaNodes int) string {
func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletconfig.KubeletConfiguration, policy, scope string, configMap *v1.ConfigMap, numaNodes int) string {
// Configure Topology Manager in Kubelet with policy.
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
@ -205,6 +218,8 @@ func configureTopologyManagerInKubelet(f *framework.Framework, oldCfg *kubeletco
// Set the Topology Manager policy
newCfg.TopologyManagerPolicy = policy
newCfg.TopologyManagerScope = scope
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
@ -313,6 +328,36 @@ func validatePodAlignment(f *framework.Framework, pod *v1.Pod, envInfo *testEnvI
}
}
// validatePodAligmentWithPodScope validates whether all pod's CPUs are affined to the same NUMA node.
func validatePodAlignmentWithPodScope(f *framework.Framework, pod *v1.Pod, envInfo *testEnvInfo) error {
// Mapping between CPU IDs and NUMA node IDs.
podsNUMA := make(map[int]int)
ginkgo.By(fmt.Sprintf("validate pod scope alignment for %s pod", pod.Name))
for _, cnt := range pod.Spec.Containers {
logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
framework.ExpectNoError(err, "NUMA alignment failed for container [%s] of pod [%s]", cnt.Name, pod.Name)
envMap, err := makeEnvMap(logs)
framework.ExpectNoError(err, "NUMA alignment failed for container [%s] of pod [%s]", cnt.Name, pod.Name)
cpuToNUMA, err := getCPUToNUMANodeMapFromEnv(f, pod, &cnt, envMap, envInfo.numaNodes)
framework.ExpectNoError(err, "NUMA alignment failed for container [%s] of pod [%s]", cnt.Name, pod.Name)
for cpuID, numaID := range cpuToNUMA {
podsNUMA[cpuID] = numaID
}
}
numaRes := numaPodResources{
CPUToNUMANode: podsNUMA,
}
aligned := numaRes.CheckAlignment()
if !aligned {
return fmt.Errorf("resources were assigned from different NUMA nodes")
}
framework.Logf("NUMA locality confirmed: all pod's CPUs aligned to the same NUMA node")
return nil
}
func runTopologyManagerPolicySuiteTests(f *framework.Framework) {
var cpuCap, cpuAlloc int64
@ -359,13 +404,13 @@ func waitForAllContainerRemoval(podName, podNS string) {
}, 2*time.Minute, 1*time.Second).Should(gomega.BeTrue())
}
func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttrs []tmCtnAttribute, envInfo *testEnvInfo) {
func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttrs, initCtnAttrs []tmCtnAttribute, envInfo *testEnvInfo) {
var pods []*v1.Pod
for podID := 0; podID < numPods; podID++ {
podName := fmt.Sprintf("gu-pod-%d", podID)
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, numalignCmd, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = f.PodClient().CreateSync(pod)
framework.Logf("created pod %s", podName)
pods = append(pods, pod)
@ -377,6 +422,12 @@ func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttr
for podID := 0; podID < numPods; podID++ {
validatePodAlignment(f, pods[podID], envInfo)
}
if envInfo.scope == podScopeTopology {
for podID := 0; podID < numPods; podID++ {
err := validatePodAlignmentWithPodScope(f, pods[podID], envInfo)
framework.ExpectNoError(err)
}
}
}
for podID := 0; podID < numPods; podID++ {
@ -388,10 +439,10 @@ func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttr
}
}
func runTopologyManagerNegativeTest(f *framework.Framework, numPods int, ctnAttrs []tmCtnAttribute, envInfo *testEnvInfo) {
func runTopologyManagerNegativeTest(f *framework.Framework, ctnAttrs, initCtnAttrs []tmCtnAttribute, envInfo *testEnvInfo) {
podName := "gu-pod"
framework.Logf("creating pod %s attrs %v", podName, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, numalignCmd, ctnAttrs)
pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs)
pod = f.PodClient().Create(pod)
err := e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) {
@ -520,7 +571,108 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
framework.ExpectNoError(err)
}
func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs string, numaNodes, coreCount int, policy string) {
func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) {
threadsPerCore := 1
if isHTEnabled() {
threadsPerCore = 2
}
sd := setupSRIOVConfigOrFail(f, configMap)
var ctnAttrs, initCtnAttrs []tmCtnAttribute
envInfo := &testEnvInfo{
numaNodes: numaNodes,
sriovResourceName: sd.resourceName,
policy: policy,
scope: podScopeTopology,
}
ginkgo.By(fmt.Sprintf("Admit two guaranteed pods. Both consist of 2 containers, each container with 1 CPU core. Use 1 %s device.", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
{
ctnName: "ps-container-0",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
{
ctnName: "ps-container-1",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceName: sd.resourceName,
deviceRequest: "1",
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
numCores := threadsPerCore * coreCount
coresReq := fmt.Sprintf("%dm", numCores*1000)
ginkgo.By(fmt.Sprintf("Admit a guaranteed pod requesting %d CPU cores, i.e., more than can be provided at every single NUMA node. Therefore, the request should be rejected.", numCores+1))
ctnAttrs = []tmCtnAttribute{
{
ctnName: "gu-container-1",
cpuRequest: coresReq,
cpuLimit: coresReq,
deviceRequest: "1",
deviceLimit: "1",
},
{
ctnName: "gu-container-2",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceRequest: "1",
deviceLimit: "1",
},
}
runTopologyManagerNegativeTest(f, ctnAttrs, initCtnAttrs, envInfo)
// The Topology Manager with pod scope should calculate how many CPUs it needs to admit a pod basing on two requests:
// the maximum of init containers' demand for CPU and sum of app containers' requests for CPU.
// The Topology Manager should use higher value of these. Therefore, both pods from below test case should get number of CPUs
// requested by init-container of highest demand for it. Since demand for CPU of each pod is slightly higher than half of resources
// available on one node, both pods should be placed on distinct NUMA nodes.
coresReq = fmt.Sprintf("%dm", (numCores/2+1)*1000)
ginkgo.By(fmt.Sprintf("Admit two guaranteed pods, each pod requests %d cores - the pods should be placed on different NUMA nodes", numCores/2+1))
initCtnAttrs = []tmCtnAttribute{
{
ctnName: "init-container-1",
cpuRequest: coresReq,
cpuLimit: coresReq,
deviceRequest: "1",
deviceLimit: "1",
},
{
ctnName: "init-container-2",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceRequest: "1",
deviceLimit: "1",
},
}
ctnAttrs = []tmCtnAttribute{
{
ctnName: "gu-container-0",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceRequest: "1",
deviceLimit: "1",
},
{
ctnName: "gu-container-1",
cpuRequest: "1000m",
cpuLimit: "1000m",
deviceRequest: "1",
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
teardownSRIOVConfigOrFail(f, sd)
}
func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) {
threadsPerCore := 1
if isHTEnabled() {
threadsPerCore = 2
@ -536,7 +688,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
}
// could have been a loop, we unroll it to explain the testcases
var ctnAttrs []tmCtnAttribute
var ctnAttrs, initCtnAttrs []tmCtnAttribute
// simplest case
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName))
@ -550,7 +702,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 1, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 1, ctnAttrs, initCtnAttrs, envInfo)
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 2 cores, 1 %s device", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
@ -563,7 +715,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 1, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 1, ctnAttrs, initCtnAttrs, envInfo)
if reservedSystemCPUs != "" {
// to avoid false negatives, we have put reserved CPUs in such a way there is at least a NUMA node
@ -581,7 +733,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 1, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 1, ctnAttrs, initCtnAttrs, envInfo)
}
if sd.resourceAmount > 1 {
@ -598,7 +750,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
ginkgo.By(fmt.Sprintf("Successfully admit two guaranteed pods, each with 2 cores, 1 %s device", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
@ -611,14 +763,14 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
// testing more complex conditions require knowledge about the system cpu+bus topology
}
// multi-container tests
if sd.resourceAmount >= 4 {
ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pods, each with two containers, each with 2 cores, 1 %s device", sd.resourceName))
ginkgo.By(fmt.Sprintf("Successfully admit a guaranteed pod requesting for two containers, each with 2 cores, 1 %s device", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
{
ctnName: "gu-container-0",
@ -637,7 +789,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 1, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 1, ctnAttrs, initCtnAttrs, envInfo)
ginkgo.By(fmt.Sprintf("Successfully admit two guaranteed pods, each with two containers, each with 1 core, 1 %s device", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
@ -658,7 +810,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
ginkgo.By(fmt.Sprintf("Successfully admit two guaranteed pods, each with two containers, both with with 2 cores, one with 1 %s device", sd.resourceName))
ctnAttrs = []tmCtnAttribute{
@ -676,7 +828,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
cpuLimit: "2000m",
},
}
runTopologyManagerPositiveTest(f, 2, ctnAttrs, envInfo)
runTopologyManagerPositiveTest(f, 2, ctnAttrs, initCtnAttrs, envInfo)
}
// this is the only policy that can guarantee reliable rejects
@ -695,7 +847,7 @@ func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, configMap
deviceLimit: "1",
},
}
runTopologyManagerNegativeTest(f, 1, ctnAttrs, envInfo)
runTopologyManagerNegativeTest(f, ctnAttrs, initCtnAttrs, envInfo)
}
}
@ -710,12 +862,13 @@ func runTopologyManagerTests(f *framework.Framework) {
var policies = []string{topologymanager.PolicySingleNumaNode, topologymanager.PolicyRestricted,
topologymanager.PolicyBestEffort, topologymanager.PolicyNone}
scope := containerScopeTopology
for _, policy := range policies {
// Configure Topology Manager
ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy))
framework.Logf("Configuring topology Manager policy to %s", policy)
configureTopologyManagerInKubelet(f, oldCfg, policy, nil, 0)
configureTopologyManagerInKubelet(f, oldCfg, policy, scope, nil, 0)
// Run the tests
runTopologyManagerPolicySuiteTests(f)
}
@ -751,14 +904,15 @@ func runTopologyManagerTests(f *framework.Framework) {
var policies = []string{topologymanager.PolicySingleNumaNode, topologymanager.PolicyRestricted,
topologymanager.PolicyBestEffort, topologymanager.PolicyNone}
scope := containerScopeTopology
for _, policy := range policies {
// Configure Topology Manager
ginkgo.By(fmt.Sprintf("by configuring Topology Manager policy to %s", policy))
framework.Logf("Configuring topology Manager policy to %s", policy)
reservedSystemCPUs := configureTopologyManagerInKubelet(f, oldCfg, policy, configMap, numaNodes)
reservedSystemCPUs := configureTopologyManagerInKubelet(f, oldCfg, policy, scope, configMap, numaNodes)
runTopologyManagerNodeAlignmentSuiteTests(f, configMap, reservedSystemCPUs, numaNodes, coreCount, policy)
runTopologyManagerNodeAlignmentSuiteTests(f, configMap, reservedSystemCPUs, policy, numaNodes, coreCount)
}
// restore kubelet config
@ -767,6 +921,37 @@ func runTopologyManagerTests(f *framework.Framework) {
// Delete state file to allow repeated runs
deleteStateFile()
})
ginkgo.It("run the Topology Manager pod scope alignment test suite", func() {
sriovdevCount := detectSRIOVDevices()
numaNodes := detectNUMANodes()
coreCount := detectCoresPerSocket()
if numaNodes < minNumaNodes {
e2eskipper.Skipf("this test is intended to be run on a multi-node NUMA system")
}
if coreCount < minCoreCount {
e2eskipper.Skipf("this test is intended to be run on a system with at least %d cores per socket", minCoreCount)
}
if sriovdevCount == 0 {
e2eskipper.Skipf("this test is intended to be run on a system with at least one SR-IOV VF enabled")
}
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
policy := topologymanager.PolicySingleNumaNode
scope := podScopeTopology
reservedSystemCPUs := configureTopologyManagerInKubelet(f, oldCfg, policy, scope, configMap, numaNodes)
runTMScopeResourceAlignmentTestSuite(f, configMap, reservedSystemCPUs, policy, numaNodes, coreCount)
setOldKubeletConfig(f, oldCfg)
deleteStateFile()
})
}
// Serial because the test updates kubelet configuration.