diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 5f5fa99a94e..3d684e6e8e5 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -492,6 +492,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'") fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "CPU Manager policy to use. Possible values: 'none', 'static'.") + fs.Var(cliflag.NewMapStringStringNoSplit(&c.CPUManagerPolicyOptions), "cpu-manager-policy-options", "A set of key=value CPU Manager policy options to use, to fine tune their behaviour. If not supplied, keep the default behaviour.") fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, " CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to 'NodeStatusUpdateFrequency'") fs.Var(cliflag.NewMapStringString(&c.QOSReserved), "qos-reserved", " A set of ResourceName=Percentage (e.g. memory=50%) pairs that describe how pod resource requests are reserved at the QoS level. Currently only memory is supported. Requires the QOSReserved feature gate to be enabled.") fs.StringVar(&c.TopologyManagerPolicy, "topology-manager-policy", c.TopologyManagerPolicy, "Topology Manager policy to use. Possible values: 'none', 'best-effort', 'restricted', 'single-numa-node'.") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 64e39e3322d..34bac9c53a9 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -732,6 +732,16 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) + var cpuManagerPolicyOptions map[string]string + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) { + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) { + cpuManagerPolicyOptions = s.CPUManagerPolicyOptions + } else if s.CPUManagerPolicyOptions != nil { + return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled", + s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions) + } + } + kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, @@ -756,6 +766,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend }, QOSReserved: *experimentalQOSReserved, ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, + ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions, ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy, ExperimentalMemoryManagerReservedMemory: s.ReservedMemory, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 841686d615a..7bc5d0de2ac 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -789,6 +789,12 @@ const ( // // Enables kubelet to support memory QoS with cgroups v2. MemoryQoS featuregate.Feature = "MemoryQoS" + + // owner: @fromanirh + // alpha: v1.22 + // + // Allow fine-tuning of cpumanager policies + CPUManagerPolicyOptions featuregate.Feature = "CPUManagerPolicyOptions" ) func init() { @@ -906,6 +912,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS DelegateFSGroupToCSIDriver: {Default: false, PreRelease: featuregate.Alpha}, KubeletInUserNamespace: {Default: false, PreRelease: featuregate.Alpha}, MemoryQoS: {Default: false, PreRelease: featuregate.Alpha}, + CPUManagerPolicyOptions: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/apis/config/fuzzer/fuzzer.go b/pkg/kubelet/apis/config/fuzzer/fuzzer.go index 39da70b8305..5d7ebb9cd58 100644 --- a/pkg/kubelet/apis/config/fuzzer/fuzzer.go +++ b/pkg/kubelet/apis/config/fuzzer/fuzzer.go @@ -70,6 +70,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} { obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute} obj.NodeLeaseDurationSeconds = 40 obj.CPUManagerPolicy = "none" + obj.CPUManagerPolicyOptions = make(map[string]string) obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency obj.NodeStatusMaxImages = 50 obj.TopologyManagerPolicy = kubeletconfig.NoneTopologyManagerPolicy diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 442a080ad97..b30613da572 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -150,6 +150,7 @@ var ( "CPUCFSQuota", "CPUCFSQuotaPeriod.Duration", "CPUManagerPolicy", + "CPUManagerPolicyOptions[*]", "CPUManagerReconcilePeriod.Duration", "TopologyManagerPolicy", "TopologyManagerScope", diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index fce1f3ad367..85b84b50638 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -221,6 +221,10 @@ type KubeletConfiguration struct { // CPUManagerPolicy is the name of the policy to use. // Requires the CPUManager feature gate to be enabled. CPUManagerPolicy string + // CPUManagerPolicyOptions is a set of key=value which allows to set extra options + // to fine tune the behaviour of the cpu manager policies. + // Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled. + CPUManagerPolicyOptions map[string]string // CPU Manager reconciliation period. // Requires the CPUManager feature gate to be enabled. CPUManagerReconcilePeriod metav1.Duration diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index f7a8c4b5d43..141b901a802 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -294,6 +294,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in } out.CgroupDriver = in.CgroupDriver out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions)) out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.MemoryManagerPolicy = in.MemoryManagerPolicy out.TopologyManagerPolicy = in.TopologyManagerPolicy @@ -464,6 +465,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in } out.CgroupDriver = in.CgroupDriver out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions)) out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.MemoryManagerPolicy = in.MemoryManagerPolicy out.TopologyManagerPolicy = in.TopologyManagerPolicy diff --git a/pkg/kubelet/apis/config/zz_generated.deepcopy.go b/pkg/kubelet/apis/config/zz_generated.deepcopy.go index 2c1b67744ed..67dba19b020 100644 --- a/pkg/kubelet/apis/config/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/config/zz_generated.deepcopy.go @@ -201,6 +201,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.ImageMinimumGCAge = in.ImageMinimumGCAge out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod + if in.CPUManagerPolicyOptions != nil { + in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod if in.QOSReserved != nil { in, out := &in.QOSReserved, &out.QOSReserved diff --git a/pkg/kubelet/cm/admission/errors.go b/pkg/kubelet/cm/admission/errors.go new file mode 100644 index 00000000000..5e549220394 --- /dev/null +++ b/pkg/kubelet/cm/admission/errors.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "errors" + "fmt" + + "k8s.io/kubernetes/pkg/kubelet/lifecycle" +) + +const ( + ErrorReasonUnexpected = "UnexpectedAdmissionError" +) + +type Error interface { + Error() string + Type() string +} + +type unexpectedAdmissionError struct{ Err error } + +var _ Error = (*unexpectedAdmissionError)(nil) + +func (e *unexpectedAdmissionError) Error() string { + return fmt.Sprintf("Allocate failed due to %v, which is unexpected", e.Err) +} + +func (e *unexpectedAdmissionError) Type() string { + return ErrorReasonUnexpected +} + +func GetPodAdmitResult(err error) lifecycle.PodAdmitResult { + if err == nil { + return lifecycle.PodAdmitResult{Admit: true} + } + + var admissionErr Error + if !errors.As(err, &admissionErr) { + admissionErr = &unexpectedAdmissionError{err} + } + + return lifecycle.PodAdmitResult{ + Message: admissionErr.Error(), + Reason: admissionErr.Type(), + Admit: false, + } +} diff --git a/pkg/kubelet/cm/admission/errors_test.go b/pkg/kubelet/cm/admission/errors_test.go new file mode 100644 index 00000000000..d18eb4a8ed4 --- /dev/null +++ b/pkg/kubelet/cm/admission/errors_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "errors" + "testing" +) + +type TestAdmissionError struct { + message string + reason string +} + +func (e *TestAdmissionError) Error() string { + return e.message +} + +func (e *TestAdmissionError) Type() string { + return e.reason +} + +func TestAdmissionErrors(t *testing.T) { + testCases := []struct { + Error error + expectedAdmissionError bool + }{ + { + nil, + false, + }, + { + errors.New("Not an AdmissionError error"), + false, + }, + { + &TestAdmissionError{ + "Is an AdmissionError error", + "TestAdmissionError", + }, + true, + }, + } + + for _, tc := range testCases { + h := GetPodAdmitResult(tc.Error) + if tc.Error == nil { + if !h.Admit { + t.Errorf("expected PodAdmitResult.Admit = true") + } + continue + } + + if h.Admit { + t.Errorf("expected PodAdmitResult.Admit = false") + } + + if tc.expectedAdmissionError { + err, ok := tc.Error.(*TestAdmissionError) + if !ok { + t.Errorf("expected TestAdmissionError") + } + if h.Reason != err.reason { + t.Errorf("expected PodAdmitResult.Reason = %v, got %v", err.reason, h.Reason) + } + continue + } + + if h.Reason != ErrorReasonUnexpected { + t.Errorf("expected PodAdmitResult.Reason = %v, got %v", ErrorReasonUnexpected, h.Reason) + } + } +} diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 73e9b88c03d..5dbe856094c 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -134,6 +134,7 @@ type NodeConfig struct { NodeAllocatableConfig QOSReserved map[v1.ResourceName]int64 ExperimentalCPUManagerPolicy string + ExperimentalCPUManagerPolicyOptions map[string]string ExperimentalTopologyManagerScope string ExperimentalCPUManagerReconcilePeriod time.Duration ExperimentalMemoryManagerPolicy string diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index b3a4ce786fc..31a728b7291 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -52,6 +52,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" @@ -332,6 +333,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { cm.cpuManager, err = cpumanager.NewManager( nodeConfig.ExperimentalCPUManagerPolicy, + nodeConfig.ExperimentalCPUManagerPolicyOptions, nodeConfig.ExperimentalCPUManagerReconcilePeriod, machineInfo, nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs, @@ -757,37 +759,25 @@ func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { err := m.deviceManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } if m.cpuManager != nil { err = m.cpuManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } } if m.memoryManager != nil { err = m.memoryManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } } } - return lifecycle.PodAdmitResult{Admit: true} + return admission.GetPodAdmitResult(nil) } func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 1bc246c94b1..17fa16cdd9f 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -35,6 +35,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" @@ -63,9 +64,7 @@ type containerManagerImpl struct { type noopWindowsResourceAllocator struct{} func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Admit: true, - } + return admission.GetPodAdmitResult(nil) } func (cm *containerManagerImpl) Start(node *v1.Node, diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 5fd0ec9275f..ec793cb5bb1 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -143,17 +143,20 @@ func (s *sourcesReadyStub) AddSource(source string) {} func (s *sourcesReadyStub) AllReady() bool { return true } // NewManager creates new cpu manager based on provided policy -func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { +func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var topo *topology.CPUTopology var policy Policy + var err error switch policyName(cpuPolicyName) { case PolicyNone: - policy = NewNonePolicy() + policy, err = NewNonePolicy(cpuPolicyOptions) + if err != nil { + return nil, fmt.Errorf("new none policy error: %w", err) + } case PolicyStatic: - var err error topo, err = topology.Discover(machineInfo) if err != nil { return nil, err @@ -178,9 +181,9 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) + policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions) if err != nil { - return nil, fmt.Errorf("new static policy error: %v", err) + return nil, fmt.Errorf("new static policy error: %w", err) } default: diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 58ff1e56e44..dc818cb0f58 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -229,7 +229,8 @@ func TestCPUManagerAdd(t *testing.T) { }, 0, cpuset.NewCPUSet(), - topologymanager.NewFakeManager()) + topologymanager.NewFakeManager(), + nil) testCases := []struct { description string updateErr error @@ -479,7 +480,7 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) mockState := &mockState{ assignments: testCase.stAssignments, @@ -634,7 +635,7 @@ func TestCPUManagerGenerate(t *testing.T) { } defer os.RemoveAll(sDir) - mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) + mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) if testCase.expectedError != nil { if !strings.Contains(err.Error(), testCase.expectedError.Error()) { t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) @@ -1004,7 +1005,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) { }, 1, cpuset.NewCPUSet(0), - topologymanager.NewFakeManager()) + topologymanager.NewFakeManager(), + nil) testCases := []struct { description string updateErr error @@ -1061,3 +1063,69 @@ func TestCPUManagerAddWithResvList(t *testing.T) { } } } + +func TestCPUManagerHandlePolicyOptions(t *testing.T) { + testCases := []struct { + description string + cpuPolicyName string + cpuPolicyOptions map[string]string + expectedError error + }{ + { + description: "options to none policy", + cpuPolicyName: "none", + cpuPolicyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + expectedError: fmt.Errorf("received unsupported options"), + }, + } + + // any correct realistic topology is fine. We pick a simple one. + mockedMachineInfo := cadvisorapi.MachineInfo{ + NumCores: 4, + Topology: []cadvisorapi.Node{ + { + Cores: []cadvisorapi.Core{ + { + Id: 0, + Threads: []int{0}, + }, + { + Id: 1, + Threads: []int{1}, + }, + { + Id: 2, + Threads: []int{2}, + }, + { + Id: 3, + Threads: []int{3}, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + machineInfo := &mockedMachineInfo + nodeAllocatableReservation := v1.ResourceList{} + sDir, err := ioutil.TempDir("/tmp/", "cpu_manager_test") + if err != nil { + t.Errorf("cannot create state file: %s", err.Error()) + } + defer os.RemoveAll(sDir) + + _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.NewCPUSet(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) + if err == nil { + t.Errorf("Expected error, but NewManager succeeded") + } + if !strings.Contains(err.Error(), testCase.expectedError.Error()) { + t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) + } + }) + + } +} diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 2c38b52b374..28578e6415d 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -38,7 +38,8 @@ func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.Sourc func (m *fakeManager) Policy() Policy { klog.InfoS("Policy()") - return NewNonePolicy() + pol, _ := NewNonePolicy(nil) + return pol } func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index 345d4c14d6d..1e35f6a094e 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -17,6 +17,8 @@ limitations under the License. package cpumanager import ( + "fmt" + "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" @@ -32,8 +34,11 @@ var _ Policy = &nonePolicy{} const PolicyNone policyName = "none" // NewNonePolicy returns a cpuset manager policy that does nothing -func NewNonePolicy() Policy { - return &nonePolicy{} +func NewNonePolicy(cpuPolicyOptions map[string]string) (Policy, error) { + if len(cpuPolicyOptions) > 0 { + return nil, fmt.Errorf("None policy: received unsupported options=%v", cpuPolicyOptions) + } + return &nonePolicy{}, nil } func (p *nonePolicy) Name() string { diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index 97127971096..1dcd00bd3e9 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -86,3 +86,20 @@ func TestNonePolicyGetAllocatableCPUs(t *testing.T) { t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus) } } + +func TestNonePolicyOptions(t *testing.T) { + var err error + + _, err = NewNonePolicy(nil) + if err != nil { + t.Errorf("NewNonePolicy with nil options failure. expected no error but got: %v", err) + } + + opts := map[string]string{ + FullPCPUsOnlyOption: "true", + } + _, err = NewNonePolicy(opts) + if err == nil { + t.Errorf("NewNonePolicy with (any) options failure. expected error but got none") + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go new file mode 100644 index 00000000000..cf92a2e11c4 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -0,0 +1,56 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + "strconv" +) + +const ( + // FullPCPUsOnlyOption is the name of the CPU Manager policy option + FullPCPUsOnlyOption string = "full-pcpus-only" +) + +type StaticPolicyOptions struct { + // flag to enable extra allocation restrictions to avoid + // different containers to possibly end up on the same core. + // we consider "core" and "physical CPU" synonim here, leaning + // towards the terminoloy k8s hints. We acknowledge this is confusing. + // + // looking at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/, + // any possible naming scheme will lead to ambiguity to some extent. + // We picked "pcpu" because it the established docs hints at vCPU already. + FullPhysicalCPUsOnly bool +} + +func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) { + opts := StaticPolicyOptions{} + for name, value := range policyOptions { + switch name { + case FullPCPUsOnlyOption: + optValue, err := strconv.ParseBool(value) + if err != nil { + return opts, fmt.Errorf("bad value for option %q: %w", name, err) + } + opts.FullPhysicalCPUsOnly = optValue + default: + return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value) + } + } + return opts, nil +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index ec25a15a3c2..f5d275d8ea8 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -29,8 +29,29 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" ) -// PolicyStatic is the name of the static policy -const PolicyStatic policyName = "static" +const ( + + // PolicyStatic is the name of the static policy. + // Should options be given, these will be ignored and backward (up to 1.21 included) + // compatible behaviour will be enforced + PolicyStatic policyName = "static" + // ErrorSMTAlignment represents the type of an SMTAlignmentError + ErrorSMTAlignment = "SMTAlignmentError" +) + +// SMTAlignmentError represents an error due to SMT alignment +type SMTAlignmentError struct { + RequestedCPUs int + CpusPerCore int +} + +func (e SMTAlignmentError) Error() string { + return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore) +} + +func (e SMTAlignmentError) Type() string { + return ErrorSMTAlignment +} // staticPolicy is a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main @@ -79,6 +100,8 @@ type staticPolicy struct { affinity topologymanager.Store // set of CPUs to reuse across allocations in a pod cpusToReuse map[string]cpuset.CPUSet + // options allow to fine-tune the behaviour of the policy + options StaticPolicyOptions } // Ensure staticPolicy implements Policy interface @@ -87,7 +110,14 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) { + opts, err := NewStaticPolicyOptions(cpuPolicyOptions) + if err != nil { + return nil, err + } + + klog.InfoS("Static policy created with configuration", "options", opts) + allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -113,6 +143,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv reserved: reserved, affinity: affinity, cpusToReuse: make(map[string]cpuset.CPUSet), + options: opts, }, nil } @@ -220,6 +251,21 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) // container belongs in an exclusively allocated pool + if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { + // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted + // if the CPU requested is a multiple of the number of virtual cpus per physical cores. + // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put + // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores + // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, + // the pod would be placed on a node where there are enough physical cores available to be allocated. + // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket + // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all + // CPUs on a physical core. Allocation of individual threads would never have to occur. + return SMTAlignmentError{ + RequestedCPUs: numCPUs, + CpusPerCore: p.topology.CPUsPerCore(), + } + } if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { p.updateCPUsToReuse(pod, container, cpuset) klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index c54997787b4..d2b641fe3a0 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -34,6 +34,7 @@ type staticPolicyTest struct { topo *topology.CPUTopology numReservedCPUs int podUID string + options map[string]string containerName string stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet @@ -43,8 +44,27 @@ type staticPolicyTest struct { expCSet cpuset.CPUSet } +// this is not a real Clone() - hence Pseudo- - because we don't clone some +// objects which are accessed read-only +func (spt staticPolicyTest) PseudoClone() staticPolicyTest { + return staticPolicyTest{ + description: spt.description, + topo: spt.topo, // accessed in read-only + numReservedCPUs: spt.numReservedCPUs, + podUID: spt.podUID, + options: spt.options, // accessed in read-only + containerName: spt.containerName, + stAssignments: spt.stAssignments.Clone(), + stDefaultCPUSet: spt.stDefaultCPUSet.Clone(), + pod: spt.pod, // accessed in read-only + expErr: spt.expErr, + expCPUAlloc: spt.expCPUAlloc, + expCSet: spt.expCSet.Clone(), + } +} + func TestStaticPolicyName(t *testing.T) { - policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policyName := policy.Name() if policyName != "static" { @@ -120,7 +140,7 @@ func TestStaticPolicyStart(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, @@ -168,7 +188,9 @@ func TestStaticPolicyAdd(t *testing.T) { largeTopoSock0CPUSet := largeTopoSock0Builder.Result() largeTopoSock1CPUSet := largeTopoSock1Builder.Result() - testCases := []staticPolicyTest{ + // these are the cases which must behave the same regardless the policy options. + // So we will permutate the options to ensure this holds true. + optionsInsensitiveTestCases := []staticPolicyTest{ { description: "GuPodSingleCore, SingleSocketHT, ExpectError", topo: topoSingleSocketHT, @@ -180,17 +202,6 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), }, - { - description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", - topo: topoSingleSocketHT, - numReservedCPUs: 1, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), - expErr: nil, - expCPUAlloc: true, - expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core - }, { description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore", topo: topoSingleSocketHT, @@ -400,22 +411,6 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: true, expCSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47)), }, - { - // Only partial cores are available in the entire system. - // Expect allocation of all the CPUs from the partial cores. - description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", - topo: topoQuadSocketFourWayHT, - stAssignments: state.ContainerCPUAssignments{ - "fakePod": map[string]cpuset.CPUSet{ - "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), - }, - }, - stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), - pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"), - expErr: nil, - expCPUAlloc: true, - expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), - }, { // Only 7 CPUs are available. // Pod requests 76 cores. @@ -435,45 +430,130 @@ func TestStaticPolicyAdd(t *testing.T) { }, } - for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + // testcases for the default behaviour of the policy. + defaultOptionsTestCases := []staticPolicyTest{ + { + description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", + topo: topoSingleSocketHT, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core + }, + { + // Only partial cores are available in the entire system. + // Expect allocation of all the CPUs from the partial cores. + description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", + topo: topoQuadSocketFourWayHT, + stAssignments: state.ContainerCPUAssignments{ + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + }, + }, + stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), + pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), + }, + } - st := &mockState{ - assignments: testCase.stAssignments, - defaultCPUSet: testCase.stDefaultCPUSet, + // testcases for the FullPCPUsOnlyOption + smtalignOptionTestCases := []staticPolicyTest{ + { + description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", + topo: topoSingleSocketHT, + options: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), + expErr: SMTAlignmentError{RequestedCPUs: 1, CpusPerCore: 2}, + expCPUAlloc: false, + expCSet: cpuset.NewCPUSet(), // reject allocation of sibling of partial core + }, + { + // test SMT-level != 2 - which is the default on x86_64 + description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocOneCPUs", + topo: topoQuadSocketFourWayHT, + options: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + numReservedCPUs: 8, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: largeTopoCPUSet, + pod: makePod("fakePod", "fakeContainer15", "15000m", "15000m"), + expErr: SMTAlignmentError{RequestedCPUs: 15, CpusPerCore: 4}, + expCPUAlloc: false, + expCSet: cpuset.NewCPUSet(), + }, + } + + for _, testCase := range optionsInsensitiveTestCases { + for _, options := range []map[string]string{ + nil, + { + FullPCPUsOnlyOption: "true", + }, + } { + tCase := testCase.PseudoClone() + tCase.description = fmt.Sprintf("options=%v %s", options, testCase.description) + tCase.options = options + runStaticPolicyTestCase(t, tCase) + } + } + + for _, testCase := range defaultOptionsTestCases { + runStaticPolicyTestCase(t, testCase) + } + for _, testCase := range smtalignOptionTestCases { + runStaticPolicyTestCase(t, testCase) + } +} + +func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options) + + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + + container := &testCase.pod.Spec.Containers[0] + err := policy.Allocate(st, testCase.pod, container) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q", + testCase.description, testCase.expErr, err) + } + + if testCase.expCPUAlloc { + cset, found := st.assignments[string(testCase.pod.UID)][container.Name] + if !found { + t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } - container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) - if !reflect.DeepEqual(err, testCase.expErr) { - t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", - testCase.description, testCase.expErr, err) + if !reflect.DeepEqual(cset, testCase.expCSet) { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + testCase.description, testCase.expCSet, cset) } - if testCase.expCPUAlloc { - cset, found := st.assignments[string(testCase.pod.UID)][container.Name] - if !found { - t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", - testCase.description, container.Name, st.assignments) - } - - if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", - testCase.description, testCase.expCSet, cset) - } - - if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", - testCase.description, cset, st.defaultCPUSet) - } + if !cset.Intersection(st.defaultCPUSet).IsEmpty() { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + testCase.description, cset, st.defaultCPUSet) } + } - if !testCase.expCPUAlloc { - _, found := st.assignments[string(testCase.pod.UID)][container.Name] - if found { - t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", - testCase.description, container.Name, st.assignments) - } + if !testCase.expCPUAlloc { + _, found := st.assignments[string(testCase.pod.UID)][container.Name] + if found { + t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } } } @@ -537,7 +617,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) st := &mockState{ assignments: testCase.stAssignments, @@ -627,7 +707,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, @@ -701,7 +781,7 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) if !reflect.DeepEqual(err, testCase.expNewErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", testCase.description, testCase.expNewErr, err) @@ -778,7 +858,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) st := &mockState{ assignments: testCase.stAssignments, @@ -819,3 +899,88 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } } } + +type staticPolicyOptionTestCase struct { + description string + policyOptions map[string]string + expectedError bool + expectedValue StaticPolicyOptions +} + +func TestStaticPolicyOptions(t *testing.T) { + testCases := []staticPolicyOptionTestCase{ + { + description: "nil args", + policyOptions: nil, + expectedError: false, + expectedValue: StaticPolicyOptions{}, + }, + { + description: "empty args", + policyOptions: map[string]string{}, + expectedError: false, + expectedValue: StaticPolicyOptions{}, + }, + { + description: "bad single arg", + policyOptions: map[string]string{ + "badValue1": "", + }, + expectedError: true, + }, + { + description: "bad multiple arg", + policyOptions: map[string]string{ + "badValue1": "", + "badvalue2": "aaaa", + }, + expectedError: true, + }, + { + description: "good arg", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + expectedError: false, + expectedValue: StaticPolicyOptions{ + FullPhysicalCPUsOnly: true, + }, + }, + { + description: "good arg, bad value", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "enabled!", + }, + expectedError: true, + }, + + { + description: "bad arg intermixed", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "1", + "badvalue2": "lorem ipsum", + }, + expectedError: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + opts, err := NewStaticPolicyOptions(testCase.policyOptions) + gotError := (err != nil) + if gotError != testCase.expectedError { + t.Fatalf("error with args %v expected error %v got %v: %v", + testCase.policyOptions, testCase.expectedError, gotError, err) + } + + if testCase.expectedError { + return + } + + if !reflect.DeepEqual(opts, testCase.expectedValue) { + t.Fatalf("value mismatch with args %v expected value %v got %v", + testCase.policyOptions, testCase.expectedValue, opts) + } + }) + } +} diff --git a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go index 407691e98f0..8a60aa23347 100644 --- a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -64,7 +65,5 @@ func (m *fakeManager) RemoveContainer(containerID string) error { func (m *fakeManager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { klog.InfoS("Topology Admit Handler") - return lifecycle.PodAdmitResult{ - Admit: true, - } + return admission.GetPodAdmitResult(nil) } diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index c5c6f36be97..912aba3fde0 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -17,12 +17,11 @@ limitations under the License. package topologymanager import ( - "fmt" - "sync" "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -133,10 +132,10 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } // It would be better to implement this function in topologymanager instead of scope @@ -150,23 +149,3 @@ func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) e } return nil } - -func topologyAffinityError() lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Message: "Resources cannot be allocated with Topology locality", - Reason: "TopologyAffinityError", - Admit: false, - } -} - -func unexpectedAdmissionError(err error) lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } -} - -func admitPod() lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{Admit: true} -} diff --git a/pkg/kubelet/cm/topologymanager/scope_container.go b/pkg/kubelet/cm/topologymanager/scope_container.go index de45209625a..1e4e2f58fc0 100644 --- a/pkg/kubelet/cm/topologymanager/scope_container.go +++ b/pkg/kubelet/cm/topologymanager/scope_container.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -53,17 +54,17 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name) if !admit { - return topologyAffinityError() + return admission.GetPodAdmitResult(&TopologyAffinityError{}) } klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name) s.setTopologyHints(string(pod.UID), container.Name, bestHint) err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint { diff --git a/pkg/kubelet/cm/topologymanager/scope_pod.go b/pkg/kubelet/cm/topologymanager/scope_pod.go index 9ccc6414dd9..b77682597b8 100644 --- a/pkg/kubelet/cm/topologymanager/scope_pod.go +++ b/pkg/kubelet/cm/topologymanager/scope_pod.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -51,7 +52,7 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { bestHint, admit := s.calculateAffinity(pod) klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod)) if !admit { - return topologyAffinityError() + return admission.GetPodAdmitResult(&TopologyAffinityError{}) } for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { @@ -60,10 +61,10 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint { diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index 4f327e6efc0..7cd67d1aa60 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -36,8 +36,21 @@ const ( // present on a machine and the TopologyManager is enabled, an error will // be returned and the TopologyManager will not be loaded. maxAllowableNUMANodes = 8 + // ErrorTopologyAffinity represents the type for a TopologyAffinityError + ErrorTopologyAffinity = "TopologyAffinityError" ) +// TopologyAffinityError represents an resource alignment error +type TopologyAffinityError struct{} + +func (e TopologyAffinityError) Error() string { + return "Resources cannot be allocated with Topology locality" +} + +func (e TopologyAffinityError) Type() string { + return ErrorTopologyAffinity +} + // Manager interface provides methods for Kubelet to manage pod topology hints type Manager interface { // PodAdmitHandler is implemented by Manager diff --git a/pkg/kubelet/cm/topologymanager/topology_manager_test.go b/pkg/kubelet/cm/topologymanager/topology_manager_test.go index 6c84411c397..c0cc0980c65 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager_test.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager_test.go @@ -477,11 +477,17 @@ func TestAdmit(t *testing.T) { if ctnActual.Admit != tc.expected { t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, ctnActual.Admit) } + if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity { + t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason) + } // Pod scope Admit podActual := podScopeManager.Admit(&podAttr) if podActual.Admit != tc.expected { t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, podActual.Admit) } + if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity { + t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason) + } } } diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index b96ae1c235c..90179c5cdf3 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -448,6 +448,14 @@ type KubeletConfiguration struct { // Default: "None" // +optional CPUManagerPolicy string `json:"cpuManagerPolicy,omitempty"` + // cpuManagerPolicyOptions is a set of key=value which allows to set extra options + // to fine tune the behaviour of the cpu manager policies. + // Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled. + // Dynamic Kubelet Config (beta): This field should not be updated without a full node + // reboot. It is safest to keep this value the same as the local config. + // Default: nil + // +optional + CPUManagerPolicyOptions map[string]string `json:"cpuManagerPolicyOptions,omitempty"` // cpuManagerReconcilePeriod is the reconciliation period for the CPU Manager. // Requires the CPUManager feature gate to be enabled. // Dynamic Kubelet Config (deprecated): If dynamically updating this field, consider that diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go index aba0b74227e..027ac220164 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go @@ -167,6 +167,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { *out = new(bool) **out = **in } + if in.CPUManagerPolicyOptions != nil { + in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod if in.QOSReserved != nil { in, out := &in.QOSReserved, &out.QOSReserved diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index fb38837b575..2ca84bcd90a 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -17,8 +17,10 @@ limitations under the License. package e2enode import ( + "context" "fmt" "os/exec" + "regexp" "strconv" "strings" "time" @@ -34,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "github.com/onsi/ginkgo" @@ -159,6 +162,17 @@ func isMultiNUMA() bool { return numaNodes > 1 } +func getSMTLevel() int { + cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this. + out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuID)).Output() + framework.ExpectNoError(err) + // how many thread sibling you have = SMT level + // example: 2-way SMT means 2 threads sibling for each thread + cpus, err := cpuset.Parse(strings.TrimSpace(string(out))) + framework.ExpectNoError(err) + return cpus.Size() +} + func getCPUSiblingList(cpuRes int64) string { out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output() framework.ExpectNoError(err) @@ -209,20 +223,16 @@ func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.K return oldCfg } -func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) { - return configureCPUManagerInKubelet(f, cleanStateFile, cpuset.CPUSet{}) -} - -func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) { +func configureCPUManagerInKubelet(f *framework.Framework, policyName string, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet, enableOptions bool, options map[string]string) (oldCfg *kubeletconfig.KubeletConfiguration) { // Enable CPU Manager in Kubelet with static policy. oldCfg, err := getCurrentKubeletConfig() framework.ExpectNoError(err) newCfg := oldCfg.DeepCopy() if newCfg.FeatureGates == nil { newCfg.FeatureGates = make(map[string]bool) - } else { - newCfg.FeatureGates["CPUManager"] = true } + newCfg.FeatureGates["CPUManager"] = true + newCfg.FeatureGates["CPUManagerPolicyOptions"] = enableOptions // After graduation of the CPU Manager feature to Beta, the CPU Manager // "none" policy is ON by default. But when we set the CPU Manager policy to @@ -237,11 +247,9 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r deleteStateFile() } - // Set the CPU Manager policy to static. - newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) - - // Set the CPU Manager reconcile period to 1 second. + newCfg.CPUManagerPolicy = policyName newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} + newCfg.CPUManagerPolicyOptions = options if reservedSystemCPUs.Size() > 0 { cpus := reservedSystemCPUs.String() @@ -272,43 +280,37 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r return oldCfg } -func runGuPodTest(f *framework.Framework) { - var ctnAttrs []ctnAttribute - var cpu1 int - var err error - var cpuList []int +func runGuPodTest(f *framework.Framework, cpuCount int) { var pod *v1.Pod - var expAllowedCPUsListRegex string - ctnAttrs = []ctnAttribute{ + ctnAttrs := []ctnAttribute{ { ctnName: "gu-container", - cpuRequest: "1000m", - cpuLimit: "1000m", + cpuRequest: fmt.Sprintf("%dm", 1000*cpuCount), + cpuLimit: fmt.Sprintf("%dm", 1000*cpuCount), }, } pod = makeCPUManagerPod("gu-pod", ctnAttrs) pod = f.PodClient().CreateSync(pod) ginkgo.By("checking if the expected cpuset was assigned") - cpu1 = 1 - if isHTEnabled() { - cpuList = cpuset.MustParse(getCPUSiblingList(0)).ToSlice() - cpu1 = cpuList[1] - } else if isMultiNUMA() { - cpuList = cpuset.MustParse(getCoreSiblingList(0)).ToSlice() - if len(cpuList) > 1 { - cpu1 = cpuList[1] - } + // any full CPU is fine - we cannot nor we should predict which one, though + for _, cnt := range pod.Spec.Containers { + ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name)) + + logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + cpus, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.ExpectEqual(cpus.Size(), cpuCount, "expected cpu set size == %d, got %q", cpuCount, cpus.String()) } - expAllowedCPUsListRegex = fmt.Sprintf("^%d\n$", cpu1) - err = f.PodClient().MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex) - framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", - pod.Spec.Containers[0].Name, pod.Name) ginkgo.By("by deleting the pods and waiting for container removal") deletePods(f, []string{pod.Name}) - waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + waitForAllContainerRemoval(pod.Name, pod.Namespace) } func runNonGuPodTest(f *framework.Framework, cpuCap int64) { @@ -586,13 +588,13 @@ func runCPUManagerTests(f *framework.Framework) { } // Enable CPU Manager in the kubelet. - oldCfg = enableCPUManagerInKubelet(f, true) + oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), true, cpuset.CPUSet{}, false, nil) ginkgo.By("running a non-Gu pod") runNonGuPodTest(f, cpuCap) ginkgo.By("running a Gu pod") - runGuPodTest(f) + runGuPodTest(f, 1) ginkgo.By("running multiple Gu and non-Gu pods") runMultipleGuNonGuPods(f, cpuCap, cpuAlloc) @@ -653,18 +655,150 @@ func runCPUManagerTests(f *framework.Framework) { waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) ginkgo.By("enable cpu manager in kubelet without delete state file") - enableCPUManagerInKubelet(f, false) + configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), false, cpuset.CPUSet{}, false, nil) ginkgo.By("wait for the deleted pod to be cleaned up from the state file") waitForStateFileCleanedUp() ginkgo.By("the deleted pod has already been deleted from the state file") }) + ginkgo.It("should assign CPUs as expected with enhanced policy based on strict SMT alignment", func() { + fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) + _, cpuAlloc, _ = getLocalNodeCPUDetails(f) + smtLevel := getSMTLevel() + + // strict SMT alignment is trivially verified and granted on non-SMT systems + if smtLevel < 2 { + e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt) + } + + // our tests want to allocate a full core, so we need at last 2*2=4 virtual cpus + if cpuAlloc < int64(smtLevel*2) { + e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt) + } + + framework.Logf("SMT level %d", smtLevel) + + cleanStateFile := true + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably + // check what we do have in the node. + cpuPolicyOptions := map[string]string{ + cpumanager.FullPCPUsOnlyOption: "true", + } + oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), cleanStateFile, cpuset.NewCPUSet(0), true, cpuPolicyOptions) + + // the order between negative and positive doesn't really matter + runSMTAlignmentNegativeTests(f) + runSMTAlignmentPositiveTests(f, smtLevel) + }) + ginkgo.AfterEach(func() { setOldKubeletConfig(f, oldCfg) }) } +func runSMTAlignmentNegativeTests(f *framework.Framework) { + // negative test: try to run a container whose requests aren't a multiple of SMT level, expect a rejection + ctnAttrs := []ctnAttribute{ + { + ctnName: "gu-container-neg", + cpuRequest: "1000m", + cpuLimit: "1000m", + }, + } + pod := makeCPUManagerPod("gu-pod", ctnAttrs) + // CreateSync would wait for pod to become Ready - which will never happen if production code works as intended! + pod = f.PodClient().Create(pod) + + err := e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase != v1.PodPending { + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err) + pod, err = f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + if pod.Status.Phase != v1.PodFailed { + framework.Failf("pod %s not failed: %v", pod.Name, pod.Status) + } + if !isSMTAlignmentError(pod) { + framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason) + } + + deletePodSyncByName(f, pod.Name) + // we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state. + // this is in turn needed because we will have an unavoidable (in the current framework) race with th + // reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire + waitForAllContainerRemoval(pod.Name, pod.Namespace) +} + +func runSMTAlignmentPositiveTests(f *framework.Framework, smtLevel int) { + // positive test: try to run a container whose requests are a multiple of SMT level, check allocated cores + // 1. are core siblings + // 2. take a full core + // WARNING: this assumes 2-way SMT systems - we don't know how to access other SMT levels. + // this means on more-than-2-way SMT systems this test will prove nothing + ctnAttrs := []ctnAttribute{ + { + ctnName: "gu-container-pos", + cpuRequest: "2000m", + cpuLimit: "2000m", + }, + } + pod := makeCPUManagerPod("gu-pod", ctnAttrs) + pod = f.PodClient().CreateSync(pod) + + for _, cnt := range pod.Spec.Containers { + ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name)) + + logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + cpus, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + + validateSMTAlignment(cpus, smtLevel, pod, &cnt) + } + + deletePodSyncByName(f, pod.Name) + // we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state. + // this is in turn needed because we will have an unavoidable (in the current framework) race with th + // reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire + waitForAllContainerRemoval(pod.Name, pod.Namespace) +} + +func validateSMTAlignment(cpus cpuset.CPUSet, smtLevel int, pod *v1.Pod, cnt *v1.Container) { + framework.Logf("validating cpus: %v", cpus) + + if cpus.Size()%smtLevel != 0 { + framework.Failf("pod %q cnt %q received non-smt-multiple cpuset %v (SMT level %d)", pod.Name, cnt.Name, cpus, smtLevel) + } + + // now check all the given cpus are thread siblings. + // to do so the easiest way is to rebuild the expected set of siblings from all the cpus we got. + // if the expected set matches the given set, the given set was good. + b := cpuset.NewBuilder() + for _, cpuID := range cpus.ToSliceNoSort() { + threadSiblings, err := cpuset.Parse(strings.TrimSpace(getCPUSiblingList(int64(cpuID)))) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + b.Add(threadSiblings.ToSliceNoSort()...) + } + siblingsCPUs := b.Result() + + framework.Logf("siblings cpus: %v", siblingsCPUs) + if !siblingsCPUs.Equals(cpus) { + framework.Failf("pod %q cnt %q received non-smt-aligned cpuset %v (expected %v)", pod.Name, cnt.Name, cpus, siblingsCPUs) + } +} + +func isSMTAlignmentError(pod *v1.Pod) bool { + re := regexp.MustCompile(`SMT.*Alignment.*Error`) + return re.MatchString(pod.Status.Reason) +} + // Serial because the test updates kubelet configuration. var _ = SIGDescribe("CPU Manager [Serial] [Feature:CPUManager]", func() { f := framework.NewDefaultFramework("cpu-manager-test") diff --git a/test/e2e_node/topology_manager_test.go b/test/e2e_node/topology_manager_test.go index 27ea6a0f4e9..9ac3afe1a0e 100644 --- a/test/e2e_node/topology_manager_test.go +++ b/test/e2e_node/topology_manager_test.go @@ -372,7 +372,7 @@ func runTopologyManagerPolicySuiteTests(f *framework.Framework) { runNonGuPodTest(f, cpuCap) ginkgo.By("running a Gu pod") - runGuPodTest(f) + runGuPodTest(f, 1) ginkgo.By("running multiple Gu and non-Gu pods") runMultipleGuNonGuPods(f, cpuCap, cpuAlloc) @@ -596,10 +596,7 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { } func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) { - threadsPerCore := 1 - if isHTEnabled() { - threadsPerCore = 2 - } + threadsPerCore := getSMTLevel() sd := setupSRIOVConfigOrFail(f, configMap) var ctnAttrs, initCtnAttrs []tmCtnAttribute @@ -699,10 +696,7 @@ func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1. } func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, sd *sriovData, reservedSystemCPUs, policy string, numaNodes, coreCount int) { - threadsPerCore := 1 - if isHTEnabled() { - threadsPerCore = 2 - } + threadsPerCore := getSMTLevel() waitForSRIOVResources(f, sd)