mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #90319 from ahg-g/ahg-info2
Split scheduler's PodInfo into two types, PodInfo and PodQueueInfo
This commit is contained in:
commit
eb50f4dd09
@ -456,8 +456,8 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
||||||
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.PodInfo, error) {
|
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
|
||||||
return func(podInfo *framework.PodInfo, err error) {
|
return func(podInfo *framework.QueuedPodInfo, err error) {
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
if err == core.ErrNoNodesAvailable {
|
if err == core.ErrNoNodesAvailable {
|
||||||
klog.V(2).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
klog.V(2).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
||||||
|
@ -328,7 +328,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||||||
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||||
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
|
||||||
testPodInfo := &framework.PodInfo{Pod: testPod}
|
testPodInfo := &framework.QueuedPodInfo{Pod: testPod}
|
||||||
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{*nodeBar}})
|
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{*nodeBar}})
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
@ -37,8 +37,8 @@ func (pl *PrioritySort) Name() string {
|
|||||||
|
|
||||||
// Less is the function used by the activeQ heap algorithm to sort pods.
|
// Less is the function used by the activeQ heap algorithm to sort pods.
|
||||||
// It sorts pods based on their priority. When priorities are equal, it uses
|
// It sorts pods based on their priority. When priorities are equal, it uses
|
||||||
// PodInfo.timestamp.
|
// PodQueueInfo.timestamp.
|
||||||
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.PodInfo) bool {
|
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
|
||||||
p1 := pod.GetPodPriority(pInfo1.Pod)
|
p1 := pod.GetPodPriority(pInfo1.Pod)
|
||||||
p2 := pod.GetPodPriority(pInfo2.Pod)
|
p2 := pod.GetPodPriority(pInfo2.Pod)
|
||||||
return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
|
return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
|
||||||
|
@ -31,20 +31,20 @@ func TestLess(t *testing.T) {
|
|||||||
t2 := t1.Add(time.Second)
|
t2 := t1.Add(time.Second)
|
||||||
for _, tt := range []struct {
|
for _, tt := range []struct {
|
||||||
name string
|
name string
|
||||||
p1 *framework.PodInfo
|
p1 *framework.QueuedPodInfo
|
||||||
p2 *framework.PodInfo
|
p2 *framework.QueuedPodInfo
|
||||||
expected bool
|
expected bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "p1.priority less than p2.priority",
|
name: "p1.priority less than p2.priority",
|
||||||
p1: &framework.PodInfo{
|
p1: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &lowPriority,
|
Priority: &lowPriority,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
p2: &framework.PodInfo{
|
p2: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
@ -55,14 +55,14 @@ func TestLess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "p1.priority greater than p2.priority",
|
name: "p1.priority greater than p2.priority",
|
||||||
p1: &framework.PodInfo{
|
p1: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
p2: &framework.PodInfo{
|
p2: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &lowPriority,
|
Priority: &lowPriority,
|
||||||
@ -73,7 +73,7 @@ func TestLess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "equal priority. p1 is added to schedulingQ earlier than p2",
|
name: "equal priority. p1 is added to schedulingQ earlier than p2",
|
||||||
p1: &framework.PodInfo{
|
p1: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
@ -81,7 +81,7 @@ func TestLess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Timestamp: t1,
|
Timestamp: t1,
|
||||||
},
|
},
|
||||||
p2: &framework.PodInfo{
|
p2: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
@ -93,7 +93,7 @@ func TestLess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "equal priority. p2 is added to schedulingQ earlier than p1",
|
name: "equal priority. p2 is added to schedulingQ earlier than p1",
|
||||||
p1: &framework.PodInfo{
|
p1: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
@ -101,7 +101,7 @@ func TestLess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Timestamp: t2,
|
Timestamp: t2,
|
||||||
},
|
},
|
||||||
p2: &framework.PodInfo{
|
p2: &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Priority: &highPriority,
|
Priority: &highPriority,
|
||||||
|
@ -295,7 +295,7 @@ func (f *framework) QueueSortFunc() LessFunc {
|
|||||||
if f == nil {
|
if f == nil {
|
||||||
// If framework is nil, simply keep their order unchanged.
|
// If framework is nil, simply keep their order unchanged.
|
||||||
// NOTE: this is primarily for tests.
|
// NOTE: this is primarily for tests.
|
||||||
return func(_, _ *PodInfo) bool { return false }
|
return func(_, _ *QueuedPodInfo) bool { return false }
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(f.queueSortPlugins) == 0 {
|
if len(f.queueSortPlugins) == 0 {
|
||||||
|
@ -288,7 +288,7 @@ func (pl *TestQueueSortPlugin) Name() string {
|
|||||||
return queueSortPlugin
|
return queueSortPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pl *TestQueueSortPlugin) Less(_, _ *PodInfo) bool {
|
func (pl *TestQueueSortPlugin) Less(_, _ *QueuedPodInfo) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +211,7 @@ type Plugin interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LessFunc is the function to sort pod info
|
// LessFunc is the function to sort pod info
|
||||||
type LessFunc func(podInfo1, podInfo2 *PodInfo) bool
|
type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool
|
||||||
|
|
||||||
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
|
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
|
||||||
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
|
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
|
||||||
@ -219,7 +219,7 @@ type LessFunc func(podInfo1, podInfo2 *PodInfo) bool
|
|||||||
type QueueSortPlugin interface {
|
type QueueSortPlugin interface {
|
||||||
Plugin
|
Plugin
|
||||||
// Less are used to sort pods in the scheduling queue.
|
// Less are used to sort pods in the scheduling queue.
|
||||||
Less(*PodInfo, *PodInfo) bool
|
Less(*QueuedPodInfo, *QueuedPodInfo) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreFilterExtensions is an interface that is included in plugins that allow specifying
|
// PreFilterExtensions is an interface that is included in plugins that allow specifying
|
||||||
|
@ -34,9 +34,10 @@ import (
|
|||||||
|
|
||||||
var generation int64
|
var generation int64
|
||||||
|
|
||||||
// PodInfo is a wrapper to a Pod with additional information for purposes such as tracking
|
// QueuedPodInfo is a Pod wrapper with additional information related to
|
||||||
// the timestamp when it's added to the queue or recording per-pod metrics.
|
// the pod's status in the scheduling queue, such as the timestamp when
|
||||||
type PodInfo struct {
|
// it's added to the queue.
|
||||||
|
type QueuedPodInfo struct {
|
||||||
Pod *v1.Pod
|
Pod *v1.Pod
|
||||||
// The time pod added to the scheduling queue.
|
// The time pod added to the scheduling queue.
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
@ -50,16 +51,23 @@ type PodInfo struct {
|
|||||||
InitialAttemptTimestamp time.Time
|
InitialAttemptTimestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeepCopy returns a deep copy of the PodInfo object.
|
// DeepCopy returns a deep copy of the QueuedPodInfo object.
|
||||||
func (podInfo *PodInfo) DeepCopy() *PodInfo {
|
func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
|
||||||
return &PodInfo{
|
return &QueuedPodInfo{
|
||||||
Pod: podInfo.Pod.DeepCopy(),
|
Pod: pqi.Pod.DeepCopy(),
|
||||||
Timestamp: podInfo.Timestamp,
|
Timestamp: pqi.Timestamp,
|
||||||
Attempts: podInfo.Attempts,
|
Attempts: pqi.Attempts,
|
||||||
InitialAttemptTimestamp: podInfo.InitialAttemptTimestamp,
|
InitialAttemptTimestamp: pqi.InitialAttemptTimestamp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PodInfo is a wrapper to a Pod with additional pre-computed information to
|
||||||
|
// accelerate processing. This information is typically immutable (e.g., pre-processed
|
||||||
|
// inter-pod affinity selectors).
|
||||||
|
type PodInfo struct {
|
||||||
|
Pod *v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
// NewPodInfo return a new PodInfo
|
// NewPodInfo return a new PodInfo
|
||||||
func NewPodInfo(pod *v1.Pod) *PodInfo {
|
func NewPodInfo(pod *v1.Pod) *PodInfo {
|
||||||
return &PodInfo{
|
return &PodInfo{
|
||||||
@ -359,7 +367,6 @@ func (n *NodeInfo) String() string {
|
|||||||
|
|
||||||
// AddPod adds pod information to this NodeInfo.
|
// AddPod adds pod information to this NodeInfo.
|
||||||
func (n *NodeInfo) AddPod(pod *v1.Pod) {
|
func (n *NodeInfo) AddPod(pod *v1.Pod) {
|
||||||
// TODO(#89528): AddPod should accept a PodInfo as an input argument.
|
|
||||||
podInfo := NewPodInfo(pod)
|
podInfo := NewPodInfo(pod)
|
||||||
res, non0CPU, non0Mem := calculateResource(pod)
|
res, non0CPU, non0Mem := calculateResource(pod)
|
||||||
n.Requested.MilliCPU += res.MilliCPU
|
n.Requested.MilliCPU += res.MilliCPU
|
||||||
|
@ -69,14 +69,14 @@ type SchedulingQueue interface {
|
|||||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
||||||
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
||||||
// returned by calling SchedulingCycle().
|
// returned by calling SchedulingCycle().
|
||||||
AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error
|
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
|
||||||
// SchedulingCycle returns the current number of scheduling cycle which is
|
// SchedulingCycle returns the current number of scheduling cycle which is
|
||||||
// cached by scheduling queue. Normally, incrementing this number whenever
|
// cached by scheduling queue. Normally, incrementing this number whenever
|
||||||
// a pod is popped (e.g. called Pop()) is enough.
|
// a pod is popped (e.g. called Pop()) is enough.
|
||||||
SchedulingCycle() int64
|
SchedulingCycle() int64
|
||||||
// Pop removes the head of the queue and returns it. It blocks if the
|
// Pop removes the head of the queue and returns it. It blocks if the
|
||||||
// queue is empty and waits until a new item is added to the queue.
|
// queue is empty and waits until a new item is added to the queue.
|
||||||
Pop() (*framework.PodInfo, error)
|
Pop() (*framework.QueuedPodInfo, error)
|
||||||
Update(oldPod, newPod *v1.Pod) error
|
Update(oldPod, newPod *v1.Pod) error
|
||||||
Delete(pod *v1.Pod) error
|
Delete(pod *v1.Pod) error
|
||||||
MoveAllToActiveOrBackoffQueue(event string)
|
MoveAllToActiveOrBackoffQueue(event string)
|
||||||
@ -191,9 +191,9 @@ var defaultPriorityQueueOptions = priorityQueueOptions{
|
|||||||
// Making sure that PriorityQueue implements SchedulingQueue.
|
// Making sure that PriorityQueue implements SchedulingQueue.
|
||||||
var _ SchedulingQueue = &PriorityQueue{}
|
var _ SchedulingQueue = &PriorityQueue{}
|
||||||
|
|
||||||
// newPodInfoNoTimestamp builds a PodInfo object without timestamp.
|
// newQueuedPodInfoNoTimestamp builds a QueuedPodInfo object without timestamp.
|
||||||
func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
|
func newQueuedPodInfoNoTimestamp(pod *v1.Pod) *framework.QueuedPodInfo {
|
||||||
return &framework.PodInfo{
|
return &framework.QueuedPodInfo{
|
||||||
Pod: pod,
|
Pod: pod,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -209,8 +209,8 @@ func NewPriorityQueue(
|
|||||||
}
|
}
|
||||||
|
|
||||||
comp := func(podInfo1, podInfo2 interface{}) bool {
|
comp := func(podInfo1, podInfo2 interface{}) bool {
|
||||||
pInfo1 := podInfo1.(*framework.PodInfo)
|
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
|
||||||
pInfo2 := podInfo2.(*framework.PodInfo)
|
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
|
||||||
return lessFn(pInfo1, pInfo2)
|
return lessFn(pInfo1, pInfo2)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,7 +241,7 @@ func (p *PriorityQueue) Run() {
|
|||||||
func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
pInfo := p.newPodInfo(pod)
|
pInfo := p.newQueuedPodInfo(pod)
|
||||||
if err := p.activeQ.Add(pInfo); err != nil {
|
if err := p.activeQ.Add(pInfo); err != nil {
|
||||||
klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
|
klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
|
||||||
return err
|
return err
|
||||||
@ -271,7 +271,7 @@ func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
|
|||||||
|
|
||||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||||
// If this returns true, the pod should not be re-tried.
|
// If this returns true, the pod should not be re-tried.
|
||||||
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.PodInfo) bool {
|
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
|
||||||
boTime := p.getBackoffTime(podInfo)
|
boTime := p.getBackoffTime(podInfo)
|
||||||
return boTime.After(p.clock.Now())
|
return boTime.After(p.clock.Now())
|
||||||
}
|
}
|
||||||
@ -287,7 +287,7 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
|
|||||||
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||||
// unschedulable pods in `unschedulableQ`. But if there has been a recent move
|
// unschedulable pods in `unschedulableQ`. But if there has been a recent move
|
||||||
// request, then the pod is put in `podBackoffQ`.
|
// request, then the pod is put in `podBackoffQ`.
|
||||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error {
|
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
pod := pInfo.Pod
|
pod := pInfo.Pod
|
||||||
@ -330,8 +330,8 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
|
|||||||
if rawPodInfo == nil {
|
if rawPodInfo == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod := rawPodInfo.(*framework.PodInfo).Pod
|
pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
|
||||||
boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
|
boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
|
||||||
if boTime.After(p.clock.Now()) {
|
if boTime.After(p.clock.Now()) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -352,7 +352,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
|
|||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
var podsToMove []*framework.PodInfo
|
var podsToMove []*framework.QueuedPodInfo
|
||||||
currentTime := p.clock.Now()
|
currentTime := p.clock.Now()
|
||||||
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
||||||
lastScheduleTime := pInfo.Timestamp
|
lastScheduleTime := pInfo.Timestamp
|
||||||
@ -369,7 +369,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
|
|||||||
// Pop removes the head of the active queue and returns it. It blocks if the
|
// Pop removes the head of the active queue and returns it. It blocks if the
|
||||||
// activeQ is empty and waits until a new item is added to the queue. It
|
// activeQ is empty and waits until a new item is added to the queue. It
|
||||||
// increments scheduling cycle when a pod is popped.
|
// increments scheduling cycle when a pod is popped.
|
||||||
func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
|
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
for p.activeQ.Len() == 0 {
|
for p.activeQ.Len() == 0 {
|
||||||
@ -385,7 +385,7 @@ func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pInfo := obj.(*framework.PodInfo)
|
pInfo := obj.(*framework.QueuedPodInfo)
|
||||||
pInfo.Attempts++
|
pInfo.Attempts++
|
||||||
p.schedulingCycle++
|
p.schedulingCycle++
|
||||||
return pInfo, err
|
return pInfo, err
|
||||||
@ -413,7 +413,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
|||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
if oldPod != nil {
|
if oldPod != nil {
|
||||||
oldPodInfo := newPodInfoNoTimestamp(oldPod)
|
oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod)
|
||||||
// If the pod is already in the active queue, just update it there.
|
// If the pod is already in the active queue, just update it there.
|
||||||
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
|
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
|
||||||
p.nominatedPods.update(oldPod, newPod)
|
p.nominatedPods.update(oldPod, newPod)
|
||||||
@ -449,7 +449,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// If pod is not in any of the queues, we put it in the active queue.
|
// If pod is not in any of the queues, we put it in the active queue.
|
||||||
err := p.activeQ.Add(p.newPodInfo(newPod))
|
err := p.activeQ.Add(p.newQueuedPodInfo(newPod))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
p.nominatedPods.add(newPod, "")
|
p.nominatedPods.add(newPod, "")
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
@ -463,9 +463,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
|
|||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
p.nominatedPods.delete(pod)
|
p.nominatedPods.delete(pod)
|
||||||
err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
|
err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod))
|
||||||
if err != nil { // The item was probably not found in the activeQ.
|
if err != nil { // The item was probably not found in the activeQ.
|
||||||
p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
|
p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod))
|
||||||
p.unschedulableQ.delete(pod)
|
p.unschedulableQ.delete(pod)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -494,7 +494,7 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
|
|||||||
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
|
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
|
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap))
|
||||||
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
||||||
unschedulablePods = append(unschedulablePods, pInfo)
|
unschedulablePods = append(unschedulablePods, pInfo)
|
||||||
}
|
}
|
||||||
@ -504,7 +504,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: this function assumes lock has been acquired in caller
|
// NOTE: this function assumes lock has been acquired in caller
|
||||||
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
|
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
|
||||||
for _, pInfo := range podInfoList {
|
for _, pInfo := range podInfoList {
|
||||||
pod := pInfo.Pod
|
pod := pInfo.Pod
|
||||||
if p.isPodBackingoff(pInfo) {
|
if p.isPodBackingoff(pInfo) {
|
||||||
@ -530,8 +530,8 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
|
|||||||
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
|
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
|
||||||
// any affinity term that matches "pod".
|
// any affinity term that matches "pod".
|
||||||
// NOTE: this function assumes lock has been acquired in caller.
|
// NOTE: this function assumes lock has been acquired in caller.
|
||||||
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
|
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
|
||||||
var podsToMove []*framework.PodInfo
|
var podsToMove []*framework.QueuedPodInfo
|
||||||
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
||||||
up := pInfo.Pod
|
up := pInfo.Pod
|
||||||
affinity := up.Spec.Affinity
|
affinity := up.Spec.Affinity
|
||||||
@ -569,10 +569,10 @@ func (p *PriorityQueue) PendingPods() []*v1.Pod {
|
|||||||
defer p.lock.RUnlock()
|
defer p.lock.RUnlock()
|
||||||
result := []*v1.Pod{}
|
result := []*v1.Pod{}
|
||||||
for _, pInfo := range p.activeQ.List() {
|
for _, pInfo := range p.activeQ.List() {
|
||||||
result = append(result, pInfo.(*framework.PodInfo).Pod)
|
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
|
||||||
}
|
}
|
||||||
for _, pInfo := range p.podBackoffQ.List() {
|
for _, pInfo := range p.podBackoffQ.List() {
|
||||||
result = append(result, pInfo.(*framework.PodInfo).Pod)
|
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
|
||||||
}
|
}
|
||||||
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
for _, pInfo := range p.unschedulableQ.podInfoMap {
|
||||||
result = append(result, pInfo.Pod)
|
result = append(result, pInfo.Pod)
|
||||||
@ -607,8 +607,8 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
|
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
|
||||||
pInfo1 := podInfo1.(*framework.PodInfo)
|
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
|
||||||
pInfo2 := podInfo2.(*framework.PodInfo)
|
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
|
||||||
bo1 := p.getBackoffTime(pInfo1)
|
bo1 := p.getBackoffTime(pInfo1)
|
||||||
bo2 := p.getBackoffTime(pInfo2)
|
bo2 := p.getBackoffTime(pInfo2)
|
||||||
return bo1.Before(bo2)
|
return bo1.Before(bo2)
|
||||||
@ -621,10 +621,10 @@ func (p *PriorityQueue) NumUnschedulablePods() int {
|
|||||||
return len(p.unschedulableQ.podInfoMap)
|
return len(p.unschedulableQ.podInfoMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPodInfo builds a PodInfo object.
|
// newQueuedPodInfo builds a QueuedPodInfo object.
|
||||||
func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
|
func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod) *framework.QueuedPodInfo {
|
||||||
now := p.clock.Now()
|
now := p.clock.Now()
|
||||||
return &framework.PodInfo{
|
return &framework.QueuedPodInfo{
|
||||||
Pod: pod,
|
Pod: pod,
|
||||||
Timestamp: now,
|
Timestamp: now,
|
||||||
InitialAttemptTimestamp: now,
|
InitialAttemptTimestamp: now,
|
||||||
@ -632,7 +632,7 @@ func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getBackoffTime returns the time that podInfo completes backoff
|
// getBackoffTime returns the time that podInfo completes backoff
|
||||||
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
|
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
|
||||||
duration := p.calculateBackoffDuration(podInfo)
|
duration := p.calculateBackoffDuration(podInfo)
|
||||||
backoffTime := podInfo.Timestamp.Add(duration)
|
backoffTime := podInfo.Timestamp.Add(duration)
|
||||||
return backoffTime
|
return backoffTime
|
||||||
@ -640,7 +640,7 @@ func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
|
|||||||
|
|
||||||
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
||||||
// based on the number of attempts the pod has made.
|
// based on the number of attempts the pod has made.
|
||||||
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
|
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
|
||||||
duration := p.podInitialBackoffDuration
|
duration := p.podInitialBackoffDuration
|
||||||
for i := 1; i < podInfo.Attempts; i++ {
|
for i := 1; i < podInfo.Attempts; i++ {
|
||||||
duration = duration * 2
|
duration = duration * 2
|
||||||
@ -651,8 +651,8 @@ func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) tim
|
|||||||
return duration
|
return duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
|
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.QueuedPodInfo {
|
||||||
pInfo := oldPodInfo.(*framework.PodInfo)
|
pInfo := oldPodInfo.(*framework.QueuedPodInfo)
|
||||||
pInfo.Pod = newPod
|
pInfo.Pod = newPod
|
||||||
return pInfo
|
return pInfo
|
||||||
}
|
}
|
||||||
@ -660,8 +660,8 @@ func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
|
|||||||
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
|
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
|
||||||
// is used to implement unschedulableQ.
|
// is used to implement unschedulableQ.
|
||||||
type UnschedulablePodsMap struct {
|
type UnschedulablePodsMap struct {
|
||||||
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
|
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.
|
||||||
podInfoMap map[string]*framework.PodInfo
|
podInfoMap map[string]*framework.QueuedPodInfo
|
||||||
keyFunc func(*v1.Pod) string
|
keyFunc func(*v1.Pod) string
|
||||||
// metricRecorder updates the counter when elements of an unschedulablePodsMap
|
// metricRecorder updates the counter when elements of an unschedulablePodsMap
|
||||||
// get added or removed, and it does nothing if it's nil
|
// get added or removed, and it does nothing if it's nil
|
||||||
@ -669,7 +669,7 @@ type UnschedulablePodsMap struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add adds a pod to the unschedulable podInfoMap.
|
// Add adds a pod to the unschedulable podInfoMap.
|
||||||
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
|
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.QueuedPodInfo) {
|
||||||
podID := u.keyFunc(pInfo.Pod)
|
podID := u.keyFunc(pInfo.Pod)
|
||||||
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
|
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
|
||||||
u.metricRecorder.Inc()
|
u.metricRecorder.Inc()
|
||||||
@ -686,9 +686,9 @@ func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
|
|||||||
delete(u.podInfoMap, podID)
|
delete(u.podInfoMap, podID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the PodInfo if a pod with the same key as the key of the given "pod"
|
// Get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod"
|
||||||
// is found in the map. It returns nil otherwise.
|
// is found in the map. It returns nil otherwise.
|
||||||
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
|
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.QueuedPodInfo {
|
||||||
podKey := u.keyFunc(pod)
|
podKey := u.keyFunc(pod)
|
||||||
if pInfo, exists := u.podInfoMap[podKey]; exists {
|
if pInfo, exists := u.podInfoMap[podKey]; exists {
|
||||||
return pInfo
|
return pInfo
|
||||||
@ -698,7 +698,7 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
|
|||||||
|
|
||||||
// Clear removes all the entries from the unschedulable podInfoMap.
|
// Clear removes all the entries from the unschedulable podInfoMap.
|
||||||
func (u *UnschedulablePodsMap) clear() {
|
func (u *UnschedulablePodsMap) clear() {
|
||||||
u.podInfoMap = make(map[string]*framework.PodInfo)
|
u.podInfoMap = make(map[string]*framework.QueuedPodInfo)
|
||||||
if u.metricRecorder != nil {
|
if u.metricRecorder != nil {
|
||||||
u.metricRecorder.Clear()
|
u.metricRecorder.Clear()
|
||||||
}
|
}
|
||||||
@ -707,7 +707,7 @@ func (u *UnschedulablePodsMap) clear() {
|
|||||||
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
|
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
|
||||||
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
|
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
|
||||||
return &UnschedulablePodsMap{
|
return &UnschedulablePodsMap{
|
||||||
podInfoMap: make(map[string]*framework.PodInfo),
|
podInfoMap: make(map[string]*framework.QueuedPodInfo),
|
||||||
keyFunc: util.GetPodFullName,
|
keyFunc: util.GetPodFullName,
|
||||||
metricRecorder: metricRecorder,
|
metricRecorder: metricRecorder,
|
||||||
}
|
}
|
||||||
@ -803,8 +803,8 @@ func newNominatedPodMap() *nominatedPodMap {
|
|||||||
|
|
||||||
// MakeNextPodFunc returns a function to retrieve the next pod from a given
|
// MakeNextPodFunc returns a function to retrieve the next pod from a given
|
||||||
// scheduling queue
|
// scheduling queue
|
||||||
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
|
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
|
||||||
return func() *framework.PodInfo {
|
return func() *framework.QueuedPodInfo {
|
||||||
podInfo, err := queue.Pop()
|
podInfo, err := queue.Pop()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
|
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
|
||||||
@ -816,5 +816,5 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func podInfoKeyFunc(obj interface{}) (string, error) {
|
func podInfoKeyFunc(obj interface{}) (string, error) {
|
||||||
return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
|
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
|
||||||
}
|
}
|
||||||
|
@ -177,8 +177,8 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
|
|||||||
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&highPriNominatedPod)
|
q.Add(&highPriNominatedPod)
|
||||||
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
|
||||||
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
||||||
expectedNominatedPods := &nominatedPodMap{
|
expectedNominatedPods := &nominatedPodMap{
|
||||||
nominatedPodToNode: map[types.UID]string{
|
nominatedPodToNode: map[types.UID]string{
|
||||||
unschedulablePod.UID: "node1",
|
unschedulablePod.UID: "node1",
|
||||||
@ -257,7 +257,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(unschedulablePod), oldCycle); err != nil {
|
if err := q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(unschedulablePod), oldCycle); err != nil {
|
||||||
t.Errorf("Failed to call AddUnschedulableIfNotPresent(%v): %v", unschedulablePod.Name, err)
|
t.Errorf("Failed to call AddUnschedulableIfNotPresent(%v): %v", unschedulablePod.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -266,7 +266,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
|||||||
// Since there was a move request at the same cycle as "oldCycle", these pods
|
// Since there was a move request at the same cycle as "oldCycle", these pods
|
||||||
// should be in the backoff queue.
|
// should be in the backoff queue.
|
||||||
for i := 1; i < totalNum; i++ {
|
for i := 1; i < totalNum; i++ {
|
||||||
if _, exists, _ := q.podBackoffQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists {
|
if _, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoNoTimestamp(&expectedPods[i])); !exists {
|
||||||
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -294,7 +294,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
||||||
q.Update(nil, &highPriorityPod)
|
q.Update(nil, &highPriorityPod)
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists {
|
||||||
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
|
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
@ -315,7 +315,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
// add the pod to activeQ.
|
// add the pod to activeQ.
|
||||||
q.Update(&unschedulablePod, &unschedulablePod)
|
q.Update(&unschedulablePod, &unschedulablePod)
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
@ -325,7 +325,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
t.Error("Expected unschedulableQ to be empty.")
|
t.Error("Expected unschedulableQ to be empty.")
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
@ -334,7 +334,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Updating a pod that is in unschedulableQ in a way that it may
|
// Updating a pod that is in unschedulableQ in a way that it may
|
||||||
// become schedulable should add the pod to the activeQ.
|
// become schedulable should add the pod to the activeQ.
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&medPriorityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&medPriorityPod), q.SchedulingCycle())
|
||||||
if len(q.unschedulableQ.podInfoMap) != 1 {
|
if len(q.unschedulableQ.podInfoMap) != 1 {
|
||||||
t.Error("Expected unschedulableQ to be 1.")
|
t.Error("Expected unschedulableQ to be 1.")
|
||||||
}
|
}
|
||||||
@ -354,10 +354,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
|
|||||||
t.Errorf("delete failed: %v", err)
|
t.Errorf("delete failed: %v", err)
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriNominatedPod)); exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists {
|
||||||
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
|
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
@ -375,8 +375,8 @@ func TestPriorityQueue_Delete(t *testing.T) {
|
|||||||
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
||||||
q.MoveAllToActiveOrBackoffQueue("test")
|
q.MoveAllToActiveOrBackoffQueue("test")
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
defer q.lock.RUnlock()
|
defer q.lock.RUnlock()
|
||||||
@ -428,8 +428,8 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
|||||||
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
// Add a couple of pods to the unschedulableQ.
|
// Add a couple of pods to the unschedulableQ.
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(affinityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(affinityPod), q.SchedulingCycle())
|
||||||
|
|
||||||
// Move clock to make the unschedulable pods complete backoff.
|
// Move clock to make the unschedulable pods complete backoff.
|
||||||
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
||||||
@ -440,7 +440,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
|||||||
t.Error("affinityPod is still in the unschedulableQ.")
|
t.Error("affinityPod is still in the unschedulableQ.")
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
q.lock.RLock()
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(affinityPod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(affinityPod)); !exists {
|
||||||
t.Error("affinityPod is not moved to activeQ.")
|
t.Error("affinityPod is not moved to activeQ.")
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
@ -478,8 +478,8 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
|
|||||||
|
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
||||||
|
|
||||||
expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod})
|
expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod})
|
||||||
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
|
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
|
||||||
@ -630,30 +630,30 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
podsToAdd []*v1.Pod
|
podsToAdd []*v1.Pod
|
||||||
expectedMapAfterAdd map[string]*framework.PodInfo
|
expectedMapAfterAdd map[string]*framework.QueuedPodInfo
|
||||||
podsToUpdate []*v1.Pod
|
podsToUpdate []*v1.Pod
|
||||||
expectedMapAfterUpdate map[string]*framework.PodInfo
|
expectedMapAfterUpdate map[string]*framework.QueuedPodInfo
|
||||||
podsToDelete []*v1.Pod
|
podsToDelete []*v1.Pod
|
||||||
expectedMapAfterDelete map[string]*framework.PodInfo
|
expectedMapAfterDelete map[string]*framework.QueuedPodInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "create, update, delete subset of pods",
|
name: "create, update, delete subset of pods",
|
||||||
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
|
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
|
||||||
expectedMapAfterAdd: map[string]*framework.PodInfo{
|
expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
||||||
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
||||||
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
||||||
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
||||||
},
|
},
|
||||||
podsToUpdate: []*v1.Pod{updatedPods[0]},
|
podsToUpdate: []*v1.Pod{updatedPods[0]},
|
||||||
expectedMapAfterUpdate: map[string]*framework.PodInfo{
|
expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[0]): {Pod: updatedPods[0]},
|
util.GetPodFullName(pods[0]): {Pod: updatedPods[0]},
|
||||||
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
||||||
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
||||||
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
||||||
},
|
},
|
||||||
podsToDelete: []*v1.Pod{pods[0], pods[1]},
|
podsToDelete: []*v1.Pod{pods[0], pods[1]},
|
||||||
expectedMapAfterDelete: map[string]*framework.PodInfo{
|
expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
||||||
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
||||||
},
|
},
|
||||||
@ -661,32 +661,32 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "create, update, delete all",
|
name: "create, update, delete all",
|
||||||
podsToAdd: []*v1.Pod{pods[0], pods[3]},
|
podsToAdd: []*v1.Pod{pods[0], pods[3]},
|
||||||
expectedMapAfterAdd: map[string]*framework.PodInfo{
|
expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
||||||
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
util.GetPodFullName(pods[3]): {Pod: pods[3]},
|
||||||
},
|
},
|
||||||
podsToUpdate: []*v1.Pod{updatedPods[3]},
|
podsToUpdate: []*v1.Pod{updatedPods[3]},
|
||||||
expectedMapAfterUpdate: map[string]*framework.PodInfo{
|
expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
util.GetPodFullName(pods[0]): {Pod: pods[0]},
|
||||||
util.GetPodFullName(pods[3]): {Pod: updatedPods[3]},
|
util.GetPodFullName(pods[3]): {Pod: updatedPods[3]},
|
||||||
},
|
},
|
||||||
podsToDelete: []*v1.Pod{pods[0], pods[3]},
|
podsToDelete: []*v1.Pod{pods[0], pods[3]},
|
||||||
expectedMapAfterDelete: map[string]*framework.PodInfo{},
|
expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "delete non-existing and existing pods",
|
name: "delete non-existing and existing pods",
|
||||||
podsToAdd: []*v1.Pod{pods[1], pods[2]},
|
podsToAdd: []*v1.Pod{pods[1], pods[2]},
|
||||||
expectedMapAfterAdd: map[string]*framework.PodInfo{
|
expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
util.GetPodFullName(pods[1]): {Pod: pods[1]},
|
||||||
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
||||||
},
|
},
|
||||||
podsToUpdate: []*v1.Pod{updatedPods[1]},
|
podsToUpdate: []*v1.Pod{updatedPods[1]},
|
||||||
expectedMapAfterUpdate: map[string]*framework.PodInfo{
|
expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
|
util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
|
||||||
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
util.GetPodFullName(pods[2]): {Pod: pods[2]},
|
||||||
},
|
},
|
||||||
podsToDelete: []*v1.Pod{pods[2], pods[3]},
|
podsToDelete: []*v1.Pod{pods[2], pods[3]},
|
||||||
expectedMapAfterDelete: map[string]*framework.PodInfo{
|
expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{
|
||||||
util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
|
util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -696,7 +696,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
upm := newUnschedulablePodsMap(nil)
|
upm := newUnschedulablePodsMap(nil)
|
||||||
for _, p := range test.podsToAdd {
|
for _, p := range test.podsToAdd {
|
||||||
upm.addOrUpdate(newPodInfoNoTimestamp(p))
|
upm.addOrUpdate(newQueuedPodInfoNoTimestamp(p))
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterAdd) {
|
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterAdd) {
|
||||||
t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v",
|
t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v",
|
||||||
@ -705,7 +705,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
|||||||
|
|
||||||
if len(test.podsToUpdate) > 0 {
|
if len(test.podsToUpdate) > 0 {
|
||||||
for _, p := range test.podsToUpdate {
|
for _, p := range test.podsToUpdate {
|
||||||
upm.addOrUpdate(newPodInfoNoTimestamp(p))
|
upm.addOrUpdate(newQueuedPodInfoNoTimestamp(p))
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterUpdate) {
|
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterUpdate) {
|
||||||
t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v",
|
t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v",
|
||||||
@ -847,7 +847,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Put in the unschedulable queue
|
// Put in the unschedulable queue
|
||||||
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
||||||
// Move clock to make the unschedulable pods complete backoff.
|
// Move clock to make the unschedulable pods complete backoff.
|
||||||
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
@ -890,7 +890,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// And then, put unschedulable pod to the unschedulable queue
|
// And then, put unschedulable pod to the unschedulable queue
|
||||||
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
||||||
// Move clock to make the unschedulable pods complete backoff.
|
// Move clock to make the unschedulable pods complete backoff.
|
||||||
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
c.Step(DefaultPodInitialBackoffDuration + time.Second)
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
@ -1018,8 +1018,8 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
|
|||||||
Message: "fake scheduling failure",
|
Message: "fake scheduling failure",
|
||||||
})
|
})
|
||||||
|
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newPodInfo(&midPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod), q.SchedulingCycle())
|
||||||
c.Step(unschedulableQTimeInterval + time.Second)
|
c.Step(unschedulableQTimeInterval + time.Second)
|
||||||
|
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != &highPod {
|
if p, err := q.Pop(); err != nil || p.Pod != &highPod {
|
||||||
@ -1030,29 +1030,29 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type operation func(queue *PriorityQueue, pInfo *framework.PodInfo)
|
type operation func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
add = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
add = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.Add(pInfo.Pod)
|
queue.Add(pInfo.Pod)
|
||||||
}
|
}
|
||||||
addUnschedulablePodBackToUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
addUnschedulablePodBackToUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.AddUnschedulableIfNotPresent(pInfo, 0)
|
queue.AddUnschedulableIfNotPresent(pInfo, 0)
|
||||||
}
|
}
|
||||||
addUnschedulablePodBackToBackoffQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
addUnschedulablePodBackToBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.AddUnschedulableIfNotPresent(pInfo, -1)
|
queue.AddUnschedulableIfNotPresent(pInfo, -1)
|
||||||
}
|
}
|
||||||
addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
queue.lock.Lock()
|
||||||
queue.activeQ.Add(pInfo)
|
queue.activeQ.Add(pInfo)
|
||||||
queue.lock.Unlock()
|
queue.lock.Unlock()
|
||||||
}
|
}
|
||||||
updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
queue.lock.Lock()
|
||||||
queue.activeQ.Update(pInfo)
|
queue.activeQ.Update(pInfo)
|
||||||
queue.lock.Unlock()
|
queue.lock.Unlock()
|
||||||
}
|
}
|
||||||
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
queue.lock.Lock()
|
||||||
// Update pod condition to unschedulable.
|
// Update pod condition to unschedulable.
|
||||||
podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
|
podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
|
||||||
@ -1064,24 +1064,24 @@ var (
|
|||||||
queue.unschedulableQ.addOrUpdate(pInfo)
|
queue.unschedulableQ.addOrUpdate(pInfo)
|
||||||
queue.lock.Unlock()
|
queue.lock.Unlock()
|
||||||
}
|
}
|
||||||
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
|
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
queue.lock.Lock()
|
||||||
queue.podBackoffQ.Add(pInfo)
|
queue.podBackoffQ.Add(pInfo)
|
||||||
queue.lock.Unlock()
|
queue.lock.Unlock()
|
||||||
}
|
}
|
||||||
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
|
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
||||||
queue.MoveAllToActiveOrBackoffQueue("test")
|
queue.MoveAllToActiveOrBackoffQueue("test")
|
||||||
}
|
}
|
||||||
flushBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
|
flushBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
||||||
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
||||||
queue.flushBackoffQCompleted()
|
queue.flushBackoffQCompleted()
|
||||||
}
|
}
|
||||||
moveClockForward = func(queue *PriorityQueue, _ *framework.PodInfo) {
|
moveClockForward = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
||||||
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestPodTimestamp tests the operations related to PodInfo.
|
// TestPodTimestamp tests the operations related to QueuedPodInfo.
|
||||||
func TestPodTimestamp(t *testing.T) {
|
func TestPodTimestamp(t *testing.T) {
|
||||||
pod1 := &v1.Pod{
|
pod1 := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -1106,11 +1106,11 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var timestamp = time.Now()
|
var timestamp = time.Now()
|
||||||
pInfo1 := &framework.PodInfo{
|
pInfo1 := &framework.QueuedPodInfo{
|
||||||
Pod: pod1,
|
Pod: pod1,
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
}
|
}
|
||||||
pInfo2 := &framework.PodInfo{
|
pInfo2 := &framework.QueuedPodInfo{
|
||||||
Pod: pod2,
|
Pod: pod2,
|
||||||
Timestamp: timestamp.Add(time.Second),
|
Timestamp: timestamp.Add(time.Second),
|
||||||
}
|
}
|
||||||
@ -1118,8 +1118,8 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
operations []operation
|
operations []operation
|
||||||
operands []*framework.PodInfo
|
operands []*framework.QueuedPodInfo
|
||||||
expected []*framework.PodInfo
|
expected []*framework.QueuedPodInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "add two pod to activeQ and sort them by the timestamp",
|
name: "add two pod to activeQ and sort them by the timestamp",
|
||||||
@ -1127,8 +1127,8 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
addPodActiveQ,
|
addPodActiveQ,
|
||||||
addPodActiveQ,
|
addPodActiveQ,
|
||||||
},
|
},
|
||||||
operands: []*framework.PodInfo{pInfo2, pInfo1},
|
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1},
|
||||||
expected: []*framework.PodInfo{pInfo1, pInfo2},
|
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "update two pod to activeQ and sort them by the timestamp",
|
name: "update two pod to activeQ and sort them by the timestamp",
|
||||||
@ -1136,8 +1136,8 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
updatePodActiveQ,
|
updatePodActiveQ,
|
||||||
updatePodActiveQ,
|
updatePodActiveQ,
|
||||||
},
|
},
|
||||||
operands: []*framework.PodInfo{pInfo2, pInfo1},
|
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1},
|
||||||
expected: []*framework.PodInfo{pInfo1, pInfo2},
|
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp",
|
name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp",
|
||||||
@ -1147,8 +1147,8 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
moveClockForward,
|
moveClockForward,
|
||||||
moveAllToActiveOrBackoffQ,
|
moveAllToActiveOrBackoffQ,
|
||||||
},
|
},
|
||||||
operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil},
|
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1, nil, nil},
|
||||||
expected: []*framework.PodInfo{pInfo1, pInfo2},
|
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "add one pod to BackoffQ and move it to activeQ",
|
name: "add one pod to BackoffQ and move it to activeQ",
|
||||||
@ -1158,15 +1158,15 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
flushBackoffQ,
|
flushBackoffQ,
|
||||||
moveAllToActiveOrBackoffQ,
|
moveAllToActiveOrBackoffQ,
|
||||||
},
|
},
|
||||||
operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil},
|
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1, nil, nil},
|
||||||
expected: []*framework.PodInfo{pInfo1, pInfo2},
|
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
||||||
var podInfoList []*framework.PodInfo
|
var podInfoList []*framework.QueuedPodInfo
|
||||||
|
|
||||||
for i, op := range test.operations {
|
for i, op := range test.operations {
|
||||||
op(queue, test.operands[i])
|
op(queue, test.operands[i])
|
||||||
@ -1177,13 +1177,13 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
if pInfo, err := queue.activeQ.Pop(); err != nil {
|
if pInfo, err := queue.activeQ.Pop(); err != nil {
|
||||||
t.Errorf("Error while popping the head of the queue: %v", err)
|
t.Errorf("Error while popping the head of the queue: %v", err)
|
||||||
} else {
|
} else {
|
||||||
podInfoList = append(podInfoList, pInfo.(*framework.PodInfo))
|
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.lock.Unlock()
|
queue.lock.Unlock()
|
||||||
|
|
||||||
if !reflect.DeepEqual(test.expected, podInfoList) {
|
if !reflect.DeepEqual(test.expected, podInfoList) {
|
||||||
t.Errorf("Unexpected PodInfo list. Expected: %v, got: %v",
|
t.Errorf("Unexpected QueuedPodInfo list. Expected: %v, got: %v",
|
||||||
test.expected, podInfoList)
|
test.expected, podInfoList)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -1195,14 +1195,14 @@ func TestPendingPodsMetric(t *testing.T) {
|
|||||||
timestamp := time.Now()
|
timestamp := time.Now()
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
total := 50
|
total := 50
|
||||||
pInfos := makePodInfos(total, timestamp)
|
pInfos := makeQueuedPodInfos(total, timestamp)
|
||||||
totalWithDelay := 20
|
totalWithDelay := 20
|
||||||
pInfosWithDelay := makePodInfos(totalWithDelay, timestamp.Add(2*time.Second))
|
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, timestamp.Add(2*time.Second))
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
operations []operation
|
operations []operation
|
||||||
operands [][]*framework.PodInfo
|
operands [][]*framework.QueuedPodInfo
|
||||||
metricsName string
|
metricsName string
|
||||||
wants string
|
wants string
|
||||||
}{
|
}{
|
||||||
@ -1212,7 +1212,7 @@ func TestPendingPodsMetric(t *testing.T) {
|
|||||||
addPodActiveQ,
|
addPodActiveQ,
|
||||||
addPodUnschedulableQ,
|
addPodUnschedulableQ,
|
||||||
},
|
},
|
||||||
operands: [][]*framework.PodInfo{
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
pInfos[:30],
|
pInfos[:30],
|
||||||
pInfos[30:],
|
pInfos[30:],
|
||||||
},
|
},
|
||||||
@ -1232,7 +1232,7 @@ scheduler_pending_pods{queue="unschedulable"} 20
|
|||||||
addPodBackoffQ,
|
addPodBackoffQ,
|
||||||
addPodUnschedulableQ,
|
addPodUnschedulableQ,
|
||||||
},
|
},
|
||||||
operands: [][]*framework.PodInfo{
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
pInfos[:15],
|
pInfos[:15],
|
||||||
pInfos[15:40],
|
pInfos[15:40],
|
||||||
pInfos[40:],
|
pInfos[40:],
|
||||||
@ -1253,7 +1253,7 @@ scheduler_pending_pods{queue="unschedulable"} 10
|
|||||||
moveClockForward,
|
moveClockForward,
|
||||||
moveAllToActiveOrBackoffQ,
|
moveAllToActiveOrBackoffQ,
|
||||||
},
|
},
|
||||||
operands: [][]*framework.PodInfo{
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
pInfos[:total],
|
pInfos[:total],
|
||||||
{nil},
|
{nil},
|
||||||
{nil},
|
{nil},
|
||||||
@ -1275,7 +1275,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
|
|||||||
addPodUnschedulableQ,
|
addPodUnschedulableQ,
|
||||||
moveAllToActiveOrBackoffQ,
|
moveAllToActiveOrBackoffQ,
|
||||||
},
|
},
|
||||||
operands: [][]*framework.PodInfo{
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
pInfos[20:total],
|
pInfos[20:total],
|
||||||
{nil},
|
{nil},
|
||||||
pInfosWithDelay[:20],
|
pInfosWithDelay[:20],
|
||||||
@ -1298,7 +1298,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
|
|||||||
moveAllToActiveOrBackoffQ,
|
moveAllToActiveOrBackoffQ,
|
||||||
flushBackoffQ,
|
flushBackoffQ,
|
||||||
},
|
},
|
||||||
operands: [][]*framework.PodInfo{
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
pInfos[:40],
|
pInfos[:40],
|
||||||
pInfos[40:],
|
pInfos[40:],
|
||||||
{nil},
|
{nil},
|
||||||
@ -1408,9 +1408,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
|
|||||||
func TestIncomingPodsMetrics(t *testing.T) {
|
func TestIncomingPodsMetrics(t *testing.T) {
|
||||||
timestamp := time.Now()
|
timestamp := time.Now()
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
var pInfos = make([]*framework.PodInfo, 0, 3)
|
var pInfos = make([]*framework.QueuedPodInfo, 0, 3)
|
||||||
for i := 1; i <= 3; i++ {
|
for i := 1; i <= 3; i++ {
|
||||||
p := &framework.PodInfo{
|
p := &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: fmt.Sprintf("test-pod-%d", i),
|
Name: fmt.Sprintf("test-pod-%d", i),
|
||||||
@ -1499,7 +1499,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.PodInfo, wantAttemtps int, wantInitialAttemptTs time.Time) {
|
func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.QueuedPodInfo, wantAttemtps int, wantInitialAttemptTs time.Time) {
|
||||||
if pInfo.Attempts != wantAttemtps {
|
if pInfo.Attempts != wantAttemtps {
|
||||||
t.Errorf("[%s] Pod schedule attempt unexpected, got %v, want %v", name, pInfo.Attempts, wantAttemtps)
|
t.Errorf("[%s] Pod schedule attempt unexpected, got %v, want %v", name, pInfo.Attempts, wantAttemtps)
|
||||||
}
|
}
|
||||||
@ -1592,10 +1592,10 @@ func TestBackOffFlow(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makePodInfos(num int, timestamp time.Time) []*framework.PodInfo {
|
func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo {
|
||||||
var pInfos = make([]*framework.PodInfo, 0, num)
|
var pInfos = make([]*framework.QueuedPodInfo, 0, num)
|
||||||
for i := 1; i <= num; i++ {
|
for i := 1; i <= num; i++ {
|
||||||
p := &framework.PodInfo{
|
p := &framework.QueuedPodInfo{
|
||||||
Pod: &v1.Pod{
|
Pod: &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: fmt.Sprintf("test-pod-%d", i),
|
Name: fmt.Sprintf("test-pod-%d", i),
|
||||||
|
@ -296,7 +296,7 @@ func (p *fakePlugin) Name() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *fakePlugin) Less(*framework.PodInfo, *framework.PodInfo) bool {
|
func (p *fakePlugin) Less(*framework.QueuedPodInfo, *framework.QueuedPodInfo) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,11 +94,11 @@ type Scheduler struct {
|
|||||||
// is available. We don't use a channel for this, because scheduling
|
// is available. We don't use a channel for this, because scheduling
|
||||||
// a pod may take some amount of time and we don't want pods to get
|
// a pod may take some amount of time and we don't want pods to get
|
||||||
// stale while they sit in a channel.
|
// stale while they sit in a channel.
|
||||||
NextPod func() *framework.PodInfo
|
NextPod func() *framework.QueuedPodInfo
|
||||||
|
|
||||||
// Error is called if there is an error. It is passed the pod in
|
// Error is called if there is an error. It is passed the pod in
|
||||||
// question, and the error
|
// question, and the error
|
||||||
Error func(*framework.PodInfo, error)
|
Error func(*framework.QueuedPodInfo, error)
|
||||||
|
|
||||||
// Close this to shut down the scheduler.
|
// Close this to shut down the scheduler.
|
||||||
StopEverything <-chan struct{}
|
StopEverything <-chan struct{}
|
||||||
@ -384,7 +384,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
|
|||||||
// recordFailedSchedulingEvent records an event for the pod that indicates the
|
// recordFailedSchedulingEvent records an event for the pod that indicates the
|
||||||
// pod has failed to schedule.
|
// pod has failed to schedule.
|
||||||
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
|
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
|
||||||
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.PodInfo, err error, reason string, message string) {
|
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, message string) {
|
||||||
sched.Error(podInfo, err)
|
sched.Error(podInfo, err)
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
|
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
|
||||||
|
@ -335,12 +335,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||||||
SchedulerCache: sCache,
|
SchedulerCache: sCache,
|
||||||
Algorithm: item.algo,
|
Algorithm: item.algo,
|
||||||
podConditionUpdater: fakePodConditionUpdater{},
|
podConditionUpdater: fakePodConditionUpdater{},
|
||||||
Error: func(p *framework.PodInfo, err error) {
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||||
gotPod = p.Pod
|
gotPod = p.Pod
|
||||||
gotError = err
|
gotError = err
|
||||||
},
|
},
|
||||||
NextPod: func() *framework.PodInfo {
|
NextPod: func() *framework.QueuedPodInfo {
|
||||||
return &framework.PodInfo{Pod: item.sendPod}
|
return &framework.QueuedPodInfo{Pod: item.sendPod}
|
||||||
},
|
},
|
||||||
Profiles: profile.Map{
|
Profiles: profile.Map{
|
||||||
testSchedulerName: &profile.Profile{
|
testSchedulerName: &profile.Profile{
|
||||||
@ -827,10 +827,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
sched := &Scheduler{
|
sched := &Scheduler{
|
||||||
SchedulerCache: scache,
|
SchedulerCache: scache,
|
||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
NextPod: func() *framework.PodInfo {
|
NextPod: func() *framework.QueuedPodInfo {
|
||||||
return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
|
return &framework.QueuedPodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
|
||||||
},
|
},
|
||||||
Error: func(p *framework.PodInfo, err error) {
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
},
|
},
|
||||||
Profiles: profiles,
|
Profiles: profiles,
|
||||||
|
Loading…
Reference in New Issue
Block a user