Remove balanced attached node volumes

kubernetes#60525 introduced
Balanced attached node volumes feature gate to include volume
count for prioritizing nodes. The reason for introducing this
flag was its usefulness in Red Hat OpenShift Online environment
which is not being used any more. So, removing the flag
as it helps in maintainability of the scheduler code base
as mentioned at kubernetes#101489 (comment)
This commit is contained in:
ravisantoshgudimetla 2021-06-22 11:16:32 -04:00
parent 6dd9deea3d
commit b6c75bee15
14 changed files with 29 additions and 321 deletions

View File

@ -187,14 +187,6 @@ const (
// Do not remove this feature gate even though it's GA // Do not remove this feature gate even though it's GA
VolumeSubpath featuregate.Feature = "VolumeSubpath" VolumeSubpath featuregate.Feature = "VolumeSubpath"
// owner: @ravig
// alpha: v1.11
//
// Include volume count on node to be considered for balanced resource allocation while scheduling.
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler
// while making decisions.
BalanceAttachedNodeVolumes featuregate.Feature = "BalanceAttachedNodeVolumes"
// owner: @pohly // owner: @pohly
// alpha: v1.14 // alpha: v1.14
// beta: v1.16 // beta: v1.16
@ -774,7 +766,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
InTreePluginOpenStackUnregister: {Default: false, PreRelease: featuregate.Alpha}, InTreePluginOpenStackUnregister: {Default: false, PreRelease: featuregate.Alpha},
VolumeSubpath: {Default: true, PreRelease: featuregate.GA}, VolumeSubpath: {Default: true, PreRelease: featuregate.GA},
ConfigurableFSGroupPolicy: {Default: true, PreRelease: featuregate.Beta}, ConfigurableFSGroupPolicy: {Default: true, PreRelease: featuregate.Beta},
BalanceAttachedNodeVolumes: {Default: false, PreRelease: featuregate.Alpha},
CSIInlineVolume: {Default: true, PreRelease: featuregate.Beta}, CSIInlineVolume: {Default: true, PreRelease: featuregate.Beta},
CSIStorageCapacity: {Default: true, PreRelease: featuregate.Beta}, CSIStorageCapacity: {Default: true, PreRelease: featuregate.Beta},
CSIServiceAccountToken: {Default: true, PreRelease: featuregate.Beta}, CSIServiceAccountToken: {Default: true, PreRelease: featuregate.Beta},

View File

@ -23,5 +23,4 @@ type Features struct {
EnablePodAffinityNamespaceSelector bool EnablePodAffinityNamespaceSelector bool
EnablePodDisruptionBudget bool EnablePodDisruptionBudget bool
EnablePodOverhead bool EnablePodOverhead bool
EnableBalanceAttachedNodeVolumes bool
} }

View File

@ -75,14 +75,12 @@ func NewBalancedAllocation(_ runtime.Object, h framework.Handle, fts feature.Fea
scorer: balancedResourceScorer, scorer: balancedResourceScorer,
resourceToWeightMap: defaultRequestedRatioResources, resourceToWeightMap: defaultRequestedRatioResources,
enablePodOverhead: fts.EnablePodOverhead, enablePodOverhead: fts.EnablePodOverhead,
enableBalanceAttachedNodeVolumes: fts.EnableBalanceAttachedNodeVolumes,
}, },
}, nil }, nil
} }
// todo: use resource weights in the scorer function // todo: use resource weights in the scorer function
func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { func balancedResourceScorer(requested, allocable resourceToValueMap) int64 {
// This to find a node which has most balanced CPU, memory and volume usage.
cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU]) cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory]) memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
// fractions might be greater than 1 because pods with no requests get minimum // fractions might be greater than 1 because pods with no requests get minimum
@ -94,22 +92,6 @@ func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolu
memoryFraction = 1 memoryFraction = 1
} }
// includeVolumes is only true when BalanceAttachedNodeVolumes feature gate is enabled (see resource_allocation.go#score())
if includeVolumes && allocatableVolumes > 0 {
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
if volumeFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferred.
return 0
}
// Compute variance for all the three fractions.
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
// score to be higher for node which has least variance and multiplying it with `MaxNodeScore` provides the scaling
// factor needed.
return int64((1 - variance) * float64(framework.MaxNodeScore))
}
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1 // Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
// respectively. Multiplying the absolute value of the difference by `MaxNodeScore` scales the value to // respectively. Multiplying the absolute value of the difference by `MaxNodeScore` scales the value to
// 0-MaxNodeScore with 0 representing well balanced allocation and `MaxNodeScore` poorly balanced. Subtracting it from // 0-MaxNodeScore with 0 representing well balanced allocation and `MaxNodeScore` poorly balanced. Subtracting it from

View File

@ -24,114 +24,13 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
// getExistingVolumeCountForNode gets the current number of volumes on node.
func getExistingVolumeCountForNode(podInfos []*framework.PodInfo, maxVolumes int) int {
volumeCount := 0
for _, p := range podInfos {
volumeCount += len(p.Pod.Spec.Volumes)
}
if maxVolumes-volumeCount > 0 {
return maxVolumes - volumeCount
}
return 0
}
func TestNodeResourcesBalancedAllocation(t *testing.T) { func TestNodeResourcesBalancedAllocation(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced node resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
podwithVol1 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"},
},
},
},
NodeName: "machine4",
}
podwithVol2 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
},
},
},
NodeName: "machine4",
}
podwithVol3 := v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
},
},
},
NodeName: "machine4",
}
labels1 := map[string]string{ labels1 := map[string]string{
"foo": "bar", "foo": "bar",
"baz": "blah", "baz": "blah",
@ -193,27 +92,6 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
}, },
}, },
} }
cpuAndMemory3 := v1.PodSpec{
NodeName: "machine3",
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
}
nonZeroContainer := v1.PodSpec{ nonZeroContainer := v1.PodSpec{
Containers: []v1.Container{{}}, Containers: []v1.Container{{}},
} }
@ -368,45 +246,13 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
{Spec: cpuAndMemory}, {Spec: cpuAndMemory},
}, },
}, },
{
// Machine4 will be chosen here because it already has a existing volume making the variance
// of volume count, CPU usage, memory usage closer.
pod: &v1.Pod{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp2"},
},
},
},
},
},
nodes: []*v1.Node{makeNode("machine3", 3500, 40000), makeNode("machine4", 4000, 10000)},
expectedList: []framework.NodeScore{{Name: "machine3", Score: 89}, {Name: "machine4", Score: 98}},
name: "Include volume count on a node for balanced resource allocation",
pods: []*v1.Pod{
{Spec: cpuAndMemory3},
{Spec: podwithVol1},
{Spec: podwithVol2},
{Spec: podwithVol3},
},
},
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, test.nodes) snapshot := cache.NewSnapshot(test.pods, test.nodes)
if len(test.pod.Spec.Volumes) > 0 {
maxVolumes := 5
nodeInfoList, _ := snapshot.NodeInfos().List()
for _, info := range nodeInfoList {
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods, maxVolumes)
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
}
}
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(nil, fh, feature.Features{EnablePodOverhead: true, EnableBalanceAttachedNodeVolumes: true}) p, _ := NewBalancedAllocation(nil, fh, feature.Features{EnablePodOverhead: true})
for i := range test.nodes { for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)

View File

@ -92,8 +92,8 @@ func NewLeastAllocated(laArgs runtime.Object, h framework.Handle, fts feature.Fe
}, nil }, nil
} }
func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 { func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { return func(requested, allocable resourceToValueMap) int64 {
var nodeScore, weightSum int64 var nodeScore, weightSum int64
for resource, weight := range resToWeightMap { for resource, weight := range resToWeightMap {
resourceScore := leastRequestedScore(requested[resource], allocable[resource]) resourceScore := leastRequestedScore(requested[resource], allocable[resource])

View File

@ -90,8 +90,8 @@ func NewMostAllocated(maArgs runtime.Object, h framework.Handle, fts feature.Fea
}, nil }, nil
} }
func mostResourceScorer(resToWeightMap resourceToWeightMap) func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { func mostResourceScorer(resToWeightMap resourceToWeightMap) func(requested, allocable resourceToValueMap) int64 {
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { return func(requested, allocable resourceToValueMap) int64 {
var nodeScore, weightSum int64 var nodeScore, weightSum int64
for resource, weight := range resToWeightMap { for resource, weight := range resToWeightMap {
resourceScore := mostRequestedScore(requested[resource], allocable[resource]) resourceScore := mostRequestedScore(requested[resource], allocable[resource])

View File

@ -114,7 +114,7 @@ func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions
return nil return nil
} }
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 { func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape) rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
resourceScoringFunction := func(requested, capacity int64) int64 { resourceScoringFunction := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity { if capacity == 0 || requested > capacity {
@ -123,7 +123,7 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.Fun
return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity) return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity)
} }
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { return func(requested, allocable resourceToValueMap) int64 {
var nodeScore, weightSum int64 var nodeScore, weightSum int64
for resource, weight := range resourceToWeightMap { for resource, weight := range resourceToWeightMap {
resourceScore := resourceScoringFunction(requested[resource], allocable[resource]) resourceScore := resourceScoringFunction(requested[resource], allocable[resource])

View File

@ -32,11 +32,10 @@ var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v
// resourceAllocationScorer contains information to calculate resource allocation score. // resourceAllocationScorer contains information to calculate resource allocation score.
type resourceAllocationScorer struct { type resourceAllocationScorer struct {
Name string Name string
scorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 scorer func(requested, allocable resourceToValueMap) int64
resourceToWeightMap resourceToWeightMap resourceToWeightMap resourceToWeightMap
enablePodOverhead bool enablePodOverhead bool
enableBalanceAttachedNodeVolumes bool
} }
// resourceToValueMap contains resource name and score. // resourceToValueMap contains resource name and score.
@ -60,29 +59,14 @@ func (r *resourceAllocationScorer) score(
} }
var score int64 var score int64
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation. score = r.scorer(requested, allocatable)
if len(pod.Spec.Volumes) > 0 && r.enableBalanceAttachedNodeVolumes && nodeInfo.TransientInfo != nil {
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(requested, allocatable, false, 0, 0)
}
if klog.V(10).Enabled() { if klog.V(10).Enabled() {
if len(pod.Spec.Volumes) > 0 && r.enableBalanceAttachedNodeVolumes && nodeInfo.TransientInfo != nil {
klog.Infof(
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",
pod.Name, node.Name, r.Name,
allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,
score,
)
} else {
klog.Infof( klog.Infof(
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,", "%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",
pod.Name, node.Name, r.Name, pod.Name, node.Name, r.Name,
allocatable, requested, score, allocatable, requested, score,
) )
}
} }
return score, nil return score, nil

View File

@ -29,13 +29,11 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1" storagelisters "k8s.io/client-go/listers/storage/v1"
csilibplugins "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -269,12 +267,6 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
if numExistingVolumes+numNewVolumes > maxAttachLimit { if numExistingVolumes+numNewVolumes > maxAttachLimit {
return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded) return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
} }
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
nodeInfo.TransientInfo.TransientLock.Lock()
defer nodeInfo.TransientInfo.TransientLock.Unlock()
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
}
return nil return nil
} }

View File

@ -53,7 +53,6 @@ func NewInTreeRegistry() runtime.Registry {
EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector), EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget), EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead), EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
EnableBalanceAttachedNodeVolumes: feature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes),
} }
return runtime.Registry{ return runtime.Registry{

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -388,21 +387,11 @@ type NodeInfo struct {
// state information. // state information.
ImageStates map[string]*ImageStateSummary ImageStates map[string]*ImageStateSummary
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle.
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
TransientInfo *TransientSchedulerInfo
// Whenever NodeInfo changes, generation is bumped. // Whenever NodeInfo changes, generation is bumped.
// This is used to avoid cloning it if the object didn't change. // This is used to avoid cloning it if the object didn't change.
Generation int64 Generation int64
} }
//initializeNodeTransientInfo initializes transient information pertaining to node.
func initializeNodeTransientInfo() nodeTransientInfo {
return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0}
}
// nextGeneration: Let's make sure history never forgets the name... // nextGeneration: Let's make sure history never forgets the name...
// Increments the generation number monotonically ensuring that generation numbers never collide. // Increments the generation number monotonically ensuring that generation numbers never collide.
// Collision of the generation numbers would be particularly problematic if a node was deleted and // Collision of the generation numbers would be particularly problematic if a node was deleted and
@ -411,43 +400,6 @@ func nextGeneration() int64 {
return atomic.AddInt64(&generation, 1) return atomic.AddInt64(&generation, 1)
} }
// nodeTransientInfo contains transient node information while scheduling.
type nodeTransientInfo struct {
// AllocatableVolumesCount contains number of volumes that could be attached to node.
AllocatableVolumesCount int
// Requested number of volumes on a particular node.
RequestedVolumes int
}
// TransientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle.
// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and
// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization
// on node etc.
// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure
// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle.
type TransientSchedulerInfo struct {
TransientLock sync.Mutex
// NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here.
TransNodeInfo nodeTransientInfo
}
// NewTransientSchedulerInfo returns a new scheduler transient structure with initialized values.
func NewTransientSchedulerInfo() *TransientSchedulerInfo {
tsi := &TransientSchedulerInfo{
TransNodeInfo: initializeNodeTransientInfo(),
}
return tsi
}
// ResetTransientSchedulerInfo resets the TransientSchedulerInfo.
func (transientSchedInfo *TransientSchedulerInfo) ResetTransientSchedulerInfo() {
transientSchedInfo.TransientLock.Lock()
defer transientSchedInfo.TransientLock.Unlock()
// Reset TransientNodeInfo.
transientSchedInfo.TransNodeInfo.AllocatableVolumesCount = 0
transientSchedInfo.TransNodeInfo.RequestedVolumes = 0
}
// Resource is a collection of compute resource. // Resource is a collection of compute resource.
type Resource struct { type Resource struct {
MilliCPU int64 MilliCPU int64
@ -557,7 +509,6 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
Requested: &Resource{}, Requested: &Resource{},
NonZeroRequested: &Resource{}, NonZeroRequested: &Resource{},
Allocatable: &Resource{}, Allocatable: &Resource{},
TransientInfo: NewTransientSchedulerInfo(),
Generation: nextGeneration(), Generation: nextGeneration(),
UsedPorts: make(HostPortInfo), UsedPorts: make(HostPortInfo),
ImageStates: make(map[string]*ImageStateSummary), ImageStates: make(map[string]*ImageStateSummary),
@ -583,7 +534,6 @@ func (n *NodeInfo) Clone() *NodeInfo {
Requested: n.Requested.Clone(), Requested: n.Requested.Clone(),
NonZeroRequested: n.NonZeroRequested.Clone(), NonZeroRequested: n.NonZeroRequested.Clone(),
Allocatable: n.Allocatable.Clone(), Allocatable: n.Allocatable.Clone(),
TransientInfo: n.TransientInfo,
UsedPorts: make(HostPortInfo), UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates, ImageStates: n.ImageStates,
Generation: n.Generation, Generation: n.Generation,
@ -803,7 +753,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
func (n *NodeInfo) SetNode(node *v1.Node) { func (n *NodeInfo) SetNode(node *v1.Node) {
n.node = node n.node = node
n.Allocatable = NewResource(node.Status.Allocatable) n.Allocatable = NewResource(node.Status.Allocatable)
n.TransientInfo = NewTransientSchedulerInfo()
n.Generation = nextGeneration() n.Generation = nextGeneration()
} }

View File

@ -266,7 +266,6 @@ func TestNewNodeInfo(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 2, Generation: 2,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{
@ -359,7 +358,6 @@ func TestNodeInfoClone(t *testing.T) {
nodeInfo: &NodeInfo{ nodeInfo: &NodeInfo{
Requested: &Resource{}, Requested: &Resource{},
NonZeroRequested: &Resource{}, NonZeroRequested: &Resource{},
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 2, Generation: 2,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{
@ -433,7 +431,6 @@ func TestNodeInfoClone(t *testing.T) {
expected: &NodeInfo{ expected: &NodeInfo{
Requested: &Resource{}, Requested: &Resource{},
NonZeroRequested: &Resource{}, NonZeroRequested: &Resource{},
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 2, Generation: 2,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{
@ -644,7 +641,6 @@ func TestNodeInfoAddPod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 2, Generation: 2,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{
@ -824,7 +820,6 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 2, Generation: 2,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{
@ -957,7 +952,6 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: NewTransientSchedulerInfo(),
Allocatable: &Resource{}, Allocatable: &Resource{},
Generation: 3, Generation: 3,
UsedPorts: HostPortInfo{ UsedPorts: HostPortInfo{

View File

@ -24,9 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
) )
@ -198,7 +196,6 @@ func (cache *schedulerCache) Dump() *Dump {
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// Get the last generation of the snapshot. // Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.generation snapshotGeneration := nodeSnapshot.generation
@ -222,10 +219,6 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
// all the nodes are updated before the existing snapshot. We are done. // all the nodes are updated before the existing snapshot. We are done.
break break
} }
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil { if np := node.info.Node(); np != nil {
existing, ok := nodeSnapshot.nodeInfoMap[np.Name] existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
if !ok { if !ok {

View File

@ -28,9 +28,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -95,8 +92,6 @@ func newNodeInfo(requestedResource *framework.Resource,
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
// on node level. // on node level.
func TestAssumePodScheduled(t *testing.T) { func TestAssumePodScheduled(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
testPods := []*v1.Pod{ testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
@ -248,8 +243,6 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time
// TestExpirePod tests that assumed pods will be removed if expired. // TestExpirePod tests that assumed pods will be removed if expired.
// The removal will be reflected in node info. // The removal will be reflected in node info.
func TestExpirePod(t *testing.T) { func TestExpirePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
testPods := []*v1.Pod{ testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
@ -321,8 +314,6 @@ func TestExpirePod(t *testing.T) {
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed. // TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
// The pod info should still exist after manually expiring unconfirmed pods. // The pod info should still exist after manually expiring unconfirmed pods.
func TestAddPodWillConfirm(t *testing.T) { func TestAddPodWillConfirm(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
now := time.Now() now := time.Now()
ttl := 10 * time.Second ttl := 10 * time.Second
@ -492,8 +483,6 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired. // TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
func TestAddPodAfterExpiration(t *testing.T) { func TestAddPodAfterExpiration(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
ttl := 10 * time.Second ttl := 10 * time.Second
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
@ -544,8 +533,6 @@ func TestAddPodAfterExpiration(t *testing.T) {
// TestUpdatePod tests that a pod will be updated if added before. // TestUpdatePod tests that a pod will be updated if added before.
func TestUpdatePod(t *testing.T) { func TestUpdatePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
ttl := 10 * time.Second ttl := 10 * time.Second
testPods := []*v1.Pod{ testPods := []*v1.Pod{
@ -676,8 +663,6 @@ func TestUpdatePodAndGet(t *testing.T) {
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated // TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
func TestExpireAddUpdatePod(t *testing.T) { func TestExpireAddUpdatePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
ttl := 10 * time.Second ttl := 10 * time.Second
testPods := []*v1.Pod{ testPods := []*v1.Pod{
@ -777,8 +762,6 @@ func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
} }
func TestEphemeralStorageResource(t *testing.T) { func TestEphemeralStorageResource(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" nodeName := "node"
podE := makePodWithEphemeralStorage(nodeName, "500") podE := makePodWithEphemeralStorage(nodeName, "500")
tests := []struct { tests := []struct {
@ -824,8 +807,6 @@ func TestEphemeralStorageResource(t *testing.T) {
// TestRemovePod tests after added pod is removed, its information should also be subtracted. // TestRemovePod tests after added pod is removed, its information should also be subtracted.
func TestRemovePod(t *testing.T) { func TestRemovePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct { tests := []struct {
nodes []*v1.Node nodes []*v1.Node
@ -1699,8 +1680,6 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
} }
func BenchmarkUpdate1kNodes30kPods(b *testing.B) { func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(nil, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
cache := setupCacheOf1kNodes30kPods(b) cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {