Merge pull request #60525 from ravisantoshgudimetla/scheduler-pvc

Automatic merge from submit-queue (batch tested with PRs 54997, 61869, 61816, 61909, 60525). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Balanced resource allocation priority to include volume count on nodes.

Scheduler balanced resource allocation priority to include volume count on nodes.

/cc @aveshagarwal @abhgupta



**What this PR does / why we need it**:

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #58232


**Release note**:

```release-note
Balanced resource allocation priority in scheduler to include volume count on node 
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-30 20:13:15 -07:00 committed by GitHub
commit 9847c8ee0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 317 additions and 12 deletions

View File

@ -274,6 +274,14 @@ const (
// Allow mounting a subpath of a volume in a container
// Do not remove this feature gate even though it's GA
VolumeSubpath utilfeature.Feature = "VolumeSubpath"
// owner: @ravig
// alpha: v1.11
//
// Include volume count on node to be considered for balanced resource allocation while scheduling.
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler
// while making decisions.
BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes"
)
func init() {
@ -321,6 +329,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta},
RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha},
VolumeSubpath: {Default: true, PreRelease: utilfeature.GA},
BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -23,6 +23,7 @@ import (
"strconv"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -44,8 +45,6 @@ import (
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"github.com/golang/glog"
)
const (
@ -451,7 +450,12 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta algorithm.Predicat
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
}
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
nodeInfo.TransientInfo.TransientLock.Lock()
defer nodeInfo.TransientInfo.TransientLock.Unlock()
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = c.maxVolumes - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
}
return true, nil, nil
}

View File

@ -28,6 +28,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
@ -40,6 +41,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
@ -62,6 +64,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
@ -72,6 +75,7 @@ go_test(
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -19,6 +19,8 @@ package priorities
import (
"math"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
@ -36,17 +38,31 @@ var (
BalancedResourceAllocationMap = balancedResourcePriority.PriorityMap
)
func balancedResourceScorer(requested, allocable *schedulercache.Resource) int64 {
func balancedResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
cpuFraction := fractionOfCapacity(requested.MilliCPU, allocable.MilliCPU)
memoryFraction := fractionOfCapacity(requested.Memory, allocable.Memory)
// This to find a node which has most balanced CPU, memory and volume usage.
if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
if cpuFraction >= 1 || memoryFraction >= 1 || volumeFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferred.
return 0
}
// Compute variance for all the three fractions.
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
// score to be higher for node which has least variance and multiplying it with 10 provides the scaling
// factor needed.
return int64((1 - variance) * float64(schedulerapi.MaxPriority))
}
if cpuFraction >= 1 || memoryFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferred.
return 0
}
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
// respectively. Multilying the absolute value of the difference by 10 scales the value to
// respectively. Multiplying the absolute value of the difference by 10 scales the value to
// 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from
// 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced.
diff := math.Abs(cpuFraction - memoryFraction)

View File

@ -17,17 +17,118 @@ limitations under the License.
package priorities
import (
"fmt"
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
// getExistingVolumeCountForNode gets the current number of volumes on node.
func getExistingVolumeCountForNode(pods []*v1.Pod, maxVolumes int) int {
volumeCount := 0
for _, pod := range pods {
volumeCount += len(pod.Spec.Volumes)
}
if maxVolumes-volumeCount > 0 {
return maxVolumes - volumeCount
}
return 0
}
func TestBalancedResourceAllocation(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
podwithVol1 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"},
},
},
},
NodeName: "machine4",
}
podwithVol2 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
},
},
},
NodeName: "machine4",
}
podwithVol3 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
},
},
},
NodeName: "machine4",
}
labels1 := map[string]string{
"foo": "bar",
"baz": "blah",
@ -89,6 +190,27 @@ func TestBalancedResourceAllocation(t *testing.T) {
},
},
}
cpuAndMemory3 := v1.PodSpec{
NodeName: "machine3",
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
@ -249,10 +371,43 @@ func TestBalancedResourceAllocation(t *testing.T) {
{Spec: cpuAndMemory},
},
},
{
/*
Machine4 will be chosen here because it already has a existing volume making the variance
of volume count, CPU usage, memory usage closer.
*/
pod: &v1.Pod{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp2"},
},
},
},
},
},
nodes: []*v1.Node{makeNode("machine3", 3500, 40000), makeNode("machine4", 4000, 10000)},
expectedList: []schedulerapi.HostPriority{{Host: "machine3", Score: 8}, {Host: "machine4", Score: 9}},
test: "Include volume count on a node for balanced resource allocation",
pods: []*v1.Pod{
{Spec: cpuAndMemory3},
{Spec: podwithVol1},
{Spec: podwithVol2},
{Spec: podwithVol3},
},
},
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
if len(test.pod.Spec.Volumes) > 0 {
maxVolumes := 5
for _, info := range nodeNameToInfo {
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
}
}
list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -33,7 +33,7 @@ var (
LeastRequestedPriorityMap = leastResourcePriority.PriorityMap
)
func leastResourceScorer(requested, allocable *schedulercache.Resource) int64 {
func leastResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
return (leastRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
leastRequestedScore(requested.Memory, allocable.Memory)) / 2
}

View File

@ -31,7 +31,7 @@ var (
MostRequestedPriorityMap = mostResourcePriority.PriorityMap
)
func mostResourceScorer(requested, allocable *schedulercache.Resource) int64 {
func mostResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
return (mostRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
mostRequestedScore(requested.Memory, allocable.Memory)) / 2
}

View File

@ -29,7 +29,7 @@ import (
// ResourceAllocationPriority contains information to calculate resource allocation priority.
type ResourceAllocationPriority struct {
Name string
scorer func(requested, allocable *schedulercache.Resource) int64
scorer func(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
}
// PriorityMap priorities nodes according to the resource allocations on the node.
@ -54,8 +54,13 @@ func (r *ResourceAllocationPriority) PriorityMap(
requested.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
requested.Memory += nodeInfo.NonZeroRequest().Memory
score := r.scorer(&requested, &allocatable)
var score int64
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
if len(pod.Spec.Volumes) >= 0 && nodeInfo.TransientInfo != nil {
score = r.scorer(&requested, &allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(&requested, &allocatable, false, 0, 0)
}
if glog.V(10) {
glog.Infof(

View File

@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
@ -20,6 +21,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)
@ -31,6 +33,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -40,6 +43,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -24,6 +24,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"github.com/golang/glog"
policy "k8s.io/api/policy/v1beta1"
@ -112,6 +114,10 @@ func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here.
info.TransientInfo.resetTransientSchedulerInfo()
}
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
nodeNameToInfo[name] = info.Clone()
}

View File

@ -30,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -73,6 +75,8 @@ func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo {
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
// on node level.
func TestAssumePodScheduled(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
@ -99,6 +103,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -114,6 +119,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 300,
Memory: 1524,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1], testPods[2]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
@ -129,6 +135,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: priorityutil.DefaultMilliCPURequest,
Memory: priorityutil.DefaultMemoryRequest,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[3]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -145,6 +152,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -161,6 +169,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 300,
Memory: 1524,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4], testPods[5]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
@ -176,6 +185,7 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[6]},
usedPorts: newHostPortInfoBuilder().build(),
@ -219,6 +229,8 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time
// TestExpirePod tests that assumed pods will be removed if expired.
// The removal will be reflected in node info.
func TestExpirePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
@ -252,6 +264,7 @@ func TestExpirePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
@ -276,6 +289,8 @@ func TestExpirePod(t *testing.T) {
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
// The pod info should still exist after manually expiring unconfirmed pods.
func TestAddPodWillConfirm(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
now := time.Now()
ttl := 10 * time.Second
@ -301,6 +316,7 @@ func TestAddPodWillConfirm(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -396,6 +412,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
MilliCPU: 200,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{updatedPod.DeepCopy()},
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
@ -430,6 +447,8 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
func TestAddPodAfterExpiration(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
ttl := 10 * time.Second
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
@ -448,6 +467,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -477,6 +497,8 @@ func TestAddPodAfterExpiration(t *testing.T) {
// TestUpdatePod tests that a pod will be updated if added before.
func TestUpdatePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
ttl := 10 * time.Second
testPods := []*v1.Pod{
@ -501,6 +523,7 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
@ -513,6 +536,7 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -543,6 +567,8 @@ func TestUpdatePod(t *testing.T) {
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
func TestExpireAddUpdatePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
ttl := 10 * time.Second
testPods := []*v1.Pod{
@ -568,6 +594,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
@ -580,6 +607,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -638,6 +666,8 @@ func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
}
func TestEphemeralStorageResource(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
podE := makePodWithEphemeralStorage(nodeName, "500")
tests := []struct {
@ -654,6 +684,7 @@ func TestEphemeralStorageResource(t *testing.T) {
MilliCPU: priorityutil.DefaultMilliCPURequest,
Memory: priorityutil.DefaultMemoryRequest,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{podE},
usedPorts: schedutil.HostPortInfo{},
@ -681,6 +712,8 @@ func TestEphemeralStorageResource(t *testing.T) {
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
func TestRemovePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
nodeName := "node"
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct {
@ -697,6 +730,7 @@ func TestRemovePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
@ -1002,6 +1036,17 @@ func BenchmarkList1kNodes30kPods(b *testing.B) {
}
}
func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cachedNodes := map[string]*NodeInfo{}
cache.UpdateNodeNameToInfoMap(cachedNodes)
}
}
func BenchmarkExpire100Pods(b *testing.B) {
benchmarkExpire(b, 100)
}

View File

@ -19,6 +19,7 @@ package schedulercache
import (
"errors"
"fmt"
"sync"
"github.com/golang/glog"
@ -49,10 +50,15 @@ type NodeInfo struct {
// as int64, to avoid conversions and accessing map.
allocatableResource *Resource
// Cached tains of the node for faster lookup.
// Cached taints of the node for faster lookup.
taints []v1.Taint
taintsErr error
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle.
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
TransientInfo *transientSchedulerInfo
// Cached conditions of node for faster lookup.
memoryPressureCondition v1.ConditionStatus
diskPressureCondition v1.ConditionStatus
@ -62,6 +68,48 @@ type NodeInfo struct {
generation int64
}
//initializeNodeTransientInfo initializes transient information pertaining to node.
func initializeNodeTransientInfo() nodeTransientInfo {
return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0}
}
// nodeTransientInfo contains transient node information while scheduling.
type nodeTransientInfo struct {
// AllocatableVolumesCount contains number of volumes that could be attached to node.
AllocatableVolumesCount int
// Requested number of volumes on a particular node.
RequestedVolumes int
}
// transientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle.
// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and
// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization
// on node etc.
// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure
// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle.
type transientSchedulerInfo struct {
TransientLock sync.Mutex
// NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here.
TransNodeInfo nodeTransientInfo
}
// newTransientSchedulerInfo returns a new scheduler transient structure with initialized values.
func newTransientSchedulerInfo() *transientSchedulerInfo {
tsi := &transientSchedulerInfo{
TransNodeInfo: initializeNodeTransientInfo(),
}
return tsi
}
// resetTransientSchedulerInfo resets the transientSchedulerInfo.
func (transientSchedInfo *transientSchedulerInfo) resetTransientSchedulerInfo() {
transientSchedInfo.TransientLock.Lock()
defer transientSchedInfo.TransientLock.Unlock()
// Reset TransientNodeInfo.
transientSchedInfo.TransNodeInfo.AllocatableVolumesCount = 0
transientSchedInfo.TransNodeInfo.RequestedVolumes = 0
}
// Resource is a collection of compute resource.
type Resource struct {
MilliCPU int64
@ -167,6 +215,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allocatableResource: &Resource{},
TransientInfo: newTransientSchedulerInfo(),
generation: 0,
usedPorts: make(util.HostPortInfo),
}
@ -277,6 +326,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
nonzeroRequest: n.nonzeroRequest.Clone(),
allocatableResource: n.allocatableResource.Clone(),
taintsErr: n.taintsErr,
TransientInfo: n.TransientInfo,
memoryPressureCondition: n.memoryPressureCondition,
diskPressureCondition: n.diskPressureCondition,
usedPorts: make(util.HostPortInfo),
@ -443,6 +493,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
// We ignore other conditions.
}
}
n.TransientInfo = newTransientSchedulerInfo()
n.generation++
return nil
}

View File

@ -218,6 +218,7 @@ func TestNewNodeInfo(t *testing.T) {
AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil),
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 2,
usedPorts: util.HostPortInfo{
@ -300,6 +301,7 @@ func TestNodeInfoClone(t *testing.T) {
nodeInfo: &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 2,
usedPorts: util.HostPortInfo{
@ -368,6 +370,7 @@ func TestNodeInfoClone(t *testing.T) {
expected: &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 2,
usedPorts: util.HostPortInfo{
@ -526,6 +529,7 @@ func TestNodeInfoAddPod(t *testing.T) {
AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil),
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 2,
usedPorts: util.HostPortInfo{
@ -639,6 +643,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil),
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 2,
usedPorts: util.HostPortInfo{
@ -756,6 +761,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil),
},
TransientInfo: newTransientSchedulerInfo(),
allocatableResource: &Resource{},
generation: 3,
usedPorts: util.HostPortInfo{