mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #102443 from ravisantoshgudimetla/remove-bnv
Remove balanced attached node volumes
This commit is contained in:
commit
a3f24e8459
@ -187,14 +187,6 @@ const (
|
||||
// Do not remove this feature gate even though it's GA
|
||||
VolumeSubpath featuregate.Feature = "VolumeSubpath"
|
||||
|
||||
// owner: @ravig
|
||||
// alpha: v1.11
|
||||
//
|
||||
// Include volume count on node to be considered for balanced resource allocation while scheduling.
|
||||
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler
|
||||
// while making decisions.
|
||||
BalanceAttachedNodeVolumes featuregate.Feature = "BalanceAttachedNodeVolumes"
|
||||
|
||||
// owner: @pohly
|
||||
// alpha: v1.14
|
||||
// beta: v1.16
|
||||
@ -774,7 +766,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
InTreePluginOpenStackUnregister: {Default: false, PreRelease: featuregate.Alpha},
|
||||
VolumeSubpath: {Default: true, PreRelease: featuregate.GA},
|
||||
ConfigurableFSGroupPolicy: {Default: true, PreRelease: featuregate.Beta},
|
||||
BalanceAttachedNodeVolumes: {Default: false, PreRelease: featuregate.Alpha},
|
||||
CSIInlineVolume: {Default: true, PreRelease: featuregate.Beta},
|
||||
CSIStorageCapacity: {Default: true, PreRelease: featuregate.Beta},
|
||||
CSIServiceAccountToken: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
@ -23,5 +23,4 @@ type Features struct {
|
||||
EnablePodAffinityNamespaceSelector bool
|
||||
EnablePodDisruptionBudget bool
|
||||
EnablePodOverhead bool
|
||||
EnableBalanceAttachedNodeVolumes bool
|
||||
}
|
||||
|
@ -71,18 +71,16 @@ func NewBalancedAllocation(_ runtime.Object, h framework.Handle, fts feature.Fea
|
||||
return &BalancedAllocation{
|
||||
handle: h,
|
||||
resourceAllocationScorer: resourceAllocationScorer{
|
||||
Name: BalancedAllocationName,
|
||||
scorer: balancedResourceScorer,
|
||||
resourceToWeightMap: defaultRequestedRatioResources,
|
||||
enablePodOverhead: fts.EnablePodOverhead,
|
||||
enableBalanceAttachedNodeVolumes: fts.EnableBalanceAttachedNodeVolumes,
|
||||
Name: BalancedAllocationName,
|
||||
scorer: balancedResourceScorer,
|
||||
resourceToWeightMap: defaultRequestedRatioResources,
|
||||
enablePodOverhead: fts.EnablePodOverhead,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// todo: use resource weights in the scorer function
|
||||
func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||
// This to find a node which has most balanced CPU, memory and volume usage.
|
||||
func balancedResourceScorer(requested, allocable resourceToValueMap) int64 {
|
||||
cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
|
||||
memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
|
||||
// fractions might be greater than 1 because pods with no requests get minimum
|
||||
@ -94,22 +92,6 @@ func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolu
|
||||
memoryFraction = 1
|
||||
}
|
||||
|
||||
// includeVolumes is only true when BalanceAttachedNodeVolumes feature gate is enabled (see resource_allocation.go#score())
|
||||
if includeVolumes && allocatableVolumes > 0 {
|
||||
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
|
||||
if volumeFraction >= 1 {
|
||||
// if requested >= capacity, the corresponding host should never be preferred.
|
||||
return 0
|
||||
}
|
||||
// Compute variance for all the three fractions.
|
||||
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
|
||||
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
|
||||
// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
|
||||
// score to be higher for node which has least variance and multiplying it with `MaxNodeScore` provides the scaling
|
||||
// factor needed.
|
||||
return int64((1 - variance) * float64(framework.MaxNodeScore))
|
||||
}
|
||||
|
||||
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
|
||||
// respectively. Multiplying the absolute value of the difference by `MaxNodeScore` scales the value to
|
||||
// 0-MaxNodeScore with 0 representing well balanced allocation and `MaxNodeScore` poorly balanced. Subtracting it from
|
||||
|
@ -24,114 +24,13 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
|
||||
// getExistingVolumeCountForNode gets the current number of volumes on node.
|
||||
func getExistingVolumeCountForNode(podInfos []*framework.PodInfo, maxVolumes int) int {
|
||||
volumeCount := 0
|
||||
for _, p := range podInfos {
|
||||
volumeCount += len(p.Pod.Spec.Volumes)
|
||||
}
|
||||
if maxVolumes-volumeCount > 0 {
|
||||
return maxVolumes - volumeCount
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced node resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
podwithVol1 := v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("1000m"),
|
||||
v1.ResourceMemory: resource.MustParse("2000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("2000m"),
|
||||
v1.ResourceMemory: resource.MustParse("3000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: "machine4",
|
||||
}
|
||||
podwithVol2 := v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0m"),
|
||||
v1.ResourceMemory: resource.MustParse("0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0m"),
|
||||
v1.ResourceMemory: resource.MustParse("0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: "machine4",
|
||||
}
|
||||
podwithVol3 := v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0m"),
|
||||
v1.ResourceMemory: resource.MustParse("0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0m"),
|
||||
v1.ResourceMemory: resource.MustParse("0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: "machine4",
|
||||
}
|
||||
labels1 := map[string]string{
|
||||
"foo": "bar",
|
||||
"baz": "blah",
|
||||
@ -193,27 +92,6 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
cpuAndMemory3 := v1.PodSpec{
|
||||
NodeName: "machine3",
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("1000m"),
|
||||
v1.ResourceMemory: resource.MustParse("2000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("2000m"),
|
||||
v1.ResourceMemory: resource.MustParse("3000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
nonZeroContainer := v1.PodSpec{
|
||||
Containers: []v1.Container{{}},
|
||||
}
|
||||
@ -368,45 +246,13 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
{Spec: cpuAndMemory},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Machine4 will be chosen here because it already has a existing volume making the variance
|
||||
// of volume count, CPU usage, memory usage closer.
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
nodes: []*v1.Node{makeNode("machine3", 3500, 40000), makeNode("machine4", 4000, 10000)},
|
||||
expectedList: []framework.NodeScore{{Name: "machine3", Score: 89}, {Name: "machine4", Score: 98}},
|
||||
name: "Include volume count on a node for balanced resource allocation",
|
||||
pods: []*v1.Pod{
|
||||
{Spec: cpuAndMemory3},
|
||||
{Spec: podwithVol1},
|
||||
{Spec: podwithVol2},
|
||||
{Spec: podwithVol3},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
if len(test.pod.Spec.Volumes) > 0 {
|
||||
maxVolumes := 5
|
||||
nodeInfoList, _ := snapshot.NodeInfos().List()
|
||||
for _, info := range nodeInfoList {
|
||||
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods, maxVolumes)
|
||||
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
|
||||
}
|
||||
}
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
p, _ := NewBalancedAllocation(nil, fh, feature.Features{EnablePodOverhead: true, EnableBalanceAttachedNodeVolumes: true})
|
||||
p, _ := NewBalancedAllocation(nil, fh, feature.Features{EnablePodOverhead: true})
|
||||
|
||||
for i := range test.nodes {
|
||||
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
|
||||
|
@ -92,8 +92,8 @@ func NewLeastAllocated(laArgs runtime.Object, h framework.Handle, fts feature.Fe
|
||||
}, nil
|
||||
}
|
||||
|
||||
func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {
|
||||
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||
func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
|
||||
return func(requested, allocable resourceToValueMap) int64 {
|
||||
var nodeScore, weightSum int64
|
||||
for resource, weight := range resToWeightMap {
|
||||
resourceScore := leastRequestedScore(requested[resource], allocable[resource])
|
||||
|
@ -90,8 +90,8 @@ func NewMostAllocated(maArgs runtime.Object, h framework.Handle, fts feature.Fea
|
||||
}, nil
|
||||
}
|
||||
|
||||
func mostResourceScorer(resToWeightMap resourceToWeightMap) func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||
func mostResourceScorer(resToWeightMap resourceToWeightMap) func(requested, allocable resourceToValueMap) int64 {
|
||||
return func(requested, allocable resourceToValueMap) int64 {
|
||||
var nodeScore, weightSum int64
|
||||
for resource, weight := range resToWeightMap {
|
||||
resourceScore := mostRequestedScore(requested[resource], allocable[resource])
|
||||
|
@ -114,7 +114,7 @@ func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {
|
||||
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
|
||||
rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
|
||||
resourceScoringFunction := func(requested, capacity int64) int64 {
|
||||
if capacity == 0 || requested > capacity {
|
||||
@ -123,7 +123,7 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.Fun
|
||||
|
||||
return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity)
|
||||
}
|
||||
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||
return func(requested, allocable resourceToValueMap) int64 {
|
||||
var nodeScore, weightSum int64
|
||||
for resource, weight := range resourceToWeightMap {
|
||||
resourceScore := resourceScoringFunction(requested[resource], allocable[resource])
|
||||
|
@ -32,11 +32,10 @@ var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v
|
||||
// resourceAllocationScorer contains information to calculate resource allocation score.
|
||||
type resourceAllocationScorer struct {
|
||||
Name string
|
||||
scorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
|
||||
scorer func(requested, allocable resourceToValueMap) int64
|
||||
resourceToWeightMap resourceToWeightMap
|
||||
|
||||
enablePodOverhead bool
|
||||
enableBalanceAttachedNodeVolumes bool
|
||||
enablePodOverhead bool
|
||||
}
|
||||
|
||||
// resourceToValueMap contains resource name and score.
|
||||
@ -60,29 +59,14 @@ func (r *resourceAllocationScorer) score(
|
||||
}
|
||||
var score int64
|
||||
|
||||
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
|
||||
if len(pod.Spec.Volumes) > 0 && r.enableBalanceAttachedNodeVolumes && nodeInfo.TransientInfo != nil {
|
||||
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
|
||||
} else {
|
||||
score = r.scorer(requested, allocatable, false, 0, 0)
|
||||
}
|
||||
if klog.V(10).Enabled() {
|
||||
if len(pod.Spec.Volumes) > 0 && r.enableBalanceAttachedNodeVolumes && nodeInfo.TransientInfo != nil {
|
||||
klog.Infof(
|
||||
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",
|
||||
pod.Name, node.Name, r.Name,
|
||||
allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,
|
||||
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,
|
||||
score,
|
||||
)
|
||||
} else {
|
||||
klog.Infof(
|
||||
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",
|
||||
pod.Name, node.Name, r.Name,
|
||||
allocatable, requested, score,
|
||||
)
|
||||
score = r.scorer(requested, allocatable)
|
||||
|
||||
}
|
||||
if klog.V(10).Enabled() {
|
||||
klog.Infof(
|
||||
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",
|
||||
pod.Name, node.Name, r.Name,
|
||||
allocatable, requested, score,
|
||||
)
|
||||
}
|
||||
|
||||
return score, nil
|
||||
|
@ -29,13 +29,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
@ -269,12 +267,6 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
if numExistingVolumes+numNewVolumes > maxAttachLimit {
|
||||
return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
|
||||
}
|
||||
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
|
||||
nodeInfo.TransientInfo.TransientLock.Lock()
|
||||
defer nodeInfo.TransientInfo.TransientLock.Unlock()
|
||||
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
|
||||
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,6 @@ func NewInTreeRegistry() runtime.Registry {
|
||||
EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
|
||||
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
|
||||
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
|
||||
EnableBalanceAttachedNodeVolumes: feature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes),
|
||||
}
|
||||
|
||||
return runtime.Registry{
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -388,21 +387,11 @@ type NodeInfo struct {
|
||||
// state information.
|
||||
ImageStates map[string]*ImageStateSummary
|
||||
|
||||
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
|
||||
// scheduling cycle.
|
||||
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
|
||||
TransientInfo *TransientSchedulerInfo
|
||||
|
||||
// Whenever NodeInfo changes, generation is bumped.
|
||||
// This is used to avoid cloning it if the object didn't change.
|
||||
Generation int64
|
||||
}
|
||||
|
||||
//initializeNodeTransientInfo initializes transient information pertaining to node.
|
||||
func initializeNodeTransientInfo() nodeTransientInfo {
|
||||
return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0}
|
||||
}
|
||||
|
||||
// nextGeneration: Let's make sure history never forgets the name...
|
||||
// Increments the generation number monotonically ensuring that generation numbers never collide.
|
||||
// Collision of the generation numbers would be particularly problematic if a node was deleted and
|
||||
@ -411,43 +400,6 @@ func nextGeneration() int64 {
|
||||
return atomic.AddInt64(&generation, 1)
|
||||
}
|
||||
|
||||
// nodeTransientInfo contains transient node information while scheduling.
|
||||
type nodeTransientInfo struct {
|
||||
// AllocatableVolumesCount contains number of volumes that could be attached to node.
|
||||
AllocatableVolumesCount int
|
||||
// Requested number of volumes on a particular node.
|
||||
RequestedVolumes int
|
||||
}
|
||||
|
||||
// TransientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle.
|
||||
// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and
|
||||
// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization
|
||||
// on node etc.
|
||||
// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure
|
||||
// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle.
|
||||
type TransientSchedulerInfo struct {
|
||||
TransientLock sync.Mutex
|
||||
// NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here.
|
||||
TransNodeInfo nodeTransientInfo
|
||||
}
|
||||
|
||||
// NewTransientSchedulerInfo returns a new scheduler transient structure with initialized values.
|
||||
func NewTransientSchedulerInfo() *TransientSchedulerInfo {
|
||||
tsi := &TransientSchedulerInfo{
|
||||
TransNodeInfo: initializeNodeTransientInfo(),
|
||||
}
|
||||
return tsi
|
||||
}
|
||||
|
||||
// ResetTransientSchedulerInfo resets the TransientSchedulerInfo.
|
||||
func (transientSchedInfo *TransientSchedulerInfo) ResetTransientSchedulerInfo() {
|
||||
transientSchedInfo.TransientLock.Lock()
|
||||
defer transientSchedInfo.TransientLock.Unlock()
|
||||
// Reset TransientNodeInfo.
|
||||
transientSchedInfo.TransNodeInfo.AllocatableVolumesCount = 0
|
||||
transientSchedInfo.TransNodeInfo.RequestedVolumes = 0
|
||||
}
|
||||
|
||||
// Resource is a collection of compute resource.
|
||||
type Resource struct {
|
||||
MilliCPU int64
|
||||
@ -557,7 +509,6 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
Allocatable: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Generation: nextGeneration(),
|
||||
UsedPorts: make(HostPortInfo),
|
||||
ImageStates: make(map[string]*ImageStateSummary),
|
||||
@ -583,7 +534,6 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
||||
Requested: n.Requested.Clone(),
|
||||
NonZeroRequested: n.NonZeroRequested.Clone(),
|
||||
Allocatable: n.Allocatable.Clone(),
|
||||
TransientInfo: n.TransientInfo,
|
||||
UsedPorts: make(HostPortInfo),
|
||||
ImageStates: n.ImageStates,
|
||||
Generation: n.Generation,
|
||||
@ -803,7 +753,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
|
||||
func (n *NodeInfo) SetNode(node *v1.Node) {
|
||||
n.node = node
|
||||
n.Allocatable = NewResource(node.Status.Allocatable)
|
||||
n.TransientInfo = NewTransientSchedulerInfo()
|
||||
n.Generation = nextGeneration()
|
||||
}
|
||||
|
||||
|
@ -266,9 +266,8 @@ func TestNewNodeInfo(t *testing.T) {
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
@ -359,7 +358,6 @@ func TestNodeInfoClone(t *testing.T) {
|
||||
nodeInfo: &NodeInfo{
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
@ -433,7 +431,6 @@ func TestNodeInfoClone(t *testing.T) {
|
||||
expected: &NodeInfo{
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
@ -644,9 +641,8 @@ func TestNodeInfoAddPod(t *testing.T) {
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
@ -824,9 +820,8 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
@ -957,9 +952,8 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 3,
|
||||
Allocatable: &Resource{},
|
||||
Generation: 3,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
|
7
pkg/scheduler/internal/cache/cache.go
vendored
7
pkg/scheduler/internal/cache/cache.go
vendored
@ -24,9 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
)
|
||||
@ -198,7 +196,6 @@ func (cache *schedulerCache) Dump() *Dump {
|
||||
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
|
||||
|
||||
// Get the last generation of the snapshot.
|
||||
snapshotGeneration := nodeSnapshot.generation
|
||||
@ -222,10 +219,6 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
// all the nodes are updated before the existing snapshot. We are done.
|
||||
break
|
||||
}
|
||||
if balancedVolumesEnabled && node.info.TransientInfo != nil {
|
||||
// Transient scheduler info is reset here.
|
||||
node.info.TransientInfo.ResetTransientSchedulerInfo()
|
||||
}
|
||||
if np := node.info.Node(); np != nil {
|
||||
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
|
||||
if !ok {
|
||||
|
21
pkg/scheduler/internal/cache/cache_test.go
vendored
21
pkg/scheduler/internal/cache/cache_test.go
vendored
@ -28,9 +28,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
@ -95,8 +92,6 @@ func newNodeInfo(requestedResource *framework.Resource,
|
||||
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
||||
// on node level.
|
||||
func TestAssumePodScheduled(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
testPods := []*v1.Pod{
|
||||
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
||||
@ -248,8 +243,6 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time
|
||||
// TestExpirePod tests that assumed pods will be removed if expired.
|
||||
// The removal will be reflected in node info.
|
||||
func TestExpirePod(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
testPods := []*v1.Pod{
|
||||
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
||||
@ -321,8 +314,6 @@ func TestExpirePod(t *testing.T) {
|
||||
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
|
||||
// The pod info should still exist after manually expiring unconfirmed pods.
|
||||
func TestAddPodWillConfirm(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
now := time.Now()
|
||||
ttl := 10 * time.Second
|
||||
@ -492,8 +483,6 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
|
||||
|
||||
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
|
||||
func TestAddPodAfterExpiration(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
ttl := 10 * time.Second
|
||||
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
||||
@ -544,8 +533,6 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
||||
|
||||
// TestUpdatePod tests that a pod will be updated if added before.
|
||||
func TestUpdatePod(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
ttl := 10 * time.Second
|
||||
testPods := []*v1.Pod{
|
||||
@ -676,8 +663,6 @@ func TestUpdatePodAndGet(t *testing.T) {
|
||||
|
||||
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
|
||||
func TestExpireAddUpdatePod(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
ttl := 10 * time.Second
|
||||
testPods := []*v1.Pod{
|
||||
@ -777,8 +762,6 @@ func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
|
||||
}
|
||||
|
||||
func TestEphemeralStorageResource(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
nodeName := "node"
|
||||
podE := makePodWithEphemeralStorage(nodeName, "500")
|
||||
tests := []struct {
|
||||
@ -824,8 +807,6 @@ func TestEphemeralStorageResource(t *testing.T) {
|
||||
|
||||
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
|
||||
func TestRemovePod(t *testing.T) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
||||
tests := []struct {
|
||||
nodes []*v1.Node
|
||||
@ -1699,8 +1680,6 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
|
||||
}
|
||||
|
||||
func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
|
||||
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(nil, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
|
||||
cache := setupCacheOf1kNodes30kPods(b)
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
|
Loading…
Reference in New Issue
Block a user