mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Include volume count while doing balanced resource allocation
This commit is contained in:
parent
52ed0368f8
commit
ba827f82c7
@ -274,6 +274,14 @@ const (
|
|||||||
// Allow mounting a subpath of a volume in a container
|
// Allow mounting a subpath of a volume in a container
|
||||||
// Do not remove this feature gate even though it's GA
|
// Do not remove this feature gate even though it's GA
|
||||||
VolumeSubpath utilfeature.Feature = "VolumeSubpath"
|
VolumeSubpath utilfeature.Feature = "VolumeSubpath"
|
||||||
|
|
||||||
|
// owner: @ravig
|
||||||
|
// alpha: v1.11
|
||||||
|
//
|
||||||
|
// Include volume count on node to be considered for balanced resource allocation while scheduling.
|
||||||
|
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler
|
||||||
|
// while making decisions.
|
||||||
|
BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -321,6 +329,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
|||||||
GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta},
|
GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta},
|
||||||
RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha},
|
RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
VolumeSubpath: {Default: true, PreRelease: utilfeature.GA},
|
VolumeSubpath: {Default: true, PreRelease: utilfeature.GA},
|
||||||
|
BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
|
|
||||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||||
// unintentionally on either side:
|
// unintentionally on either side:
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
storagev1 "k8s.io/api/storage/v1"
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -44,8 +45,6 @@ import (
|
|||||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -451,7 +450,12 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta algorithm.Predicat
|
|||||||
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
|
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
|
||||||
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
|
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
|
||||||
}
|
}
|
||||||
|
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
|
||||||
|
nodeInfo.TransientInfo.TransientLock.Lock()
|
||||||
|
defer nodeInfo.TransientInfo.TransientLock.Unlock()
|
||||||
|
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = c.maxVolumes - numExistingVolumes
|
||||||
|
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
|
||||||
|
}
|
||||||
return true, nil, nil
|
return true, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ package priorities
|
|||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
@ -36,17 +38,31 @@ var (
|
|||||||
BalancedResourceAllocationMap = balancedResourcePriority.PriorityMap
|
BalancedResourceAllocationMap = balancedResourcePriority.PriorityMap
|
||||||
)
|
)
|
||||||
|
|
||||||
func balancedResourceScorer(requested, allocable *schedulercache.Resource) int64 {
|
func balancedResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||||
cpuFraction := fractionOfCapacity(requested.MilliCPU, allocable.MilliCPU)
|
cpuFraction := fractionOfCapacity(requested.MilliCPU, allocable.MilliCPU)
|
||||||
memoryFraction := fractionOfCapacity(requested.Memory, allocable.Memory)
|
memoryFraction := fractionOfCapacity(requested.Memory, allocable.Memory)
|
||||||
|
// This to find a node which has most balanced CPU, memory and volume usage.
|
||||||
|
if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
|
||||||
|
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
|
||||||
|
if cpuFraction >= 1 || memoryFraction >= 1 || volumeFraction >= 1 {
|
||||||
|
// if requested >= capacity, the corresponding host should never be preferred.
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
// Compute variance for all the three fractions.
|
||||||
|
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
|
||||||
|
variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
|
||||||
|
// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
|
||||||
|
// score to be higher for node which has least variance and multiplying it with 10 provides the scaling
|
||||||
|
// factor needed.
|
||||||
|
return int64((1 - variance) * float64(schedulerapi.MaxPriority))
|
||||||
|
}
|
||||||
|
|
||||||
if cpuFraction >= 1 || memoryFraction >= 1 {
|
if cpuFraction >= 1 || memoryFraction >= 1 {
|
||||||
// if requested >= capacity, the corresponding host should never be preferred.
|
// if requested >= capacity, the corresponding host should never be preferred.
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
|
// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
|
||||||
// respectively. Multilying the absolute value of the difference by 10 scales the value to
|
// respectively. Multiplying the absolute value of the difference by 10 scales the value to
|
||||||
// 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from
|
// 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from
|
||||||
// 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced.
|
// 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced.
|
||||||
diff := math.Abs(cpuFraction - memoryFraction)
|
diff := math.Abs(cpuFraction - memoryFraction)
|
||||||
|
@ -17,17 +17,118 @@ limitations under the License.
|
|||||||
package priorities
|
package priorities
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// getExistingVolumeCountForNode gets the current number of volumes on node.
|
||||||
|
func getExistingVolumeCountForNode(pods []*v1.Pod, maxVolumes int) int {
|
||||||
|
volumeCount := 0
|
||||||
|
for _, pod := range pods {
|
||||||
|
volumeCount += len(pod.Spec.Volumes)
|
||||||
|
}
|
||||||
|
if maxVolumes-volumeCount > 0 {
|
||||||
|
return maxVolumes - volumeCount
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func TestBalancedResourceAllocation(t *testing.T) {
|
func TestBalancedResourceAllocation(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
|
podwithVol1 := v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("1000m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("2000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("2000m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("3000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NodeName: "machine4",
|
||||||
|
}
|
||||||
|
podwithVol2 := v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NodeName: "machine4",
|
||||||
|
}
|
||||||
|
podwithVol3 := v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NodeName: "machine4",
|
||||||
|
}
|
||||||
labels1 := map[string]string{
|
labels1 := map[string]string{
|
||||||
"foo": "bar",
|
"foo": "bar",
|
||||||
"baz": "blah",
|
"baz": "blah",
|
||||||
@ -89,6 +190,27 @@ func TestBalancedResourceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
cpuAndMemory3 := v1.PodSpec{
|
||||||
|
NodeName: "machine3",
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("1000m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("2000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("2000m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("3000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
pods []*v1.Pod
|
pods []*v1.Pod
|
||||||
@ -249,10 +371,43 @@ func TestBalancedResourceAllocation(t *testing.T) {
|
|||||||
{Spec: cpuAndMemory},
|
{Spec: cpuAndMemory},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Machine4 will be chosen here because it already has a existing volume making the variance
|
||||||
|
of volume count, CPU usage, memory usage closer.
|
||||||
|
*/
|
||||||
|
pod: &v1.Pod{
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{VolumeID: "ovp2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nodes: []*v1.Node{makeNode("machine3", 3500, 40000), makeNode("machine4", 4000, 10000)},
|
||||||
|
expectedList: []schedulerapi.HostPriority{{Host: "machine3", Score: 8}, {Host: "machine4", Score: 9}},
|
||||||
|
test: "Include volume count on a node for balanced resource allocation",
|
||||||
|
pods: []*v1.Pod{
|
||||||
|
{Spec: cpuAndMemory3},
|
||||||
|
{Spec: podwithVol1},
|
||||||
|
{Spec: podwithVol2},
|
||||||
|
{Spec: podwithVol3},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||||
|
if len(test.pod.Spec.Volumes) > 0 {
|
||||||
|
maxVolumes := 5
|
||||||
|
for _, info := range nodeNameToInfo {
|
||||||
|
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
|
||||||
|
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
|
||||||
|
}
|
||||||
|
}
|
||||||
list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -33,7 +33,7 @@ var (
|
|||||||
LeastRequestedPriorityMap = leastResourcePriority.PriorityMap
|
LeastRequestedPriorityMap = leastResourcePriority.PriorityMap
|
||||||
)
|
)
|
||||||
|
|
||||||
func leastResourceScorer(requested, allocable *schedulercache.Resource) int64 {
|
func leastResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||||
return (leastRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
|
return (leastRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
|
||||||
leastRequestedScore(requested.Memory, allocable.Memory)) / 2
|
leastRequestedScore(requested.Memory, allocable.Memory)) / 2
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ var (
|
|||||||
MostRequestedPriorityMap = mostResourcePriority.PriorityMap
|
MostRequestedPriorityMap = mostResourcePriority.PriorityMap
|
||||||
)
|
)
|
||||||
|
|
||||||
func mostResourceScorer(requested, allocable *schedulercache.Resource) int64 {
|
func mostResourceScorer(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
|
||||||
return (mostRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
|
return (mostRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
|
||||||
mostRequestedScore(requested.Memory, allocable.Memory)) / 2
|
mostRequestedScore(requested.Memory, allocable.Memory)) / 2
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
// ResourceAllocationPriority contains information to calculate resource allocation priority.
|
// ResourceAllocationPriority contains information to calculate resource allocation priority.
|
||||||
type ResourceAllocationPriority struct {
|
type ResourceAllocationPriority struct {
|
||||||
Name string
|
Name string
|
||||||
scorer func(requested, allocable *schedulercache.Resource) int64
|
scorer func(requested, allocable *schedulercache.Resource, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// PriorityMap priorities nodes according to the resource allocations on the node.
|
// PriorityMap priorities nodes according to the resource allocations on the node.
|
||||||
@ -54,8 +54,13 @@ func (r *ResourceAllocationPriority) PriorityMap(
|
|||||||
|
|
||||||
requested.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
|
requested.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
|
||||||
requested.Memory += nodeInfo.NonZeroRequest().Memory
|
requested.Memory += nodeInfo.NonZeroRequest().Memory
|
||||||
|
var score int64
|
||||||
score := r.scorer(&requested, &allocatable)
|
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
|
||||||
|
if len(pod.Spec.Volumes) >= 0 && nodeInfo.TransientInfo != nil {
|
||||||
|
score = r.scorer(&requested, &allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
|
||||||
|
} else {
|
||||||
|
score = r.scorer(&requested, &allocatable, false, 0, 0)
|
||||||
|
}
|
||||||
|
|
||||||
if glog.V(10) {
|
if glog.V(10) {
|
||||||
glog.Infof(
|
glog.Infof(
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
policy "k8s.io/api/policy/v1beta1"
|
policy "k8s.io/api/policy/v1beta1"
|
||||||
@ -112,6 +114,10 @@ func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*
|
|||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
for name, info := range cache.nodes {
|
for name, info := range cache.nodes {
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
|
||||||
|
// Transient scheduler info is reset here.
|
||||||
|
info.TransientInfo.resetTransientSchedulerInfo()
|
||||||
|
}
|
||||||
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
|
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
|
||||||
nodeNameToInfo[name] = info.Clone()
|
nodeNameToInfo[name] = info.Clone()
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
@ -73,6 +75,8 @@ func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo {
|
|||||||
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
||||||
// on node level.
|
// on node level.
|
||||||
func TestAssumePodScheduled(t *testing.T) {
|
func TestAssumePodScheduled(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
testPods := []*v1.Pod{
|
testPods := []*v1.Pod{
|
||||||
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
||||||
@ -99,6 +103,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[0]},
|
pods: []*v1.Pod{testPods[0]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -114,6 +119,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: 300,
|
MilliCPU: 300,
|
||||||
Memory: 1524,
|
Memory: 1524,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[1], testPods[2]},
|
pods: []*v1.Pod{testPods[1], testPods[2]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||||
@ -129,6 +135,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: priorityutil.DefaultMilliCPURequest,
|
MilliCPU: priorityutil.DefaultMilliCPURequest,
|
||||||
Memory: priorityutil.DefaultMemoryRequest,
|
Memory: priorityutil.DefaultMemoryRequest,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[3]},
|
pods: []*v1.Pod{testPods[3]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -145,6 +152,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[4]},
|
pods: []*v1.Pod{testPods[4]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -161,6 +169,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: 300,
|
MilliCPU: 300,
|
||||||
Memory: 1524,
|
Memory: 1524,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[4], testPods[5]},
|
pods: []*v1.Pod{testPods[4], testPods[5]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||||
@ -176,6 +185,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[6]},
|
pods: []*v1.Pod{testPods[6]},
|
||||||
usedPorts: newHostPortInfoBuilder().build(),
|
usedPorts: newHostPortInfoBuilder().build(),
|
||||||
@ -219,6 +229,8 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time
|
|||||||
// TestExpirePod tests that assumed pods will be removed if expired.
|
// TestExpirePod tests that assumed pods will be removed if expired.
|
||||||
// The removal will be reflected in node info.
|
// The removal will be reflected in node info.
|
||||||
func TestExpirePod(t *testing.T) {
|
func TestExpirePod(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
testPods := []*v1.Pod{
|
testPods := []*v1.Pod{
|
||||||
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
|
||||||
@ -252,6 +264,7 @@ func TestExpirePod(t *testing.T) {
|
|||||||
MilliCPU: 200,
|
MilliCPU: 200,
|
||||||
Memory: 1024,
|
Memory: 1024,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[1]},
|
pods: []*v1.Pod{testPods[1]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||||
@ -276,6 +289,8 @@ func TestExpirePod(t *testing.T) {
|
|||||||
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
|
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
|
||||||
// The pod info should still exist after manually expiring unconfirmed pods.
|
// The pod info should still exist after manually expiring unconfirmed pods.
|
||||||
func TestAddPodWillConfirm(t *testing.T) {
|
func TestAddPodWillConfirm(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
ttl := 10 * time.Second
|
ttl := 10 * time.Second
|
||||||
@ -301,6 +316,7 @@ func TestAddPodWillConfirm(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[0]},
|
pods: []*v1.Pod{testPods[0]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -396,6 +412,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
|
|||||||
MilliCPU: 200,
|
MilliCPU: 200,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{updatedPod.DeepCopy()},
|
pods: []*v1.Pod{updatedPod.DeepCopy()},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
|
||||||
@ -430,6 +447,8 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
|
|||||||
|
|
||||||
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
|
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
|
||||||
func TestAddPodAfterExpiration(t *testing.T) {
|
func TestAddPodAfterExpiration(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
ttl := 10 * time.Second
|
ttl := 10 * time.Second
|
||||||
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
||||||
@ -448,6 +467,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{basePod},
|
pods: []*v1.Pod{basePod},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -477,6 +497,8 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
|||||||
|
|
||||||
// TestUpdatePod tests that a pod will be updated if added before.
|
// TestUpdatePod tests that a pod will be updated if added before.
|
||||||
func TestUpdatePod(t *testing.T) {
|
func TestUpdatePod(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
ttl := 10 * time.Second
|
ttl := 10 * time.Second
|
||||||
testPods := []*v1.Pod{
|
testPods := []*v1.Pod{
|
||||||
@ -501,6 +523,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
MilliCPU: 200,
|
MilliCPU: 200,
|
||||||
Memory: 1024,
|
Memory: 1024,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[1]},
|
pods: []*v1.Pod{testPods[1]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||||
@ -513,6 +536,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[0]},
|
pods: []*v1.Pod{testPods[0]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -543,6 +567,8 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
|
|
||||||
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
|
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
|
||||||
func TestExpireAddUpdatePod(t *testing.T) {
|
func TestExpireAddUpdatePod(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
ttl := 10 * time.Second
|
ttl := 10 * time.Second
|
||||||
testPods := []*v1.Pod{
|
testPods := []*v1.Pod{
|
||||||
@ -568,6 +594,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
|||||||
MilliCPU: 200,
|
MilliCPU: 200,
|
||||||
Memory: 1024,
|
Memory: 1024,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[1]},
|
pods: []*v1.Pod{testPods[1]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||||
@ -580,6 +607,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{testPods[0]},
|
pods: []*v1.Pod{testPods[0]},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -638,6 +666,8 @@ func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestEphemeralStorageResource(t *testing.T) {
|
func TestEphemeralStorageResource(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
podE := makePodWithEphemeralStorage(nodeName, "500")
|
podE := makePodWithEphemeralStorage(nodeName, "500")
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -654,6 +684,7 @@ func TestEphemeralStorageResource(t *testing.T) {
|
|||||||
MilliCPU: priorityutil.DefaultMilliCPURequest,
|
MilliCPU: priorityutil.DefaultMilliCPURequest,
|
||||||
Memory: priorityutil.DefaultMemoryRequest,
|
Memory: priorityutil.DefaultMemoryRequest,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{podE},
|
pods: []*v1.Pod{podE},
|
||||||
usedPorts: schedutil.HostPortInfo{},
|
usedPorts: schedutil.HostPortInfo{},
|
||||||
@ -681,6 +712,8 @@ func TestEphemeralStorageResource(t *testing.T) {
|
|||||||
|
|
||||||
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
|
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
|
||||||
func TestRemovePod(t *testing.T) {
|
func TestRemovePod(t *testing.T) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
nodeName := "node"
|
nodeName := "node"
|
||||||
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -697,6 +730,7 @@ func TestRemovePod(t *testing.T) {
|
|||||||
MilliCPU: 100,
|
MilliCPU: 100,
|
||||||
Memory: 500,
|
Memory: 500,
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
pods: []*v1.Pod{basePod},
|
pods: []*v1.Pod{basePod},
|
||||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||||
@ -1002,6 +1036,17 @@ func BenchmarkList1kNodes30kPods(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
|
||||||
|
// Enable volumesOnNodeForBalancing to do balanced resource allocation
|
||||||
|
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.BalanceAttachedNodeVolumes))
|
||||||
|
cache := setupCacheOf1kNodes30kPods(b)
|
||||||
|
b.ResetTimer()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
cachedNodes := map[string]*NodeInfo{}
|
||||||
|
cache.UpdateNodeNameToInfoMap(cachedNodes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkExpire100Pods(b *testing.B) {
|
func BenchmarkExpire100Pods(b *testing.B) {
|
||||||
benchmarkExpire(b, 100)
|
benchmarkExpire(b, 100)
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ package schedulercache
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
@ -49,10 +50,15 @@ type NodeInfo struct {
|
|||||||
// as int64, to avoid conversions and accessing map.
|
// as int64, to avoid conversions and accessing map.
|
||||||
allocatableResource *Resource
|
allocatableResource *Resource
|
||||||
|
|
||||||
// Cached tains of the node for faster lookup.
|
// Cached taints of the node for faster lookup.
|
||||||
taints []v1.Taint
|
taints []v1.Taint
|
||||||
taintsErr error
|
taintsErr error
|
||||||
|
|
||||||
|
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
|
||||||
|
// scheduling cycle.
|
||||||
|
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
|
||||||
|
TransientInfo *transientSchedulerInfo
|
||||||
|
|
||||||
// Cached conditions of node for faster lookup.
|
// Cached conditions of node for faster lookup.
|
||||||
memoryPressureCondition v1.ConditionStatus
|
memoryPressureCondition v1.ConditionStatus
|
||||||
diskPressureCondition v1.ConditionStatus
|
diskPressureCondition v1.ConditionStatus
|
||||||
@ -62,6 +68,48 @@ type NodeInfo struct {
|
|||||||
generation int64
|
generation int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//initializeNodeTransientInfo initializes transient information pertaining to node.
|
||||||
|
func initializeNodeTransientInfo() nodeTransientInfo {
|
||||||
|
return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeTransientInfo contains transient node information while scheduling.
|
||||||
|
type nodeTransientInfo struct {
|
||||||
|
// AllocatableVolumesCount contains number of volumes that could be attached to node.
|
||||||
|
AllocatableVolumesCount int
|
||||||
|
// Requested number of volumes on a particular node.
|
||||||
|
RequestedVolumes int
|
||||||
|
}
|
||||||
|
|
||||||
|
// transientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle.
|
||||||
|
// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and
|
||||||
|
// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization
|
||||||
|
// on node etc.
|
||||||
|
// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure
|
||||||
|
// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle.
|
||||||
|
type transientSchedulerInfo struct {
|
||||||
|
TransientLock sync.Mutex
|
||||||
|
// NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here.
|
||||||
|
TransNodeInfo nodeTransientInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTransientSchedulerInfo returns a new scheduler transient structure with initialized values.
|
||||||
|
func newTransientSchedulerInfo() *transientSchedulerInfo {
|
||||||
|
tsi := &transientSchedulerInfo{
|
||||||
|
TransNodeInfo: initializeNodeTransientInfo(),
|
||||||
|
}
|
||||||
|
return tsi
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetTransientSchedulerInfo resets the transientSchedulerInfo.
|
||||||
|
func (transientSchedInfo *transientSchedulerInfo) resetTransientSchedulerInfo() {
|
||||||
|
transientSchedInfo.TransientLock.Lock()
|
||||||
|
defer transientSchedInfo.TransientLock.Unlock()
|
||||||
|
// Reset TransientNodeInfo.
|
||||||
|
transientSchedInfo.TransNodeInfo.AllocatableVolumesCount = 0
|
||||||
|
transientSchedInfo.TransNodeInfo.RequestedVolumes = 0
|
||||||
|
}
|
||||||
|
|
||||||
// Resource is a collection of compute resource.
|
// Resource is a collection of compute resource.
|
||||||
type Resource struct {
|
type Resource struct {
|
||||||
MilliCPU int64
|
MilliCPU int64
|
||||||
@ -167,6 +215,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
|
|||||||
requestedResource: &Resource{},
|
requestedResource: &Resource{},
|
||||||
nonzeroRequest: &Resource{},
|
nonzeroRequest: &Resource{},
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
generation: 0,
|
generation: 0,
|
||||||
usedPorts: make(util.HostPortInfo),
|
usedPorts: make(util.HostPortInfo),
|
||||||
}
|
}
|
||||||
@ -277,6 +326,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
|||||||
nonzeroRequest: n.nonzeroRequest.Clone(),
|
nonzeroRequest: n.nonzeroRequest.Clone(),
|
||||||
allocatableResource: n.allocatableResource.Clone(),
|
allocatableResource: n.allocatableResource.Clone(),
|
||||||
taintsErr: n.taintsErr,
|
taintsErr: n.taintsErr,
|
||||||
|
TransientInfo: n.TransientInfo,
|
||||||
memoryPressureCondition: n.memoryPressureCondition,
|
memoryPressureCondition: n.memoryPressureCondition,
|
||||||
diskPressureCondition: n.diskPressureCondition,
|
diskPressureCondition: n.diskPressureCondition,
|
||||||
usedPorts: make(util.HostPortInfo),
|
usedPorts: make(util.HostPortInfo),
|
||||||
@ -443,6 +493,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
|
|||||||
// We ignore other conditions.
|
// We ignore other conditions.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
n.TransientInfo = newTransientSchedulerInfo()
|
||||||
n.generation++
|
n.generation++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -218,6 +218,7 @@ func TestNewNodeInfo(t *testing.T) {
|
|||||||
AllowedPodNumber: 0,
|
AllowedPodNumber: 0,
|
||||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 2,
|
generation: 2,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
@ -300,6 +301,7 @@ func TestNodeInfoClone(t *testing.T) {
|
|||||||
nodeInfo: &NodeInfo{
|
nodeInfo: &NodeInfo{
|
||||||
requestedResource: &Resource{},
|
requestedResource: &Resource{},
|
||||||
nonzeroRequest: &Resource{},
|
nonzeroRequest: &Resource{},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 2,
|
generation: 2,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
@ -368,6 +370,7 @@ func TestNodeInfoClone(t *testing.T) {
|
|||||||
expected: &NodeInfo{
|
expected: &NodeInfo{
|
||||||
requestedResource: &Resource{},
|
requestedResource: &Resource{},
|
||||||
nonzeroRequest: &Resource{},
|
nonzeroRequest: &Resource{},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 2,
|
generation: 2,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
@ -526,6 +529,7 @@ func TestNodeInfoAddPod(t *testing.T) {
|
|||||||
AllowedPodNumber: 0,
|
AllowedPodNumber: 0,
|
||||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 2,
|
generation: 2,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
@ -639,6 +643,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
|||||||
AllowedPodNumber: 0,
|
AllowedPodNumber: 0,
|
||||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 2,
|
generation: 2,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
@ -756,6 +761,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
|||||||
AllowedPodNumber: 0,
|
AllowedPodNumber: 0,
|
||||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||||
},
|
},
|
||||||
|
TransientInfo: newTransientSchedulerInfo(),
|
||||||
allocatableResource: &Resource{},
|
allocatableResource: &Resource{},
|
||||||
generation: 3,
|
generation: 3,
|
||||||
usedPorts: util.HostPortInfo{
|
usedPorts: util.HostPortInfo{
|
||||||
|
Loading…
Reference in New Issue
Block a user