Implement node Allocatable

Add `kube-reserved` and `system-reserved` flags for configuration
reserved resources for usage outside of kubernetes pods. Allocatable is
provided by the Kubelet according to the formula:
```
Allocatable = Capacity - KubeReserved - SystemReserved
```

Also provides a method for estimating a reasonable default for
`KubeReserved`, but the current implementation probably is low and needs
more tuning.
This commit is contained in:
Tim St. Clair 2015-12-11 17:51:39 -08:00
parent 8f401fb0d5
commit e2ffd007f7
7 changed files with 123 additions and 11 deletions

View File

@ -108,6 +108,8 @@ type KubeletServer struct {
TLSCertFile string
TLSPrivateKeyFile string
ReconcileCIDR bool
SystemReserved util.ConfigurationMap
KubeReserved util.ConfigurationMap
// Flags intended for testing
// Is the kubelet containerized?
@ -183,6 +185,8 @@ func NewKubeletServer() *KubeletServer {
SyncFrequency: 1 * time.Minute,
SystemContainer: "",
ReconcileCIDR: true,
SystemReserved: make(util.ConfigurationMap),
KubeReserved: make(util.ConfigurationMap),
KubeAPIQPS: 5.0,
KubeAPIBurst: 10,
ExperimentalFlannelOverlay: experimentalFlannelOverlay,
@ -267,6 +271,8 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.Containerized, "containerized", s.Containerized, "Experimental support for running kubelet in a container. Intended for testing. [default=false]")
fs.Uint64Var(&s.MaxOpenFiles, "max-open-files", s.MaxOpenFiles, "Number of files that can be opened by Kubelet process. [default=1000000]")
fs.BoolVar(&s.ReconcileCIDR, "reconcile-cidr", s.ReconcileCIDR, "Reconcile node CIDR with the CIDR specified by the API server. No-op if register-node or configure-cbr0 is false. [default=true]")
fs.Var(&s.SystemReserved, "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]")
fs.Var(&s.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]")
fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/client/chaosclient"
"k8s.io/kubernetes/pkg/client/record"
@ -171,6 +172,11 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
manifestURLHeader.Set(pieces[0], pieces[1])
}
reservation, err := parseReservation(s.KubeReserved, s.SystemReserved)
if err != nil {
return nil, err
}
return &KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
@ -228,6 +234,7 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
RegistryBurst: s.RegistryBurst,
RegistryPullQPS: s.RegistryPullQPS,
ResolverConfig: s.ResolverConfig,
Reservation: *reservation,
ResourceContainer: s.ResourceContainer,
RktPath: s.RktPath,
RktStage1Image: s.RktStage1Image,
@ -706,6 +713,7 @@ type KubeletConfig struct {
RegisterSchedulable bool
RegistryBurst int
RegistryPullQPS float64
Reservation kubetypes.Reservation
ResolverConfig string
ResourceContainer string
RktPath string
@ -808,6 +816,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.OutOfDiskTransitionFrequency,
kc.ExperimentalFlannelOverlay,
kc.NodeIP,
kc.Reservation,
)
if err != nil {
@ -820,3 +829,36 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
return k, pc, nil
}
func parseReservation(kubeReserved, systemReserved util.ConfigurationMap) (*kubetypes.Reservation, error) {
reservation := new(kubetypes.Reservation)
if rl, err := parseResourceList(kubeReserved); err != nil {
return nil, err
} else {
reservation.Kubernetes = rl
}
if rl, err := parseResourceList(systemReserved); err != nil {
return nil, err
} else {
reservation.System = rl
}
return reservation, nil
}
func parseResourceList(m util.ConfigurationMap) (api.ResourceList, error) {
rl := make(api.ResourceList)
for k, v := range m {
switch api.ResourceName(k) {
// Only CPU and memory resources are supported.
case api.ResourceCPU, api.ResourceMemory:
q, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
rl[api.ResourceName(k)] = *q
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}
}
return rl, nil
}

View File

@ -100,6 +100,7 @@ kubelet
--image-gc-low-threshold=80: The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%
--kube-api-burst=10: Burst to use while talking with kubernetes apiserver
--kube-api-qps=5: QPS to use while talking with kubernetes apiserver
--kube-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]
--kubeconfig="/var/lib/kubelet/kubeconfig": Path to a kubeconfig file, specifying how to authenticate to API server (the master location is set by the api-servers flag).
--log-flush-frequency=5s: Maximum number of seconds between log flushes
--low-diskspace-threshold-mb=256: The absolute free disk space, in MB, to maintain. When disk space falls below this threshold, new pods would be rejected. Default: 256
@ -138,12 +139,13 @@ kubelet
--streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'
--sync-frequency=1m0s: Max period between synchronizing running containers and config
--system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: "").
--system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file.
--volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins
```
###### Auto generated by spf13/cobra on 11-Jan-2016
###### Auto generated by spf13/cobra on 12-Jan-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -156,6 +156,7 @@ km-path
kube-api-burst
kube-api-qps
kube-master
kube-reserved
kubectl-path
kubelet-address
kubelet-cadvisor-port
@ -331,6 +332,7 @@ streaming-connection-idle-timeout
suicide-timeout
sync-frequency
system-container
system-reserved
target-port
tcp-services
terminated-pod-gc-threshold

View File

@ -196,6 +196,7 @@ func NewMainKubelet(
outOfDiskTransitionFrequency time.Duration,
flannelExperimentalOverlay bool,
nodeIP net.IP,
reservation kubetypes.Reservation,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
@ -313,6 +314,7 @@ func NewMainKubelet(
nodeIP: nodeIP,
clock: util.RealClock{},
outOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
reservation: reservation,
}
if klet.flannelExperimentalOverlay {
glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
@ -659,6 +661,10 @@ type Kubelet struct {
// not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
// getting rescheduled onto the node.
outOfDiskTransitionFrequency time.Duration
// reservation specifies resources which are reserved for non-pod usage, including kubernetes and
// non-kubernetes system processes.
reservation kubetypes.Reservation
}
// Validate given node IP belongs to the current host
@ -2739,6 +2745,23 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) {
}
node.Status.NodeInfo.BootID = info.BootID
}
// Set Allocatable.
node.Status.Allocatable = make(api.ResourceList)
for k, v := range node.Status.Capacity {
value := *(v.Copy())
if kl.reservation.System != nil {
value.Sub(kl.reservation.System[k])
}
if kl.reservation.Kubernetes != nil {
value.Sub(kl.reservation.Kubernetes[k])
}
if value.Amount != nil && value.Amount.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
node.Status.Allocatable[k] = value
}
}
// Set versioninfo for the node.

View File

@ -69,6 +69,9 @@ func init() {
const testKubeletHostname = "127.0.0.1"
const testReservationCPU = "200m"
const testReservationMemory = "100M"
type fakeHTTP struct {
url string
err error
@ -172,6 +175,12 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
kubelet.resyncInterval = 10 * time.Second
kubelet.reservation = kubetypes.Reservation{
Kubernetes: api.ResourceList{
api.ResourceCPU: resource.MustParse(testReservationCPU),
api.ResourceMemory: resource.MustParse(testReservationMemory),
},
}
kubelet.workQueue = queue.NewBasicWorkQueue()
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
@ -2521,7 +2530,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
MemoryCapacity: 10E9, // 10G
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
@ -2572,7 +2581,12 @@ func TestUpdateNewNodeStatus(t *testing.T) {
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
@ -2623,7 +2637,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if !reflect.DeepEqual(expectedNode, updatedNode) {
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
@ -2878,7 +2892,12 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
@ -2891,7 +2910,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
MemoryCapacity: 20E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
@ -2940,7 +2959,12 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
@ -2993,7 +3017,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if !reflect.DeepEqual(expectedNode, updatedNode) {
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
@ -3169,7 +3193,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
MemoryCapacity: 10E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
@ -3218,7 +3242,12 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
@ -3270,7 +3299,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if !reflect.DeepEqual(expectedNode, updatedNode) {
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}

View File

@ -67,3 +67,11 @@ func (s SortedContainerStatuses) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s SortedContainerStatuses) Less(i, j int) bool {
return s[i].Name < s[j].Name
}
// Reservation represents reserved resources for non-pod components.
type Reservation struct {
// System represents resources reserved for non-kubernetes components.
System api.ResourceList
// Kubernetes represents resources reserved for kubernetes system components.
Kubernetes api.ResourceList
}