mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
fix: run activate() only when fail
This commit is contained in:
parent
4a084d54d2
commit
e601eb7c5a
@ -492,10 +492,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
|
|||||||
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
|
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
|
||||||
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
|
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
ev.mu.Lock()
|
if result == metrics.GoroutineResultError {
|
||||||
ev.preempting.Delete(pod.UID)
|
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
|
||||||
ev.mu.Unlock()
|
// So, we should move the Pod to the activeQ.
|
||||||
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
|
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
|
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
|
||||||
@ -512,15 +513,34 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(c.Victims().Pods) == 0 {
|
if len(c.Victims().Pods) == 0 {
|
||||||
|
ev.mu.Lock()
|
||||||
|
delete(ev.preempting, pod.UID)
|
||||||
|
ev.mu.Unlock()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
|
// We can evict all victims in parallel, but the last one.
|
||||||
|
// We have to remove the pod from the preempting map before the last one is evicted
|
||||||
|
// because, otherwise, the pod removal might be notified to the scheduling queue before
|
||||||
|
// we remove this pod from the preempting map,
|
||||||
|
// and the pod could end up stucking at the unschedulable pod pool
|
||||||
|
// by all the pod removal events being ignored.
|
||||||
|
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
|
||||||
if err := errCh.ReceiveError(); err != nil {
|
if err := errCh.ReceiveError(); err != nil {
|
||||||
logger.Error(err, "Error occurred during async preemption")
|
logger.Error(err, "Error occurred during async preemption")
|
||||||
result = metrics.GoroutineResultError
|
result = metrics.GoroutineResultError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ev.mu.Lock()
|
||||||
|
delete(ev.preempting, pod.UID)
|
||||||
|
ev.mu.Unlock()
|
||||||
|
|
||||||
|
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
|
||||||
|
logger.Error(err, "Error occurred during async preemption")
|
||||||
|
result = metrics.GoroutineResultError
|
||||||
|
}
|
||||||
|
|
||||||
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
|
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -453,6 +453,7 @@ func TestPrepareCandidate(t *testing.T) {
|
|||||||
expectedStatus *framework.Status
|
expectedStatus *framework.Status
|
||||||
// Only compared when async preemption is enabled.
|
// Only compared when async preemption is enabled.
|
||||||
expectedPreemptingMap sets.Set[types.UID]
|
expectedPreemptingMap sets.Set[types.UID]
|
||||||
|
expectedActivatedPods map[string]*v1.Pod
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no victims",
|
name: "no victims",
|
||||||
@ -543,6 +544,7 @@ func TestPrepareCandidate(t *testing.T) {
|
|||||||
nodeNames: []string{node1Name},
|
nodeNames: []string{node1Name},
|
||||||
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
|
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
|
||||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||||
|
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "one victim, not-found victim error is ignored when deleting",
|
name: "one victim, not-found victim error is ignored when deleting",
|
||||||
@ -579,6 +581,7 @@ func TestPrepareCandidate(t *testing.T) {
|
|||||||
nodeNames: []string{node1Name},
|
nodeNames: []string{node1Name},
|
||||||
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
|
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
|
||||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||||
|
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "two victims without condition, one passes successfully and the second fails",
|
name: "two victims without condition, one passes successfully and the second fails",
|
||||||
@ -601,6 +604,7 @@ func TestPrepareCandidate(t *testing.T) {
|
|||||||
expectedDeletedPods: []string{"victim2"},
|
expectedDeletedPods: []string{"victim2"},
|
||||||
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
|
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
|
||||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||||
|
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -730,9 +734,12 @@ func TestPrepareCandidate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if asyncPreemptionEnabled {
|
if asyncPreemptionEnabled {
|
||||||
// Make sure the preemptor is activated regardless of the preemption result.
|
if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) {
|
||||||
if !reflect.DeepEqual(map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods) {
|
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods)
|
||||||
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods)
|
return false, nil
|
||||||
|
}
|
||||||
|
if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 {
|
||||||
|
lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user