diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index dd8f24c2895..a04eb70d15b 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -274,6 +274,14 @@ const ( // Allow mounting a subpath of a volume in a container // Do not remove this feature gate even though it's GA VolumeSubpath utilfeature.Feature = "VolumeSubpath" + + // owner: @ravig + // alpha: v1.11 + // + // Include volume count on node to be considered for balanced resource allocation while scheduling. + // A node which has closer cpu,memory utilization and volume count is favoured by scheduler + // while making decisions. + BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes" ) func init() { @@ -321,6 +329,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta}, RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha}, VolumeSubpath: {Default: true, PreRelease: utilfeature.GA}, + BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 23c31e53754..c03b6b412b3 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -23,6 +23,7 @@ import ( "strconv" "sync" + "github.com/golang/glog" "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -44,8 +45,6 @@ import ( schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" volumeutil "k8s.io/kubernetes/pkg/volume/util" - - "github.com/golang/glog" ) const ( @@ -451,7 +450,12 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta algorithm.Predicat // violates MaxEBSVolumeCount or MaxGCEPDVolumeCount return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil } - + if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) { + nodeInfo.TransientInfo.TransientLock.Lock() + defer nodeInfo.TransientInfo.TransientLock.Unlock() + nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = c.maxVolumes - numExistingVolumes + nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes + } return true, nil, nil } diff --git a/pkg/scheduler/algorithm/priorities/balanced_resource_allocation.go b/pkg/scheduler/algorithm/priorities/balanced_resource_allocation.go index 5f070caac13..3d655bdbbba 100644 --- a/pkg/scheduler/algorithm/priorities/balanced_resource_allocation.go +++ b/pkg/scheduler/algorithm/priorities/balanced_resource_allocation.go @@ -19,6 +19,8 @@ package priorities import ( "math" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) @@ -36,17 +38,31 @@ var ( BalancedResourceAllocationMap = balancedResourcePriority.PriorityMap ) -func balancedResourceScorer(requested, allocable *schedulercache.Resource) int64 { +func balancedResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { cpuFraction := fractionOfCapacity(requested.MilliCPU, allocable.MilliCPU) memoryFraction := fractionOfCapacity(requested.Memory, allocable.Memory) + // This to find a node which has most balanced CPU, memory and volume usage. + if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 { + volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes) + if cpuFraction >= 1 || memoryFraction >= 1 || volumeFraction >= 1 { + // if requested >= capacity, the corresponding host should never be preferred. + return 0 + } + // Compute variance for all the three fractions. + mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3) + variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)) + // Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the + // score to be higher for node which has least variance and multiplying it with 10 provides the scaling + // factor needed. + return int64((1 - variance) * float64(schedulerapi.MaxPriority)) + } if cpuFraction >= 1 || memoryFraction >= 1 { // if requested >= capacity, the corresponding host should never be preferred. return 0 } - // Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1 - // respectively. Multilying the absolute value of the difference by 10 scales the value to + // respectively. Multiplying the absolute value of the difference by 10 scales the value to // 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from // 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced. diff := math.Abs(cpuFraction - memoryFraction) diff --git a/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go b/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go index 9b109d109c3..0725a4d6609 100644 --- a/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go +++ b/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go @@ -17,17 +17,118 @@ limitations under the License. package priorities import ( + "fmt" "reflect" "testing" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) +// getExistingVolumeCountForNode gets the current number of volumes on node. +func getExistingVolumeCountForNode(pods []*v1.Pod, maxVolumes int) int { + volumeCount := 0 + for _, pod := range pods { + volumeCount += len(pod.Spec.Volumes) + } + if maxVolumes-volumeCount > 0 { + return maxVolumes - volumeCount + } + return 0 +} + func TestBalancedResourceAllocation(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) + podwithVol1 := v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"}, + }, + }, + }, + NodeName: "machine4", + } + podwithVol2 := v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"}, + }, + }, + }, + NodeName: "machine4", + } + podwithVol3 := v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"}, + }, + }, + }, + NodeName: "machine4", + } labels1 := map[string]string{ "foo": "bar", "baz": "blah", @@ -89,6 +190,27 @@ func TestBalancedResourceAllocation(t *testing.T) { }, }, } + cpuAndMemory3 := v1.PodSpec{ + NodeName: "machine3", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + } tests := []struct { pod *v1.Pod pods []*v1.Pod @@ -249,10 +371,43 @@ func TestBalancedResourceAllocation(t *testing.T) { {Spec: cpuAndMemory}, }, }, + { + /* + Machine4 will be chosen here because it already has a existing volume making the variance + of volume count, CPU usage, memory usage closer. + */ + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp2"}, + }, + }, + }, + }, + }, + nodes: []*v1.Node{makeNode("machine3", 3500, 40000), makeNode("machine4", 4000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine3", Score: 8}, {Host: "machine4", Score: 9}}, + test: "Include volume count on a node for balanced resource allocation", + pods: []*v1.Pod{ + {Spec: cpuAndMemory3}, + {Spec: podwithVol1}, + {Spec: podwithVol2}, + {Spec: podwithVol3}, + }, + }, } for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) + if len(test.pod.Spec.Volumes) > 0 { + maxVolumes := 5 + for _, info := range nodeNameToInfo { + info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes) + info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes) + } + } list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/scheduler/algorithm/priorities/least_requested.go b/pkg/scheduler/algorithm/priorities/least_requested.go index 33d6e5a6c4e..b6f096b033a 100644 --- a/pkg/scheduler/algorithm/priorities/least_requested.go +++ b/pkg/scheduler/algorithm/priorities/least_requested.go @@ -33,7 +33,7 @@ var ( LeastRequestedPriorityMap = leastResourcePriority.PriorityMap ) -func leastResourceScorer(requested, allocable *schedulercache.Resource) int64 { +func leastResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { return (leastRequestedScore(requested.MilliCPU, allocable.MilliCPU) + leastRequestedScore(requested.Memory, allocable.Memory)) / 2 } diff --git a/pkg/scheduler/algorithm/priorities/most_requested.go b/pkg/scheduler/algorithm/priorities/most_requested.go index 6cb6fe323e7..09fd7786d07 100644 --- a/pkg/scheduler/algorithm/priorities/most_requested.go +++ b/pkg/scheduler/algorithm/priorities/most_requested.go @@ -31,7 +31,7 @@ var ( MostRequestedPriorityMap = mostResourcePriority.PriorityMap ) -func mostResourceScorer(requested, allocable *schedulercache.Resource) int64 { +func mostResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { return (mostRequestedScore(requested.MilliCPU, allocable.MilliCPU) + mostRequestedScore(requested.Memory, allocable.Memory)) / 2 } diff --git a/pkg/scheduler/algorithm/priorities/resource_allocation.go b/pkg/scheduler/algorithm/priorities/resource_allocation.go index 2d569b1d388..fc01dc11ea5 100644 --- a/pkg/scheduler/algorithm/priorities/resource_allocation.go +++ b/pkg/scheduler/algorithm/priorities/resource_allocation.go @@ -29,7 +29,7 @@ import ( // ResourceAllocationPriority contains information to calculate resource allocation priority. type ResourceAllocationPriority struct { Name string - scorer func(requested, allocable *schedulercache.Resource) int64 + scorer func(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 } // PriorityMap priorities nodes according to the resource allocations on the node. @@ -54,8 +54,13 @@ func (r *ResourceAllocationPriority) PriorityMap( requested.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU requested.Memory += nodeInfo.NonZeroRequest().Memory - - score := r.scorer(&requested, &allocatable) + var score int64 + // Check if the pod has volumes and this could be added to scorer function for balanced resource allocation. + if len(pod.Spec.Volumes) >= 0 && nodeInfo.TransientInfo != nil { + score = r.scorer(&requested, &allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount) + } else { + score = r.scorer(&requested, &allocatable, false, 0, 0) + } if glog.V(10) { glog.Infof( diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index b84147535de..f5fd80908d9 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -24,6 +24,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "github.com/golang/glog" policy "k8s.io/api/policy/v1beta1" @@ -112,6 +114,10 @@ func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]* cache.mu.Lock() defer cache.mu.Unlock() for name, info := range cache.nodes { + if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { + // Transient scheduler info is reset here. + info.TransientInfo.resetTransientSchedulerInfo() + } if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation { nodeNameToInfo[name] = info.Clone() } diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 0fc2be33fb0..a4e337b6cc8 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -30,6 +30,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -73,6 +75,8 @@ func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo { // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // on node level. func TestAssumePodScheduled(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" testPods := []*v1.Pod{ makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), @@ -99,6 +103,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -114,6 +119,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: 300, Memory: 1524, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1], testPods[2]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), @@ -129,6 +135,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: priorityutil.DefaultMilliCPURequest, Memory: priorityutil.DefaultMemoryRequest, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[3]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -145,6 +152,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -161,6 +169,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: 300, Memory: 1524, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4], testPods[5]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), @@ -176,6 +185,7 @@ func TestAssumePodScheduled(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[6]}, usedPorts: newHostPortInfoBuilder().build(), @@ -219,6 +229,8 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time // TestExpirePod tests that assumed pods will be removed if expired. // The removal will be reflected in node info. func TestExpirePod(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" testPods := []*v1.Pod{ makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), @@ -252,6 +264,7 @@ func TestExpirePod(t *testing.T) { MilliCPU: 200, Memory: 1024, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), @@ -276,6 +289,8 @@ func TestExpirePod(t *testing.T) { // TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed. // The pod info should still exist after manually expiring unconfirmed pods. func TestAddPodWillConfirm(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" now := time.Now() ttl := 10 * time.Second @@ -301,6 +316,7 @@ func TestAddPodWillConfirm(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -396,6 +412,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { MilliCPU: 200, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{updatedPod.DeepCopy()}, usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), @@ -430,6 +447,8 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { // TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired. func TestAddPodAfterExpiration(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" ttl := 10 * time.Second basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) @@ -448,6 +467,7 @@ func TestAddPodAfterExpiration(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -477,6 +497,8 @@ func TestAddPodAfterExpiration(t *testing.T) { // TestUpdatePod tests that a pod will be updated if added before. func TestUpdatePod(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" ttl := 10 * time.Second testPods := []*v1.Pod{ @@ -501,6 +523,7 @@ func TestUpdatePod(t *testing.T) { MilliCPU: 200, Memory: 1024, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), @@ -513,6 +536,7 @@ func TestUpdatePod(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -543,6 +567,8 @@ func TestUpdatePod(t *testing.T) { // TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated func TestExpireAddUpdatePod(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" ttl := 10 * time.Second testPods := []*v1.Pod{ @@ -568,6 +594,7 @@ func TestExpireAddUpdatePod(t *testing.T) { MilliCPU: 200, Memory: 1024, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), @@ -580,6 +607,7 @@ func TestExpireAddUpdatePod(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -638,6 +666,8 @@ func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod { } func TestEphemeralStorageResource(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" podE := makePodWithEphemeralStorage(nodeName, "500") tests := []struct { @@ -654,6 +684,7 @@ func TestEphemeralStorageResource(t *testing.T) { MilliCPU: priorityutil.DefaultMilliCPURequest, Memory: priorityutil.DefaultMemoryRequest, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{podE}, usedPorts: schedutil.HostPortInfo{}, @@ -681,6 +712,8 @@ func TestEphemeralStorageResource(t *testing.T) { // TestRemovePod tests after added pod is removed, its information should also be subtracted. func TestRemovePod(t *testing.T) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) nodeName := "node" basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) tests := []struct { @@ -697,6 +730,7 @@ func TestRemovePod(t *testing.T) { MilliCPU: 100, Memory: 500, }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), @@ -1002,6 +1036,17 @@ func BenchmarkList1kNodes30kPods(b *testing.B) { } } +func BenchmarkUpdate1kNodes30kPods(b *testing.B) { + // Enable volumesOnNodeForBalancing to do balanced resource allocation + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes)) + cache := setupCacheOf1kNodes30kPods(b) + b.ResetTimer() + for n := 0; n < b.N; n++ { + cachedNodes := map[string]*NodeInfo{} + cache.UpdateNodeNameToInfoMap(cachedNodes) + } +} + func BenchmarkExpire100Pods(b *testing.B) { benchmarkExpire(b, 100) } diff --git a/pkg/scheduler/schedulercache/node_info.go b/pkg/scheduler/schedulercache/node_info.go index 3c981982afa..9d09be74285 100644 --- a/pkg/scheduler/schedulercache/node_info.go +++ b/pkg/scheduler/schedulercache/node_info.go @@ -19,6 +19,7 @@ package schedulercache import ( "errors" "fmt" + "sync" "github.com/golang/glog" @@ -49,10 +50,15 @@ type NodeInfo struct { // as int64, to avoid conversions and accessing map. allocatableResource *Resource - // Cached tains of the node for faster lookup. + // Cached taints of the node for faster lookup. taints []v1.Taint taintsErr error + // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of + // scheduling cycle. + // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. + TransientInfo *transientSchedulerInfo + // Cached conditions of node for faster lookup. memoryPressureCondition v1.ConditionStatus diskPressureCondition v1.ConditionStatus @@ -62,6 +68,48 @@ type NodeInfo struct { generation int64 } +//initializeNodeTransientInfo initializes transient information pertaining to node. +func initializeNodeTransientInfo() nodeTransientInfo { + return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0} +} + +// nodeTransientInfo contains transient node information while scheduling. +type nodeTransientInfo struct { + // AllocatableVolumesCount contains number of volumes that could be attached to node. + AllocatableVolumesCount int + // Requested number of volumes on a particular node. + RequestedVolumes int +} + +// transientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle. +// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and +// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization +// on node etc. +// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure +// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle. +type transientSchedulerInfo struct { + TransientLock sync.Mutex + // NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here. + TransNodeInfo nodeTransientInfo +} + +// newTransientSchedulerInfo returns a new scheduler transient structure with initialized values. +func newTransientSchedulerInfo() *transientSchedulerInfo { + tsi := &transientSchedulerInfo{ + TransNodeInfo: initializeNodeTransientInfo(), + } + return tsi +} + +// resetTransientSchedulerInfo resets the transientSchedulerInfo. +func (transientSchedInfo *transientSchedulerInfo) resetTransientSchedulerInfo() { + transientSchedInfo.TransientLock.Lock() + defer transientSchedInfo.TransientLock.Unlock() + // Reset TransientNodeInfo. + transientSchedInfo.TransNodeInfo.AllocatableVolumesCount = 0 + transientSchedInfo.TransNodeInfo.RequestedVolumes = 0 +} + // Resource is a collection of compute resource. type Resource struct { MilliCPU int64 @@ -167,6 +215,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { requestedResource: &Resource{}, nonzeroRequest: &Resource{}, allocatableResource: &Resource{}, + TransientInfo: newTransientSchedulerInfo(), generation: 0, usedPorts: make(util.HostPortInfo), } @@ -277,6 +326,7 @@ func (n *NodeInfo) Clone() *NodeInfo { nonzeroRequest: n.nonzeroRequest.Clone(), allocatableResource: n.allocatableResource.Clone(), taintsErr: n.taintsErr, + TransientInfo: n.TransientInfo, memoryPressureCondition: n.memoryPressureCondition, diskPressureCondition: n.diskPressureCondition, usedPorts: make(util.HostPortInfo), @@ -443,6 +493,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { // We ignore other conditions. } } + n.TransientInfo = newTransientSchedulerInfo() n.generation++ return nil } diff --git a/pkg/scheduler/schedulercache/node_info_test.go b/pkg/scheduler/schedulercache/node_info_test.go index 583b2d08d38..03b3646a8c2 100644 --- a/pkg/scheduler/schedulercache/node_info_test.go +++ b/pkg/scheduler/schedulercache/node_info_test.go @@ -218,6 +218,7 @@ func TestNewNodeInfo(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -300,6 +301,7 @@ func TestNodeInfoClone(t *testing.T) { nodeInfo: &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -368,6 +370,7 @@ func TestNodeInfoClone(t *testing.T) { expected: &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -526,6 +529,7 @@ func TestNodeInfoAddPod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -639,6 +643,7 @@ func TestNodeInfoRemovePod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -756,6 +761,7 @@ func TestNodeInfoRemovePod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, + TransientInfo: newTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 3, usedPorts: util.HostPortInfo{