Merge pull request #101432 from swatisehgal/smtaware

node: cpumanager: add options to reject non SMT-aligned workload
This commit is contained in:
Kubernetes Prow Robot 2021-07-08 21:04:53 -07:00 committed by GitHub
commit 617064d732
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 855 additions and 179 deletions

View File

@ -492,6 +492,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig
fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'")
fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "CPU Manager policy to use. Possible values: 'none', 'static'.")
fs.Var(cliflag.NewMapStringStringNoSplit(&c.CPUManagerPolicyOptions), "cpu-manager-policy-options", "A set of key=value CPU Manager policy options to use, to fine tune their behaviour. If not supplied, keep the default behaviour.")
fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, "<Warning: Alpha feature> CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to 'NodeStatusUpdateFrequency'")
fs.Var(cliflag.NewMapStringString(&c.QOSReserved), "qos-reserved", "<Warning: Alpha feature> A set of ResourceName=Percentage (e.g. memory=50%) pairs that describe how pod resource requests are reserved at the QoS level. Currently only memory is supported. Requires the QOSReserved feature gate to be enabled.")
fs.StringVar(&c.TopologyManagerPolicy, "topology-manager-policy", c.TopologyManagerPolicy, "Topology Manager policy to use. Possible values: 'none', 'best-effort', 'restricted', 'single-numa-node'.")

View File

@ -732,6 +732,16 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
}
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
@ -756,6 +766,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,

View File

@ -789,6 +789,12 @@ const (
//
// Enables kubelet to support memory QoS with cgroups v2.
MemoryQoS featuregate.Feature = "MemoryQoS"
// owner: @fromanirh
// alpha: v1.22
//
// Allow fine-tuning of cpumanager policies
CPUManagerPolicyOptions featuregate.Feature = "CPUManagerPolicyOptions"
)
func init() {
@ -906,6 +912,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
DelegateFSGroupToCSIDriver: {Default: false, PreRelease: featuregate.Alpha},
KubeletInUserNamespace: {Default: false, PreRelease: featuregate.Alpha},
MemoryQoS: {Default: false, PreRelease: featuregate.Alpha},
CPUManagerPolicyOptions: {Default: false, PreRelease: featuregate.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -70,6 +70,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}
obj.NodeLeaseDurationSeconds = 40
obj.CPUManagerPolicy = "none"
obj.CPUManagerPolicyOptions = make(map[string]string)
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency
obj.NodeStatusMaxImages = 50
obj.TopologyManagerPolicy = kubeletconfig.NoneTopologyManagerPolicy

View File

@ -150,6 +150,7 @@ var (
"CPUCFSQuota",
"CPUCFSQuotaPeriod.Duration",
"CPUManagerPolicy",
"CPUManagerPolicyOptions[*]",
"CPUManagerReconcilePeriod.Duration",
"TopologyManagerPolicy",
"TopologyManagerScope",

View File

@ -221,6 +221,10 @@ type KubeletConfiguration struct {
// CPUManagerPolicy is the name of the policy to use.
// Requires the CPUManager feature gate to be enabled.
CPUManagerPolicy string
// CPUManagerPolicyOptions is a set of key=value which allows to set extra options
// to fine tune the behaviour of the cpu manager policies.
// Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled.
CPUManagerPolicyOptions map[string]string
// CPU Manager reconciliation period.
// Requires the CPUManager feature gate to be enabled.
CPUManagerReconcilePeriod metav1.Duration

View File

@ -294,6 +294,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
}
out.CgroupDriver = in.CgroupDriver
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions))
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.MemoryManagerPolicy = in.MemoryManagerPolicy
out.TopologyManagerPolicy = in.TopologyManagerPolicy
@ -464,6 +465,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
}
out.CgroupDriver = in.CgroupDriver
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions))
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.MemoryManagerPolicy = in.MemoryManagerPolicy
out.TopologyManagerPolicy = in.TopologyManagerPolicy

View File

@ -201,6 +201,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod
if in.CPUManagerPolicyOptions != nil {
in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
if in.QOSReserved != nil {
in, out := &in.QOSReserved, &out.QOSReserved

View File

@ -0,0 +1,62 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"errors"
"fmt"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
const (
ErrorReasonUnexpected = "UnexpectedAdmissionError"
)
type Error interface {
Error() string
Type() string
}
type unexpectedAdmissionError struct{ Err error }
var _ Error = (*unexpectedAdmissionError)(nil)
func (e *unexpectedAdmissionError) Error() string {
return fmt.Sprintf("Allocate failed due to %v, which is unexpected", e.Err)
}
func (e *unexpectedAdmissionError) Type() string {
return ErrorReasonUnexpected
}
func GetPodAdmitResult(err error) lifecycle.PodAdmitResult {
if err == nil {
return lifecycle.PodAdmitResult{Admit: true}
}
var admissionErr Error
if !errors.As(err, &admissionErr) {
admissionErr = &unexpectedAdmissionError{err}
}
return lifecycle.PodAdmitResult{
Message: admissionErr.Error(),
Reason: admissionErr.Type(),
Admit: false,
}
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"errors"
"testing"
)
type TestAdmissionError struct {
message string
reason string
}
func (e *TestAdmissionError) Error() string {
return e.message
}
func (e *TestAdmissionError) Type() string {
return e.reason
}
func TestAdmissionErrors(t *testing.T) {
testCases := []struct {
Error error
expectedAdmissionError bool
}{
{
nil,
false,
},
{
errors.New("Not an AdmissionError error"),
false,
},
{
&TestAdmissionError{
"Is an AdmissionError error",
"TestAdmissionError",
},
true,
},
}
for _, tc := range testCases {
h := GetPodAdmitResult(tc.Error)
if tc.Error == nil {
if !h.Admit {
t.Errorf("expected PodAdmitResult.Admit = true")
}
continue
}
if h.Admit {
t.Errorf("expected PodAdmitResult.Admit = false")
}
if tc.expectedAdmissionError {
err, ok := tc.Error.(*TestAdmissionError)
if !ok {
t.Errorf("expected TestAdmissionError")
}
if h.Reason != err.reason {
t.Errorf("expected PodAdmitResult.Reason = %v, got %v", err.reason, h.Reason)
}
continue
}
if h.Reason != ErrorReasonUnexpected {
t.Errorf("expected PodAdmitResult.Reason = %v, got %v", ErrorReasonUnexpected, h.Reason)
}
}
}

View File

@ -134,6 +134,7 @@ type NodeConfig struct {
NodeAllocatableConfig
QOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
ExperimentalCPUManagerPolicyOptions map[string]string
ExperimentalTopologyManagerScope string
ExperimentalCPUManagerReconcilePeriod time.Duration
ExperimentalMemoryManagerPolicy string

View File

@ -52,6 +52,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
@ -332,6 +333,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerPolicyOptions,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
@ -757,37 +759,25 @@ func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
return admission.GetPodAdmitResult(err)
}
if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
return admission.GetPodAdmitResult(err)
}
}
if m.memoryManager != nil {
err = m.memoryManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
return admission.GetPodAdmitResult(err)
}
}
}
return lifecycle.PodAdmitResult{Admit: true}
return admission.GetPodAdmitResult(nil)
}
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {

View File

@ -35,6 +35,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
@ -63,9 +64,7 @@ type containerManagerImpl struct {
type noopWindowsResourceAllocator struct{}
func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Admit: true,
}
return admission.GetPodAdmitResult(nil)
}
func (cm *containerManagerImpl) Start(node *v1.Node,

View File

@ -143,17 +143,20 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var topo *topology.CPUTopology
var policy Policy
var err error
switch policyName(cpuPolicyName) {
case PolicyNone:
policy = NewNonePolicy()
policy, err = NewNonePolicy(cpuPolicyOptions)
if err != nil {
return nil, fmt.Errorf("new none policy error: %w", err)
}
case PolicyStatic:
var err error
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
@ -178,9 +181,9 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
if err != nil {
return nil, fmt.Errorf("new static policy error: %v", err)
return nil, fmt.Errorf("new static policy error: %w", err)
}
default:

View File

@ -229,7 +229,8 @@ func TestCPUManagerAdd(t *testing.T) {
},
0,
cpuset.NewCPUSet(),
topologymanager.NewFakeManager())
topologymanager.NewFakeManager(),
nil)
testCases := []struct {
description string
updateErr error
@ -479,7 +480,7 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil)
mockState := &mockState{
assignments: testCase.stAssignments,
@ -634,7 +635,7 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
@ -1004,7 +1005,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
},
1,
cpuset.NewCPUSet(0),
topologymanager.NewFakeManager())
topologymanager.NewFakeManager(),
nil)
testCases := []struct {
description string
updateErr error
@ -1061,3 +1063,69 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
}
}
}
func TestCPUManagerHandlePolicyOptions(t *testing.T) {
testCases := []struct {
description string
cpuPolicyName string
cpuPolicyOptions map[string]string
expectedError error
}{
{
description: "options to none policy",
cpuPolicyName: "none",
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
},
expectedError: fmt.Errorf("received unsupported options"),
},
}
// any correct realistic topology is fine. We pick a simple one.
mockedMachineInfo := cadvisorapi.MachineInfo{
NumCores: 4,
Topology: []cadvisorapi.Node{
{
Cores: []cadvisorapi.Core{
{
Id: 0,
Threads: []int{0},
},
{
Id: 1,
Threads: []int{1},
},
{
Id: 2,
Threads: []int{2},
},
{
Id: 3,
Threads: []int{3},
},
},
},
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
machineInfo := &mockedMachineInfo
nodeAllocatableReservation := v1.ResourceList{}
sDir, err := ioutil.TempDir("/tmp/", "cpu_manager_test")
if err != nil {
t.Errorf("cannot create state file: %s", err.Error())
}
defer os.RemoveAll(sDir)
_, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.NewCPUSet(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
if err == nil {
t.Errorf("Expected error, but NewManager succeeded")
}
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
}
})
}
}

View File

@ -38,7 +38,8 @@ func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
func (m *fakeManager) Policy() Policy {
klog.InfoS("Policy()")
return NewNonePolicy()
pol, _ := NewNonePolicy(nil)
return pol
}
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {

View File

@ -17,6 +17,8 @@ limitations under the License.
package cpumanager
import (
"fmt"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
@ -32,8 +34,11 @@ var _ Policy = &nonePolicy{}
const PolicyNone policyName = "none"
// NewNonePolicy returns a cpuset manager policy that does nothing
func NewNonePolicy() Policy {
return &nonePolicy{}
func NewNonePolicy(cpuPolicyOptions map[string]string) (Policy, error) {
if len(cpuPolicyOptions) > 0 {
return nil, fmt.Errorf("None policy: received unsupported options=%v", cpuPolicyOptions)
}
return &nonePolicy{}, nil
}
func (p *nonePolicy) Name() string {

View File

@ -86,3 +86,20 @@ func TestNonePolicyGetAllocatableCPUs(t *testing.T) {
t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus)
}
}
func TestNonePolicyOptions(t *testing.T) {
var err error
_, err = NewNonePolicy(nil)
if err != nil {
t.Errorf("NewNonePolicy with nil options failure. expected no error but got: %v", err)
}
opts := map[string]string{
FullPCPUsOnlyOption: "true",
}
_, err = NewNonePolicy(opts)
if err == nil {
t.Errorf("NewNonePolicy with (any) options failure. expected error but got none")
}
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"strconv"
)
const (
// FullPCPUsOnlyOption is the name of the CPU Manager policy option
FullPCPUsOnlyOption string = "full-pcpus-only"
)
type StaticPolicyOptions struct {
// flag to enable extra allocation restrictions to avoid
// different containers to possibly end up on the same core.
// we consider "core" and "physical CPU" synonim here, leaning
// towards the terminoloy k8s hints. We acknowledge this is confusing.
//
// looking at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/,
// any possible naming scheme will lead to ambiguity to some extent.
// We picked "pcpu" because it the established docs hints at vCPU already.
FullPhysicalCPUsOnly bool
}
func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) {
opts := StaticPolicyOptions{}
for name, value := range policyOptions {
switch name {
case FullPCPUsOnlyOption:
optValue, err := strconv.ParseBool(value)
if err != nil {
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.FullPhysicalCPUsOnly = optValue
default:
return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value)
}
}
return opts, nil
}

View File

@ -29,8 +29,29 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
)
// PolicyStatic is the name of the static policy
const PolicyStatic policyName = "static"
const (
// PolicyStatic is the name of the static policy.
// Should options be given, these will be ignored and backward (up to 1.21 included)
// compatible behaviour will be enforced
PolicyStatic policyName = "static"
// ErrorSMTAlignment represents the type of an SMTAlignmentError
ErrorSMTAlignment = "SMTAlignmentError"
)
// SMTAlignmentError represents an error due to SMT alignment
type SMTAlignmentError struct {
RequestedCPUs int
CpusPerCore int
}
func (e SMTAlignmentError) Error() string {
return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore)
}
func (e SMTAlignmentError) Type() string {
return ErrorSMTAlignment
}
// staticPolicy is a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
@ -79,6 +100,8 @@ type staticPolicy struct {
affinity topologymanager.Store
// set of CPUs to reuse across allocations in a pod
cpusToReuse map[string]cpuset.CPUSet
// options allow to fine-tune the behaviour of the policy
options StaticPolicyOptions
}
// Ensure staticPolicy implements Policy interface
@ -87,7 +110,14 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) {
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) {
opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
if err != nil {
return nil, err
}
klog.InfoS("Static policy created with configuration", "options", opts)
allCPUs := topology.CPUDetails.CPUs()
var reserved cpuset.CPUSet
if reservedCPUs.Size() > 0 {
@ -113,6 +143,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
reserved: reserved,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
options: opts,
}, nil
}
@ -220,6 +251,21 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
// container belongs in an exclusively allocated pool
if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) {
// Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted
// if the CPU requested is a multiple of the number of virtual cpus per physical cores.
// In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put
// in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores
// and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs,
// the pod would be placed on a node where there are enough physical cores available to be allocated.
// Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket
// and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all
// CPUs on a physical core. Allocation of individual threads would never have to occur.
return SMTAlignmentError{
RequestedCPUs: numCPUs,
CpusPerCore: p.topology.CPUsPerCore(),
}
}
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)

View File

@ -34,6 +34,7 @@ type staticPolicyTest struct {
topo *topology.CPUTopology
numReservedCPUs int
podUID string
options map[string]string
containerName string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
@ -43,8 +44,27 @@ type staticPolicyTest struct {
expCSet cpuset.CPUSet
}
// this is not a real Clone() - hence Pseudo- - because we don't clone some
// objects which are accessed read-only
func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
return staticPolicyTest{
description: spt.description,
topo: spt.topo, // accessed in read-only
numReservedCPUs: spt.numReservedCPUs,
podUID: spt.podUID,
options: spt.options, // accessed in read-only
containerName: spt.containerName,
stAssignments: spt.stAssignments.Clone(),
stDefaultCPUSet: spt.stDefaultCPUSet.Clone(),
pod: spt.pod, // accessed in read-only
expErr: spt.expErr,
expCPUAlloc: spt.expCPUAlloc,
expCSet: spt.expCSet.Clone(),
}
}
func TestStaticPolicyName(t *testing.T) {
policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil)
policyName := policy.Name()
if policyName != "static" {
@ -120,7 +140,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil)
policy := p.(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
@ -168,7 +188,9 @@ func TestStaticPolicyAdd(t *testing.T) {
largeTopoSock0CPUSet := largeTopoSock0Builder.Result()
largeTopoSock1CPUSet := largeTopoSock1Builder.Result()
testCases := []staticPolicyTest{
// these are the cases which must behave the same regardless the policy options.
// So we will permutate the options to ensure this holds true.
optionsInsensitiveTestCases := []staticPolicyTest{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectError",
topo: topoSingleSocketHT,
@ -180,17 +202,6 @@ func TestStaticPolicyAdd(t *testing.T) {
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
},
{
description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore",
topo: topoSingleSocketHT,
@ -400,22 +411,6 @@ func TestStaticPolicyAdd(t *testing.T) {
expCPUAlloc: true,
expCSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47)),
},
{
// Only partial cores are available in the entire system.
// Expect allocation of all the CPUs from the partial cores.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs",
topo: topoQuadSocketFourWayHT,
stAssignments: state.ContainerCPUAssignments{
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
},
{
// Only 7 CPUs are available.
// Pod requests 76 cores.
@ -435,45 +430,130 @@ func TestStaticPolicyAdd(t *testing.T) {
},
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
// testcases for the default behaviour of the policy.
defaultOptionsTestCases := []staticPolicyTest{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
},
{
// Only partial cores are available in the entire system.
// Expect allocation of all the CPUs from the partial cores.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs",
topo: topoQuadSocketFourWayHT,
stAssignments: state.ContainerCPUAssignments{
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
},
}
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
// testcases for the FullPCPUsOnlyOption
smtalignOptionTestCases := []staticPolicyTest{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
options: map[string]string{
FullPCPUsOnlyOption: "true",
},
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: SMTAlignmentError{RequestedCPUs: 1, CpusPerCore: 2},
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(), // reject allocation of sibling of partial core
},
{
// test SMT-level != 2 - which is the default on x86_64
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocOneCPUs",
topo: topoQuadSocketFourWayHT,
options: map[string]string{
FullPCPUsOnlyOption: "true",
},
numReservedCPUs: 8,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: largeTopoCPUSet,
pod: makePod("fakePod", "fakeContainer15", "15000m", "15000m"),
expErr: SMTAlignmentError{RequestedCPUs: 15, CpusPerCore: 4},
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
}
for _, testCase := range optionsInsensitiveTestCases {
for _, options := range []map[string]string{
nil,
{
FullPCPUsOnlyOption: "true",
},
} {
tCase := testCase.PseudoClone()
tCase.description = fmt.Sprintf("options=%v %s", options, testCase.description)
tCase.options = options
runStaticPolicyTestCase(t, tCase)
}
}
for _, testCase := range defaultOptionsTestCases {
runStaticPolicyTestCase(t, testCase)
}
for _, testCase := range smtalignOptionTestCases {
runStaticPolicyTestCase(t, testCase)
}
}
func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
container := &testCase.pod.Spec.Containers[0]
err := policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
container := &testCase.pod.Spec.Containers[0]
err := policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
}
if !testCase.expCPUAlloc {
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !testCase.expCPUAlloc {
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
}
}
@ -537,7 +617,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil)
st := &mockState{
assignments: testCase.stAssignments,
@ -627,7 +707,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil)
policy := p.(*staticPolicy)
st := &mockState{
assignments: tc.stAssignments,
@ -701,7 +781,7 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager())
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
if !reflect.DeepEqual(err, testCase.expNewErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expNewErr, err)
@ -778,7 +858,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager())
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
st := &mockState{
assignments: testCase.stAssignments,
@ -819,3 +899,88 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
}
}
type staticPolicyOptionTestCase struct {
description string
policyOptions map[string]string
expectedError bool
expectedValue StaticPolicyOptions
}
func TestStaticPolicyOptions(t *testing.T) {
testCases := []staticPolicyOptionTestCase{
{
description: "nil args",
policyOptions: nil,
expectedError: false,
expectedValue: StaticPolicyOptions{},
},
{
description: "empty args",
policyOptions: map[string]string{},
expectedError: false,
expectedValue: StaticPolicyOptions{},
},
{
description: "bad single arg",
policyOptions: map[string]string{
"badValue1": "",
},
expectedError: true,
},
{
description: "bad multiple arg",
policyOptions: map[string]string{
"badValue1": "",
"badvalue2": "aaaa",
},
expectedError: true,
},
{
description: "good arg",
policyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
},
expectedError: false,
expectedValue: StaticPolicyOptions{
FullPhysicalCPUsOnly: true,
},
},
{
description: "good arg, bad value",
policyOptions: map[string]string{
FullPCPUsOnlyOption: "enabled!",
},
expectedError: true,
},
{
description: "bad arg intermixed",
policyOptions: map[string]string{
FullPCPUsOnlyOption: "1",
"badvalue2": "lorem ipsum",
},
expectedError: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
opts, err := NewStaticPolicyOptions(testCase.policyOptions)
gotError := (err != nil)
if gotError != testCase.expectedError {
t.Fatalf("error with args %v expected error %v got %v: %v",
testCase.policyOptions, testCase.expectedError, gotError, err)
}
if testCase.expectedError {
return
}
if !reflect.DeepEqual(opts, testCase.expectedValue) {
t.Fatalf("value mismatch with args %v expected value %v got %v",
testCase.policyOptions, testCase.expectedValue, opts)
}
})
}
}

View File

@ -19,6 +19,7 @@ package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -64,7 +65,5 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
func (m *fakeManager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.InfoS("Topology Admit Handler")
return lifecycle.PodAdmitResult{
Admit: true,
}
return admission.GetPodAdmitResult(nil)
}

View File

@ -17,12 +17,11 @@ limitations under the License.
package topologymanager
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -133,10 +132,10 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
return admission.GetPodAdmitResult(err)
}
}
return admitPod()
return admission.GetPodAdmitResult(nil)
}
// It would be better to implement this function in topologymanager instead of scope
@ -150,23 +149,3 @@ func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) e
}
return nil
}
func topologyAffinityError() lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Message: "Resources cannot be allocated with Topology locality",
Reason: "TopologyAffinityError",
Admit: false,
}
}
func unexpectedAdmissionError(err error) lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
func admitPod() lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{Admit: true}
}

View File

@ -19,6 +19,7 @@ package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -53,17 +54,17 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
if !admit {
return topologyAffinityError()
return admission.GetPodAdmitResult(&TopologyAffinityError{})
}
klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name)
s.setTopologyHints(string(pod.UID), container.Name, bestHint)
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
return admission.GetPodAdmitResult(err)
}
}
return admitPod()
return admission.GetPodAdmitResult(nil)
}
func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint {

View File

@ -19,6 +19,7 @@ package topologymanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
@ -51,7 +52,7 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
bestHint, admit := s.calculateAffinity(pod)
klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod))
if !admit {
return topologyAffinityError()
return admission.GetPodAdmitResult(&TopologyAffinityError{})
}
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
@ -60,10 +61,10 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
err := s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
return admission.GetPodAdmitResult(err)
}
}
return admitPod()
return admission.GetPodAdmitResult(nil)
}
func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint {

View File

@ -36,8 +36,21 @@ const (
// present on a machine and the TopologyManager is enabled, an error will
// be returned and the TopologyManager will not be loaded.
maxAllowableNUMANodes = 8
// ErrorTopologyAffinity represents the type for a TopologyAffinityError
ErrorTopologyAffinity = "TopologyAffinityError"
)
// TopologyAffinityError represents an resource alignment error
type TopologyAffinityError struct{}
func (e TopologyAffinityError) Error() string {
return "Resources cannot be allocated with Topology locality"
}
func (e TopologyAffinityError) Type() string {
return ErrorTopologyAffinity
}
// Manager interface provides methods for Kubelet to manage pod topology hints
type Manager interface {
// PodAdmitHandler is implemented by Manager

View File

@ -477,11 +477,17 @@ func TestAdmit(t *testing.T) {
if ctnActual.Admit != tc.expected {
t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, ctnActual.Admit)
}
if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity {
t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason)
}
// Pod scope Admit
podActual := podScopeManager.Admit(&podAttr)
if podActual.Admit != tc.expected {
t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, podActual.Admit)
}
if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity {
t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason)
}
}
}

View File

@ -448,6 +448,14 @@ type KubeletConfiguration struct {
// Default: "None"
// +optional
CPUManagerPolicy string `json:"cpuManagerPolicy,omitempty"`
// cpuManagerPolicyOptions is a set of key=value which allows to set extra options
// to fine tune the behaviour of the cpu manager policies.
// Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled.
// Dynamic Kubelet Config (beta): This field should not be updated without a full node
// reboot. It is safest to keep this value the same as the local config.
// Default: nil
// +optional
CPUManagerPolicyOptions map[string]string `json:"cpuManagerPolicyOptions,omitempty"`
// cpuManagerReconcilePeriod is the reconciliation period for the CPU Manager.
// Requires the CPUManager feature gate to be enabled.
// Dynamic Kubelet Config (deprecated): If dynamically updating this field, consider that

View File

@ -167,6 +167,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = new(bool)
**out = **in
}
if in.CPUManagerPolicyOptions != nil {
in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
if in.QOSReserved != nil {
in, out := &in.QOSReserved, &out.QOSReserved

View File

@ -17,8 +17,10 @@ limitations under the License.
package e2enode
import (
"context"
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
@ -34,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"github.com/onsi/ginkgo"
@ -159,6 +162,17 @@ func isMultiNUMA() bool {
return numaNodes > 1
}
func getSMTLevel() int {
cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this.
out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuID)).Output()
framework.ExpectNoError(err)
// how many thread sibling you have = SMT level
// example: 2-way SMT means 2 threads sibling for each thread
cpus, err := cpuset.Parse(strings.TrimSpace(string(out)))
framework.ExpectNoError(err)
return cpus.Size()
}
func getCPUSiblingList(cpuRes int64) string {
out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output()
framework.ExpectNoError(err)
@ -209,20 +223,16 @@ func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.K
return oldCfg
}
func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) {
return configureCPUManagerInKubelet(f, cleanStateFile, cpuset.CPUSet{})
}
func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) {
func configureCPUManagerInKubelet(f *framework.Framework, policyName string, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet, enableOptions bool, options map[string]string) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Enable CPU Manager in Kubelet with static policy.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
} else {
newCfg.FeatureGates["CPUManager"] = true
}
newCfg.FeatureGates["CPUManager"] = true
newCfg.FeatureGates["CPUManagerPolicyOptions"] = enableOptions
// After graduation of the CPU Manager feature to Beta, the CPU Manager
// "none" policy is ON by default. But when we set the CPU Manager policy to
@ -237,11 +247,9 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r
deleteStateFile()
}
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 1 second.
newCfg.CPUManagerPolicy = policyName
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
newCfg.CPUManagerPolicyOptions = options
if reservedSystemCPUs.Size() > 0 {
cpus := reservedSystemCPUs.String()
@ -272,43 +280,37 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r
return oldCfg
}
func runGuPodTest(f *framework.Framework) {
var ctnAttrs []ctnAttribute
var cpu1 int
var err error
var cpuList []int
func runGuPodTest(f *framework.Framework, cpuCount int) {
var pod *v1.Pod
var expAllowedCPUsListRegex string
ctnAttrs = []ctnAttribute{
ctnAttrs := []ctnAttribute{
{
ctnName: "gu-container",
cpuRequest: "1000m",
cpuLimit: "1000m",
cpuRequest: fmt.Sprintf("%dm", 1000*cpuCount),
cpuLimit: fmt.Sprintf("%dm", 1000*cpuCount),
},
}
pod = makeCPUManagerPod("gu-pod", ctnAttrs)
pod = f.PodClient().CreateSync(pod)
ginkgo.By("checking if the expected cpuset was assigned")
cpu1 = 1
if isHTEnabled() {
cpuList = cpuset.MustParse(getCPUSiblingList(0)).ToSlice()
cpu1 = cpuList[1]
} else if isMultiNUMA() {
cpuList = cpuset.MustParse(getCoreSiblingList(0)).ToSlice()
if len(cpuList) > 1 {
cpu1 = cpuList[1]
}
// any full CPU is fine - we cannot nor we should predict which one, though
for _, cnt := range pod.Spec.Containers {
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
framework.ExpectEqual(cpus.Size(), cpuCount, "expected cpu set size == %d, got %q", cpuCount, cpus.String())
}
expAllowedCPUsListRegex = fmt.Sprintf("^%d\n$", cpu1)
err = f.PodClient().MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]",
pod.Spec.Containers[0].Name, pod.Name)
ginkgo.By("by deleting the pods and waiting for container removal")
deletePods(f, []string{pod.Name})
waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
waitForAllContainerRemoval(pod.Name, pod.Namespace)
}
func runNonGuPodTest(f *framework.Framework, cpuCap int64) {
@ -586,13 +588,13 @@ func runCPUManagerTests(f *framework.Framework) {
}
// Enable CPU Manager in the kubelet.
oldCfg = enableCPUManagerInKubelet(f, true)
oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), true, cpuset.CPUSet{}, false, nil)
ginkgo.By("running a non-Gu pod")
runNonGuPodTest(f, cpuCap)
ginkgo.By("running a Gu pod")
runGuPodTest(f)
runGuPodTest(f, 1)
ginkgo.By("running multiple Gu and non-Gu pods")
runMultipleGuNonGuPods(f, cpuCap, cpuAlloc)
@ -653,18 +655,150 @@ func runCPUManagerTests(f *framework.Framework) {
waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
ginkgo.By("enable cpu manager in kubelet without delete state file")
enableCPUManagerInKubelet(f, false)
configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), false, cpuset.CPUSet{}, false, nil)
ginkgo.By("wait for the deleted pod to be cleaned up from the state file")
waitForStateFileCleanedUp()
ginkgo.By("the deleted pod has already been deleted from the state file")
})
ginkgo.It("should assign CPUs as expected with enhanced policy based on strict SMT alignment", func() {
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
_, cpuAlloc, _ = getLocalNodeCPUDetails(f)
smtLevel := getSMTLevel()
// strict SMT alignment is trivially verified and granted on non-SMT systems
if smtLevel < 2 {
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
}
// our tests want to allocate a full core, so we need at last 2*2=4 virtual cpus
if cpuAlloc < int64(smtLevel*2) {
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt)
}
framework.Logf("SMT level %d", smtLevel)
cleanStateFile := true
// TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably
// check what we do have in the node.
cpuPolicyOptions := map[string]string{
cpumanager.FullPCPUsOnlyOption: "true",
}
oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), cleanStateFile, cpuset.NewCPUSet(0), true, cpuPolicyOptions)
// the order between negative and positive doesn't really matter
runSMTAlignmentNegativeTests(f)
runSMTAlignmentPositiveTests(f, smtLevel)
})
ginkgo.AfterEach(func() {
setOldKubeletConfig(f, oldCfg)
})
}
func runSMTAlignmentNegativeTests(f *framework.Framework) {
// negative test: try to run a container whose requests aren't a multiple of SMT level, expect a rejection
ctnAttrs := []ctnAttribute{
{
ctnName: "gu-container-neg",
cpuRequest: "1000m",
cpuLimit: "1000m",
},
}
pod := makeCPUManagerPod("gu-pod", ctnAttrs)
// CreateSync would wait for pod to become Ready - which will never happen if production code works as intended!
pod = f.PodClient().Create(pod)
err := e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) {
if pod.Status.Phase != v1.PodPending {
return true, nil
}
return false, nil
})
framework.ExpectNoError(err)
pod, err = f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if pod.Status.Phase != v1.PodFailed {
framework.Failf("pod %s not failed: %v", pod.Name, pod.Status)
}
if !isSMTAlignmentError(pod) {
framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason)
}
deletePodSyncByName(f, pod.Name)
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
waitForAllContainerRemoval(pod.Name, pod.Namespace)
}
func runSMTAlignmentPositiveTests(f *framework.Framework, smtLevel int) {
// positive test: try to run a container whose requests are a multiple of SMT level, check allocated cores
// 1. are core siblings
// 2. take a full core
// WARNING: this assumes 2-way SMT systems - we don't know how to access other SMT levels.
// this means on more-than-2-way SMT systems this test will prove nothing
ctnAttrs := []ctnAttribute{
{
ctnName: "gu-container-pos",
cpuRequest: "2000m",
cpuLimit: "2000m",
},
}
pod := makeCPUManagerPod("gu-pod", ctnAttrs)
pod = f.PodClient().CreateSync(pod)
for _, cnt := range pod.Spec.Containers {
ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name))
logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name)
framework.Logf("got pod logs: %v", logs)
cpus, err := cpuset.Parse(strings.TrimSpace(logs))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
validateSMTAlignment(cpus, smtLevel, pod, &cnt)
}
deletePodSyncByName(f, pod.Name)
// we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state.
// this is in turn needed because we will have an unavoidable (in the current framework) race with th
// reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire
waitForAllContainerRemoval(pod.Name, pod.Namespace)
}
func validateSMTAlignment(cpus cpuset.CPUSet, smtLevel int, pod *v1.Pod, cnt *v1.Container) {
framework.Logf("validating cpus: %v", cpus)
if cpus.Size()%smtLevel != 0 {
framework.Failf("pod %q cnt %q received non-smt-multiple cpuset %v (SMT level %d)", pod.Name, cnt.Name, cpus, smtLevel)
}
// now check all the given cpus are thread siblings.
// to do so the easiest way is to rebuild the expected set of siblings from all the cpus we got.
// if the expected set matches the given set, the given set was good.
b := cpuset.NewBuilder()
for _, cpuID := range cpus.ToSliceNoSort() {
threadSiblings, err := cpuset.Parse(strings.TrimSpace(getCPUSiblingList(int64(cpuID))))
framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name)
b.Add(threadSiblings.ToSliceNoSort()...)
}
siblingsCPUs := b.Result()
framework.Logf("siblings cpus: %v", siblingsCPUs)
if !siblingsCPUs.Equals(cpus) {
framework.Failf("pod %q cnt %q received non-smt-aligned cpuset %v (expected %v)", pod.Name, cnt.Name, cpus, siblingsCPUs)
}
}
func isSMTAlignmentError(pod *v1.Pod) bool {
re := regexp.MustCompile(`SMT.*Alignment.*Error`)
return re.MatchString(pod.Status.Reason)
}
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("CPU Manager [Serial] [Feature:CPUManager]", func() {
f := framework.NewDefaultFramework("cpu-manager-test")

View File

@ -372,7 +372,7 @@ func runTopologyManagerPolicySuiteTests(f *framework.Framework) {
runNonGuPodTest(f, cpuCap)
ginkgo.By("running a Gu pod")
runGuPodTest(f)
runGuPodTest(f, 1)
ginkgo.By("running multiple Gu and non-Gu pods")
runMultipleGuNonGuPods(f, cpuCap, cpuAlloc)
@ -596,10 +596,7 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
}
func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) {
threadsPerCore := 1
if isHTEnabled() {
threadsPerCore = 2
}
threadsPerCore := getSMTLevel()
sd := setupSRIOVConfigOrFail(f, configMap)
var ctnAttrs, initCtnAttrs []tmCtnAttribute
@ -699,10 +696,7 @@ func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.
}
func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, sd *sriovData, reservedSystemCPUs, policy string, numaNodes, coreCount int) {
threadsPerCore := 1
if isHTEnabled() {
threadsPerCore = 2
}
threadsPerCore := getSMTLevel()
waitForSRIOVResources(f, sd)