diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index b5dc81df4af..d5e412a618f 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -291,7 +291,7 @@ func (c *kubeletConfiguration) addFlags(fs *pflag.FlagSet) { // Node Allocatable Flags fs.Var(&c.SystemReserved, "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]") - fs.Var(&c.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]") + fs.Var(&c.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi, storage=1Gi) pairs that describe resources reserved for kubernetes system components. Currently cpu, memory and local storage for root file system are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]") fs.StringSliceVar(&c.EnforceNodeAllocatable, "enforce-node-allocatable", c.EnforceNodeAllocatable, "A comma separated list of levels of node allocatable enforcement to be enforced by kubelet. Acceptible options are 'pods', 'system-reserved' & 'kube-reserved'. If the latter two options are specified, '--system-reserved-cgroup' & '--kube-reserved-cgroup' must also be set respectively. See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node-allocatable.md for more details.") fs.StringVar(&c.SystemReservedCgroup, "system-reserved-cgroup", c.SystemReservedCgroup, "Absolute name of the top level cgroup that is used to manage non-kubernetes components for which compute resources were reserved via '--system-reserved' flag. Ex. '/system-reserved'. [default='']") fs.StringVar(&c.KubeReservedCgroup, "kube-reserved-cgroup", c.KubeReservedCgroup, "Absolute name of the top level cgroup that is used to manage kubernetes components for which compute resources were reserved via '--kube-reserved' flag. Ex. '/kube-reserved'. [default='']") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index eb44ed968c8..00d06d737ff 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -1025,8 +1025,8 @@ func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, err rl := make(v1.ResourceList) for k, v := range m { switch v1.ResourceName(k) { - // Only CPU and memory resources are supported. - case v1.ResourceCPU, v1.ResourceMemory: + // CPU, memory and local storage resources are supported. + case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceStorage: q, err := resource.ParseQuantity(v) if err != nil { return nil, err diff --git a/pkg/apis/componentconfig/v1alpha1/types.go b/pkg/apis/componentconfig/v1alpha1/types.go index 2b010a2932b..ff700bbf4dd 100644 --- a/pkg/apis/componentconfig/v1alpha1/types.go +++ b/pkg/apis/componentconfig/v1alpha1/types.go @@ -551,7 +551,7 @@ type KubeletConfiguration struct { SystemReserved map[string]string `json:"systemReserved"` // A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs // that describe resources reserved for kubernetes system components. - // Currently only cpu and memory are supported. [default=none] + // Currently cpu, memory and local storage for root file system are supported. [default=none] // See http://kubernetes.io/docs/user-guide/compute-resources for more detail. KubeReserved map[string]string `json:"kubeReserved"` diff --git a/pkg/kubelet/cadvisor/util.go b/pkg/kubelet/cadvisor/util.go index c2ac721b4eb..4b8a67340c7 100644 --- a/pkg/kubelet/cadvisor/util.go +++ b/pkg/kubelet/cadvisor/util.go @@ -18,6 +18,7 @@ package cadvisor import ( cadvisorapi "github.com/google/cadvisor/info/v1" + cadvisorapi2 "github.com/google/cadvisor/info/v2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/api/v1" ) @@ -33,3 +34,21 @@ func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) v1.ResourceList { } return c } + +func StorageScratchCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList { + c := v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity( + int64(info.Capacity), + resource.BinarySI), + } + return c +} + +func StorageOverlayCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList { + c := v1.ResourceList{ + v1.ResourceStorageOverlay: *resource.NewQuantity( + int64(info.Capacity), + resource.BinarySI), + } + return c +} diff --git a/pkg/kubelet/cm/node_container_manager.go b/pkg/kubelet/cm/node_container_manager.go index f0ffb12879f..e9391076903 100644 --- a/pkg/kubelet/cm/node_container_manager.go +++ b/pkg/kubelet/cm/node_container_manager.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" ) @@ -180,9 +181,20 @@ func (cm *containerManagerImpl) getNodeAllocatableAbsolute() v1.ResourceList { } -// GetNodeAllocatable returns amount of compute resource that have to be reserved on this node from scheduling. +// GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling. func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList { evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity) + if _, ok := cm.capacity[v1.ResourceStorage]; !ok { + if cm.cadvisorInterface != nil { + if rootfs, err := cm.cadvisorInterface.RootFsInfo(); err == nil { + for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { + cm.capacity[rName] = rCap + } + } else { + glog.Warning("Error getting rootfs info: %v", err) + } + } + } result := make(v1.ResourceList) for k := range cm.capacity { value := resource.NewQuantity(0, resource.DecimalSI) diff --git a/pkg/kubelet/eviction/api/types.go b/pkg/kubelet/eviction/api/types.go index 2928688a120..9e40ac62460 100644 --- a/pkg/kubelet/eviction/api/types.go +++ b/pkg/kubelet/eviction/api/types.go @@ -38,6 +38,8 @@ const ( SignalImageFsInodesFree Signal = "imagefs.inodesFree" // SignalAllocatableMemoryAvailable is amount of memory available for pod allocation (i.e. allocatable - workingSet (of pods), in bytes. SignalAllocatableMemoryAvailable Signal = "allocatableMemory.available" + // SignalAllocatableNodeFsAvailable is amount of local storage available for pod allocation + SignalAllocatableNodeFsAvailable Signal = "allocatableNodeFs.available" ) // ThresholdOperator is the operator used to express a Threshold. diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 6431a4f23d5..09db40d0075 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -82,6 +82,8 @@ type managerImpl struct { lastObservations signalObservations // notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once) notifiersInitialized bool + // dedicatedImageFs indicates if imagefs is on a separate device from the rootfs + dedicatedImageFs *bool } // ensure it implements the required interface @@ -106,6 +108,7 @@ func NewManager( nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, + dedicatedImageFs: nil, } return manager, manager } @@ -211,21 +214,22 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act } glog.V(3).Infof("eviction manager: synchronize housekeeping") - // build the ranking functions (if not yet known) // TODO: have a function in cadvisor that lets us know if global housekeeping has completed - if len(m.resourceToRankFunc) == 0 || len(m.resourceToNodeReclaimFuncs) == 0 { - // this may error if cadvisor has yet to complete housekeeping, so we will just try again in next pass. - hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs() - if err != nil { + if m.dedicatedImageFs == nil { + hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs() + if ok != nil { return nil } - m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs) - m.resourceToNodeReclaimFuncs = buildResourceToNodeReclaimFuncs(m.imageGC, hasDedicatedImageFs) + m.dedicatedImageFs = &hasImageFs + m.resourceToRankFunc = buildResourceToRankFunc(hasImageFs) + m.resourceToNodeReclaimFuncs = buildResourceToNodeReclaimFuncs(m.imageGC, hasImageFs) + } + activePods := podFunc() // make observations and get a function to derive pod usage stats relative to those observations. - observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider) + observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider, activePods, *m.dedicatedImageFs) if err != nil { glog.Errorf("eviction manager: unexpected err: %v", err) return nil @@ -336,7 +340,11 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act } // the only candidates viable for eviction are those pods that had anything running. - activePods := podFunc() + if len(activePods) == 0 { + glog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict") + return nil + } + // rank the running pods for eviction for the specified resource rank(activePods, statsFunc) diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 518853dd767..583097f37bc 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -54,6 +54,8 @@ const ( resourceNodeFs v1.ResourceName = "nodefs" // nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes. resourceNodeFsInodes v1.ResourceName = "nodefsInodes" + // container overlay storage, in bytes. internal to this module, used to account for local disk usage for container overlay. + resourceOverlay v1.ResourceName = "overlay" ) var ( @@ -74,19 +76,25 @@ func init() { signalToNodeCondition[evictionapi.SignalNodeFsAvailable] = v1.NodeDiskPressure signalToNodeCondition[evictionapi.SignalImageFsInodesFree] = v1.NodeDiskPressure signalToNodeCondition[evictionapi.SignalNodeFsInodesFree] = v1.NodeDiskPressure + signalToNodeCondition[evictionapi.SignalAllocatableNodeFsAvailable] = v1.NodeDiskPressure // map signals to resources (and vice-versa) signalToResource = map[evictionapi.Signal]v1.ResourceName{} signalToResource[evictionapi.SignalMemoryAvailable] = v1.ResourceMemory signalToResource[evictionapi.SignalAllocatableMemoryAvailable] = v1.ResourceMemory + signalToResource[evictionapi.SignalAllocatableNodeFsAvailable] = resourceNodeFs signalToResource[evictionapi.SignalImageFsAvailable] = resourceImageFs signalToResource[evictionapi.SignalImageFsInodesFree] = resourceImageFsInodes signalToResource[evictionapi.SignalNodeFsAvailable] = resourceNodeFs signalToResource[evictionapi.SignalNodeFsInodesFree] = resourceNodeFsInodes + resourceToSignal = map[v1.ResourceName]evictionapi.Signal{} for key, value := range signalToResource { resourceToSignal[value] = key } + // Hard-code here to make sure resourceNodeFs maps to evictionapi.SignalNodeFsAvailable + // (TODO) resourceToSignal is a map from resource name to a list of signals + resourceToSignal[resourceNodeFs] = evictionapi.SignalNodeFsAvailable } // validSignal returns true if the signal is supported. @@ -234,6 +242,16 @@ func getAllocatableThreshold(allocatableConfig []string) []evictionapi.Threshold Quantity: resource.NewQuantity(int64(0), resource.BinarySI), }, }, + { + Signal: evictionapi.SignalAllocatableNodeFsAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: resource.NewQuantity(int64(0), resource.BinarySI), + }, + MinReclaim: &evictionapi.ThresholdValue{ + Quantity: resource.NewQuantity(int64(0), resource.BinarySI), + }, + }, } } } @@ -382,10 +400,12 @@ func localVolumeNames(pod *v1.Pod) []string { func podDiskUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsStatsType) (v1.ResourceList, error) { disk := resource.Quantity{Format: resource.BinarySI} inodes := resource.Quantity{Format: resource.BinarySI} + overlay := resource.Quantity{Format: resource.BinarySI} for _, container := range podStats.Containers { if hasFsStatsType(statsToMeasure, fsStatsRoot) { disk.Add(*diskUsage(container.Rootfs)) inodes.Add(*inodeUsage(container.Rootfs)) + overlay.Add(*diskUsage(container.Rootfs)) } if hasFsStatsType(statsToMeasure, fsStatsLogs) { disk.Add(*diskUsage(container.Logs)) @@ -405,8 +425,9 @@ func podDiskUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsSt } } return v1.ResourceList{ - resourceDisk: disk, - resourceInodes: inodes, + resourceDisk: disk, + resourceInodes: inodes, + resourceOverlay: overlay, }, nil } @@ -637,7 +658,7 @@ func (a byEvictionPriority) Less(i, j int) bool { } // makeSignalObservations derives observations using the specified summary provider. -func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider) (signalObservations, statsFunc, error) { +func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) { summary, err := summaryProvider.Get() if err != nil { return nil, nil, err @@ -706,6 +727,37 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider capacity: memoryAllocatableCapacity.Copy(), } } + + if storageScratchAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceStorage]; ok { + storageScratchAllocatable := storageScratchAllocatableCapacity.Copy() + for _, pod := range pods { + podStat, ok := statsFunc(pod) + if !ok { + continue + } + + usage, err := podDiskUsage(podStat, pod, []fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource, fsStatsRoot}) + if err != nil { + glog.Warningf("eviction manager: error getting pod disk usage %v", err) + continue + } + // If there is a seperate imagefs set up for container runtimes, the scratch disk usage from nodefs should exclude the overlay usage + if withImageFs { + diskUsage := usage[resourceDisk] + diskUsageP := &diskUsage + diskUsagep := diskUsageP.Copy() + diskUsagep.Sub(usage[resourceOverlay]) + storageScratchAllocatable.Sub(*diskUsagep) + } else { + storageScratchAllocatable.Sub(usage[resourceDisk]) + } + } + result[evictionapi.SignalAllocatableNodeFsAvailable] = signalObservation{ + available: storageScratchAllocatable, + capacity: storageScratchAllocatableCapacity.Copy(), + } + } + return result, statsFunc, nil } diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index 5772a41ab16..fda1d010e1e 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -76,6 +76,16 @@ func TestParseThresholdConfig(t *testing.T) { Quantity: quantityMustParse("0"), }, }, + { + Signal: evictionapi.SignalAllocatableNodeFsAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("0"), + }, + MinReclaim: &evictionapi.ThresholdValue{ + Quantity: quantityMustParse("0"), + }, + }, { Signal: evictionapi.SignalMemoryAvailable, Operator: evictionapi.OpLessThan, @@ -777,8 +787,7 @@ func TestMakeSignalObservations(t *testing.T) { if res.CmpInt64(int64(allocatableMemoryCapacity)) != 0 { t.Errorf("Expected Threshold %v to be equal to value %v", res.Value(), allocatableMemoryCapacity) } - actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider) - + actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider, pods, false) if err != nil { t.Errorf("Unexpected err: %v", err) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 12143e46d4b..c3a84681185 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -37,6 +37,7 @@ import ( clientgoclientset "k8s.io/client-go/kubernetes" cadvisorapi "github.com/google/cadvisor/info/v1" + cadvisorapiv2 "github.com/google/cadvisor/info/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -927,6 +928,9 @@ type Kubelet struct { // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorapi.MachineInfo + //Cached RootFsInfo returned by cadvisor + rootfsInfo *cadvisorapiv2.FsInfo + // Handles certificate rotations. serverCertificateManager certificate.Manager diff --git a/pkg/kubelet/kubelet_cadvisor.go b/pkg/kubelet/kubelet_cadvisor.go index 04d928129c2..dcb3de9728e 100644 --- a/pkg/kubelet/kubelet_cadvisor.go +++ b/pkg/kubelet/kubelet_cadvisor.go @@ -100,3 +100,15 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) { } return kl.machineInfo, nil } + +// GetCachedRootFsInfo assumes that the rootfs info can't change without a reboot +func (kl *Kubelet) GetCachedRootFsInfo() (cadvisorapiv2.FsInfo, error) { + if kl.rootfsInfo == nil { + info, err := kl.cadvisor.RootFsInfo() + if err != nil { + return cadvisorapiv2.FsInfo{}, err + } + kl.rootfsInfo = &info + } + return *kl.rootfsInfo, nil +} diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index c9504c899a2..eb871220931 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -553,6 +553,26 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { node.Status.NodeInfo.BootID = info.BootID } + rootfs, err := kl.GetCachedRootFsInfo() + if err != nil { + node.Status.Capacity[v1.ResourceStorage] = resource.MustParse("0Gi") + } else { + for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) { + node.Status.Capacity[rName] = rCap + } + } + + if hasDedicatedImageFs, _ := kl.HasDedicatedImageFs(); hasDedicatedImageFs { + imagesfs, err := kl.ImagesFsInfo() + if err != nil { + node.Status.Capacity[v1.ResourceStorageOverlay] = resource.MustParse("0Gi") + } else { + for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) { + node.Status.Capacity[rName] = rCap + } + } + } + // Set Allocatable. if node.Status.Allocatable == nil { node.Status.Allocatable = make(v1.ResourceList) diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 96161b5de59..74382a57f7f 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -208,14 +208,16 @@ func TestUpdateNewNodeStatus(t *testing.T) { KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, @@ -361,14 +363,16 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, }, Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, }, } @@ -444,14 +448,16 @@ func TestUpdateExistingNodeStatus(t *testing.T) { KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, @@ -655,8 +661,9 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(200*mb, resource.BinarySI), }, } @@ -727,14 +734,16 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(300*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, @@ -1141,14 +1150,16 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, }, } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index ca0af726517..f3c6116b6d7 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -510,6 +510,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { result.MilliCPU += rQuantity.MilliValue() case v1.ResourceNvidiaGPU: result.NvidiaGPU += rQuantity.Value() + case v1.ResourceStorageOverlay: + result.StorageOverlay += rQuantity.Value() default: if v1helper.IsOpaqueIntResourceName(rName) { result.AddOpaque(rName, rQuantity.Value()) @@ -517,6 +519,15 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { } } } + // Account for storage requested by emptydir volumes + // If the storage medium is memory, should exclude the size + for _, vol := range pod.Spec.Volumes { + if vol.EmptyDir != nil && vol.EmptyDir.Medium != v1.StorageMediumMemory { + + result.StorageScratch += vol.EmptyDir.SizeLimit.Value() + } + } + // take max_resource(sum_pod, any_init_container) for _, container := range pod.Spec.InitContainers { for rName, rQuantity := range container.Resources.Requests { @@ -533,6 +544,10 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { if gpu := rQuantity.Value(); gpu > result.NvidiaGPU { result.NvidiaGPU = gpu } + case v1.ResourceStorageOverlay: + if overlay := rQuantity.Value(); overlay > result.StorageOverlay { + result.StorageOverlay = overlay + } default: if v1helper.IsOpaqueIntResourceName(rName) { value := rQuantity.Value() @@ -583,6 +598,23 @@ func PodFitsResources(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU)) } + + scratchSpaceRequest := podRequest.StorageScratch + if allocatable.StorageOverlay == 0 { + scratchSpaceRequest += podRequest.StorageOverlay + //scratchSpaceRequest += nodeInfo.RequestedResource().StorageOverlay + nodeScratchRequest := nodeInfo.RequestedResource().StorageOverlay + nodeInfo.RequestedResource().StorageScratch + if allocatable.StorageScratch < scratchSpaceRequest+nodeScratchRequest { + predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageScratch, scratchSpaceRequest, nodeScratchRequest, allocatable.StorageScratch)) + } + + } else if allocatable.StorageScratch < scratchSpaceRequest+nodeInfo.RequestedResource().StorageScratch { + predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageScratch, scratchSpaceRequest, nodeInfo.RequestedResource().StorageScratch, allocatable.StorageScratch)) + } + if allocatable.StorageOverlay > 0 && allocatable.StorageOverlay < podRequest.StorageOverlay+nodeInfo.RequestedResource().StorageOverlay { + predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageOverlay, podRequest.StorageOverlay, nodeInfo.RequestedResource().StorageOverlay, allocatable.StorageOverlay)) + } + for rName, rQuant := range podRequest.OpaqueIntResources { if allocatable.OpaqueIntResources[rName] < rQuant+nodeInfo.RequestedResource().OpaqueIntResources[rName] { predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.OpaqueIntResources[rName], nodeInfo.RequestedResource().OpaqueIntResources[rName], allocatable.OpaqueIntResources[rName])) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index ff61f2d9c9b..c8495052fb3 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -78,7 +78,7 @@ var ( opaqueResourceB = v1helper.OpaqueIntResourceName("BBB") ) -func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.NodeResources { +func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.NodeResources { return v1.NodeResources{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), @@ -86,17 +86,19 @@ func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.NodeRes v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI), }, } } -func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.ResourceList { +func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.ResourceList { return v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI), } } @@ -114,6 +116,25 @@ func newResourcePod(usage ...schedulercache.Resource) *v1.Pod { } } +func addStorageLimit(pod *v1.Pod, sizeLimit int64, medium v1.StorageMedium) *v1.Pod { + return &v1.Pod{ + Spec: v1.PodSpec{ + Containers: pod.Spec.Containers, + Volumes: []v1.Volume{ + { + Name: "emptyDirVolumeName", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: *resource.NewQuantity(sizeLimit, resource.BinarySI), + Medium: medium, + }, + }, + }, + }, + }, + } +} + func newResourceInitPod(pod *v1.Pod, usage ...schedulercache.Resource) *v1.Pod { pod.Spec.InitContainers = newResourcePod(usage...).Spec.Containers return pod @@ -331,7 +352,7 @@ func TestPodFitsResources(t *testing.T) { } for _, test := range enoughPodsTests { - node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5)}} + node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20)}} test.nodeInfo.SetNode(&node) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { @@ -386,7 +407,7 @@ func TestPodFitsResources(t *testing.T) { }, } for _, test := range notEnoughPodsTests { - node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0)}} + node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0, 0)}} test.nodeInfo.SetNode(&node) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { @@ -399,6 +420,86 @@ func TestPodFitsResources(t *testing.T) { t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) } } + + storagePodsTests := []struct { + pod *v1.Pod + emptyDirLimit int64 + storageMedium v1.StorageMedium + nodeInfo *schedulercache.NodeInfo + fits bool + test string + reasons []algorithm.PredicateFailureReason + }{ + { + pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 1}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 10, Memory: 10, StorageOverlay: 20})), + fits: false, + test: "due to init container scratch disk", + reasons: []algorithm.PredicateFailureReason{ + NewInsufficientResourceError(v1.ResourceCPU, 1, 10, 10), + NewInsufficientResourceError(v1.ResourceStorageScratch, 1, 20, 20), + }, + }, + { + pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 10})), + fits: true, + test: "pod fit", + }, + { + pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 18}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})), + fits: false, + test: "request exceeds allocatable", + reasons: []algorithm.PredicateFailureReason{ + NewInsufficientResourceError(v1.ResourceStorageScratch, 18, 5, 20), + }, + }, + { + pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}), + emptyDirLimit: 15, + storageMedium: v1.StorageMediumDefault, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})), + fits: false, + test: "storage scratchrequest exceeds allocatable", + reasons: []algorithm.PredicateFailureReason{ + NewInsufficientResourceError(v1.ResourceStorageScratch, 25, 5, 20), + }, + }, + { + pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}), + emptyDirLimit: 15, + storageMedium: v1.StorageMediumMemory, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})), + fits: true, + test: "storage scratchrequest exceeds allocatable", + reasons: []algorithm.PredicateFailureReason{ + NewInsufficientResourceError(v1.ResourceStorageScratch, 25, 5, 20), + }, + }, + } + + for _, test := range storagePodsTests { + node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20)}} + test.nodeInfo.SetNode(&node) + pod := addStorageLimit(test.pod, test.emptyDirLimit, test.storageMedium) + fits, reasons, err := PodFitsResources(pod, PredicateMetadata(pod, nil), test.nodeInfo) + if err != nil { + t.Errorf("%s: unexpected error: %v", test.test, err) + } + if !fits && !reflect.DeepEqual(reasons, test.reasons) { + t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, test.reasons) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } + } func TestPodFitsHost(t *testing.T) { @@ -1845,7 +1946,7 @@ func TestRunGeneralPredicates(t *testing.T) { newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})), node: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, + Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)}, }, fits: true, wErr: nil, @@ -1857,7 +1958,7 @@ func TestRunGeneralPredicates(t *testing.T) { newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})), node: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, + Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)}, }, fits: false, wErr: nil, @@ -1871,7 +1972,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: &v1.Pod{}, nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})), - node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, + node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}}, fits: true, wErr: nil, test: "no resources/port/host requested always fits on GPU machine", @@ -1880,7 +1981,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}), nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 1})), - node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, + node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}}, fits: false, wErr: nil, reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(v1.ResourceNvidiaGPU, 1, 1, 1)}, @@ -1890,7 +1991,7 @@ func TestRunGeneralPredicates(t *testing.T) { pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}), nodeInfo: schedulercache.NewNodeInfo( newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 0})), - node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}}, + node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}}, fits: true, wErr: nil, test: "enough GPU resource", @@ -1904,7 +2005,7 @@ func TestRunGeneralPredicates(t *testing.T) { nodeInfo: schedulercache.NewNodeInfo(), node: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, + Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)}, }, fits: false, wErr: nil, @@ -1916,7 +2017,7 @@ func TestRunGeneralPredicates(t *testing.T) { nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)), node: &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)}, + Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)}, }, fits: false, wErr: nil, @@ -3251,7 +3352,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) { ImagePullPolicy: "Always", // at least one requirement -> burstable pod Resources: v1.ResourceRequirements{ - Requests: makeAllocatableResources(100, 100, 100, 100, 0), + Requests: makeAllocatableResources(100, 100, 100, 100, 0, 0), }, }, }, diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index f4262aaca17..4717b1d8e8c 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -69,14 +69,17 @@ type Resource struct { MilliCPU int64 Memory int64 NvidiaGPU int64 + StorageScratch int64 + StorageOverlay int64 OpaqueIntResources map[v1.ResourceName]int64 } func (r *Resource) ResourceList() v1.ResourceList { result := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI), - v1.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI), + v1.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI), + v1.ResourceStorageOverlay: *resource.NewQuantity(r.StorageOverlay, resource.BinarySI), } for rName, rQuant := range r.OpaqueIntResources { result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI) @@ -86,9 +89,11 @@ func (r *Resource) ResourceList() v1.ResourceList { func (r *Resource) Clone() *Resource { res := &Resource{ - MilliCPU: r.MilliCPU, - Memory: r.Memory, - NvidiaGPU: r.NvidiaGPU, + MilliCPU: r.MilliCPU, + Memory: r.Memory, + NvidiaGPU: r.NvidiaGPU, + StorageOverlay: r.StorageOverlay, + StorageScratch: r.StorageScratch, } if r.OpaqueIntResources != nil { res.OpaqueIntResources = make(map[v1.ResourceName]int64) @@ -262,6 +267,8 @@ func (n *NodeInfo) addPod(pod *v1.Pod) { n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory n.requestedResource.NvidiaGPU += res.NvidiaGPU + n.requestedResource.StorageOverlay += res.StorageOverlay + n.requestedResource.StorageScratch += res.StorageScratch if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 { n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{} } @@ -347,6 +354,8 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6 res.Memory += rQuant.Value() case v1.ResourceNvidiaGPU: res.NvidiaGPU += rQuant.Value() + case v1.ResourceStorageOverlay: + res.StorageOverlay += rQuant.Value() default: if v1helper.IsOpaqueIntResourceName(rName) { res.AddOpaque(rName, rQuant.Value()) @@ -359,6 +368,15 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6 non0_mem += non0_mem_req // No non-zero resources for GPUs or opaque resources. } + + // Account for storage requested by emptydir volumes + // If the storage medium is memory, should exclude the size + for _, vol := range pod.Spec.Volumes { + if vol.EmptyDir != nil && vol.EmptyDir.Medium != v1.StorageMediumMemory { + res.StorageScratch += vol.EmptyDir.SizeLimit.Value() + } + } + return } @@ -389,6 +407,10 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { n.allocatableResource.NvidiaGPU = rQuant.Value() case v1.ResourcePods: n.allowedPodNumber = int(rQuant.Value()) + case v1.ResourceStorage: + n.allocatableResource.StorageScratch = rQuant.Value() + case v1.ResourceStorageOverlay: + n.allocatableResource.StorageOverlay = rQuant.Value() default: if v1helper.IsOpaqueIntResourceName(rName) { n.allocatableResource.SetOpaque(rName, rQuant.Value()) diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index 100580d54fe..b8956ab1381 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -76,6 +76,7 @@ go_test( "inode_eviction_test.go", "kubelet_test.go", "lifecycle_hook_test.go", + "local_storage_allocatable_eviction_test.go", "log_path_test.go", "memory_eviction_test.go", "mirror_pod_test.go", diff --git a/test/e2e_node/local_storage_allocatable_eviction_test.go b/test/e2e_node/local_storage_allocatable_eviction_test.go new file mode 100644 index 00000000000..dc5a952da53 --- /dev/null +++ b/test/e2e_node/local_storage_allocatable_eviction_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api/v1" + nodeutil "k8s.io/kubernetes/pkg/api/v1/node" + "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// Eviction Policy is described here: +// https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/kubelet-eviction.md + +var _ = framework.KubeDescribe("LocalStorageAllocatableEviction [Slow] [Serial] [Disruptive] [Flaky]", func() { + f := framework.NewDefaultFramework("localstorageallocatable-eviction-test") + evictionTestTimeout := 15 * time.Minute + testCondition := "Evict pod due to local storage allocatable violation" + conditionType := v1.NodeDiskPressure + var podTestSpecs []podTestSpec + //podTestSpecsS := make([]podTestSpec, 5) + var diskReserve uint64 + Context(fmt.Sprintf("when we run containers that should cause %s", testCondition), func() { + + BeforeEach(func() { + diskAvail, err := getDiskUsage() + if err != nil { + framework.ExpectNoError(err) + } + + diskReserve = uint64(0.8 * diskAvail / 1000000) // Reserve 0.8 * disk Capacity for kube-reserved scratch storage + maxDisk := 10000000 // Set dd command to read and write up to 10MB at a time + count := uint64(0.8 * diskAvail / float64(maxDisk)) + command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; sleep 0.5; while true; do sleep 5; done", maxDisk, count) + + podTestSpecs = []podTestSpec{ + { + evictionPriority: 1, // This pod should be evicted before the innocent pod + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "container-disk-hog-pod"}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Image: "gcr.io/google_containers/busybox:1.24", + Name: "container-disk-hog-pod", + Command: []string{"sh", "-c", command}, + }, + }, + }, + }, + }, + + { + evictionPriority: 0, // This pod should never be evicted + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "idle-pod"}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Image: "gcr.io/google_containers/busybox:1.24", + Name: "idle-pod", + Command: []string{"sh", "-c", + fmt.Sprintf("while true; do sleep 5; done")}, + }, + }, + }, + }, + }, + } + }) + + // Set up --kube-reserved for scratch storage + tempSetCurrentKubeletConfig(f, func(initialConfig *componentconfig.KubeletConfiguration) { + framework.Logf("Set up --kube-reserved for local storage reserved %dMi", diskReserve) + initialConfig.KubeReserved = componentconfig.ConfigurationMap(map[string]string{"storage": fmt.Sprintf("%dMi", diskReserve)}) + + }) + + // Place the remainder of the test within a context so that the kubelet config is set before and after the test. + Context("With kubeconfig updated", func() { + runLocalStorageEvictionTest(f, conditionType, testCondition, &podTestSpecs, evictionTestTimeout, hasDiskPressure) + }) + + }) + +}) + +// Returns TRUE if the node has disk pressure, FALSE otherwise +func hasDiskPressure(f *framework.Framework, conditionType v1.NodeConditionType, testCondition string) (bool, error) { + localNodeStatus := getLocalNode(f).Status + _, pressure := nodeutil.GetNodeCondition(&localNodeStatus, conditionType) + Expect(pressure).NotTo(BeNil()) + hasPressure := pressure.Status == v1.ConditionTrue + By(fmt.Sprintf("checking if pod has %s: %v", testCondition, hasPressure)) + + // Additional Logging relating to disk + summary, err := getNodeSummary() + if err != nil { + return false, err + } + if summary.Node.Runtime != nil && summary.Node.Runtime.ImageFs != nil && summary.Node.Runtime.ImageFs.UsedBytes != nil { + framework.Logf("imageFsInfo.UsedBytes: %d", *summary.Node.Runtime.ImageFs.UsedBytes) + } + if summary.Node.Fs != nil && summary.Node.Fs.UsedBytes != nil { + framework.Logf("rootFsInfo.UsedBytes: %d", *summary.Node.Fs.UsedBytes) + } + for _, pod := range summary.Pods { + framework.Logf("Pod: %s", pod.PodRef.Name) + for _, container := range pod.Containers { + if container.Rootfs != nil && container.Rootfs.UsedBytes != nil { + framework.Logf("--- summary Container: %s UsedBytes: %d", container.Name, *container.Rootfs.UsedBytes) + } + } + for _, volume := range pod.VolumeStats { + if volume.FsStats.UsedBytes != nil { + framework.Logf("--- summary Volume: %s UsedBytes: %d", volume.Name, *volume.FsStats.UsedBytes) + } + } + } + return hasPressure, nil +} + +// Pass podTestSpecsP as references so that it could be set up in the first BeforeEach clause +func runLocalStorageEvictionTest(f *framework.Framework, conditionType v1.NodeConditionType, testCondition string, podTestSpecsP *[]podTestSpec, evictionTestTimeout time.Duration, + hasPressureCondition func(*framework.Framework, v1.NodeConditionType, string) (bool, error)) { + BeforeEach(func() { + + By("seting up pods to be used by tests") + for _, spec := range *podTestSpecsP { + By(fmt.Sprintf("creating pod with container: %s", spec.pod.Name)) + f.PodClient().CreateSync(&spec.pod) + } + }) + + It(fmt.Sprintf("should eventually see %s, and then evict all of the correct pods", testCondition), func() { + Expect(podTestSpecsP).NotTo(BeNil()) + podTestSpecs := *podTestSpecsP + + Eventually(func() error { + hasPressure, err := hasPressureCondition(f, conditionType, testCondition) + if err != nil { + return err + } + if hasPressure { + return nil + } + return fmt.Errorf("Condition: %s not encountered", testCondition) + }, evictionTestTimeout, evictionPollInterval).Should(BeNil()) + + Eventually(func() error { + // Gather current information + updatedPodList, err := f.ClientSet.Core().Pods(f.Namespace.Name).List(metav1.ListOptions{}) + updatedPods := updatedPodList.Items + for _, p := range updatedPods { + framework.Logf("fetching pod %s; phase= %v", p.Name, p.Status.Phase) + } + _, err = hasPressureCondition(f, conditionType, testCondition) + if err != nil { + return err + } + + By("checking eviction ordering and ensuring important pods dont fail") + done := true + for _, priorityPodSpec := range podTestSpecs { + var priorityPod v1.Pod + for _, p := range updatedPods { + if p.Name == priorityPodSpec.pod.Name { + priorityPod = p + } + } + Expect(priorityPod).NotTo(BeNil()) + + // Check eviction ordering. + // Note: it is alright for a priority 1 and priority 2 pod (for example) to fail in the same round + for _, lowPriorityPodSpec := range podTestSpecs { + var lowPriorityPod v1.Pod + for _, p := range updatedPods { + if p.Name == lowPriorityPodSpec.pod.Name { + lowPriorityPod = p + } + } + Expect(lowPriorityPod).NotTo(BeNil()) + if priorityPodSpec.evictionPriority < lowPriorityPodSpec.evictionPriority && lowPriorityPod.Status.Phase == v1.PodRunning { + Expect(priorityPod.Status.Phase).NotTo(Equal(v1.PodFailed), + fmt.Sprintf("%s pod failed before %s pod", priorityPodSpec.pod.Name, lowPriorityPodSpec.pod.Name)) + } + } + + // EvictionPriority 0 pods should not fail + if priorityPodSpec.evictionPriority == 0 { + Expect(priorityPod.Status.Phase).NotTo(Equal(v1.PodFailed), + fmt.Sprintf("%s pod failed (and shouldn't have failed)", priorityPod.Name)) + } + + // If a pod that is not evictionPriority 0 has not been evicted, we are not done + if priorityPodSpec.evictionPriority != 0 && priorityPod.Status.Phase != v1.PodFailed { + done = false + } + } + if done { + return nil + } + return fmt.Errorf("pods that caused %s have not been evicted.", testCondition) + }, evictionTestTimeout, evictionPollInterval).Should(BeNil()) + + // We observe pressure from the API server. The eviction manager observes pressure from the kubelet internal stats. + // This means the eviction manager will observe pressure before we will, creating a delay between when the eviction manager + // evicts a pod, and when we observe the pressure by querrying the API server. Add a delay here to account for this delay + By("making sure pressure from test has surfaced before continuing") + time.Sleep(pressureDelay) + + By("making sure conditions eventually return to normal") + Eventually(func() error { + hasPressure, err := hasPressureCondition(f, conditionType, testCondition) + if err != nil { + return err + } + if hasPressure { + return fmt.Errorf("Conditions havent returned to normal, we still have %s", testCondition) + } + return nil + }, evictionTestTimeout, evictionPollInterval).Should(BeNil()) + + By("making sure conditions do not return, and that pods that shouldnt fail dont fail") + Consistently(func() error { + hasPressure, err := hasPressureCondition(f, conditionType, testCondition) + if err != nil { + // Race conditions sometimes occur when checking pressure condition due to #38710 (Docker bug) + // Do not fail the test when this occurs, since this is expected to happen occasionally. + framework.Logf("Failed to check pressure condition. Error: %v", err) + return nil + } + if hasPressure { + return fmt.Errorf("%s dissappeared and then reappeared", testCondition) + } + // Gather current information + updatedPodList, _ := f.ClientSet.Core().Pods(f.Namespace.Name).List(metav1.ListOptions{}) + for _, priorityPodSpec := range podTestSpecs { + // EvictionPriority 0 pods should not fail + if priorityPodSpec.evictionPriority == 0 { + for _, p := range updatedPodList.Items { + if p.Name == priorityPodSpec.pod.Name && p.Status.Phase == v1.PodFailed { + return fmt.Errorf("%s pod failed (delayed) and shouldn't have failed", p.Name) + } + } + } + } + return nil + }, postTestConditionMonitoringPeriod, evictionPollInterval).Should(BeNil()) + + By("making sure we can start a new pod after the test") + podName := "test-admit-pod" + f.PodClient().CreateSync(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Image: framework.GetPauseImageNameForHostArch(), + Name: podName, + }, + }, + }, + }) + }) + + AfterEach(func() { + By("deleting pods") + for _, spec := range *podTestSpecsP { + By(fmt.Sprintf("deleting pod: %s", spec.pod.Name)) + f.PodClient().DeleteSync(spec.pod.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + } + + if CurrentGinkgoTestDescription().Failed { + if framework.TestContext.DumpLogsOnFailure { + logPodEvents(f) + logNodeEvents(f) + } + By("sleeping to allow for cleanup of test") + time.Sleep(postTestConditionMonitoringPeriod) + } + }) +} + +func getDiskUsage() (float64, error) { + summary, err := getNodeSummary() + if err != nil { + return 0, err + } + + if nodeFs := summary.Node.Fs; nodeFs != nil { + return float64(*nodeFs.AvailableBytes), nil + } + + return 0, fmt.Errorf("fail to get nodefs available bytes") + +}