diff --git a/cmd/kubeadm/.import-restrictions b/cmd/kubeadm/.import-restrictions index b0f05ef9a01..ba7c7e5e0a8 100644 --- a/cmd/kubeadm/.import-restrictions +++ b/cmd/kubeadm/.import-restrictions @@ -64,6 +64,7 @@ "k8s.io/kubernetes/pkg/features", "k8s.io/kubernetes/pkg/fieldpath", "k8s.io/kubernetes/pkg/kubelet/apis", + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset", "k8s.io/kubernetes/pkg/kubelet/qos", "k8s.io/kubernetes/pkg/kubelet/types", "k8s.io/kubernetes/pkg/master/ports", diff --git a/cmd/kubelet/app/BUILD b/cmd/kubelet/app/BUILD index 3963755844f..b52ca819235 100644 --- a/cmd/kubelet/app/BUILD +++ b/cmd/kubelet/app/BUILD @@ -57,6 +57,7 @@ go_library( "//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/certificate/bootstrap:go_default_library", "//pkg/kubelet/cm:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim:go_default_library", diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index d5e41298e97..7401c510deb 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -553,7 +553,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig fs.Var(cliflag.NewMapStringString(&c.EvictionMinimumReclaim), "eviction-minimum-reclaim", "A set of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.") fs.Int32Var(&c.PodsPerCore, "pods-per-core", c.PodsPerCore, "Number of Pods per core that can run on this Kubelet. The total number of Pods on this Kubelet cannot exceed max-pods, so max-pods will be used if this calculation results in a larger number of Pods allowed on the Kubelet. A value of 0 disables this limit.") fs.BoolVar(&c.ProtectKernelDefaults, "protect-kernel-defaults", c.ProtectKernelDefaults, "Default kubelet behaviour for kernel tuning. If set, kubelet errors if any of kernel tunables is different than kubelet defaults.") - + fs.StringVar(&c.ReservedSystemCPUs, "reserved-cpus", c.ReservedSystemCPUs, "A comma-separated list of CPUs or CPU ranges that are reserved for system and kubernetes usage. This specific list will supersede cpu counts in --system-reserved and --kube-reserved.") // Node Allocatable Flags fs.Var(cliflag.NewMapStringString(&c.SystemReserved), "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]") fs.Var(cliflag.NewMapStringString(&c.KubeReserved), "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for kubernetes system components. Currently cpu, memory and local ephemeral storage for root file system are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 7b3aafcaf91..00a245b9b3e 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -77,6 +77,7 @@ import ( kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim" @@ -636,6 +637,48 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /") s.CgroupRoot = "/" } + + var reservedSystemCPUs cpuset.CPUSet + var errParse error + if s.ReservedSystemCPUs != "" { + reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs) + if errParse != nil { + // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved + klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs) + return errParse + } + // is it safe do use CAdvisor here ?? + machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo() + if err != nil { + // if can't use CAdvisor here, fall back to non-explicit cpu list behavor + klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty") + reservedSystemCPUs = cpuset.NewCPUSet() + } else { + reservedList := reservedSystemCPUs.ToSlice() + first := reservedList[0] + last := reservedList[len(reservedList)-1] + if first < 0 || last >= machineInfo.NumCores { + // the specified cpuset is outside of the range of what the machine has + klog.Infof("Invalid cpuset specified by --reserved-cpus") + return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs) + } + } + } else { + reservedSystemCPUs = cpuset.NewCPUSet() + } + + if reservedSystemCPUs.Size() > 0 { + // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok + klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved) + if s.KubeReserved != nil { + delete(s.KubeReserved, "cpu") + } + if s.SystemReserved == nil { + s.SystemReserved = make(map[string]string) + } + s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size()) + klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved) + } kubeReserved, err := parseResourceList(s.KubeReserved) if err != nil { return err @@ -678,6 +721,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...), KubeReserved: kubeReserved, SystemReserved: systemReserved, + ReservedSystemCPUs: reservedSystemCPUs, HardEvictionThresholds: hardEvictionThresholds, }, QOSReserved: *experimentalQOSReserved, diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 3032ed5dacd..dae68438a95 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -212,6 +212,7 @@ var ( "ReadOnlyPort", "RegistryBurst", "RegistryPullQPS", + "ReservedSystemCPUs", "RuntimeRequestTimeout.Duration", "SerializeImagePulls", "StreamingConnectionIdleTimeout.Duration", diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index 446ad78326c..ddb8a3e0eba 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -331,6 +331,10 @@ type KubeletConfiguration struct { // This flag accepts a list of options. Acceptable options are `pods`, `system-reserved` & `kube-reserved`. // Refer to [Node Allocatable](https://git.k8s.io/community/contributors/design-proposals/node/node-allocatable.md) doc for more information. EnforceNodeAllocatable []string + // This option specifies the cpu list reserved for the host level system threads and kubernetes related threads. + // This provide a "static" CPU list rather than the "dynamic" list by system-reserved and kube-reserved. + // This option overwrites CPUs provided by system-reserved and kube-reserved. + ReservedSystemCPUs string } type KubeletAuthorizationMode string diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index cb761146ea9..c478e06fcf1 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -326,6 +326,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy) out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved)) out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved)) + out.ReservedSystemCPUs = in.ReservedSystemCPUs out.SystemReservedCgroup = in.SystemReservedCgroup out.KubeReservedCgroup = in.KubeReservedCgroup out.EnforceNodeAllocatable = *(*[]string)(unsafe.Pointer(&in.EnforceNodeAllocatable)) @@ -462,6 +463,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in out.SystemReservedCgroup = in.SystemReservedCgroup out.KubeReservedCgroup = in.KubeReservedCgroup out.EnforceNodeAllocatable = *(*[]string)(unsafe.Pointer(&in.EnforceNodeAllocatable)) + out.ReservedSystemCPUs = in.ReservedSystemCPUs return nil } diff --git a/pkg/kubelet/apis/config/validation/BUILD b/pkg/kubelet/apis/config/validation/BUILD index f7348901d4e..c50698e5aa0 100644 --- a/pkg/kubelet/apis/config/validation/BUILD +++ b/pkg/kubelet/apis/config/validation/BUILD @@ -17,6 +17,7 @@ go_library( deps = [ "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", diff --git a/pkg/kubelet/apis/config/validation/validation.go b/pkg/kubelet/apis/config/validation/validation.go index 31c6cedda28..877ad7c7d29 100644 --- a/pkg/kubelet/apis/config/validation/validation.go +++ b/pkg/kubelet/apis/config/validation/validation.go @@ -25,6 +25,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -142,6 +143,15 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error allErrors = append(allErrors, fmt.Errorf("invalid configuration: option %q specified for HairpinMode (--hairpin-mode). Valid options are %q, %q or %q", kc.HairpinMode, kubeletconfig.HairpinNone, kubeletconfig.HairpinVeth, kubeletconfig.PromiscuousBridge)) } + if kc.ReservedSystemCPUs != "" { + // --reserved-cpus does not support --system-reserved-cgroup or --kube-reserved-cgroup + if kc.SystemReservedCgroup != "" || kc.KubeReservedCgroup != "" { + allErrors = append(allErrors, fmt.Errorf("can't use --reserved-cpus with --system-reserved-cgroup or --kube-reserved-cgroup")) + } + if _, err := cpuset.Parse(kc.ReservedSystemCPUs); err != nil { + allErrors = append(allErrors, fmt.Errorf("unable to parse --reserved-cpus, error: %v", err)) + } + } if err := validateKubeletOSConfiguration(kc); err != nil { allErrors = append(allErrors, err) diff --git a/pkg/kubelet/apis/config/validation/validation_test.go b/pkg/kubelet/apis/config/validation/validation_test.go index eb48c1e0514..501afa233d5 100644 --- a/pkg/kubelet/apis/config/validation/validation_test.go +++ b/pkg/kubelet/apis/config/validation/validation_test.go @@ -26,7 +26,7 @@ import ( ) func TestValidateKubeletConfiguration(t *testing.T) { - successCase := &kubeletconfig.KubeletConfiguration{ + successCase1 := &kubeletconfig.KubeletConfiguration{ CgroupsPerQOS: true, EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"}, SystemReservedCgroup: "/system.slice", @@ -55,11 +55,45 @@ func TestValidateKubeletConfiguration(t *testing.T) { CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond}, TopologyManagerPolicy: "none", } - if allErrors := ValidateKubeletConfiguration(successCase); allErrors != nil { + if allErrors := ValidateKubeletConfiguration(successCase1); allErrors != nil { t.Errorf("expect no errors, got %v", allErrors) } - errorCase := &kubeletconfig.KubeletConfiguration{ + successCase2 := &kubeletconfig.KubeletConfiguration{ + CgroupsPerQOS: true, + EnforceNodeAllocatable: []string{"pods"}, + SystemReservedCgroup: "", + KubeReservedCgroup: "", + SystemCgroups: "", + CgroupRoot: "", + EventBurst: 10, + EventRecordQPS: 5, + HealthzPort: 10248, + ImageGCHighThresholdPercent: 85, + ImageGCLowThresholdPercent: 80, + IPTablesDropBit: 15, + IPTablesMasqueradeBit: 14, + KubeAPIBurst: 10, + KubeAPIQPS: 5, + MaxOpenFiles: 1000000, + MaxPods: 110, + OOMScoreAdj: -999, + PodsPerCore: 100, + Port: 65535, + ReadOnlyPort: 0, + RegistryBurst: 10, + RegistryPullQPS: 5, + HairpinMode: kubeletconfig.PromiscuousBridge, + NodeLeaseDurationSeconds: 1, + CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond}, + TopologyManagerPolicy: "none", + ReservedSystemCPUs: "0-3", + } + if allErrors := ValidateKubeletConfiguration(successCase2); allErrors != nil { + t.Errorf("expect no errors, got %v", allErrors) + } + + errorCase1 := &kubeletconfig.KubeletConfiguration{ CgroupsPerQOS: false, EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved", "illegal-key"}, SystemCgroups: "/", @@ -86,8 +120,43 @@ func TestValidateKubeletConfiguration(t *testing.T) { CPUCFSQuotaPeriod: metav1.Duration{Duration: 0}, TopologyManagerPolicy: "", } - const numErrs = 26 - if allErrors := ValidateKubeletConfiguration(errorCase); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrs { - t.Errorf("expect %d errors, got %v", numErrs, len(allErrors.(utilerrors.Aggregate).Errors())) + const numErrsErrorCase1 = 26 + if allErrors := ValidateKubeletConfiguration(errorCase1); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase1 { + t.Errorf("expect %d errors, got %v", numErrsErrorCase1, len(allErrors.(utilerrors.Aggregate).Errors())) + } + + errorCase2 := &kubeletconfig.KubeletConfiguration{ + CgroupsPerQOS: true, + EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"}, + SystemReservedCgroup: "/system.slice", + KubeReservedCgroup: "/kubelet.service", + SystemCgroups: "", + CgroupRoot: "", + EventBurst: 10, + EventRecordQPS: 5, + HealthzPort: 10248, + ImageGCHighThresholdPercent: 85, + ImageGCLowThresholdPercent: 80, + IPTablesDropBit: 15, + IPTablesMasqueradeBit: 14, + KubeAPIBurst: 10, + KubeAPIQPS: 5, + MaxOpenFiles: 1000000, + MaxPods: 110, + OOMScoreAdj: -999, + PodsPerCore: 100, + Port: 65535, + ReadOnlyPort: 0, + RegistryBurst: 10, + RegistryPullQPS: 5, + HairpinMode: kubeletconfig.PromiscuousBridge, + NodeLeaseDurationSeconds: 1, + CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond}, + TopologyManagerPolicy: "none", + ReservedSystemCPUs: "0-3", + } + const numErrsErrorCase2 = 1 + if allErrors := ValidateKubeletConfiguration(errorCase2); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase2 { + t.Errorf("expect %d errors, got %v", numErrsErrorCase2, len(allErrors.(utilerrors.Aggregate).Errors())) } } diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index db332fd85de..10270839aa7 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -28,6 +28,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 40ed8bd21ec..b91102f6a5c 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -137,6 +138,7 @@ type NodeConfig struct { type NodeAllocatableConfig struct { KubeReservedCgroupName string SystemReservedCgroupName string + ReservedSystemCPUs cpuset.CPUSet EnforceNodeAllocatable sets.String KubeReserved v1.ResourceList SystemReserved v1.ResourceList diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index c2c67105c7c..8dcdc5f1249 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -320,6 +320,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I nodeConfig.ExperimentalCPUManagerReconcilePeriod, machineInfo, numaNodeInfo, + nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs, cm.GetNodeAllocatableReservation(), nodeConfig.KubeletRootDir, cm.topologyManager, diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 69d3a0a0519..d93f0db3c24 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -23,7 +23,7 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" @@ -113,7 +113,7 @@ func (s *sourcesReadyStub) AddSource(source string) {} func (s *sourcesReadyStub) AllReady() bool { return true } // NewManager creates new cpu manager based on provided policy -func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { +func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var topo *topology.CPUTopology var policy Policy @@ -147,7 +147,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy = NewStaticPolicy(topo, numReservedCPUs, affinity) + policy = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity) default: return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 6b3f566478a..429e9981a55 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -27,7 +27,7 @@ import ( "os" cadvisorapi "github.com/google/cadvisor/info/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -200,7 +200,10 @@ func TestCPUManagerAdd(t *testing.T) { 2: {CoreID: 2, SocketID: 0}, 3: {CoreID: 3, SocketID: 0}, }, - }, 0, topologymanager.NewFakeManager()) + }, + 0, + cpuset.NewCPUSet(), + topologymanager.NewFakeManager()) testCases := []struct { description string updateErr error @@ -347,7 +350,7 @@ func TestCPUManagerGenerate(t *testing.T) { } defer os.RemoveAll(sDir) - mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, nil, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) + mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, nil, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) if testCase.expectedError != nil { if !strings.Contains(err.Error(), testCase.expectedError.Error()) { t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) @@ -669,3 +672,72 @@ func TestReconcileState(t *testing.T) { } } } + +// above test cases are without kubelet --reserved-cpus cmd option +// the following tests are with --reserved-cpus configured +func TestCPUManagerAddWithResvList(t *testing.T) { + testPolicy := NewStaticPolicy( + &topology.CPUTopology{ + NumCPUs: 4, + NumSockets: 1, + NumCores: 4, + CPUDetails: map[int]topology.CPUInfo{ + 0: {CoreID: 0, SocketID: 0}, + 1: {CoreID: 1, SocketID: 0}, + 2: {CoreID: 2, SocketID: 0}, + 3: {CoreID: 3, SocketID: 0}, + }, + }, + 1, + cpuset.NewCPUSet(0), + topologymanager.NewFakeManager()) + testCases := []struct { + description string + updateErr error + policy Policy + expCPUSet cpuset.CPUSet + expErr error + }{ + { + description: "cpu manager add - no error", + updateErr: nil, + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(0, 3), + expErr: nil, + }, + { + description: "cpu manager add - container update error", + updateErr: fmt.Errorf("fake update error"), + policy: testPolicy, + expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), + expErr: fmt.Errorf("fake update error"), + }, + } + + for _, testCase := range testCases { + mgr := &manager{ + policy: testCase.policy, + state: &mockState{ + assignments: state.ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), + }, + containerRuntime: mockRuntimeService{ + err: testCase.updateErr, + }, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + } + + pod := makePod("2", "2") + container := &pod.Spec.Containers[0] + err := mgr.AddContainer(pod, container, "fakeID") + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) { + t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v", + testCase.description, testCase.expCPUSet, mgr.state.GetDefaultCPUSet()) + } + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 82c81d77a90..dd71c026dfb 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -19,7 +19,7 @@ package cpumanager import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" @@ -89,14 +89,19 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) Policy { allCPUs := topology.CPUDetails.CPUs() - // takeByTopology allocates CPUs associated with low-numbered cores from - // allCPUs. - // - // For example: Given a system with 8 CPUs available and HT enabled, - // if numReservedCPUs=2, then reserved={0,4} - reserved, _ := takeByTopology(topology, allCPUs, numReservedCPUs) + var reserved cpuset.CPUSet + if reservedCPUs.Size() > 0 { + reserved = reservedCPUs + } else { + // takeByTopology allocates CPUs associated with low-numbered cores from + // allCPUs. + // + // For example: Given a system with 8 CPUs available and HT enabled, + // if numReservedCPUs=2, then reserved={0,4} + reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs) + } if reserved.Size() != numReservedCPUs { panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index acba7c811aa..a2df92f34d7 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -21,7 +21,7 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -57,7 +57,7 @@ type staticPolicyMultiContainerTest struct { } func TestStaticPolicyName(t *testing.T) { - policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager()) + policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) policyName := policy.Name() if policyName != "static" { @@ -134,7 +134,7 @@ func TestStaticPolicyStart(t *testing.T) { t.Error("expected panic doesn't occurred") } }() - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, @@ -432,7 +432,7 @@ func TestStaticPolicyAdd(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -645,7 +645,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -732,7 +732,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -822,7 +822,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - policy := NewStaticPolicy(tc.topo, 0, topologymanager.NewFakeManager()).(*staticPolicy) + policy := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, defaultCPUSet: tc.stDefaultCPUSet, @@ -842,3 +842,166 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { } } } + +// above test cases are without kubelet --reserved-cpus cmd option +// the following tests are with --reserved-cpus configured +type staticPolicyTestWithResvList struct { + description string + topo *topology.CPUTopology + numReservedCPUs int + reserved cpuset.CPUSet + containerID string + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + pod *v1.Pod + expErr error + expCPUAlloc bool + expCSet cpuset.CPUSet + expPanic bool +} + +func TestStaticPolicyStartWithResvList(t *testing.T) { + testCases := []staticPolicyTestWithResvList{ + { + description: "empty cpuset", + topo: topoDualSocketHT, + numReservedCPUs: 2, + reserved: cpuset.NewCPUSet(0, 1), + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(), + expCSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + }, + { + description: "reserved cores 0 & 1 are not present in available cpuset", + topo: topoDualSocketHT, + numReservedCPUs: 2, + reserved: cpuset.NewCPUSet(0, 1), + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5), + expPanic: true, + }, + { + description: "inconsistency between numReservedCPUs and reserved", + topo: topoDualSocketHT, + numReservedCPUs: 1, + reserved: cpuset.NewCPUSet(0, 1), + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + expPanic: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + defer func() { + if err := recover(); err != nil { + if !testCase.expPanic { + t.Errorf("unexpected panic occurred: %q", err) + } + } else if testCase.expPanic { + t.Error("expected panic doesn't occurred") + } + }() + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()).(*staticPolicy) + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + policy.Start(st) + + if !st.GetDefaultCPUSet().Equals(testCase.expCSet) { + t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(), + testCase.expCSet) + } + + }) + } +} + +func TestStaticPolicyAddWithResvList(t *testing.T) { + + testCases := []staticPolicyTestWithResvList{ + { + description: "GuPodSingleCore, SingleSocketHT, ExpectError", + topo: topoSingleSocketHT, + numReservedCPUs: 1, + reserved: cpuset.NewCPUSet(0), + containerID: "fakeID2", + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("8000m", "8000m"), + expErr: fmt.Errorf("not enough cpus available to satisfy request"), + expCPUAlloc: false, + expCSet: cpuset.NewCPUSet(), + }, + { + description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", + topo: topoSingleSocketHT, + numReservedCPUs: 2, + reserved: cpuset.NewCPUSet(0, 1), + containerID: "fakeID2", + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + pod: makePod("1000m", "1000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core + }, + { + description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore", + topo: topoSingleSocketHT, + numReservedCPUs: 2, + reserved: cpuset.NewCPUSet(0, 1), + containerID: "fakeID3", + stAssignments: state.ContainerCPUAssignments{ + "fakeID100": cpuset.NewCPUSet(2, 3, 6, 7), + }, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), + pod: makePod("2000m", "2000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(4, 5), + }, + } + + for _, testCase := range testCases { + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()) + + st := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + + container := &testCase.pod.Spec.Containers[0] + err := policy.AddContainer(st, testCase.pod, container, testCase.containerID) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + + if testCase.expCPUAlloc { + cset, found := st.assignments[testCase.containerID] + if !found { + t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", + testCase.description, testCase.containerID, st.assignments) + } + + if !reflect.DeepEqual(cset, testCase.expCSet) { + t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v", + testCase.description, testCase.expCSet, cset) + } + + if !cset.Intersection(st.defaultCPUSet).IsEmpty() { + t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + testCase.description, cset, st.defaultCPUSet) + } + } + + if !testCase.expCPUAlloc { + _, found := st.assignments[testCase.containerID] + if found { + t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v", + testCase.description, testCase.containerID, st.assignments) + } + } + } +} diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index b2aeb87cc7a..1c03d43162f 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -710,6 +710,10 @@ type KubeletConfiguration struct { // Default: nil // +optional KubeReserved map[string]string `json:"kubeReserved,omitempty"` + // This ReservedSystemCPUs option specifies the cpu list reserved for the host level system threads and kubernetes related threads. + // This provide a "static" CPU list rather than the "dynamic" list by system-reserved and kube-reserved. + // This option overwrites CPUs provided by system-reserved and kube-reserved. + ReservedSystemCPUs string `json:"reservedSystemCPUs,omitempty"` // This flag helps kubelet identify absolute name of top level cgroup used to enforce `SystemReserved` compute resource reservation for OS system daemons. // Refer to [Node Allocatable](https://git.k8s.io/community/contributors/design-proposals/node/node-allocatable.md) doc for more information. // Dynamic Kubelet Config (beta): This field should not be updated without a full node