Merge pull request #83592 from jianzzha/opt-reserved-cpus

added --reserved-cpus kubelet command option
This commit is contained in:
Kubernetes Prow Robot 2019-11-06 22:14:42 -08:00 committed by GitHub
commit 73b2c82b28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 409 additions and 28 deletions

View File

@ -64,6 +64,7 @@
"k8s.io/kubernetes/pkg/features",
"k8s.io/kubernetes/pkg/fieldpath",
"k8s.io/kubernetes/pkg/kubelet/apis",
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset",
"k8s.io/kubernetes/pkg/kubelet/qos",
"k8s.io/kubernetes/pkg/kubelet/types",
"k8s.io/kubernetes/pkg/master/ports",

View File

@ -57,6 +57,7 @@ go_library(
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/certificate/bootstrap:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",

View File

@ -553,7 +553,7 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig
fs.Var(cliflag.NewMapStringString(&c.EvictionMinimumReclaim), "eviction-minimum-reclaim", "A set of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.")
fs.Int32Var(&c.PodsPerCore, "pods-per-core", c.PodsPerCore, "Number of Pods per core that can run on this Kubelet. The total number of Pods on this Kubelet cannot exceed max-pods, so max-pods will be used if this calculation results in a larger number of Pods allowed on the Kubelet. A value of 0 disables this limit.")
fs.BoolVar(&c.ProtectKernelDefaults, "protect-kernel-defaults", c.ProtectKernelDefaults, "Default kubelet behaviour for kernel tuning. If set, kubelet errors if any of kernel tunables is different than kubelet defaults.")
fs.StringVar(&c.ReservedSystemCPUs, "reserved-cpus", c.ReservedSystemCPUs, "A comma-separated list of CPUs or CPU ranges that are reserved for system and kubernetes usage. This specific list will supersede cpu counts in --system-reserved and --kube-reserved.")
// Node Allocatable Flags
fs.Var(cliflag.NewMapStringString(&c.SystemReserved), "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")
fs.Var(cliflag.NewMapStringString(&c.KubeReserved), "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi,ephemeral-storage=1Gi) pairs that describe resources reserved for kubernetes system components. Currently cpu, memory and local ephemeral storage for root file system are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")

View File

@ -77,6 +77,7 @@ import (
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
@ -636,6 +637,48 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
var reservedSystemCPUs cpuset.CPUSet
var errParse error
if s.ReservedSystemCPUs != "" {
reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
if errParse != nil {
// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
return errParse
}
// is it safe do use CAdvisor here ??
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
// if can't use CAdvisor here, fall back to non-explicit cpu list behavor
klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
reservedSystemCPUs = cpuset.NewCPUSet()
} else {
reservedList := reservedSystemCPUs.ToSlice()
first := reservedList[0]
last := reservedList[len(reservedList)-1]
if first < 0 || last >= machineInfo.NumCores {
// the specified cpuset is outside of the range of what the machine has
klog.Infof("Invalid cpuset specified by --reserved-cpus")
return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
}
}
} else {
reservedSystemCPUs = cpuset.NewCPUSet()
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
@ -678,6 +721,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,

View File

@ -212,6 +212,7 @@ var (
"ReadOnlyPort",
"RegistryBurst",
"RegistryPullQPS",
"ReservedSystemCPUs",
"RuntimeRequestTimeout.Duration",
"SerializeImagePulls",
"StreamingConnectionIdleTimeout.Duration",

View File

@ -331,6 +331,10 @@ type KubeletConfiguration struct {
// This flag accepts a list of options. Acceptable options are `pods`, `system-reserved` & `kube-reserved`.
// Refer to [Node Allocatable](https://git.k8s.io/community/contributors/design-proposals/node/node-allocatable.md) doc for more information.
EnforceNodeAllocatable []string
// This option specifies the cpu list reserved for the host level system threads and kubernetes related threads.
// This provide a "static" CPU list rather than the "dynamic" list by system-reserved and kube-reserved.
// This option overwrites CPUs provided by system-reserved and kube-reserved.
ReservedSystemCPUs string
}
type KubeletAuthorizationMode string

View File

@ -326,6 +326,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
out.ReservedSystemCPUs = in.ReservedSystemCPUs
out.SystemReservedCgroup = in.SystemReservedCgroup
out.KubeReservedCgroup = in.KubeReservedCgroup
out.EnforceNodeAllocatable = *(*[]string)(unsafe.Pointer(&in.EnforceNodeAllocatable))
@ -462,6 +463,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
out.SystemReservedCgroup = in.SystemReservedCgroup
out.KubeReservedCgroup = in.KubeReservedCgroup
out.EnforceNodeAllocatable = *(*[]string)(unsafe.Pointer(&in.EnforceNodeAllocatable))
out.ReservedSystemCPUs = in.ReservedSystemCPUs
return nil
}

View File

@ -17,6 +17,7 @@ go_library(
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",

View File

@ -25,6 +25,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -142,6 +143,15 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error
allErrors = append(allErrors, fmt.Errorf("invalid configuration: option %q specified for HairpinMode (--hairpin-mode). Valid options are %q, %q or %q",
kc.HairpinMode, kubeletconfig.HairpinNone, kubeletconfig.HairpinVeth, kubeletconfig.PromiscuousBridge))
}
if kc.ReservedSystemCPUs != "" {
// --reserved-cpus does not support --system-reserved-cgroup or --kube-reserved-cgroup
if kc.SystemReservedCgroup != "" || kc.KubeReservedCgroup != "" {
allErrors = append(allErrors, fmt.Errorf("can't use --reserved-cpus with --system-reserved-cgroup or --kube-reserved-cgroup"))
}
if _, err := cpuset.Parse(kc.ReservedSystemCPUs); err != nil {
allErrors = append(allErrors, fmt.Errorf("unable to parse --reserved-cpus, error: %v", err))
}
}
if err := validateKubeletOSConfiguration(kc); err != nil {
allErrors = append(allErrors, err)

View File

@ -26,7 +26,7 @@ import (
)
func TestValidateKubeletConfiguration(t *testing.T) {
successCase := &kubeletconfig.KubeletConfiguration{
successCase1 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
@ -55,11 +55,45 @@ func TestValidateKubeletConfiguration(t *testing.T) {
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
TopologyManagerPolicy: "none",
}
if allErrors := ValidateKubeletConfiguration(successCase); allErrors != nil {
if allErrors := ValidateKubeletConfiguration(successCase1); allErrors != nil {
t.Errorf("expect no errors, got %v", allErrors)
}
errorCase := &kubeletconfig.KubeletConfiguration{
successCase2 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods"},
SystemReservedCgroup: "",
KubeReservedCgroup: "",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
TopologyManagerPolicy: "none",
ReservedSystemCPUs: "0-3",
}
if allErrors := ValidateKubeletConfiguration(successCase2); allErrors != nil {
t.Errorf("expect no errors, got %v", allErrors)
}
errorCase1 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: false,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved", "illegal-key"},
SystemCgroups: "/",
@ -86,8 +120,43 @@ func TestValidateKubeletConfiguration(t *testing.T) {
CPUCFSQuotaPeriod: metav1.Duration{Duration: 0},
TopologyManagerPolicy: "",
}
const numErrs = 26
if allErrors := ValidateKubeletConfiguration(errorCase); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrs {
t.Errorf("expect %d errors, got %v", numErrs, len(allErrors.(utilerrors.Aggregate).Errors()))
const numErrsErrorCase1 = 26
if allErrors := ValidateKubeletConfiguration(errorCase1); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase1 {
t.Errorf("expect %d errors, got %v", numErrsErrorCase1, len(allErrors.(utilerrors.Aggregate).Errors()))
}
errorCase2 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
KubeReservedCgroup: "/kubelet.service",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
TopologyManagerPolicy: "none",
ReservedSystemCPUs: "0-3",
}
const numErrsErrorCase2 = 1
if allErrors := ValidateKubeletConfiguration(errorCase2); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase2 {
t.Errorf("expect %d errors, got %v", numErrsErrorCase2, len(allErrors.(utilerrors.Aggregate).Errors()))
}
}

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -137,6 +138,7 @@ type NodeConfig struct {
type NodeAllocatableConfig struct {
KubeReservedCgroupName string
SystemReservedCgroupName string
ReservedSystemCPUs cpuset.CPUSet
EnforceNodeAllocatable sets.String
KubeReserved v1.ResourceList
SystemReserved v1.ResourceList

View File

@ -320,6 +320,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
numaNodeInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,

View File

@ -23,7 +23,7 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
@ -113,7 +113,7 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var topo *topology.CPUTopology
var policy Policy
@ -147,7 +147,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs, affinity)
policy = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)

View File

@ -27,7 +27,7 @@ import (
"os"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -200,7 +200,10 @@ func TestCPUManagerAdd(t *testing.T) {
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
},
}, 0, topologymanager.NewFakeManager())
},
0,
cpuset.NewCPUSet(),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
@ -347,7 +350,7 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, nil, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, nil, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
@ -669,3 +672,72 @@ func TestReconcileState(t *testing.T) {
}
}
}
// above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured
func TestCPUManagerAddWithResvList(t *testing.T) {
testPolicy := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
NumSockets: 1,
NumCores: 4,
CPUDetails: map[int]topology.CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 0},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
},
},
1,
cpuset.NewCPUSet(0),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expErr error
}{
{
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 3),
expErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expErr: fmt.Errorf("fake update error"),
},
}
for _, testCase := range testCases {
mgr := &manager{
policy: testCase.policy,
state: &mockState{
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
},
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
pod := makePod("2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",
testCase.description, testCase.expCPUSet, mgr.state.GetDefaultCPUSet())
}
}
}

View File

@ -19,7 +19,7 @@ package cpumanager
import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
@ -89,14 +89,19 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy {
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) Policy {
allCPUs := topology.CPUDetails.CPUs()
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ := takeByTopology(topology, allCPUs, numReservedCPUs)
var reserved cpuset.CPUSet
if reservedCPUs.Size() > 0 {
reserved = reservedCPUs
} else {
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs)
}
if reserved.Size() != numReservedCPUs {
panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs))

View File

@ -21,7 +21,7 @@ import (
"reflect"
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@ -57,7 +57,7 @@ type staticPolicyMultiContainerTest struct {
}
func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager())
policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
policyName := policy.Name()
if policyName != "static" {
@ -134,7 +134,7 @@ func TestStaticPolicyStart(t *testing.T) {
t.Error("expected panic doesn't occurred")
}
}()
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
@ -432,7 +432,7 @@ func TestStaticPolicyAdd(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
@ -645,7 +645,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
@ -732,7 +732,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
@ -822,7 +822,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
policy := NewStaticPolicy(tc.topo, 0, topologymanager.NewFakeManager()).(*staticPolicy)
policy := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{
assignments: tc.stAssignments,
defaultCPUSet: tc.stDefaultCPUSet,
@ -842,3 +842,166 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
}
}
}
// above test cases are without kubelet --reserved-cpus cmd option
// the following tests are with --reserved-cpus configured
type staticPolicyTestWithResvList struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
containerID string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
expPanic bool
}
func TestStaticPolicyStartWithResvList(t *testing.T) {
testCases := []staticPolicyTestWithResvList{
{
description: "empty cpuset",
topo: topoDualSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(),
expCSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
description: "reserved cores 0 & 1 are not present in available cpuset",
topo: topoDualSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5),
expPanic: true,
},
{
description: "inconsistency between numReservedCPUs and reserved",
topo: topoDualSocketHT,
numReservedCPUs: 1,
reserved: cpuset.NewCPUSet(0, 1),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expPanic: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
defer func() {
if err := recover(); err != nil {
if !testCase.expPanic {
t.Errorf("unexpected panic occurred: %q", err)
}
} else if testCase.expPanic {
t.Error("expected panic doesn't occurred")
}
}()
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
policy.Start(st)
if !st.GetDefaultCPUSet().Equals(testCase.expCSet) {
t.Errorf("State CPUSet is different than expected. Have %q wants: %q", st.GetDefaultCPUSet(),
testCase.expCSet)
}
})
}
}
func TestStaticPolicyAddWithResvList(t *testing.T) {
testCases := []staticPolicyTestWithResvList{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectError",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
reserved: cpuset.NewCPUSet(0),
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
},
{
description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore",
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2, 3, 6, 7),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("2000m", "2000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4, 5),
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container, testCase.containerID)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[testCase.containerID]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
}
if !testCase.expCPUAlloc {
_, found := st.assignments[testCase.containerID]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
}
}
}

View File

@ -710,6 +710,10 @@ type KubeletConfiguration struct {
// Default: nil
// +optional
KubeReserved map[string]string `json:"kubeReserved,omitempty"`
// This ReservedSystemCPUs option specifies the cpu list reserved for the host level system threads and kubernetes related threads.
// This provide a "static" CPU list rather than the "dynamic" list by system-reserved and kube-reserved.
// This option overwrites CPUs provided by system-reserved and kube-reserved.
ReservedSystemCPUs string `json:"reservedSystemCPUs,omitempty"`
// This flag helps kubelet identify absolute name of top level cgroup used to enforce `SystemReserved` compute resource reservation for OS system daemons.
// Refer to [Node Allocatable](https://git.k8s.io/community/contributors/design-proposals/node/node-allocatable.md) doc for more information.
// Dynamic Kubelet Config (beta): This field should not be updated without a full node