From cc76a756e409d08959b8b6e623e317619399f015 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Mon, 26 Apr 2021 20:24:30 +0100 Subject: [PATCH 1/6] smtalign: add cpu-manager-policy-options flag in Kubelet In this patch we enhance the kubelet configuration to support cpuManagerPolicyOptions. In order to introduce SMT-awareness in CPU Manager, we introduce a new flag in Kubelet to allow the user to specify an additional flag called `cpumanager-policy-options` to allow the user to modify the behaviour of static policy to strictly guarantee allocation of whole core. Co-authored-by: Francesco Romani Signed-off-by: Swati Sehgal --- cmd/kubelet/app/options/options.go | 1 + pkg/kubelet/apis/config/fuzzer/fuzzer.go | 1 + pkg/kubelet/apis/config/helpers_test.go | 1 + pkg/kubelet/apis/config/types.go | 4 ++++ staging/src/k8s.io/kubelet/config/v1beta1/types.go | 8 ++++++++ 5 files changed, 15 insertions(+) diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 5f5fa99a94e..3d684e6e8e5 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -492,6 +492,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'") fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "CPU Manager policy to use. Possible values: 'none', 'static'.") + fs.Var(cliflag.NewMapStringStringNoSplit(&c.CPUManagerPolicyOptions), "cpu-manager-policy-options", "A set of key=value CPU Manager policy options to use, to fine tune their behaviour. If not supplied, keep the default behaviour.") fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, " CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to 'NodeStatusUpdateFrequency'") fs.Var(cliflag.NewMapStringString(&c.QOSReserved), "qos-reserved", " A set of ResourceName=Percentage (e.g. memory=50%) pairs that describe how pod resource requests are reserved at the QoS level. Currently only memory is supported. Requires the QOSReserved feature gate to be enabled.") fs.StringVar(&c.TopologyManagerPolicy, "topology-manager-policy", c.TopologyManagerPolicy, "Topology Manager policy to use. Possible values: 'none', 'best-effort', 'restricted', 'single-numa-node'.") diff --git a/pkg/kubelet/apis/config/fuzzer/fuzzer.go b/pkg/kubelet/apis/config/fuzzer/fuzzer.go index 39da70b8305..5d7ebb9cd58 100644 --- a/pkg/kubelet/apis/config/fuzzer/fuzzer.go +++ b/pkg/kubelet/apis/config/fuzzer/fuzzer.go @@ -70,6 +70,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} { obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute} obj.NodeLeaseDurationSeconds = 40 obj.CPUManagerPolicy = "none" + obj.CPUManagerPolicyOptions = make(map[string]string) obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency obj.NodeStatusMaxImages = 50 obj.TopologyManagerPolicy = kubeletconfig.NoneTopologyManagerPolicy diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 442a080ad97..b30613da572 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -150,6 +150,7 @@ var ( "CPUCFSQuota", "CPUCFSQuotaPeriod.Duration", "CPUManagerPolicy", + "CPUManagerPolicyOptions[*]", "CPUManagerReconcilePeriod.Duration", "TopologyManagerPolicy", "TopologyManagerScope", diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index fce1f3ad367..85b84b50638 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -221,6 +221,10 @@ type KubeletConfiguration struct { // CPUManagerPolicy is the name of the policy to use. // Requires the CPUManager feature gate to be enabled. CPUManagerPolicy string + // CPUManagerPolicyOptions is a set of key=value which allows to set extra options + // to fine tune the behaviour of the cpu manager policies. + // Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled. + CPUManagerPolicyOptions map[string]string // CPU Manager reconciliation period. // Requires the CPUManager feature gate to be enabled. CPUManagerReconcilePeriod metav1.Duration diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index b96ae1c235c..90179c5cdf3 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -448,6 +448,14 @@ type KubeletConfiguration struct { // Default: "None" // +optional CPUManagerPolicy string `json:"cpuManagerPolicy,omitempty"` + // cpuManagerPolicyOptions is a set of key=value which allows to set extra options + // to fine tune the behaviour of the cpu manager policies. + // Requires both the "CPUManager" and "CPUManagerPolicyOptions" feature gates to be enabled. + // Dynamic Kubelet Config (beta): This field should not be updated without a full node + // reboot. It is safest to keep this value the same as the local config. + // Default: nil + // +optional + CPUManagerPolicyOptions map[string]string `json:"cpuManagerPolicyOptions,omitempty"` // cpuManagerReconcilePeriod is the reconciliation period for the CPU Manager. // Requires the CPUManager feature gate to be enabled. // Dynamic Kubelet Config (deprecated): If dynamically updating this field, consider that From 6dccad45b4f8534ea1d0b0ecd8830b72e8e56dd4 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Fri, 11 Jun 2021 19:15:53 +0200 Subject: [PATCH 2/6] smtalign: add auto generated code Files generate after running `make generated_files`. Co-authored-by: Swati Sehgal Signed-off-by: Francesco Romani --- pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go | 2 ++ pkg/kubelet/apis/config/zz_generated.deepcopy.go | 7 +++++++ .../k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go | 7 +++++++ 3 files changed, 16 insertions(+) diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index f7a8c4b5d43..141b901a802 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -294,6 +294,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in } out.CgroupDriver = in.CgroupDriver out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions)) out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.MemoryManagerPolicy = in.MemoryManagerPolicy out.TopologyManagerPolicy = in.TopologyManagerPolicy @@ -464,6 +465,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in } out.CgroupDriver = in.CgroupDriver out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerPolicyOptions = *(*map[string]string)(unsafe.Pointer(&in.CPUManagerPolicyOptions)) out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.MemoryManagerPolicy = in.MemoryManagerPolicy out.TopologyManagerPolicy = in.TopologyManagerPolicy diff --git a/pkg/kubelet/apis/config/zz_generated.deepcopy.go b/pkg/kubelet/apis/config/zz_generated.deepcopy.go index 2c1b67744ed..67dba19b020 100644 --- a/pkg/kubelet/apis/config/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/config/zz_generated.deepcopy.go @@ -201,6 +201,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.ImageMinimumGCAge = in.ImageMinimumGCAge out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod + if in.CPUManagerPolicyOptions != nil { + in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod if in.QOSReserved != nil { in, out := &in.QOSReserved, &out.QOSReserved diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go index aba0b74227e..027ac220164 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go @@ -167,6 +167,13 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { *out = new(bool) **out = **in } + if in.CPUManagerPolicyOptions != nil { + in, out := &in.CPUManagerPolicyOptions, &out.CPUManagerPolicyOptions + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod if in.QOSReserved != nil { in, out := &in.QOSReserved, &out.QOSReserved From c5cb263dcf26d014ba3df161caf88d3176b43275 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 4 May 2021 15:37:58 +0200 Subject: [PATCH 3/6] smtalign: propagate policy options to cpumanager The CPUManagerPolicyOptions received from the kubelet config/command line args is propogated to the Container Manager. We defer the consumption of the options to a later patch(set). Co-authored-by: Swati Sehgal Signed-off-by: Francesco Romani --- cmd/kubelet/app/server.go | 11 +++++++++++ pkg/features/kube_features.go | 7 +++++++ pkg/kubelet/cm/container_manager.go | 1 + pkg/kubelet/cm/container_manager_linux.go | 1 + pkg/kubelet/cm/cpumanager/cpu_manager.go | 2 +- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 2 +- 6 files changed, 22 insertions(+), 2 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 64e39e3322d..34bac9c53a9 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -732,6 +732,16 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) + var cpuManagerPolicyOptions map[string]string + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) { + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) { + cpuManagerPolicyOptions = s.CPUManagerPolicyOptions + } else if s.CPUManagerPolicyOptions != nil { + return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled", + s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions) + } + } + kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, @@ -756,6 +766,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend }, QOSReserved: *experimentalQOSReserved, ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, + ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions, ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy, ExperimentalMemoryManagerReservedMemory: s.ReservedMemory, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 841686d615a..7bc5d0de2ac 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -789,6 +789,12 @@ const ( // // Enables kubelet to support memory QoS with cgroups v2. MemoryQoS featuregate.Feature = "MemoryQoS" + + // owner: @fromanirh + // alpha: v1.22 + // + // Allow fine-tuning of cpumanager policies + CPUManagerPolicyOptions featuregate.Feature = "CPUManagerPolicyOptions" ) func init() { @@ -906,6 +912,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS DelegateFSGroupToCSIDriver: {Default: false, PreRelease: featuregate.Alpha}, KubeletInUserNamespace: {Default: false, PreRelease: featuregate.Alpha}, MemoryQoS: {Default: false, PreRelease: featuregate.Alpha}, + CPUManagerPolicyOptions: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 73e9b88c03d..5dbe856094c 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -134,6 +134,7 @@ type NodeConfig struct { NodeAllocatableConfig QOSReserved map[v1.ResourceName]int64 ExperimentalCPUManagerPolicy string + ExperimentalCPUManagerPolicyOptions map[string]string ExperimentalTopologyManagerScope string ExperimentalCPUManagerReconcilePeriod time.Duration ExperimentalMemoryManagerPolicy string diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index b3a4ce786fc..14b62159acd 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -332,6 +332,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { cm.cpuManager, err = cpumanager.NewManager( nodeConfig.ExperimentalCPUManagerPolicy, + nodeConfig.ExperimentalCPUManagerPolicyOptions, nodeConfig.ExperimentalCPUManagerReconcilePeriod, machineInfo, nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs, diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 5fd0ec9275f..b8ebf935562 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -143,7 +143,7 @@ func (s *sourcesReadyStub) AddSource(source string) {} func (s *sourcesReadyStub) AllReady() bool { return true } // NewManager creates new cpu manager based on provided policy -func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { +func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var topo *topology.CPUTopology var policy Policy diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 58ff1e56e44..08ad4611a65 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -634,7 +634,7 @@ func TestCPUManagerGenerate(t *testing.T) { } defer os.RemoveAll(sDir) - mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) + mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) if testCase.expectedError != nil { if !strings.Contains(err.Error(), testCase.expectedError.Error()) { t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) From 6dcec345dfb43bff9e978cb9e7a992e1e34c9507 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 5 May 2021 16:50:28 +0200 Subject: [PATCH 4/6] smtalign: cm: factor out admission response Introduce a new `admission` subpackage to factor out the responsability to create `PodAdmitResult` objects. This enables resource manager to report specific errors in Allocate() and to bubble up them in the relevant fields of the `PodAdmitResult`. To demonstrate the approach we refactor TopologyAffinityError as a proper error. Co-authored-by: Kevin Klues Co-authored-by: Swati Sehgal Signed-off-by: Francesco Romani --- pkg/kubelet/cm/admission/errors.go | 62 +++++++++++++ pkg/kubelet/cm/admission/errors_test.go | 87 +++++++++++++++++++ pkg/kubelet/cm/container_manager_linux.go | 21 ++--- pkg/kubelet/cm/container_manager_windows.go | 5 +- .../topologymanager/fake_topology_manager.go | 5 +- pkg/kubelet/cm/topologymanager/scope.go | 27 +----- .../cm/topologymanager/scope_container.go | 7 +- pkg/kubelet/cm/topologymanager/scope_pod.go | 7 +- .../cm/topologymanager/topology_manager.go | 13 +++ .../topologymanager/topology_manager_test.go | 6 ++ 10 files changed, 188 insertions(+), 52 deletions(-) create mode 100644 pkg/kubelet/cm/admission/errors.go create mode 100644 pkg/kubelet/cm/admission/errors_test.go diff --git a/pkg/kubelet/cm/admission/errors.go b/pkg/kubelet/cm/admission/errors.go new file mode 100644 index 00000000000..5e549220394 --- /dev/null +++ b/pkg/kubelet/cm/admission/errors.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "errors" + "fmt" + + "k8s.io/kubernetes/pkg/kubelet/lifecycle" +) + +const ( + ErrorReasonUnexpected = "UnexpectedAdmissionError" +) + +type Error interface { + Error() string + Type() string +} + +type unexpectedAdmissionError struct{ Err error } + +var _ Error = (*unexpectedAdmissionError)(nil) + +func (e *unexpectedAdmissionError) Error() string { + return fmt.Sprintf("Allocate failed due to %v, which is unexpected", e.Err) +} + +func (e *unexpectedAdmissionError) Type() string { + return ErrorReasonUnexpected +} + +func GetPodAdmitResult(err error) lifecycle.PodAdmitResult { + if err == nil { + return lifecycle.PodAdmitResult{Admit: true} + } + + var admissionErr Error + if !errors.As(err, &admissionErr) { + admissionErr = &unexpectedAdmissionError{err} + } + + return lifecycle.PodAdmitResult{ + Message: admissionErr.Error(), + Reason: admissionErr.Type(), + Admit: false, + } +} diff --git a/pkg/kubelet/cm/admission/errors_test.go b/pkg/kubelet/cm/admission/errors_test.go new file mode 100644 index 00000000000..d18eb4a8ed4 --- /dev/null +++ b/pkg/kubelet/cm/admission/errors_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "errors" + "testing" +) + +type TestAdmissionError struct { + message string + reason string +} + +func (e *TestAdmissionError) Error() string { + return e.message +} + +func (e *TestAdmissionError) Type() string { + return e.reason +} + +func TestAdmissionErrors(t *testing.T) { + testCases := []struct { + Error error + expectedAdmissionError bool + }{ + { + nil, + false, + }, + { + errors.New("Not an AdmissionError error"), + false, + }, + { + &TestAdmissionError{ + "Is an AdmissionError error", + "TestAdmissionError", + }, + true, + }, + } + + for _, tc := range testCases { + h := GetPodAdmitResult(tc.Error) + if tc.Error == nil { + if !h.Admit { + t.Errorf("expected PodAdmitResult.Admit = true") + } + continue + } + + if h.Admit { + t.Errorf("expected PodAdmitResult.Admit = false") + } + + if tc.expectedAdmissionError { + err, ok := tc.Error.(*TestAdmissionError) + if !ok { + t.Errorf("expected TestAdmissionError") + } + if h.Reason != err.reason { + t.Errorf("expected PodAdmitResult.Reason = %v, got %v", err.reason, h.Reason) + } + continue + } + + if h.Reason != ErrorReasonUnexpected { + t.Errorf("expected PodAdmitResult.Reason = %v, got %v", ErrorReasonUnexpected, h.Reason) + } + } +} diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 14b62159acd..31a728b7291 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -52,6 +52,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" @@ -758,37 +759,25 @@ func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { err := m.deviceManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } if m.cpuManager != nil { err = m.cpuManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } } if m.memoryManager != nil { err = m.memoryManager.Allocate(pod, &container) if err != nil { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } + return admission.GetPodAdmitResult(err) } } } - return lifecycle.PodAdmitResult{Admit: true} + return admission.GetPodAdmitResult(nil) } func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 1bc246c94b1..17fa16cdd9f 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -35,6 +35,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" @@ -63,9 +64,7 @@ type containerManagerImpl struct { type noopWindowsResourceAllocator struct{} func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Admit: true, - } + return admission.GetPodAdmitResult(nil) } func (cm *containerManagerImpl) Start(node *v1.Node, diff --git a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go index 407691e98f0..8a60aa23347 100644 --- a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -64,7 +65,5 @@ func (m *fakeManager) RemoveContainer(containerID string) error { func (m *fakeManager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { klog.InfoS("Topology Admit Handler") - return lifecycle.PodAdmitResult{ - Admit: true, - } + return admission.GetPodAdmitResult(nil) } diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index c5c6f36be97..912aba3fde0 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -17,12 +17,11 @@ limitations under the License. package topologymanager import ( - "fmt" - "sync" "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -133,10 +132,10 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } // It would be better to implement this function in topologymanager instead of scope @@ -150,23 +149,3 @@ func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) e } return nil } - -func topologyAffinityError() lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Message: "Resources cannot be allocated with Topology locality", - Reason: "TopologyAffinityError", - Admit: false, - } -} - -func unexpectedAdmissionError(err error) lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{ - Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err), - Reason: "UnexpectedAdmissionError", - Admit: false, - } -} - -func admitPod() lifecycle.PodAdmitResult { - return lifecycle.PodAdmitResult{Admit: true} -} diff --git a/pkg/kubelet/cm/topologymanager/scope_container.go b/pkg/kubelet/cm/topologymanager/scope_container.go index de45209625a..1e4e2f58fc0 100644 --- a/pkg/kubelet/cm/topologymanager/scope_container.go +++ b/pkg/kubelet/cm/topologymanager/scope_container.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -53,17 +54,17 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name) if !admit { - return topologyAffinityError() + return admission.GetPodAdmitResult(&TopologyAffinityError{}) } klog.InfoS("Topology Affinity", "bestHint", bestHint, "pod", klog.KObj(pod), "containerName", container.Name) s.setTopologyHints(string(pod.UID), container.Name, bestHint) err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Container) []map[string][]TopologyHint { diff --git a/pkg/kubelet/cm/topologymanager/scope_pod.go b/pkg/kubelet/cm/topologymanager/scope_pod.go index 9ccc6414dd9..b77682597b8 100644 --- a/pkg/kubelet/cm/topologymanager/scope_pod.go +++ b/pkg/kubelet/cm/topologymanager/scope_pod.go @@ -19,6 +19,7 @@ package topologymanager import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) @@ -51,7 +52,7 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { bestHint, admit := s.calculateAffinity(pod) klog.InfoS("Best TopologyHint", "bestHint", bestHint, "pod", klog.KObj(pod)) if !admit { - return topologyAffinityError() + return admission.GetPodAdmitResult(&TopologyAffinityError{}) } for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { @@ -60,10 +61,10 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult { err := s.allocateAlignedResources(pod, &container) if err != nil { - return unexpectedAdmissionError(err) + return admission.GetPodAdmitResult(err) } } - return admitPod() + return admission.GetPodAdmitResult(nil) } func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]TopologyHint { diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index 4f327e6efc0..7cd67d1aa60 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -36,8 +36,21 @@ const ( // present on a machine and the TopologyManager is enabled, an error will // be returned and the TopologyManager will not be loaded. maxAllowableNUMANodes = 8 + // ErrorTopologyAffinity represents the type for a TopologyAffinityError + ErrorTopologyAffinity = "TopologyAffinityError" ) +// TopologyAffinityError represents an resource alignment error +type TopologyAffinityError struct{} + +func (e TopologyAffinityError) Error() string { + return "Resources cannot be allocated with Topology locality" +} + +func (e TopologyAffinityError) Type() string { + return ErrorTopologyAffinity +} + // Manager interface provides methods for Kubelet to manage pod topology hints type Manager interface { // PodAdmitHandler is implemented by Manager diff --git a/pkg/kubelet/cm/topologymanager/topology_manager_test.go b/pkg/kubelet/cm/topologymanager/topology_manager_test.go index 6c84411c397..c0cc0980c65 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager_test.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager_test.go @@ -477,11 +477,17 @@ func TestAdmit(t *testing.T) { if ctnActual.Admit != tc.expected { t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, ctnActual.Admit) } + if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity { + t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason) + } // Pod scope Admit podActual := podScopeManager.Admit(&podAttr) if podActual.Admit != tc.expected { t.Errorf("Error occurred, expected Admit in result to be %v got %v", tc.expected, podActual.Admit) } + if !ctnActual.Admit && ctnActual.Reason != ErrorTopologyAffinity { + t.Errorf("Error occurred, expected Reason in result to be %v got %v", ErrorTopologyAffinity, ctnActual.Reason) + } } } From 23abdab2b77693e3271bfcf9469eeb201e2861b2 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Mon, 17 May 2021 10:53:43 +0200 Subject: [PATCH 5/6] smtalign: propagate policy options to policies Consume in the static policy the cpu manager policy options from the cpumanager instance. Validate in the none policy if any option is given, and fail if so - this is almost surely a configuration mistake. Add new cpumanager.Options type to hold the options and translate from user arguments to flags. Co-authored-by: Swati Sehgal Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 11 +- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 74 ++++- pkg/kubelet/cm/cpumanager/fake_cpu_manager.go | 3 +- pkg/kubelet/cm/cpumanager/policy_none.go | 9 +- pkg/kubelet/cm/cpumanager/policy_none_test.go | 17 + pkg/kubelet/cm/cpumanager/policy_options.go | 56 ++++ pkg/kubelet/cm/cpumanager/policy_static.go | 52 ++- .../cm/cpumanager/policy_static_test.go | 297 ++++++++++++++---- 8 files changed, 440 insertions(+), 79 deletions(-) create mode 100644 pkg/kubelet/cm/cpumanager/policy_options.go diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b8ebf935562..ec793cb5bb1 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -146,14 +146,17 @@ func (s *sourcesReadyStub) AllReady() bool { return true } func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var topo *topology.CPUTopology var policy Policy + var err error switch policyName(cpuPolicyName) { case PolicyNone: - policy = NewNonePolicy() + policy, err = NewNonePolicy(cpuPolicyOptions) + if err != nil { + return nil, fmt.Errorf("new none policy error: %w", err) + } case PolicyStatic: - var err error topo, err = topology.Discover(machineInfo) if err != nil { return nil, err @@ -178,9 +181,9 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) + policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions) if err != nil { - return nil, fmt.Errorf("new static policy error: %v", err) + return nil, fmt.Errorf("new static policy error: %w", err) } default: diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 08ad4611a65..dc818cb0f58 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -229,7 +229,8 @@ func TestCPUManagerAdd(t *testing.T) { }, 0, cpuset.NewCPUSet(), - topologymanager.NewFakeManager()) + topologymanager.NewFakeManager(), + nil) testCases := []struct { description string updateErr error @@ -479,7 +480,7 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) mockState := &mockState{ assignments: testCase.stAssignments, @@ -1004,7 +1005,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) { }, 1, cpuset.NewCPUSet(0), - topologymanager.NewFakeManager()) + topologymanager.NewFakeManager(), + nil) testCases := []struct { description string updateErr error @@ -1061,3 +1063,69 @@ func TestCPUManagerAddWithResvList(t *testing.T) { } } } + +func TestCPUManagerHandlePolicyOptions(t *testing.T) { + testCases := []struct { + description string + cpuPolicyName string + cpuPolicyOptions map[string]string + expectedError error + }{ + { + description: "options to none policy", + cpuPolicyName: "none", + cpuPolicyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + expectedError: fmt.Errorf("received unsupported options"), + }, + } + + // any correct realistic topology is fine. We pick a simple one. + mockedMachineInfo := cadvisorapi.MachineInfo{ + NumCores: 4, + Topology: []cadvisorapi.Node{ + { + Cores: []cadvisorapi.Core{ + { + Id: 0, + Threads: []int{0}, + }, + { + Id: 1, + Threads: []int{1}, + }, + { + Id: 2, + Threads: []int{2}, + }, + { + Id: 3, + Threads: []int{3}, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + machineInfo := &mockedMachineInfo + nodeAllocatableReservation := v1.ResourceList{} + sDir, err := ioutil.TempDir("/tmp/", "cpu_manager_test") + if err != nil { + t.Errorf("cannot create state file: %s", err.Error()) + } + defer os.RemoveAll(sDir) + + _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.NewCPUSet(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) + if err == nil { + t.Errorf("Expected error, but NewManager succeeded") + } + if !strings.Contains(err.Error(), testCase.expectedError.Error()) { + t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) + } + }) + + } +} diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 2c38b52b374..28578e6415d 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -38,7 +38,8 @@ func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.Sourc func (m *fakeManager) Policy() Policy { klog.InfoS("Policy()") - return NewNonePolicy() + pol, _ := NewNonePolicy(nil) + return pol } func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index 345d4c14d6d..1e35f6a094e 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -17,6 +17,8 @@ limitations under the License. package cpumanager import ( + "fmt" + "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" @@ -32,8 +34,11 @@ var _ Policy = &nonePolicy{} const PolicyNone policyName = "none" // NewNonePolicy returns a cpuset manager policy that does nothing -func NewNonePolicy() Policy { - return &nonePolicy{} +func NewNonePolicy(cpuPolicyOptions map[string]string) (Policy, error) { + if len(cpuPolicyOptions) > 0 { + return nil, fmt.Errorf("None policy: received unsupported options=%v", cpuPolicyOptions) + } + return &nonePolicy{}, nil } func (p *nonePolicy) Name() string { diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index 97127971096..1dcd00bd3e9 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -86,3 +86,20 @@ func TestNonePolicyGetAllocatableCPUs(t *testing.T) { t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus) } } + +func TestNonePolicyOptions(t *testing.T) { + var err error + + _, err = NewNonePolicy(nil) + if err != nil { + t.Errorf("NewNonePolicy with nil options failure. expected no error but got: %v", err) + } + + opts := map[string]string{ + FullPCPUsOnlyOption: "true", + } + _, err = NewNonePolicy(opts) + if err == nil { + t.Errorf("NewNonePolicy with (any) options failure. expected error but got none") + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go new file mode 100644 index 00000000000..cf92a2e11c4 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -0,0 +1,56 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + "strconv" +) + +const ( + // FullPCPUsOnlyOption is the name of the CPU Manager policy option + FullPCPUsOnlyOption string = "full-pcpus-only" +) + +type StaticPolicyOptions struct { + // flag to enable extra allocation restrictions to avoid + // different containers to possibly end up on the same core. + // we consider "core" and "physical CPU" synonim here, leaning + // towards the terminoloy k8s hints. We acknowledge this is confusing. + // + // looking at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/, + // any possible naming scheme will lead to ambiguity to some extent. + // We picked "pcpu" because it the established docs hints at vCPU already. + FullPhysicalCPUsOnly bool +} + +func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) { + opts := StaticPolicyOptions{} + for name, value := range policyOptions { + switch name { + case FullPCPUsOnlyOption: + optValue, err := strconv.ParseBool(value) + if err != nil { + return opts, fmt.Errorf("bad value for option %q: %w", name, err) + } + opts.FullPhysicalCPUsOnly = optValue + default: + return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value) + } + } + return opts, nil +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index ec25a15a3c2..f5d275d8ea8 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -29,8 +29,29 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" ) -// PolicyStatic is the name of the static policy -const PolicyStatic policyName = "static" +const ( + + // PolicyStatic is the name of the static policy. + // Should options be given, these will be ignored and backward (up to 1.21 included) + // compatible behaviour will be enforced + PolicyStatic policyName = "static" + // ErrorSMTAlignment represents the type of an SMTAlignmentError + ErrorSMTAlignment = "SMTAlignmentError" +) + +// SMTAlignmentError represents an error due to SMT alignment +type SMTAlignmentError struct { + RequestedCPUs int + CpusPerCore int +} + +func (e SMTAlignmentError) Error() string { + return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore) +} + +func (e SMTAlignmentError) Type() string { + return ErrorSMTAlignment +} // staticPolicy is a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main @@ -79,6 +100,8 @@ type staticPolicy struct { affinity topologymanager.Store // set of CPUs to reuse across allocations in a pod cpusToReuse map[string]cpuset.CPUSet + // options allow to fine-tune the behaviour of the policy + options StaticPolicyOptions } // Ensure staticPolicy implements Policy interface @@ -87,7 +110,14 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) { + opts, err := NewStaticPolicyOptions(cpuPolicyOptions) + if err != nil { + return nil, err + } + + klog.InfoS("Static policy created with configuration", "options", opts) + allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -113,6 +143,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv reserved: reserved, affinity: affinity, cpusToReuse: make(map[string]cpuset.CPUSet), + options: opts, }, nil } @@ -220,6 +251,21 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) // container belongs in an exclusively allocated pool + if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { + // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted + // if the CPU requested is a multiple of the number of virtual cpus per physical cores. + // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put + // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores + // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, + // the pod would be placed on a node where there are enough physical cores available to be allocated. + // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket + // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all + // CPUs on a physical core. Allocation of individual threads would never have to occur. + return SMTAlignmentError{ + RequestedCPUs: numCPUs, + CpusPerCore: p.topology.CPUsPerCore(), + } + } if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { p.updateCPUsToReuse(pod, container, cpuset) klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index c54997787b4..d2b641fe3a0 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -34,6 +34,7 @@ type staticPolicyTest struct { topo *topology.CPUTopology numReservedCPUs int podUID string + options map[string]string containerName string stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet @@ -43,8 +44,27 @@ type staticPolicyTest struct { expCSet cpuset.CPUSet } +// this is not a real Clone() - hence Pseudo- - because we don't clone some +// objects which are accessed read-only +func (spt staticPolicyTest) PseudoClone() staticPolicyTest { + return staticPolicyTest{ + description: spt.description, + topo: spt.topo, // accessed in read-only + numReservedCPUs: spt.numReservedCPUs, + podUID: spt.podUID, + options: spt.options, // accessed in read-only + containerName: spt.containerName, + stAssignments: spt.stAssignments.Clone(), + stDefaultCPUSet: spt.stDefaultCPUSet.Clone(), + pod: spt.pod, // accessed in read-only + expErr: spt.expErr, + expCPUAlloc: spt.expCPUAlloc, + expCSet: spt.expCSet.Clone(), + } +} + func TestStaticPolicyName(t *testing.T) { - policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policyName := policy.Name() if policyName != "static" { @@ -120,7 +140,7 @@ func TestStaticPolicyStart(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, @@ -168,7 +188,9 @@ func TestStaticPolicyAdd(t *testing.T) { largeTopoSock0CPUSet := largeTopoSock0Builder.Result() largeTopoSock1CPUSet := largeTopoSock1Builder.Result() - testCases := []staticPolicyTest{ + // these are the cases which must behave the same regardless the policy options. + // So we will permutate the options to ensure this holds true. + optionsInsensitiveTestCases := []staticPolicyTest{ { description: "GuPodSingleCore, SingleSocketHT, ExpectError", topo: topoSingleSocketHT, @@ -180,17 +202,6 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), }, - { - description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", - topo: topoSingleSocketHT, - numReservedCPUs: 1, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), - expErr: nil, - expCPUAlloc: true, - expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core - }, { description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore", topo: topoSingleSocketHT, @@ -400,22 +411,6 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: true, expCSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47)), }, - { - // Only partial cores are available in the entire system. - // Expect allocation of all the CPUs from the partial cores. - description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", - topo: topoQuadSocketFourWayHT, - stAssignments: state.ContainerCPUAssignments{ - "fakePod": map[string]cpuset.CPUSet{ - "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), - }, - }, - stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), - pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"), - expErr: nil, - expCPUAlloc: true, - expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), - }, { // Only 7 CPUs are available. // Pod requests 76 cores. @@ -435,45 +430,130 @@ func TestStaticPolicyAdd(t *testing.T) { }, } - for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + // testcases for the default behaviour of the policy. + defaultOptionsTestCases := []staticPolicyTest{ + { + description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", + topo: topoSingleSocketHT, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core + }, + { + // Only partial cores are available in the entire system. + // Expect allocation of all the CPUs from the partial cores. + description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", + topo: topoQuadSocketFourWayHT, + stAssignments: state.ContainerCPUAssignments{ + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + }, + }, + stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), + pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), + }, + } - st := &mockState{ - assignments: testCase.stAssignments, - defaultCPUSet: testCase.stDefaultCPUSet, + // testcases for the FullPCPUsOnlyOption + smtalignOptionTestCases := []staticPolicyTest{ + { + description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", + topo: topoSingleSocketHT, + options: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), + expErr: SMTAlignmentError{RequestedCPUs: 1, CpusPerCore: 2}, + expCPUAlloc: false, + expCSet: cpuset.NewCPUSet(), // reject allocation of sibling of partial core + }, + { + // test SMT-level != 2 - which is the default on x86_64 + description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocOneCPUs", + topo: topoQuadSocketFourWayHT, + options: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + numReservedCPUs: 8, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: largeTopoCPUSet, + pod: makePod("fakePod", "fakeContainer15", "15000m", "15000m"), + expErr: SMTAlignmentError{RequestedCPUs: 15, CpusPerCore: 4}, + expCPUAlloc: false, + expCSet: cpuset.NewCPUSet(), + }, + } + + for _, testCase := range optionsInsensitiveTestCases { + for _, options := range []map[string]string{ + nil, + { + FullPCPUsOnlyOption: "true", + }, + } { + tCase := testCase.PseudoClone() + tCase.description = fmt.Sprintf("options=%v %s", options, testCase.description) + tCase.options = options + runStaticPolicyTestCase(t, tCase) + } + } + + for _, testCase := range defaultOptionsTestCases { + runStaticPolicyTestCase(t, testCase) + } + for _, testCase := range smtalignOptionTestCases { + runStaticPolicyTestCase(t, testCase) + } +} + +func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options) + + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + + container := &testCase.pod.Spec.Containers[0] + err := policy.Allocate(st, testCase.pod, container) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q", + testCase.description, testCase.expErr, err) + } + + if testCase.expCPUAlloc { + cset, found := st.assignments[string(testCase.pod.UID)][container.Name] + if !found { + t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } - container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) - if !reflect.DeepEqual(err, testCase.expErr) { - t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", - testCase.description, testCase.expErr, err) + if !reflect.DeepEqual(cset, testCase.expCSet) { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + testCase.description, testCase.expCSet, cset) } - if testCase.expCPUAlloc { - cset, found := st.assignments[string(testCase.pod.UID)][container.Name] - if !found { - t.Errorf("StaticPolicy Allocate() error (%v). expected container %v to be present in assignments %v", - testCase.description, container.Name, st.assignments) - } - - if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", - testCase.description, testCase.expCSet, cset) - } - - if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", - testCase.description, cset, st.defaultCPUSet) - } + if !cset.Intersection(st.defaultCPUSet).IsEmpty() { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + testCase.description, cset, st.defaultCPUSet) } + } - if !testCase.expCPUAlloc { - _, found := st.assignments[string(testCase.pod.UID)][container.Name] - if found { - t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", - testCase.description, container.Name, st.assignments) - } + if !testCase.expCPUAlloc { + _, found := st.assignments[string(testCase.pod.UID)][container.Name] + if found { + t.Errorf("StaticPolicy Allocate() error (%v). Did not expect container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } } } @@ -537,7 +617,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) st := &mockState{ assignments: testCase.stAssignments, @@ -627,7 +707,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil) policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, @@ -701,7 +781,7 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) if !reflect.DeepEqual(err, testCase.expNewErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", testCase.description, testCase.expNewErr, err) @@ -778,7 +858,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) st := &mockState{ assignments: testCase.stAssignments, @@ -819,3 +899,88 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } } } + +type staticPolicyOptionTestCase struct { + description string + policyOptions map[string]string + expectedError bool + expectedValue StaticPolicyOptions +} + +func TestStaticPolicyOptions(t *testing.T) { + testCases := []staticPolicyOptionTestCase{ + { + description: "nil args", + policyOptions: nil, + expectedError: false, + expectedValue: StaticPolicyOptions{}, + }, + { + description: "empty args", + policyOptions: map[string]string{}, + expectedError: false, + expectedValue: StaticPolicyOptions{}, + }, + { + description: "bad single arg", + policyOptions: map[string]string{ + "badValue1": "", + }, + expectedError: true, + }, + { + description: "bad multiple arg", + policyOptions: map[string]string{ + "badValue1": "", + "badvalue2": "aaaa", + }, + expectedError: true, + }, + { + description: "good arg", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + expectedError: false, + expectedValue: StaticPolicyOptions{ + FullPhysicalCPUsOnly: true, + }, + }, + { + description: "good arg, bad value", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "enabled!", + }, + expectedError: true, + }, + + { + description: "bad arg intermixed", + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "1", + "badvalue2": "lorem ipsum", + }, + expectedError: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + opts, err := NewStaticPolicyOptions(testCase.policyOptions) + gotError := (err != nil) + if gotError != testCase.expectedError { + t.Fatalf("error with args %v expected error %v got %v: %v", + testCase.policyOptions, testCase.expectedError, gotError, err) + } + + if testCase.expectedError { + return + } + + if !reflect.DeepEqual(opts, testCase.expectedValue) { + t.Fatalf("value mismatch with args %v expected value %v got %v", + testCase.policyOptions, testCase.expectedValue, opts) + } + }) + } +} From a2fb8b00392a1bcf5c906dcddbf8d04247ddd85f Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Mon, 17 May 2021 10:58:19 +0200 Subject: [PATCH 6/6] smtalign: e2e: add tests Add e2e tests to cover the basic flows for the `full-pcpus-only` option: negative flow to ensure rejection with proper error message, and positive flow to verify the actual cpu allocation. Co-authored-by: Swati Sehgal Signed-off-by: Francesco Romani --- test/e2e_node/cpu_manager_test.go | 208 ++++++++++++++++++++----- test/e2e_node/topology_manager_test.go | 12 +- 2 files changed, 174 insertions(+), 46 deletions(-) diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index fb38837b575..2ca84bcd90a 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -17,8 +17,10 @@ limitations under the License. package e2enode import ( + "context" "fmt" "os/exec" + "regexp" "strconv" "strings" "time" @@ -34,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "github.com/onsi/ginkgo" @@ -159,6 +162,17 @@ func isMultiNUMA() bool { return numaNodes > 1 } +func getSMTLevel() int { + cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this. + out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuID)).Output() + framework.ExpectNoError(err) + // how many thread sibling you have = SMT level + // example: 2-way SMT means 2 threads sibling for each thread + cpus, err := cpuset.Parse(strings.TrimSpace(string(out))) + framework.ExpectNoError(err) + return cpus.Size() +} + func getCPUSiblingList(cpuRes int64) string { out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output() framework.ExpectNoError(err) @@ -209,20 +223,16 @@ func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.K return oldCfg } -func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) { - return configureCPUManagerInKubelet(f, cleanStateFile, cpuset.CPUSet{}) -} - -func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) { +func configureCPUManagerInKubelet(f *framework.Framework, policyName string, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet, enableOptions bool, options map[string]string) (oldCfg *kubeletconfig.KubeletConfiguration) { // Enable CPU Manager in Kubelet with static policy. oldCfg, err := getCurrentKubeletConfig() framework.ExpectNoError(err) newCfg := oldCfg.DeepCopy() if newCfg.FeatureGates == nil { newCfg.FeatureGates = make(map[string]bool) - } else { - newCfg.FeatureGates["CPUManager"] = true } + newCfg.FeatureGates["CPUManager"] = true + newCfg.FeatureGates["CPUManagerPolicyOptions"] = enableOptions // After graduation of the CPU Manager feature to Beta, the CPU Manager // "none" policy is ON by default. But when we set the CPU Manager policy to @@ -237,11 +247,9 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r deleteStateFile() } - // Set the CPU Manager policy to static. - newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) - - // Set the CPU Manager reconcile period to 1 second. + newCfg.CPUManagerPolicy = policyName newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} + newCfg.CPUManagerPolicyOptions = options if reservedSystemCPUs.Size() > 0 { cpus := reservedSystemCPUs.String() @@ -272,43 +280,37 @@ func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, r return oldCfg } -func runGuPodTest(f *framework.Framework) { - var ctnAttrs []ctnAttribute - var cpu1 int - var err error - var cpuList []int +func runGuPodTest(f *framework.Framework, cpuCount int) { var pod *v1.Pod - var expAllowedCPUsListRegex string - ctnAttrs = []ctnAttribute{ + ctnAttrs := []ctnAttribute{ { ctnName: "gu-container", - cpuRequest: "1000m", - cpuLimit: "1000m", + cpuRequest: fmt.Sprintf("%dm", 1000*cpuCount), + cpuLimit: fmt.Sprintf("%dm", 1000*cpuCount), }, } pod = makeCPUManagerPod("gu-pod", ctnAttrs) pod = f.PodClient().CreateSync(pod) ginkgo.By("checking if the expected cpuset was assigned") - cpu1 = 1 - if isHTEnabled() { - cpuList = cpuset.MustParse(getCPUSiblingList(0)).ToSlice() - cpu1 = cpuList[1] - } else if isMultiNUMA() { - cpuList = cpuset.MustParse(getCoreSiblingList(0)).ToSlice() - if len(cpuList) > 1 { - cpu1 = cpuList[1] - } + // any full CPU is fine - we cannot nor we should predict which one, though + for _, cnt := range pod.Spec.Containers { + ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name)) + + logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + cpus, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.ExpectEqual(cpus.Size(), cpuCount, "expected cpu set size == %d, got %q", cpuCount, cpus.String()) } - expAllowedCPUsListRegex = fmt.Sprintf("^%d\n$", cpu1) - err = f.PodClient().MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex) - framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", - pod.Spec.Containers[0].Name, pod.Name) ginkgo.By("by deleting the pods and waiting for container removal") deletePods(f, []string{pod.Name}) - waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + waitForAllContainerRemoval(pod.Name, pod.Namespace) } func runNonGuPodTest(f *framework.Framework, cpuCap int64) { @@ -586,13 +588,13 @@ func runCPUManagerTests(f *framework.Framework) { } // Enable CPU Manager in the kubelet. - oldCfg = enableCPUManagerInKubelet(f, true) + oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), true, cpuset.CPUSet{}, false, nil) ginkgo.By("running a non-Gu pod") runNonGuPodTest(f, cpuCap) ginkgo.By("running a Gu pod") - runGuPodTest(f) + runGuPodTest(f, 1) ginkgo.By("running multiple Gu and non-Gu pods") runMultipleGuNonGuPods(f, cpuCap, cpuAlloc) @@ -653,18 +655,150 @@ func runCPUManagerTests(f *framework.Framework) { waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) ginkgo.By("enable cpu manager in kubelet without delete state file") - enableCPUManagerInKubelet(f, false) + configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), false, cpuset.CPUSet{}, false, nil) ginkgo.By("wait for the deleted pod to be cleaned up from the state file") waitForStateFileCleanedUp() ginkgo.By("the deleted pod has already been deleted from the state file") }) + ginkgo.It("should assign CPUs as expected with enhanced policy based on strict SMT alignment", func() { + fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) + _, cpuAlloc, _ = getLocalNodeCPUDetails(f) + smtLevel := getSMTLevel() + + // strict SMT alignment is trivially verified and granted on non-SMT systems + if smtLevel < 2 { + e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt) + } + + // our tests want to allocate a full core, so we need at last 2*2=4 virtual cpus + if cpuAlloc < int64(smtLevel*2) { + e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt) + } + + framework.Logf("SMT level %d", smtLevel) + + cleanStateFile := true + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably + // check what we do have in the node. + cpuPolicyOptions := map[string]string{ + cpumanager.FullPCPUsOnlyOption: "true", + } + oldCfg = configureCPUManagerInKubelet(f, string(cpumanager.PolicyStatic), cleanStateFile, cpuset.NewCPUSet(0), true, cpuPolicyOptions) + + // the order between negative and positive doesn't really matter + runSMTAlignmentNegativeTests(f) + runSMTAlignmentPositiveTests(f, smtLevel) + }) + ginkgo.AfterEach(func() { setOldKubeletConfig(f, oldCfg) }) } +func runSMTAlignmentNegativeTests(f *framework.Framework) { + // negative test: try to run a container whose requests aren't a multiple of SMT level, expect a rejection + ctnAttrs := []ctnAttribute{ + { + ctnName: "gu-container-neg", + cpuRequest: "1000m", + cpuLimit: "1000m", + }, + } + pod := makeCPUManagerPod("gu-pod", ctnAttrs) + // CreateSync would wait for pod to become Ready - which will never happen if production code works as intended! + pod = f.PodClient().Create(pod) + + err := e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase != v1.PodPending { + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err) + pod, err = f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + if pod.Status.Phase != v1.PodFailed { + framework.Failf("pod %s not failed: %v", pod.Name, pod.Status) + } + if !isSMTAlignmentError(pod) { + framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason) + } + + deletePodSyncByName(f, pod.Name) + // we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state. + // this is in turn needed because we will have an unavoidable (in the current framework) race with th + // reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire + waitForAllContainerRemoval(pod.Name, pod.Namespace) +} + +func runSMTAlignmentPositiveTests(f *framework.Framework, smtLevel int) { + // positive test: try to run a container whose requests are a multiple of SMT level, check allocated cores + // 1. are core siblings + // 2. take a full core + // WARNING: this assumes 2-way SMT systems - we don't know how to access other SMT levels. + // this means on more-than-2-way SMT systems this test will prove nothing + ctnAttrs := []ctnAttribute{ + { + ctnName: "gu-container-pos", + cpuRequest: "2000m", + cpuLimit: "2000m", + }, + } + pod := makeCPUManagerPod("gu-pod", ctnAttrs) + pod = f.PodClient().CreateSync(pod) + + for _, cnt := range pod.Spec.Containers { + ginkgo.By(fmt.Sprintf("validating the container %s on Gu pod %s", cnt.Name, pod.Name)) + + logs, err := e2epod.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, cnt.Name) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", cnt.Name, pod.Name) + + framework.Logf("got pod logs: %v", logs) + cpus, err := cpuset.Parse(strings.TrimSpace(logs)) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + + validateSMTAlignment(cpus, smtLevel, pod, &cnt) + } + + deletePodSyncByName(f, pod.Name) + // we need to wait for all containers to really be gone so cpumanager reconcile loop will not rewrite the cpu_manager_state. + // this is in turn needed because we will have an unavoidable (in the current framework) race with th + // reconcile loop which will make our attempt to delete the state file and to restore the old config go haywire + waitForAllContainerRemoval(pod.Name, pod.Namespace) +} + +func validateSMTAlignment(cpus cpuset.CPUSet, smtLevel int, pod *v1.Pod, cnt *v1.Container) { + framework.Logf("validating cpus: %v", cpus) + + if cpus.Size()%smtLevel != 0 { + framework.Failf("pod %q cnt %q received non-smt-multiple cpuset %v (SMT level %d)", pod.Name, cnt.Name, cpus, smtLevel) + } + + // now check all the given cpus are thread siblings. + // to do so the easiest way is to rebuild the expected set of siblings from all the cpus we got. + // if the expected set matches the given set, the given set was good. + b := cpuset.NewBuilder() + for _, cpuID := range cpus.ToSliceNoSort() { + threadSiblings, err := cpuset.Parse(strings.TrimSpace(getCPUSiblingList(int64(cpuID)))) + framework.ExpectNoError(err, "parsing cpuset from logs for [%s] of pod [%s]", cnt.Name, pod.Name) + b.Add(threadSiblings.ToSliceNoSort()...) + } + siblingsCPUs := b.Result() + + framework.Logf("siblings cpus: %v", siblingsCPUs) + if !siblingsCPUs.Equals(cpus) { + framework.Failf("pod %q cnt %q received non-smt-aligned cpuset %v (expected %v)", pod.Name, cnt.Name, cpus, siblingsCPUs) + } +} + +func isSMTAlignmentError(pod *v1.Pod) bool { + re := regexp.MustCompile(`SMT.*Alignment.*Error`) + return re.MatchString(pod.Status.Reason) +} + // Serial because the test updates kubelet configuration. var _ = SIGDescribe("CPU Manager [Serial] [Feature:CPUManager]", func() { f := framework.NewDefaultFramework("cpu-manager-test") diff --git a/test/e2e_node/topology_manager_test.go b/test/e2e_node/topology_manager_test.go index 27ea6a0f4e9..9ac3afe1a0e 100644 --- a/test/e2e_node/topology_manager_test.go +++ b/test/e2e_node/topology_manager_test.go @@ -372,7 +372,7 @@ func runTopologyManagerPolicySuiteTests(f *framework.Framework) { runNonGuPodTest(f, cpuCap) ginkgo.By("running a Gu pod") - runGuPodTest(f) + runGuPodTest(f, 1) ginkgo.By("running multiple Gu and non-Gu pods") runMultipleGuNonGuPods(f, cpuCap, cpuAlloc) @@ -596,10 +596,7 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { } func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) { - threadsPerCore := 1 - if isHTEnabled() { - threadsPerCore = 2 - } + threadsPerCore := getSMTLevel() sd := setupSRIOVConfigOrFail(f, configMap) var ctnAttrs, initCtnAttrs []tmCtnAttribute @@ -699,10 +696,7 @@ func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1. } func runTopologyManagerNodeAlignmentSuiteTests(f *framework.Framework, sd *sriovData, reservedSystemCPUs, policy string, numaNodes, coreCount int) { - threadsPerCore := 1 - if isHTEnabled() { - threadsPerCore = 2 - } + threadsPerCore := getSMTLevel() waitForSRIOVResources(f, sd)