From e2ffd007f7794f06b9a2f39d62280bd974d747c6 Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Fri, 11 Dec 2015 17:51:39 -0800 Subject: [PATCH] Implement node Allocatable Add `kube-reserved` and `system-reserved` flags for configuration reserved resources for usage outside of kubernetes pods. Allocatable is provided by the Kubelet according to the formula: ``` Allocatable = Capacity - KubeReserved - SystemReserved ``` Also provides a method for estimating a reasonable default for `KubeReserved`, but the current implementation probably is low and needs more tuning. --- cmd/kubelet/app/options/options.go | 6 ++++ cmd/kubelet/app/server.go | 42 +++++++++++++++++++++++++ docs/admin/kubelet.md | 4 ++- hack/verify-flags/known-flags.txt | 2 ++ pkg/kubelet/kubelet.go | 23 ++++++++++++++ pkg/kubelet/kubelet_test.go | 49 ++++++++++++++++++++++++------ pkg/kubelet/types/types.go | 8 +++++ 7 files changed, 123 insertions(+), 11 deletions(-) diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 7ecffc79cc9..e1fe726cf35 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -108,6 +108,8 @@ type KubeletServer struct { TLSCertFile string TLSPrivateKeyFile string ReconcileCIDR bool + SystemReserved util.ConfigurationMap + KubeReserved util.ConfigurationMap // Flags intended for testing // Is the kubelet containerized? @@ -183,6 +185,8 @@ func NewKubeletServer() *KubeletServer { SyncFrequency: 1 * time.Minute, SystemContainer: "", ReconcileCIDR: true, + SystemReserved: make(util.ConfigurationMap), + KubeReserved: make(util.ConfigurationMap), KubeAPIQPS: 5.0, KubeAPIBurst: 10, ExperimentalFlannelOverlay: experimentalFlannelOverlay, @@ -267,6 +271,8 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.Containerized, "containerized", s.Containerized, "Experimental support for running kubelet in a container. Intended for testing. [default=false]") fs.Uint64Var(&s.MaxOpenFiles, "max-open-files", s.MaxOpenFiles, "Number of files that can be opened by Kubelet process. [default=1000000]") fs.BoolVar(&s.ReconcileCIDR, "reconcile-cidr", s.ReconcileCIDR, "Reconcile node CIDR with the CIDR specified by the API server. No-op if register-node or configure-cbr0 is false. [default=true]") + fs.Var(&s.SystemReserved, "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]") + fs.Var(&s.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]") fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 6a49bce62bc..222708ac1b6 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/chaosclient" "k8s.io/kubernetes/pkg/client/record" @@ -171,6 +172,11 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { manifestURLHeader.Set(pieces[0], pieces[1]) } + reservation, err := parseReservation(s.KubeReserved, s.SystemReserved) + if err != nil { + return nil, err + } + return &KubeletConfig{ Address: s.Address, AllowPrivileged: s.AllowPrivileged, @@ -228,6 +234,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { RegistryBurst: s.RegistryBurst, RegistryPullQPS: s.RegistryPullQPS, ResolverConfig: s.ResolverConfig, + Reservation: *reservation, ResourceContainer: s.ResourceContainer, RktPath: s.RktPath, RktStage1Image: s.RktStage1Image, @@ -706,6 +713,7 @@ type KubeletConfig struct { RegisterSchedulable bool RegistryBurst int RegistryPullQPS float64 + Reservation kubetypes.Reservation ResolverConfig string ResourceContainer string RktPath string @@ -808,6 +816,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.OutOfDiskTransitionFrequency, kc.ExperimentalFlannelOverlay, kc.NodeIP, + kc.Reservation, ) if err != nil { @@ -820,3 +829,36 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod return k, pc, nil } + +func parseReservation(kubeReserved, systemReserved util.ConfigurationMap) (*kubetypes.Reservation, error) { + reservation := new(kubetypes.Reservation) + if rl, err := parseResourceList(kubeReserved); err != nil { + return nil, err + } else { + reservation.Kubernetes = rl + } + if rl, err := parseResourceList(systemReserved); err != nil { + return nil, err + } else { + reservation.System = rl + } + return reservation, nil +} + +func parseResourceList(m util.ConfigurationMap) (api.ResourceList, error) { + rl := make(api.ResourceList) + for k, v := range m { + switch api.ResourceName(k) { + // Only CPU and memory resources are supported. + case api.ResourceCPU, api.ResourceMemory: + q, err := resource.ParseQuantity(v) + if err != nil { + return nil, err + } + rl[api.ResourceName(k)] = *q + default: + return nil, fmt.Errorf("cannot reserve %q resource", k) + } + } + return rl, nil +} diff --git a/docs/admin/kubelet.md b/docs/admin/kubelet.md index a124abed684..f7138091ae2 100644 --- a/docs/admin/kubelet.md +++ b/docs/admin/kubelet.md @@ -100,6 +100,7 @@ kubelet --image-gc-low-threshold=80: The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80% --kube-api-burst=10: Burst to use while talking with kubernetes apiserver --kube-api-qps=5: QPS to use while talking with kubernetes apiserver + --kube-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none] --kubeconfig="/var/lib/kubelet/kubeconfig": Path to a kubeconfig file, specifying how to authenticate to API server (the master location is set by the api-servers flag). --log-flush-frequency=5s: Maximum number of seconds between log flushes --low-diskspace-threshold-mb=256: The absolute free disk space, in MB, to maintain. When disk space falls below this threshold, new pods would be rejected. Default: 256 @@ -138,12 +139,13 @@ kubelet --streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m' --sync-frequency=1m0s: Max period between synchronizing running containers and config --system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: ""). + --system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none] --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": The full path of the directory in which to search for additional third party volume plugins ``` -###### Auto generated by spf13/cobra on 11-Jan-2016 +###### Auto generated by spf13/cobra on 12-Jan-2016 diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index e79f74d1a6d..82d7f081aab 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -156,6 +156,7 @@ km-path kube-api-burst kube-api-qps kube-master +kube-reserved kubectl-path kubelet-address kubelet-cadvisor-port @@ -331,6 +332,7 @@ streaming-connection-idle-timeout suicide-timeout sync-frequency system-container +system-reserved target-port tcp-services terminated-pod-gc-threshold diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8dce92e8331..713748a91ad 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -196,6 +196,7 @@ func NewMainKubelet( outOfDiskTransitionFrequency time.Duration, flannelExperimentalOverlay bool, nodeIP net.IP, + reservation kubetypes.Reservation, ) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -313,6 +314,7 @@ func NewMainKubelet( nodeIP: nodeIP, clock: util.RealClock{}, outOfDiskTransitionFrequency: outOfDiskTransitionFrequency, + reservation: reservation, } if klet.flannelExperimentalOverlay { glog.Infof("Flannel is in charge of podCIDR and overlay networking.") @@ -659,6 +661,10 @@ type Kubelet struct { // not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly // getting rescheduled onto the node. outOfDiskTransitionFrequency time.Duration + + // reservation specifies resources which are reserved for non-pod usage, including kubernetes and + // non-kubernetes system processes. + reservation kubetypes.Reservation } // Validate given node IP belongs to the current host @@ -2739,6 +2745,23 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) { } node.Status.NodeInfo.BootID = info.BootID } + + // Set Allocatable. + node.Status.Allocatable = make(api.ResourceList) + for k, v := range node.Status.Capacity { + value := *(v.Copy()) + if kl.reservation.System != nil { + value.Sub(kl.reservation.System[k]) + } + if kl.reservation.Kubernetes != nil { + value.Sub(kl.reservation.Kubernetes[k]) + } + if value.Amount != nil && value.Amount.Sign() < 0 { + // Negative Allocatable resources don't make sense. + value.Set(0) + } + node.Status.Allocatable[k] = value + } } // Set versioninfo for the node. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 34431ed4ee1..4698371ebc1 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -69,6 +69,9 @@ func init() { const testKubeletHostname = "127.0.0.1" +const testReservationCPU = "200m" +const testReservationMemory = "100M" + type fakeHTTP struct { url string err error @@ -172,6 +175,12 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.resyncInterval = 10 * time.Second + kubelet.reservation = kubetypes.Reservation{ + Kubernetes: api.ResourceList{ + api.ResourceCPU: resource.MustParse(testReservationCPU), + api.ResourceMemory: resource.MustParse(testReservationMemory), + }, + } kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) @@ -2521,7 +2530,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { SystemUUID: "abc", BootID: "1b3", NumCores: 2, - MemoryCapacity: 1024, + MemoryCapacity: 10E9, // 10G } mockCadvisor := testKubelet.fakeCadvisor mockCadvisor.On("Start").Return(nil) @@ -2572,7 +2581,12 @@ func TestUpdateNewNodeStatus(t *testing.T) { }, Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []api.NodeAddress{ @@ -2623,7 +2637,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { t.Errorf("unexpected node condition order. NodeReady should be last.") } - if !reflect.DeepEqual(expectedNode, updatedNode) { + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) } } @@ -2878,7 +2892,12 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), + api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, }, @@ -2891,7 +2910,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { SystemUUID: "abc", BootID: "1b3", NumCores: 2, - MemoryCapacity: 1024, + MemoryCapacity: 20E9, } mockCadvisor.On("MachineInfo").Return(machineInfo, nil) versionInfo := &cadvisorapi.VersionInfo{ @@ -2940,7 +2959,12 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []api.NodeAddress{ @@ -2993,7 +3017,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { t.Errorf("unexpected node condition order. NodeReady should be last.") } - if !reflect.DeepEqual(expectedNode, updatedNode) { + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) } } @@ -3169,7 +3193,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { SystemUUID: "abc", BootID: "1b3", NumCores: 2, - MemoryCapacity: 1024, + MemoryCapacity: 10E9, } mockCadvisor.On("MachineInfo").Return(machineInfo, nil) versionInfo := &cadvisorapi.VersionInfo{ @@ -3218,7 +3242,12 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { }, Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []api.NodeAddress{ @@ -3270,7 +3299,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { t.Errorf("unexpected node condition order. NodeReady should be last.") } - if !reflect.DeepEqual(expectedNode, updatedNode) { + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) } } diff --git a/pkg/kubelet/types/types.go b/pkg/kubelet/types/types.go index 0e7ca3bf2dd..7776ee9e366 100644 --- a/pkg/kubelet/types/types.go +++ b/pkg/kubelet/types/types.go @@ -67,3 +67,11 @@ func (s SortedContainerStatuses) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s SortedContainerStatuses) Less(i, j int) bool { return s[i].Name < s[j].Name } + +// Reservation represents reserved resources for non-pod components. +type Reservation struct { + // System represents resources reserved for non-kubernetes components. + System api.ResourceList + // Kubernetes represents resources reserved for kubernetes system components. + Kubernetes api.ResourceList +}