mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Scheduler NodeInfo cleanup
This commit is contained in:
parent
0c9245a29f
commit
2c51c13620
@ -287,7 +287,7 @@ func (h *HTTPExtender) convertToNodeToVictims(
|
||||
func (h *HTTPExtender) convertPodUIDToPod(
|
||||
metaPod *extenderv1.MetaPod,
|
||||
nodeInfo *framework.NodeInfo) (*v1.Pod, error) {
|
||||
for _, p := range nodeInfo.Pods() {
|
||||
for _, p := range nodeInfo.Pods {
|
||||
if string(p.Pod.UID) == metaPod.UID {
|
||||
return p.Pod, nil
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
|
||||
// As the first step, remove all the lower priority pods from the node and
|
||||
// check if the given pod can be scheduled.
|
||||
podPriority := podutil.GetPodPriority(pod)
|
||||
for _, p := range nodeInfoCopy.Pods() {
|
||||
for _, p := range nodeInfoCopy.Pods {
|
||||
if podutil.GetPodPriority(p.Pod) < podPriority {
|
||||
potentialVictims = append(potentialVictims, p.Pod)
|
||||
removePod(p.Pod)
|
||||
|
@ -970,7 +970,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
// As the first step, remove all the lower priority pods from the node and
|
||||
// check if the given pod can be scheduled.
|
||||
podPriority := podutil.GetPodPriority(pod)
|
||||
for _, p := range nodeInfo.Pods() {
|
||||
for _, p := range nodeInfo.Pods {
|
||||
if podutil.GetPodPriority(p.Pod) < podPriority {
|
||||
potentialVictims = append(potentialVictims, p.Pod)
|
||||
if err := removePod(p.Pod); err != nil {
|
||||
@ -1062,7 +1062,7 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister,
|
||||
if len(nomNodeName) > 0 {
|
||||
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
||||
podPriority := podutil.GetPodPriority(pod)
|
||||
for _, p := range nodeInfo.Pods() {
|
||||
for _, p := range nodeInfo.Pods {
|
||||
if p.Pod.DeletionTimestamp != nil && podutil.GetPodPriority(p.Pod) < podPriority {
|
||||
// There is a terminating pod on the nominated node.
|
||||
return false
|
||||
|
@ -135,7 +135,7 @@ func (pl *noPodsFilterPlugin) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *noPodsFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if len(nodeInfo.Pods()) == 0 {
|
||||
if len(nodeInfo.Pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
return framework.NewStatus(framework.Unschedulable, ErrReasonFake)
|
||||
|
@ -196,11 +196,11 @@ func New(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin
|
||||
|
||||
// countMatchingPods counts pods based on namespace and matching all selectors
|
||||
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {
|
||||
if len(nodeInfo.Pods()) == 0 || selector.Empty() {
|
||||
if len(nodeInfo.Pods) == 0 || selector.Empty() {
|
||||
return 0
|
||||
}
|
||||
count := 0
|
||||
for _, p := range nodeInfo.Pods() {
|
||||
for _, p := range nodeInfo.Pods {
|
||||
// Ignore pods being deleted for spreading purposes
|
||||
// Similar to how it is done for SelectorSpreadPriority
|
||||
if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
|
||||
|
@ -95,14 +95,11 @@ func calculatePriority(sumScores int64) int64 {
|
||||
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
|
||||
func sumImageScores(nodeInfo *framework.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
|
||||
var sum int64
|
||||
imageStates := nodeInfo.ImageStates()
|
||||
|
||||
for _, container := range containers {
|
||||
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
|
||||
if state, ok := nodeInfo.ImageStates[normalizedImageName(container.Image)]; ok {
|
||||
sum += scaledImageScore(state, totalNumNodes)
|
||||
}
|
||||
}
|
||||
|
||||
return sum
|
||||
}
|
||||
|
||||
|
@ -228,7 +228,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
|
||||
klog.Error("node not found")
|
||||
return
|
||||
}
|
||||
for _, existingPod := range nodeInfo.PodsWithAffinity() {
|
||||
for _, existingPod := range nodeInfo.PodsWithAffinity {
|
||||
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod.Pod, node)
|
||||
if err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
@ -291,7 +291,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame
|
||||
}
|
||||
nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
|
||||
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
// Check affinity terms.
|
||||
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, affinityTerms, 1)
|
||||
|
||||
|
@ -239,10 +239,10 @@ func (pl *InterPodAffinity) PreScore(
|
||||
}
|
||||
// Unless the pod being scheduled has affinity terms, we only
|
||||
// need to process pods with affinity in the node.
|
||||
podsToProcess := nodeInfo.PodsWithAffinity()
|
||||
podsToProcess := nodeInfo.PodsWithAffinity
|
||||
if hasAffinityConstraints || hasAntiAffinityConstraints {
|
||||
// We need to process all the pods.
|
||||
podsToProcess = nodeInfo.Pods()
|
||||
podsToProcess = nodeInfo.Pods
|
||||
}
|
||||
|
||||
topoScore := make(scoreMap)
|
||||
|
@ -118,7 +118,7 @@ func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
|
||||
|
||||
func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
|
||||
// try to see whether existingPorts and wantPorts will conflict or not
|
||||
existingPorts := nodeInfo.UsedPorts()
|
||||
existingPorts := nodeInfo.UsedPorts
|
||||
for _, cp := range wantPorts {
|
||||
if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
|
||||
return false
|
||||
|
@ -384,7 +384,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
maxVolumes := 5
|
||||
nodeInfoList, _ := snapshot.NodeInfos().List()
|
||||
for _, info := range nodeInfoList {
|
||||
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
|
||||
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods, maxVolumes)
|
||||
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
|
||||
}
|
||||
}
|
||||
|
@ -186,13 +186,13 @@ func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo, ignoredExtendedResources se
|
||||
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {
|
||||
insufficientResources := make([]InsufficientResource, 0, 4)
|
||||
|
||||
allowedPodNumber := nodeInfo.AllowedPodNumber()
|
||||
if len(nodeInfo.Pods())+1 > allowedPodNumber {
|
||||
allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
|
||||
if len(nodeInfo.Pods)+1 > allowedPodNumber {
|
||||
insufficientResources = append(insufficientResources, InsufficientResource{
|
||||
v1.ResourcePods,
|
||||
"Too many pods",
|
||||
1,
|
||||
int64(len(nodeInfo.Pods())),
|
||||
int64(len(nodeInfo.Pods)),
|
||||
int64(allowedPodNumber),
|
||||
})
|
||||
}
|
||||
@ -208,32 +208,31 @@ func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignor
|
||||
return insufficientResources
|
||||
}
|
||||
|
||||
allocatable := nodeInfo.AllocatableResource()
|
||||
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
|
||||
if nodeInfo.Allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.Requested.MilliCPU {
|
||||
insufficientResources = append(insufficientResources, InsufficientResource{
|
||||
v1.ResourceCPU,
|
||||
"Insufficient cpu",
|
||||
podRequest.MilliCPU,
|
||||
nodeInfo.RequestedResource().MilliCPU,
|
||||
allocatable.MilliCPU,
|
||||
nodeInfo.Requested.MilliCPU,
|
||||
nodeInfo.Allocatable.MilliCPU,
|
||||
})
|
||||
}
|
||||
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
|
||||
if nodeInfo.Allocatable.Memory < podRequest.Memory+nodeInfo.Requested.Memory {
|
||||
insufficientResources = append(insufficientResources, InsufficientResource{
|
||||
v1.ResourceMemory,
|
||||
"Insufficient memory",
|
||||
podRequest.Memory,
|
||||
nodeInfo.RequestedResource().Memory,
|
||||
allocatable.Memory,
|
||||
nodeInfo.Requested.Memory,
|
||||
nodeInfo.Allocatable.Memory,
|
||||
})
|
||||
}
|
||||
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
|
||||
if nodeInfo.Allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.Requested.EphemeralStorage {
|
||||
insufficientResources = append(insufficientResources, InsufficientResource{
|
||||
v1.ResourceEphemeralStorage,
|
||||
"Insufficient ephemeral-storage",
|
||||
podRequest.EphemeralStorage,
|
||||
nodeInfo.RequestedResource().EphemeralStorage,
|
||||
allocatable.EphemeralStorage,
|
||||
nodeInfo.Requested.EphemeralStorage,
|
||||
nodeInfo.Allocatable.EphemeralStorage,
|
||||
})
|
||||
}
|
||||
|
||||
@ -245,13 +244,13 @@ func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignor
|
||||
continue
|
||||
}
|
||||
}
|
||||
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
|
||||
if nodeInfo.Allocatable.ScalarResources[rName] < rQuant+nodeInfo.Requested.ScalarResources[rName] {
|
||||
insufficientResources = append(insufficientResources, InsufficientResource{
|
||||
rName,
|
||||
fmt.Sprintf("Insufficient %v", rName),
|
||||
podRequest.ScalarResources[rName],
|
||||
nodeInfo.RequestedResource().ScalarResources[rName],
|
||||
allocatable.ScalarResources[rName],
|
||||
nodeInfo.Requested.ScalarResources[rName],
|
||||
nodeInfo.Allocatable.ScalarResources[rName],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -90,20 +90,18 @@ func (r *resourceAllocationScorer) score(
|
||||
|
||||
// calculateResourceAllocatableRequest returns resources Allocatable and Requested values
|
||||
func calculateResourceAllocatableRequest(nodeInfo *framework.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) {
|
||||
allocatable := nodeInfo.AllocatableResource()
|
||||
requested := nodeInfo.RequestedResource()
|
||||
podRequest := calculatePodResourceRequest(pod, resource)
|
||||
switch resource {
|
||||
case v1.ResourceCPU:
|
||||
return allocatable.MilliCPU, (nodeInfo.NonZeroRequest().MilliCPU + podRequest)
|
||||
return nodeInfo.Allocatable.MilliCPU, (nodeInfo.NonZeroRequested.MilliCPU + podRequest)
|
||||
case v1.ResourceMemory:
|
||||
return allocatable.Memory, (nodeInfo.NonZeroRequest().Memory + podRequest)
|
||||
return nodeInfo.Allocatable.Memory, (nodeInfo.NonZeroRequested.Memory + podRequest)
|
||||
|
||||
case v1.ResourceEphemeralStorage:
|
||||
return allocatable.EphemeralStorage, (requested.EphemeralStorage + podRequest)
|
||||
return nodeInfo.Allocatable.EphemeralStorage, (nodeInfo.Requested.EphemeralStorage + podRequest)
|
||||
default:
|
||||
if v1helper.IsScalarResourceName(resource) {
|
||||
return allocatable.ScalarResources[resource], (requested.ScalarResources[resource] + podRequest)
|
||||
return nodeInfo.Allocatable.ScalarResources[resource], (nodeInfo.Requested.ScalarResources[resource] + podRequest)
|
||||
}
|
||||
}
|
||||
if klog.V(10) {
|
||||
|
@ -106,14 +106,14 @@ func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState
|
||||
if err != nil || nodeInfo.Node() == nil {
|
||||
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
|
||||
}
|
||||
allocatableResources := nodeInfo.AllocatableResource()
|
||||
|
||||
podLimits, err := getPodResource(state)
|
||||
if err != nil {
|
||||
return 0, framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
|
||||
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
|
||||
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
|
||||
cpuScore := computeScore(podLimits.MilliCPU, nodeInfo.Allocatable.MilliCPU)
|
||||
memScore := computeScore(podLimits.Memory, nodeInfo.Allocatable.Memory)
|
||||
|
||||
score := int64(0)
|
||||
if cpuScore == 1 || memScore == 1 {
|
||||
|
@ -102,7 +102,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
||||
}
|
||||
|
||||
attachedVolumes := make(map[string]string)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
@ -286,7 +286,7 @@ func NewCSI(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plu
|
||||
|
||||
func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
|
||||
// TODO: stop getting values from Node object in v1.18
|
||||
nodeVolumeLimits := nodeInfo.VolumeLimits()
|
||||
nodeVolumeLimits := volumeLimits(nodeInfo)
|
||||
if csiNode != nil {
|
||||
for i := range csiNode.Spec.Drivers {
|
||||
d := csiNode.Spec.Drivers[i]
|
||||
|
@ -235,7 +235,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
|
||||
// count unique volumes
|
||||
existingVolumes := make(map[string]bool)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
@ -251,7 +251,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
|
||||
numNewVolumes := len(newVolumes)
|
||||
maxAttachLimit := pl.maxVolumeFunc(node)
|
||||
volumeLimits := nodeInfo.VolumeLimits()
|
||||
volumeLimits := volumeLimits(nodeInfo)
|
||||
if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {
|
||||
maxAttachLimit = int(maxAttachLimitFromAllocatable)
|
||||
}
|
||||
|
@ -19,12 +19,14 @@ package nodevolumelimits
|
||||
import (
|
||||
"strings"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
)
|
||||
|
||||
// isCSIMigrationOn returns a boolean value indicating whether
|
||||
@ -79,3 +81,14 @@ func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
|
||||
|
||||
return mpaSet.Has(pluginName)
|
||||
}
|
||||
|
||||
// volumeLimits returns volume limits associated with the node.
|
||||
func volumeLimits(n *framework.NodeInfo) map[v1.ResourceName]int64 {
|
||||
volumeLimits := map[v1.ResourceName]int64{}
|
||||
for k, v := range n.Allocatable.ScalarResources {
|
||||
if v1helper.IsAttachableVolumeResourceName(k) {
|
||||
volumeLimits[k] = v
|
||||
}
|
||||
}
|
||||
return volumeLimits
|
||||
}
|
||||
|
@ -255,7 +255,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
|
||||
if tpCount == nil {
|
||||
continue
|
||||
}
|
||||
count := countPodsMatchSelector(nodeInfo.Pods(), constraint.Selector, pod.Namespace)
|
||||
count := countPodsMatchSelector(nodeInfo.Pods, constraint.Selector, pod.Namespace)
|
||||
atomic.AddInt32(tpCount, int32(count))
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func (pl *PodTopologySpread) PreScore(
|
||||
if tpCount == nil {
|
||||
continue
|
||||
}
|
||||
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
|
||||
count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
|
||||
atomic.AddInt64(tpCount, int64(count))
|
||||
}
|
||||
}
|
||||
@ -178,7 +178,7 @@ func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.Cy
|
||||
for _, c := range s.Constraints {
|
||||
if tpVal, ok := node.Labels[c.TopologyKey]; ok {
|
||||
if c.TopologyKey == v1.LabelHostname {
|
||||
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
|
||||
count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
|
||||
score += int64(count)
|
||||
} else {
|
||||
pair := topologyPair{key: c.TopologyKey, value: tpVal}
|
||||
|
@ -278,11 +278,11 @@ func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleStat
|
||||
selector = labels.NewSelector()
|
||||
}
|
||||
|
||||
if len(nodeInfo.Pods()) == 0 || selector.Empty() {
|
||||
if len(nodeInfo.Pods) == 0 || selector.Empty() {
|
||||
return 0, nil
|
||||
}
|
||||
var score int64
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
// Ignore pods being deleted for spreading purposes
|
||||
// Similar to how it is done for SelectorSpreadPriority
|
||||
if pod.Namespace == existingPod.Pod.Namespace && existingPod.Pod.DeletionTimestamp == nil {
|
||||
|
@ -119,7 +119,7 @@ func haveOverlap(a1, a2 []string) bool {
|
||||
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
|
||||
func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
for _, v := range pod.Spec.Volumes {
|
||||
for _, ev := range nodeInfo.Pods() {
|
||||
for _, ev := range nodeInfo.Pods {
|
||||
if isVolumeConflict(v, ev.Pod) {
|
||||
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
|
||||
}
|
||||
|
@ -83,44 +83,40 @@ type NodeInfo struct {
|
||||
// Overall node information.
|
||||
node *v1.Node
|
||||
|
||||
pods []*PodInfo
|
||||
podsWithAffinity []*PodInfo
|
||||
usedPorts HostPortInfo
|
||||
// Pods running on the node.
|
||||
Pods []*PodInfo
|
||||
|
||||
// The subset of pods with affinity.
|
||||
PodsWithAffinity []*PodInfo
|
||||
|
||||
// Ports allocated on the node.
|
||||
UsedPorts HostPortInfo
|
||||
|
||||
// Total requested resources of all pods on this node. This includes assumed
|
||||
// pods, which scheduler has sent for binding, but may not be scheduled yet.
|
||||
requestedResource *Resource
|
||||
Requested *Resource
|
||||
// Total requested resources of all pods on this node with a minimum value
|
||||
// applied to each container's CPU and memory requests. This does not reflect
|
||||
// the actual resource requests for this node, but is used to avoid scheduling
|
||||
// many zero-request pods onto one node.
|
||||
nonzeroRequest *Resource
|
||||
NonZeroRequested *Resource
|
||||
// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
|
||||
// as int64, to avoid conversions and accessing map.
|
||||
allocatableResource *Resource
|
||||
Allocatable *Resource
|
||||
|
||||
// Cached taints of the node for faster lookup.
|
||||
taints []v1.Taint
|
||||
taintsErr error
|
||||
|
||||
// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
|
||||
// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
|
||||
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
|
||||
// state information.
|
||||
imageStates map[string]*ImageStateSummary
|
||||
ImageStates map[string]*ImageStateSummary
|
||||
|
||||
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
|
||||
// scheduling cycle.
|
||||
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
|
||||
TransientInfo *TransientSchedulerInfo
|
||||
|
||||
// Cached conditions of node for faster lookup.
|
||||
memoryPressureCondition v1.ConditionStatus
|
||||
diskPressureCondition v1.ConditionStatus
|
||||
pidPressureCondition v1.ConditionStatus
|
||||
|
||||
// Whenever NodeInfo changes, generation is bumped.
|
||||
// This is used to avoid cloning it if the object didn't change.
|
||||
generation int64
|
||||
Generation int64
|
||||
}
|
||||
|
||||
//initializeNodeTransientInfo initializes transient information pertaining to node.
|
||||
@ -301,13 +297,13 @@ func (r *Resource) SetMaxResource(rl v1.ResourceList) {
|
||||
// the returned object.
|
||||
func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
|
||||
ni := &NodeInfo{
|
||||
requestedResource: &Resource{},
|
||||
nonzeroRequest: &Resource{},
|
||||
allocatableResource: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
generation: nextGeneration(),
|
||||
usedPorts: make(HostPortInfo),
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
Allocatable: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Generation: nextGeneration(),
|
||||
UsedPorts: make(HostPortInfo),
|
||||
ImageStates: make(map[string]*ImageStateSummary),
|
||||
}
|
||||
for _, pod := range pods {
|
||||
ni.AddPod(pod)
|
||||
@ -323,211 +319,98 @@ func (n *NodeInfo) Node() *v1.Node {
|
||||
return n.node
|
||||
}
|
||||
|
||||
// Pods return all pods scheduled (including assumed to be) on this node.
|
||||
func (n *NodeInfo) Pods() []*PodInfo {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.pods
|
||||
}
|
||||
|
||||
// UsedPorts returns used ports on this node.
|
||||
func (n *NodeInfo) UsedPorts() HostPortInfo {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.usedPorts
|
||||
}
|
||||
|
||||
// SetUsedPorts sets the used ports on this node.
|
||||
func (n *NodeInfo) SetUsedPorts(newUsedPorts HostPortInfo) {
|
||||
n.usedPorts = newUsedPorts
|
||||
}
|
||||
|
||||
// ImageStates returns the state information of all images.
|
||||
func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.imageStates
|
||||
}
|
||||
|
||||
// SetImageStates sets the state information of all images.
|
||||
func (n *NodeInfo) SetImageStates(newImageStates map[string]*ImageStateSummary) {
|
||||
n.imageStates = newImageStates
|
||||
}
|
||||
|
||||
// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
|
||||
func (n *NodeInfo) PodsWithAffinity() []*PodInfo {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.podsWithAffinity
|
||||
}
|
||||
|
||||
// AllowedPodNumber returns the number of the allowed pods on this node.
|
||||
func (n *NodeInfo) AllowedPodNumber() int {
|
||||
if n == nil || n.allocatableResource == nil {
|
||||
return 0
|
||||
}
|
||||
return n.allocatableResource.AllowedPodNumber
|
||||
}
|
||||
|
||||
// Taints returns the taints list on this node.
|
||||
// TODO(#89528): Exists only because of kubelet dependency, remove.
|
||||
func (n *NodeInfo) Taints() ([]v1.Taint, error) {
|
||||
if n == nil {
|
||||
if n == nil || n.node.Spec.Taints == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return n.taints, n.taintsErr
|
||||
}
|
||||
|
||||
// SetTaints sets the taints list on this node.
|
||||
func (n *NodeInfo) SetTaints(newTaints []v1.Taint) {
|
||||
n.taints = newTaints
|
||||
}
|
||||
|
||||
// RequestedResource returns aggregated resource request of pods on this node.
|
||||
func (n *NodeInfo) RequestedResource() Resource {
|
||||
if n == nil {
|
||||
return emptyResource
|
||||
}
|
||||
return *n.requestedResource
|
||||
}
|
||||
|
||||
// SetRequestedResource sets the aggregated resource request of pods on this node.
|
||||
func (n *NodeInfo) SetRequestedResource(newResource *Resource) {
|
||||
n.requestedResource = newResource
|
||||
}
|
||||
|
||||
// NonZeroRequest returns aggregated nonzero resource request of pods on this node.
|
||||
func (n *NodeInfo) NonZeroRequest() Resource {
|
||||
if n == nil {
|
||||
return emptyResource
|
||||
}
|
||||
return *n.nonzeroRequest
|
||||
}
|
||||
|
||||
// SetNonZeroRequest sets the aggregated nonzero resource request of pods on this node.
|
||||
func (n *NodeInfo) SetNonZeroRequest(newResource *Resource) {
|
||||
n.nonzeroRequest = newResource
|
||||
return n.node.Spec.Taints, nil
|
||||
}
|
||||
|
||||
// AllocatableResource returns allocatable resources on a given node.
|
||||
// TODO(#89528): Exists only because of kubelet dependency, remove.
|
||||
func (n *NodeInfo) AllocatableResource() Resource {
|
||||
if n == nil {
|
||||
return emptyResource
|
||||
}
|
||||
return *n.allocatableResource
|
||||
return *n.Allocatable
|
||||
}
|
||||
|
||||
// SetAllocatableResource sets the allocatableResource information of given node.
|
||||
// TODO(#89528): Exists only because of kubelet dependency, remove.
|
||||
func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) {
|
||||
n.allocatableResource = allocatableResource
|
||||
n.generation = nextGeneration()
|
||||
}
|
||||
|
||||
// GetGeneration returns the generation on this node.
|
||||
func (n *NodeInfo) GetGeneration() int64 {
|
||||
if n == nil {
|
||||
return 0
|
||||
}
|
||||
return n.generation
|
||||
}
|
||||
|
||||
// SetGeneration sets the generation on this node. This is for testing only.
|
||||
func (n *NodeInfo) SetGeneration(newGeneration int64) {
|
||||
n.generation = newGeneration
|
||||
n.Allocatable = allocatableResource
|
||||
n.Generation = nextGeneration()
|
||||
}
|
||||
|
||||
// Clone returns a copy of this node.
|
||||
func (n *NodeInfo) Clone() *NodeInfo {
|
||||
clone := &NodeInfo{
|
||||
node: n.node,
|
||||
requestedResource: n.requestedResource.Clone(),
|
||||
nonzeroRequest: n.nonzeroRequest.Clone(),
|
||||
allocatableResource: n.allocatableResource.Clone(),
|
||||
taintsErr: n.taintsErr,
|
||||
TransientInfo: n.TransientInfo,
|
||||
memoryPressureCondition: n.memoryPressureCondition,
|
||||
diskPressureCondition: n.diskPressureCondition,
|
||||
pidPressureCondition: n.pidPressureCondition,
|
||||
usedPorts: make(HostPortInfo),
|
||||
imageStates: n.imageStates,
|
||||
generation: n.generation,
|
||||
node: n.node,
|
||||
Requested: n.Requested.Clone(),
|
||||
NonZeroRequested: n.NonZeroRequested.Clone(),
|
||||
Allocatable: n.Allocatable.Clone(),
|
||||
TransientInfo: n.TransientInfo,
|
||||
UsedPorts: make(HostPortInfo),
|
||||
ImageStates: n.ImageStates,
|
||||
Generation: n.Generation,
|
||||
}
|
||||
if len(n.pods) > 0 {
|
||||
clone.pods = append([]*PodInfo(nil), n.pods...)
|
||||
if len(n.Pods) > 0 {
|
||||
clone.Pods = append([]*PodInfo(nil), n.Pods...)
|
||||
}
|
||||
if len(n.usedPorts) > 0 {
|
||||
if len(n.UsedPorts) > 0 {
|
||||
// HostPortInfo is a map-in-map struct
|
||||
// make sure it's deep copied
|
||||
for ip, portMap := range n.usedPorts {
|
||||
clone.usedPorts[ip] = make(map[ProtocolPort]struct{})
|
||||
for ip, portMap := range n.UsedPorts {
|
||||
clone.UsedPorts[ip] = make(map[ProtocolPort]struct{})
|
||||
for protocolPort, v := range portMap {
|
||||
clone.usedPorts[ip][protocolPort] = v
|
||||
clone.UsedPorts[ip][protocolPort] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(n.podsWithAffinity) > 0 {
|
||||
clone.podsWithAffinity = append([]*PodInfo(nil), n.podsWithAffinity...)
|
||||
}
|
||||
if len(n.taints) > 0 {
|
||||
clone.taints = append([]v1.Taint(nil), n.taints...)
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
|
||||
}
|
||||
return clone
|
||||
}
|
||||
|
||||
// VolumeLimits returns volume limits associated with the node
|
||||
func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 {
|
||||
volumeLimits := map[v1.ResourceName]int64{}
|
||||
for k, v := range n.AllocatableResource().ScalarResources {
|
||||
if v1helper.IsAttachableVolumeResourceName(k) {
|
||||
volumeLimits[k] = v
|
||||
}
|
||||
}
|
||||
return volumeLimits
|
||||
}
|
||||
|
||||
// String returns representation of human readable format of this NodeInfo.
|
||||
func (n *NodeInfo) String() string {
|
||||
podKeys := make([]string, len(n.pods))
|
||||
for i, p := range n.pods {
|
||||
podKeys := make([]string, len(n.Pods))
|
||||
for i, p := range n.Pods {
|
||||
podKeys[i] = p.Pod.Name
|
||||
}
|
||||
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}",
|
||||
podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts, n.allocatableResource)
|
||||
}
|
||||
|
||||
func hasPodAffinityConstraints(pod *v1.Pod) bool {
|
||||
affinity := pod.Spec.Affinity
|
||||
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
|
||||
podKeys, n.Requested, n.NonZeroRequested, n.UsedPorts, n.Allocatable)
|
||||
}
|
||||
|
||||
// AddPod adds pod information to this NodeInfo.
|
||||
func (n *NodeInfo) AddPod(pod *v1.Pod) {
|
||||
// TODO(#89528): AddPod should accept a PodInfo as an input argument.
|
||||
podInfo := NewPodInfo(pod)
|
||||
res, non0CPU, non0Mem := calculateResource(podInfo.Pod)
|
||||
n.requestedResource.MilliCPU += res.MilliCPU
|
||||
n.requestedResource.Memory += res.Memory
|
||||
n.requestedResource.EphemeralStorage += res.EphemeralStorage
|
||||
if n.requestedResource.ScalarResources == nil && len(res.ScalarResources) > 0 {
|
||||
n.requestedResource.ScalarResources = map[v1.ResourceName]int64{}
|
||||
res, non0CPU, non0Mem := calculateResource(pod)
|
||||
n.Requested.MilliCPU += res.MilliCPU
|
||||
n.Requested.Memory += res.Memory
|
||||
n.Requested.EphemeralStorage += res.EphemeralStorage
|
||||
if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
|
||||
n.Requested.ScalarResources = map[v1.ResourceName]int64{}
|
||||
}
|
||||
for rName, rQuant := range res.ScalarResources {
|
||||
n.requestedResource.ScalarResources[rName] += rQuant
|
||||
n.Requested.ScalarResources[rName] += rQuant
|
||||
}
|
||||
n.nonzeroRequest.MilliCPU += non0CPU
|
||||
n.nonzeroRequest.Memory += non0Mem
|
||||
n.pods = append(n.pods, podInfo)
|
||||
if hasPodAffinityConstraints(podInfo.Pod) {
|
||||
n.podsWithAffinity = append(n.podsWithAffinity, podInfo)
|
||||
n.NonZeroRequested.MilliCPU += non0CPU
|
||||
n.NonZeroRequested.Memory += non0Mem
|
||||
n.Pods = append(n.Pods, podInfo)
|
||||
affinity := pod.Spec.Affinity
|
||||
if affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) {
|
||||
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
|
||||
}
|
||||
|
||||
// Consume ports when pods added.
|
||||
n.UpdateUsedPorts(podInfo.Pod, true)
|
||||
n.updateUsedPorts(podInfo.Pod, true)
|
||||
|
||||
n.generation = nextGeneration()
|
||||
n.Generation = nextGeneration()
|
||||
}
|
||||
|
||||
// RemovePod subtracts pod information from this NodeInfo.
|
||||
@ -537,48 +420,48 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range n.podsWithAffinity {
|
||||
k2, err := GetPodKey(n.podsWithAffinity[i].Pod)
|
||||
for i := range n.PodsWithAffinity {
|
||||
k2, err := GetPodKey(n.PodsWithAffinity[i].Pod)
|
||||
if err != nil {
|
||||
klog.Errorf("Cannot get pod key, err: %v", err)
|
||||
continue
|
||||
}
|
||||
if k1 == k2 {
|
||||
// delete the element
|
||||
n.podsWithAffinity[i] = n.podsWithAffinity[len(n.podsWithAffinity)-1]
|
||||
n.podsWithAffinity = n.podsWithAffinity[:len(n.podsWithAffinity)-1]
|
||||
n.PodsWithAffinity[i] = n.PodsWithAffinity[len(n.PodsWithAffinity)-1]
|
||||
n.PodsWithAffinity = n.PodsWithAffinity[:len(n.PodsWithAffinity)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
for i := range n.pods {
|
||||
k2, err := GetPodKey(n.pods[i].Pod)
|
||||
for i := range n.Pods {
|
||||
k2, err := GetPodKey(n.Pods[i].Pod)
|
||||
if err != nil {
|
||||
klog.Errorf("Cannot get pod key, err: %v", err)
|
||||
continue
|
||||
}
|
||||
if k1 == k2 {
|
||||
// delete the element
|
||||
n.pods[i] = n.pods[len(n.pods)-1]
|
||||
n.pods = n.pods[:len(n.pods)-1]
|
||||
n.Pods[i] = n.Pods[len(n.Pods)-1]
|
||||
n.Pods = n.Pods[:len(n.Pods)-1]
|
||||
// reduce the resource data
|
||||
res, non0CPU, non0Mem := calculateResource(pod)
|
||||
|
||||
n.requestedResource.MilliCPU -= res.MilliCPU
|
||||
n.requestedResource.Memory -= res.Memory
|
||||
n.requestedResource.EphemeralStorage -= res.EphemeralStorage
|
||||
if len(res.ScalarResources) > 0 && n.requestedResource.ScalarResources == nil {
|
||||
n.requestedResource.ScalarResources = map[v1.ResourceName]int64{}
|
||||
n.Requested.MilliCPU -= res.MilliCPU
|
||||
n.Requested.Memory -= res.Memory
|
||||
n.Requested.EphemeralStorage -= res.EphemeralStorage
|
||||
if len(res.ScalarResources) > 0 && n.Requested.ScalarResources == nil {
|
||||
n.Requested.ScalarResources = map[v1.ResourceName]int64{}
|
||||
}
|
||||
for rName, rQuant := range res.ScalarResources {
|
||||
n.requestedResource.ScalarResources[rName] -= rQuant
|
||||
n.Requested.ScalarResources[rName] -= rQuant
|
||||
}
|
||||
n.nonzeroRequest.MilliCPU -= non0CPU
|
||||
n.nonzeroRequest.Memory -= non0Mem
|
||||
n.NonZeroRequested.MilliCPU -= non0CPU
|
||||
n.NonZeroRequested.Memory -= non0Mem
|
||||
|
||||
// Release ports when remove Pods.
|
||||
n.UpdateUsedPorts(pod, false)
|
||||
n.updateUsedPorts(pod, false)
|
||||
|
||||
n.generation = nextGeneration()
|
||||
n.Generation = nextGeneration()
|
||||
n.resetSlicesIfEmpty()
|
||||
return nil
|
||||
}
|
||||
@ -588,11 +471,11 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
|
||||
|
||||
// resets the slices to nil so that we can do DeepEqual in unit tests.
|
||||
func (n *NodeInfo) resetSlicesIfEmpty() {
|
||||
if len(n.podsWithAffinity) == 0 {
|
||||
n.podsWithAffinity = nil
|
||||
if len(n.PodsWithAffinity) == 0 {
|
||||
n.PodsWithAffinity = nil
|
||||
}
|
||||
if len(n.pods) == 0 {
|
||||
n.pods = nil
|
||||
if len(n.Pods) == 0 {
|
||||
n.Pods = nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -623,16 +506,16 @@ func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateUsedPorts updates the UsedPorts of NodeInfo.
|
||||
func (n *NodeInfo) UpdateUsedPorts(pod *v1.Pod, add bool) {
|
||||
// updateUsedPorts updates the UsedPorts of NodeInfo.
|
||||
func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
|
||||
for j := range pod.Spec.Containers {
|
||||
container := &pod.Spec.Containers[j]
|
||||
for k := range container.Ports {
|
||||
podPort := &container.Ports[k]
|
||||
if add {
|
||||
n.usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
n.UsedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
} else {
|
||||
n.usedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
n.UsedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -641,25 +524,9 @@ func (n *NodeInfo) UpdateUsedPorts(pod *v1.Pod, add bool) {
|
||||
// SetNode sets the overall node information.
|
||||
func (n *NodeInfo) SetNode(node *v1.Node) error {
|
||||
n.node = node
|
||||
|
||||
n.allocatableResource = NewResource(node.Status.Allocatable)
|
||||
|
||||
n.taints = node.Spec.Taints
|
||||
for i := range node.Status.Conditions {
|
||||
cond := &node.Status.Conditions[i]
|
||||
switch cond.Type {
|
||||
case v1.NodeMemoryPressure:
|
||||
n.memoryPressureCondition = cond.Status
|
||||
case v1.NodeDiskPressure:
|
||||
n.diskPressureCondition = cond.Status
|
||||
case v1.NodePIDPressure:
|
||||
n.pidPressureCondition = cond.Status
|
||||
default:
|
||||
// We ignore other conditions.
|
||||
}
|
||||
}
|
||||
n.Allocatable = NewResource(node.Status.Allocatable)
|
||||
n.TransientInfo = NewTransientSchedulerInfo()
|
||||
n.generation = nextGeneration()
|
||||
n.Generation = nextGeneration()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -686,7 +553,7 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, np := range n.Pods() {
|
||||
for _, np := range n.Pods {
|
||||
npodkey, _ := GetPodKey(np.Pod)
|
||||
if npodkey == podKey {
|
||||
filtered = append(filtered, p)
|
||||
|
@ -285,31 +285,31 @@ func TestNewNodeInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
expected := &NodeInfo{
|
||||
requestedResource: &Resource{
|
||||
Requested: &Resource{
|
||||
MilliCPU: 300,
|
||||
Memory: 1524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
nonzeroRequest: &Resource{
|
||||
NonZeroRequested: &Resource{
|
||||
MilliCPU: 300,
|
||||
Memory: 1524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 2,
|
||||
usedPorts: HostPortInfo{
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -373,10 +373,10 @@ func TestNewNodeInfo(t *testing.T) {
|
||||
|
||||
gen := generation
|
||||
ni := NewNodeInfo(pods...)
|
||||
if ni.generation <= gen {
|
||||
t.Errorf("generation is not incremented. previous: %v, current: %v", gen, ni.generation)
|
||||
if ni.Generation <= gen {
|
||||
t.Errorf("Generation is not incremented. previous: %v, current: %v", gen, ni.Generation)
|
||||
}
|
||||
expected.generation = ni.generation
|
||||
expected.Generation = ni.Generation
|
||||
if !reflect.DeepEqual(expected, ni) {
|
||||
t.Errorf("expected: %#v, got: %#v", expected, ni)
|
||||
}
|
||||
@ -390,19 +390,19 @@ func TestNodeInfoClone(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
nodeInfo: &NodeInfo{
|
||||
requestedResource: &Resource{},
|
||||
nonzeroRequest: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 2,
|
||||
usedPorts: HostPortInfo{
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -464,19 +464,19 @@ func TestNodeInfoClone(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: &NodeInfo{
|
||||
requestedResource: &Resource{},
|
||||
nonzeroRequest: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 2,
|
||||
usedPorts: HostPortInfo{
|
||||
Requested: &Resource{},
|
||||
NonZeroRequested: &Resource{},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -543,8 +543,8 @@ func TestNodeInfoClone(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
ni := test.nodeInfo.Clone()
|
||||
// Modify the field to check if the result is a clone of the origin one.
|
||||
test.nodeInfo.generation += 10
|
||||
test.nodeInfo.usedPorts.Remove("127.0.0.1", "TCP", 80)
|
||||
test.nodeInfo.Generation += 10
|
||||
test.nodeInfo.UsedPorts.Remove("127.0.0.1", "TCP", 80)
|
||||
if !reflect.DeepEqual(test.expected, ni) {
|
||||
t.Errorf("expected: %#v, got: %#v", test.expected, ni)
|
||||
}
|
||||
@ -621,31 +621,31 @@ func TestNodeInfoAddPod(t *testing.T) {
|
||||
Name: "test-node",
|
||||
},
|
||||
},
|
||||
requestedResource: &Resource{
|
||||
Requested: &Resource{
|
||||
MilliCPU: 1300,
|
||||
Memory: 1000,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
nonzeroRequest: &Resource{
|
||||
NonZeroRequested: &Resource{
|
||||
MilliCPU: 1300,
|
||||
Memory: 209716200, //200MB + 1000 specified in requests/overhead
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 2,
|
||||
usedPorts: HostPortInfo{
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -714,16 +714,16 @@ func TestNodeInfoAddPod(t *testing.T) {
|
||||
}
|
||||
|
||||
ni := fakeNodeInfo()
|
||||
gen := ni.generation
|
||||
gen := ni.Generation
|
||||
for _, pod := range pods {
|
||||
ni.AddPod(pod)
|
||||
if ni.generation <= gen {
|
||||
t.Errorf("generation is not incremented. Prev: %v, current: %v", gen, ni.generation)
|
||||
if ni.Generation <= gen {
|
||||
t.Errorf("Generation is not incremented. Prev: %v, current: %v", gen, ni.Generation)
|
||||
}
|
||||
gen = ni.generation
|
||||
gen = ni.Generation
|
||||
}
|
||||
|
||||
expected.generation = ni.generation
|
||||
expected.Generation = ni.Generation
|
||||
if !reflect.DeepEqual(expected, ni) {
|
||||
t.Errorf("expected: %#v, got: %#v", expected, ni)
|
||||
}
|
||||
@ -759,31 +759,31 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
Name: "test-node",
|
||||
},
|
||||
},
|
||||
requestedResource: &Resource{
|
||||
Requested: &Resource{
|
||||
MilliCPU: 1300,
|
||||
Memory: 2524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
nonzeroRequest: &Resource{
|
||||
NonZeroRequested: &Resource{
|
||||
MilliCPU: 1300,
|
||||
Memory: 2524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 2,
|
||||
usedPorts: HostPortInfo{
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 2,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 80}: {},
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -892,30 +892,30 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
Name: "test-node",
|
||||
},
|
||||
},
|
||||
requestedResource: &Resource{
|
||||
Requested: &Resource{
|
||||
MilliCPU: 700,
|
||||
Memory: 1524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
nonzeroRequest: &Resource{
|
||||
NonZeroRequested: &Resource{
|
||||
MilliCPU: 700,
|
||||
Memory: 1524,
|
||||
EphemeralStorage: 0,
|
||||
AllowedPodNumber: 0,
|
||||
ScalarResources: map[v1.ResourceName]int64(nil),
|
||||
},
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
allocatableResource: &Resource{},
|
||||
generation: 3,
|
||||
usedPorts: HostPortInfo{
|
||||
TransientInfo: NewTransientSchedulerInfo(),
|
||||
Allocatable: &Resource{},
|
||||
Generation: 3,
|
||||
UsedPorts: HostPortInfo{
|
||||
"127.0.0.1": map[ProtocolPort]struct{}{
|
||||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*PodInfo{
|
||||
ImageStates: map[string]*ImageStateSummary{},
|
||||
Pods: []*PodInfo{
|
||||
{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -957,7 +957,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
ni := fakeNodeInfo(pods...)
|
||||
|
||||
gen := ni.generation
|
||||
gen := ni.Generation
|
||||
err := ni.RemovePod(test.pod)
|
||||
if err != nil {
|
||||
if test.errExpected {
|
||||
@ -969,12 +969,12 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
||||
t.Errorf("expected no error, got: %v", err)
|
||||
}
|
||||
} else {
|
||||
if ni.generation <= gen {
|
||||
t.Errorf("generation is not incremented. Prev: %v, current: %v", gen, ni.generation)
|
||||
if ni.Generation <= gen {
|
||||
t.Errorf("Generation is not incremented. Prev: %v, current: %v", gen, ni.Generation)
|
||||
}
|
||||
}
|
||||
|
||||
test.expectedNodeInfo.generation = ni.generation
|
||||
test.expectedNodeInfo.Generation = ni.Generation
|
||||
if !reflect.DeepEqual(test.expectedNodeInfo, ni) {
|
||||
t.Errorf("expected: %#v, got: %#v", test.expectedNodeInfo, ni)
|
||||
}
|
||||
|
16
pkg/scheduler/internal/cache/cache.go
vendored
16
pkg/scheduler/internal/cache/cache.go
vendored
@ -218,7 +218,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
// Start from the head of the NodeInfo doubly linked list and update snapshot
|
||||
// of NodeInfos updated after the last snapshot.
|
||||
for node := cache.headNode; node != nil; node = node.next {
|
||||
if node.info.GetGeneration() <= snapshotGeneration {
|
||||
if node.info.Generation <= snapshotGeneration {
|
||||
// all the nodes are updated before the existing snapshot. We are done.
|
||||
break
|
||||
}
|
||||
@ -237,7 +237,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
// We track nodes that have pods with affinity, here we check if this node changed its
|
||||
// status from having pods with affinity to NOT having pods with affinity or the other
|
||||
// way around.
|
||||
if (len(existing.PodsWithAffinity()) > 0) != (len(clone.PodsWithAffinity()) > 0) {
|
||||
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
|
||||
updateNodesHavePodsWithAffinity = true
|
||||
}
|
||||
// We need to preserve the original pointer of the NodeInfo struct since it
|
||||
@ -247,7 +247,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
}
|
||||
// Update the snapshot generation with the latest NodeInfo generation.
|
||||
if cache.headNode != nil {
|
||||
nodeSnapshot.generation = cache.headNode.info.GetGeneration()
|
||||
nodeSnapshot.generation = cache.headNode.info.Generation
|
||||
}
|
||||
|
||||
if len(nodeSnapshot.nodeInfoMap) > len(cache.nodes) {
|
||||
@ -284,7 +284,7 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
|
||||
nodeName := cache.nodeTree.next()
|
||||
if n := snapshot.nodeInfoMap[nodeName]; n != nil {
|
||||
snapshot.nodeInfoList = append(snapshot.nodeInfoList, n)
|
||||
if len(n.PodsWithAffinity()) > 0 {
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
|
||||
}
|
||||
} else {
|
||||
@ -293,7 +293,7 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
|
||||
}
|
||||
} else {
|
||||
for _, n := range snapshot.nodeInfoList {
|
||||
if len(n.PodsWithAffinity()) > 0 {
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
|
||||
}
|
||||
}
|
||||
@ -322,11 +322,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro
|
||||
// pre-allocating capacity.
|
||||
maxSize := 0
|
||||
for _, n := range cache.nodes {
|
||||
maxSize += len(n.info.Pods())
|
||||
maxSize += len(n.info.Pods)
|
||||
}
|
||||
pods := make([]*v1.Pod, 0, maxSize)
|
||||
for _, n := range cache.nodes {
|
||||
for _, p := range n.info.Pods() {
|
||||
for _, p := range n.info.Pods {
|
||||
if selector.Matches(labels.Set(p.Pod.Labels)) {
|
||||
pods = append(pods, p.Pod)
|
||||
}
|
||||
@ -664,7 +664,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framewo
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeInfo.SetImageStates(newSum)
|
||||
nodeInfo.ImageStates = newSum
|
||||
}
|
||||
|
||||
// removeNodeImageStates removes the given node record from image entries having the node
|
||||
|
36
pkg/scheduler/internal/cache/cache_test.go
vendored
36
pkg/scheduler/internal/cache/cache_test.go
vendored
@ -41,10 +41,10 @@ func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *framework.No
|
||||
}
|
||||
// Ignore generation field.
|
||||
if actual != nil {
|
||||
actual.info.SetGeneration(0)
|
||||
actual.info.Generation = 0
|
||||
}
|
||||
if expected != nil {
|
||||
expected.SetGeneration(0)
|
||||
expected.Generation = 0
|
||||
}
|
||||
if actual != nil && !reflect.DeepEqual(actual.info, expected) {
|
||||
return fmt.Errorf("got node info %s, want %s", actual.info, expected)
|
||||
@ -85,11 +85,10 @@ func newNodeInfo(requestedResource *framework.Resource,
|
||||
imageStates map[string]*framework.ImageStateSummary,
|
||||
) *framework.NodeInfo {
|
||||
nodeInfo := framework.NewNodeInfo(pods...)
|
||||
nodeInfo.SetRequestedResource(requestedResource)
|
||||
nodeInfo.SetNonZeroRequest(nonzeroRequest)
|
||||
nodeInfo.SetUsedPorts(usedPorts)
|
||||
nodeInfo.SetImageStates(imageStates)
|
||||
|
||||
nodeInfo.Requested = requestedResource
|
||||
nodeInfo.NonZeroRequested = nonzeroRequest
|
||||
nodeInfo.UsedPorts = usedPorts
|
||||
nodeInfo.ImageStates = imageStates
|
||||
return nodeInfo
|
||||
}
|
||||
|
||||
@ -931,13 +930,11 @@ func TestForgetPod(t *testing.T) {
|
||||
func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *framework.NodeInfo {
|
||||
expected := framework.NewNodeInfo()
|
||||
expected.SetNode(node)
|
||||
expected.SetAllocatableResource(framework.NewResource(node.Status.Allocatable))
|
||||
expected.SetTaints(node.Spec.Taints)
|
||||
expected.SetGeneration(expected.GetGeneration() + 1)
|
||||
expected.Allocatable = framework.NewResource(node.Status.Allocatable)
|
||||
expected.Generation++
|
||||
for _, pod := range pods {
|
||||
expected.AddPod(pod)
|
||||
}
|
||||
|
||||
return expected
|
||||
}
|
||||
|
||||
@ -1098,7 +1095,7 @@ func TestNodeOperators(t *testing.T) {
|
||||
}
|
||||
|
||||
// Generations are globally unique. We check in our unit tests that they are incremented correctly.
|
||||
expected.SetGeneration(got.info.GetGeneration())
|
||||
expected.Generation = got.info.Generation
|
||||
if !reflect.DeepEqual(got.info, expected) {
|
||||
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
|
||||
}
|
||||
@ -1112,17 +1109,14 @@ func TestNodeOperators(t *testing.T) {
|
||||
if !found || len(cachedNodes.nodeInfoMap) != 1 {
|
||||
t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
|
||||
}
|
||||
expected.SetGeneration(newNode.GetGeneration())
|
||||
expected.Generation = newNode.Generation
|
||||
if !reflect.DeepEqual(newNode, expected) {
|
||||
t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
|
||||
}
|
||||
|
||||
// Step 3: update node attribute successfully.
|
||||
node.Status.Allocatable[v1.ResourceMemory] = mem50m
|
||||
allocatableResource := expected.AllocatableResource()
|
||||
newAllocatableResource := &allocatableResource
|
||||
newAllocatableResource.Memory = mem50m.Value()
|
||||
expected.SetAllocatableResource(newAllocatableResource)
|
||||
expected.Allocatable.Memory = mem50m.Value()
|
||||
|
||||
if err := cache.UpdateNode(nil, node); err != nil {
|
||||
t.Error(err)
|
||||
@ -1131,10 +1125,10 @@ func TestNodeOperators(t *testing.T) {
|
||||
if !found {
|
||||
t.Errorf("Failed to find node %v in schedulertypes after UpdateNode.", node.Name)
|
||||
}
|
||||
if got.info.GetGeneration() <= expected.GetGeneration() {
|
||||
t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration())
|
||||
if got.info.Generation <= expected.Generation {
|
||||
t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.Generation, expected.Generation)
|
||||
}
|
||||
expected.SetGeneration(got.info.GetGeneration())
|
||||
expected.Generation = got.info.Generation
|
||||
|
||||
if !reflect.DeepEqual(got.info, expected) {
|
||||
t.Errorf("Failed to update node in schedulertypes:\n got: %+v \nexpected: %+v", got, expected)
|
||||
@ -1520,7 +1514,7 @@ func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *Snapshot)
|
||||
nodeName := cache.nodeTree.next()
|
||||
if n := snapshot.nodeInfoMap[nodeName]; n != nil {
|
||||
expectedNodeInfoList = append(expectedNodeInfoList, n)
|
||||
if len(n.PodsWithAffinity()) > 0 {
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
|
||||
}
|
||||
} else {
|
||||
|
@ -91,7 +91,7 @@ func (c *CacheComparer) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[s
|
||||
|
||||
cached := []string{}
|
||||
for _, nodeinfo := range nodeinfos {
|
||||
for _, p := range nodeinfo.Pods() {
|
||||
for _, p := range nodeinfo.Pods {
|
||||
cached = append(cached, string(p.Pod.UID))
|
||||
}
|
||||
}
|
||||
|
@ -64,9 +64,9 @@ func (d *CacheDumper) dumpSchedulingQueue() {
|
||||
func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string {
|
||||
var nodeData strings.Builder
|
||||
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
|
||||
n.Node().Name, n.RequestedResource(), n.AllocatableResource(), len(n.Pods())))
|
||||
n.Node().Name, n.Requested, n.Allocatable, len(n.Pods)))
|
||||
// Dumping Pod Info
|
||||
for _, p := range n.Pods() {
|
||||
for _, p := range n.Pods {
|
||||
nodeData.WriteString(printPod(p.Pod))
|
||||
}
|
||||
// Dumping nominated pods info on the node
|
||||
|
8
pkg/scheduler/internal/cache/snapshot.go
vendored
8
pkg/scheduler/internal/cache/snapshot.go
vendored
@ -53,7 +53,7 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
|
||||
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
|
||||
for _, v := range nodeInfoMap {
|
||||
nodeInfoList = append(nodeInfoList, v)
|
||||
if len(v.PodsWithAffinity()) > 0 {
|
||||
if len(v.PodsWithAffinity) > 0 {
|
||||
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
|
||||
}
|
||||
}
|
||||
@ -86,7 +86,7 @@ func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.N
|
||||
}
|
||||
nodeInfo := nodeNameToInfo[node.Name]
|
||||
nodeInfo.SetNode(node)
|
||||
nodeInfo.SetImageStates(getNodeImageStates(node, imageExistenceMap))
|
||||
nodeInfo.ImageStates = getNodeImageStates(node, imageExistenceMap)
|
||||
}
|
||||
return nodeNameToInfo
|
||||
}
|
||||
@ -153,11 +153,11 @@ func (p podLister) FilteredList(filter framework.PodFilter, selector labels.Sele
|
||||
// pre-allocating capacity.
|
||||
maxSize := 0
|
||||
for _, n := range p {
|
||||
maxSize += len(n.Pods())
|
||||
maxSize += len(n.Pods)
|
||||
}
|
||||
pods := make([]*v1.Pod, 0, maxSize)
|
||||
for _, n := range p {
|
||||
for _, p := range n.Pods() {
|
||||
for _, p := range n.Pods {
|
||||
if filter(p.Pod) && selector.Matches(labels.Set(p.Pod.Labels)) {
|
||||
pods = append(pods, p.Pod)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user