mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #103383 from Huang-Wei/move-up-pods
sched: provide an option for plugin developers to move pods to activeQ
This commit is contained in:
commit
6ed98b60f0
@ -23,6 +23,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
@ -105,6 +106,30 @@ const (
|
|||||||
MaxTotalScore int64 = math.MaxInt64
|
MaxTotalScore int64 = math.MaxInt64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// PodsToActivateKey is a reserved state key for stashing pods.
|
||||||
|
// If the stashed pods are present in unschedulableQ or backoffQ,they will be
|
||||||
|
// activated (i.e., moved to activeQ) in two phases:
|
||||||
|
// - end of a scheduling cycle if it succeeds (will be cleared from `PodsToActivate` if activated)
|
||||||
|
// - end of a binding cycle if it succeeds
|
||||||
|
var PodsToActivateKey StateKey = "kubernetes.io/pods-to-activate"
|
||||||
|
|
||||||
|
// PodsToActivate stores pods to be activated.
|
||||||
|
type PodsToActivate struct {
|
||||||
|
sync.Mutex
|
||||||
|
// Map is keyed with namespaced pod name, and valued with the pod.
|
||||||
|
Map map[string]*v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clone just returns the same state.
|
||||||
|
func (s *PodsToActivate) Clone() StateData {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPodsToActivate instantiates a PodsToActivate object.
|
||||||
|
func NewPodsToActivate() *PodsToActivate {
|
||||||
|
return &PodsToActivate{Map: make(map[string]*v1.Pod)}
|
||||||
|
}
|
||||||
|
|
||||||
// Status indicates the result of running a plugin. It consists of a code, a
|
// Status indicates the result of running a plugin. It consists of a code, a
|
||||||
// message, (optionally) an error and an plugin name it fails by. When the status
|
// message, (optionally) an error and an plugin name it fails by. When the status
|
||||||
// code is not `Success`, the reasons should explain why.
|
// code is not `Success`, the reasons should explain why.
|
||||||
|
@ -27,6 +27,9 @@ const (
|
|||||||
ScheduleAttemptFailure = "ScheduleAttemptFailure"
|
ScheduleAttemptFailure = "ScheduleAttemptFailure"
|
||||||
// BackoffComplete is the event when a pod finishes backoff.
|
// BackoffComplete is the event when a pod finishes backoff.
|
||||||
BackoffComplete = "BackoffComplete"
|
BackoffComplete = "BackoffComplete"
|
||||||
|
// ForceActivate is the event when a pod is moved from unschedulableQ/backoffQ
|
||||||
|
// to activeQ. Usually it's triggered by plugin implementations.
|
||||||
|
ForceActivate = "ForceActivate"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -77,6 +77,10 @@ type PreEnqueueCheck func(pod *v1.Pod) bool
|
|||||||
type SchedulingQueue interface {
|
type SchedulingQueue interface {
|
||||||
framework.PodNominator
|
framework.PodNominator
|
||||||
Add(pod *v1.Pod) error
|
Add(pod *v1.Pod) error
|
||||||
|
// Activate moves the given pods to activeQ iff they're in unschedulableQ or backoffQ.
|
||||||
|
// The passed-in pods are originally compiled from plugins that want to activate Pods,
|
||||||
|
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
|
||||||
|
Activate(pods map[string]*v1.Pod)
|
||||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
||||||
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
||||||
// returned by calling SchedulingCycle().
|
// returned by calling SchedulingCycle().
|
||||||
@ -301,6 +305,58 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Activate moves the given pods to activeQ iff they're in unschedulableQ or backoffQ.
|
||||||
|
func (p *PriorityQueue) Activate(pods map[string]*v1.Pod) {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
activated := false
|
||||||
|
for _, pod := range pods {
|
||||||
|
if p.activate(pod) {
|
||||||
|
activated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if activated {
|
||||||
|
p.cond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PriorityQueue) activate(pod *v1.Pod) bool {
|
||||||
|
// Verify if the pod is present in activeQ.
|
||||||
|
if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists {
|
||||||
|
// No need to activate if it's already present in activeQ.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var pInfo *framework.QueuedPodInfo
|
||||||
|
// Verify if the pod is present in unschedulableQ or backoffQ.
|
||||||
|
if pInfo = p.unschedulableQ.get(pod); pInfo == nil {
|
||||||
|
// If the pod doesn't belong to unschedulableQ or backoffQ, don't activate it.
|
||||||
|
if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists {
|
||||||
|
klog.ErrorS(nil, "To-activate pod does not exist in unschedulableQ or backoffQ", "pod", klog.KObj(pod))
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
pInfo = obj.(*framework.QueuedPodInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pInfo == nil {
|
||||||
|
// Redundant safe check. We shouldn't reach here.
|
||||||
|
klog.ErrorS(nil, "Internal error: cannot obtain pInfo")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.activeQ.Add(pInfo); err != nil {
|
||||||
|
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
p.unschedulableQ.delete(pod)
|
||||||
|
p.podBackoffQ.Delete(pInfo)
|
||||||
|
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
|
||||||
|
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||||
// If this returns true, the pod should not be re-tried.
|
// If this returns true, the pod should not be re-tried.
|
||||||
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
|
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
|
||||||
|
@ -519,6 +519,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
||||||
|
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
|
||||||
|
podsToActivate := framework.NewPodsToActivate()
|
||||||
|
state.Write(framework.PodsToActivateKey, podsToActivate)
|
||||||
|
|
||||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
|
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
|
||||||
@ -607,6 +611,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
|
||||||
|
if len(podsToActivate.Map) != 0 {
|
||||||
|
sched.SchedulingQueue.Activate(podsToActivate.Map)
|
||||||
|
// Clear the entries after activation.
|
||||||
|
podsToActivate.Map = make(map[string]*v1.Pod)
|
||||||
|
}
|
||||||
|
|
||||||
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
||||||
go func() {
|
go func() {
|
||||||
bindingCycleCtx, cancel := context.WithCancel(ctx)
|
bindingCycleCtx, cancel := context.WithCancel(ctx)
|
||||||
@ -666,6 +677,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
|
|
||||||
// Run "postbind" plugins.
|
// Run "postbind" plugins.
|
||||||
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
|
|
||||||
|
// At the end of a successful binding cycle, move up Pods if needed.
|
||||||
|
if len(podsToActivate.Map) != 0 {
|
||||||
|
sched.SchedulingQueue.Activate(podsToActivate.Map)
|
||||||
|
// Unlike the logic in scheduling cycle, we don't bother deleting the entries
|
||||||
|
// as `podsToActivate.Map` is no longer consumed.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -27,9 +27,11 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
listersv1 "k8s.io/client-go/listers/core/v1"
|
||||||
"k8s.io/kube-scheduler/config/v1beta2"
|
"k8s.io/kube-scheduler/config/v1beta2"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
"k8s.io/kubernetes/pkg/scheduler"
|
||||||
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
@ -39,6 +41,7 @@ import (
|
|||||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
testutils "k8s.io/kubernetes/test/integration/util"
|
testutils "k8s.io/kubernetes/test/integration/util"
|
||||||
|
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1941,6 +1944,141 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
|||||||
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
|
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
jobPluginName = "job plugin"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ framework.PreFilterPlugin = &JobPlugin{}
|
||||||
|
var _ framework.PostBindPlugin = &PostBindPlugin{}
|
||||||
|
|
||||||
|
type JobPlugin struct {
|
||||||
|
podLister listersv1.PodLister
|
||||||
|
podsActivated bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JobPlugin) Name() string {
|
||||||
|
return jobPluginName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status {
|
||||||
|
labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""})
|
||||||
|
driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return framework.AsStatus(err)
|
||||||
|
}
|
||||||
|
if len(driverPods) == 0 {
|
||||||
|
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
|
||||||
|
if _, ok := p.Labels["driver"]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it's a driver pod, move other executor pods proactively to accelerating the scheduling.
|
||||||
|
labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""})
|
||||||
|
podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector)
|
||||||
|
if err == nil && len(podsToActivate) != 0 {
|
||||||
|
c, err := state.Read(framework.PodsToActivateKey)
|
||||||
|
if err == nil {
|
||||||
|
if s, ok := c.(*framework.PodsToActivate); ok {
|
||||||
|
s.Lock()
|
||||||
|
for _, pod := range podsToActivate {
|
||||||
|
namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
|
||||||
|
s.Map[namespacedName] = pod
|
||||||
|
}
|
||||||
|
s.Unlock()
|
||||||
|
j.podsActivated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test simulates a typical spark job workflow.
|
||||||
|
// - N executor pods are created, but kept pending due to missing the driver pod
|
||||||
|
// - when the driver pod gets created and scheduled, proactively move the executors to activeQ
|
||||||
|
// and thus accelerate the entire job workflow.
|
||||||
|
func TestActivatePods(t *testing.T) {
|
||||||
|
var jobPlugin *JobPlugin
|
||||||
|
// Create a plugin registry for testing. Register a Job plugin.
|
||||||
|
registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||||
|
jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()}
|
||||||
|
return jobPlugin, nil
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Setup initial filter plugin for testing.
|
||||||
|
cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
|
||||||
|
Profiles: []v1beta2.KubeSchedulerProfile{{
|
||||||
|
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
|
||||||
|
Plugins: &v1beta2.Plugins{
|
||||||
|
PreFilter: v1beta2.PluginSet{
|
||||||
|
Enabled: []v1beta2.Plugin{
|
||||||
|
{Name: jobPluginName},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PostBind: v1beta2.PluginSet{
|
||||||
|
Enabled: []v1beta2.Plugin{
|
||||||
|
{Name: jobPluginName},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create the API server and the scheduler with the test plugin set.
|
||||||
|
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
|
||||||
|
scheduler.WithProfiles(cfg.Profiles...),
|
||||||
|
scheduler.WithFrameworkOutOfTreeRegistry(registry))
|
||||||
|
defer testutils.CleanupTest(t, testCtx)
|
||||||
|
|
||||||
|
cs := testCtx.ClientSet
|
||||||
|
ns := testCtx.NS.Name
|
||||||
|
pause := imageutils.GetPauseImageName()
|
||||||
|
|
||||||
|
// Firstly create 2 executor pods.
|
||||||
|
var pods []*v1.Pod
|
||||||
|
for i := 1; i <= 2; i++ {
|
||||||
|
name := fmt.Sprintf("executor-%v", i)
|
||||||
|
executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj()
|
||||||
|
pods = append(pods, executor)
|
||||||
|
if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil {
|
||||||
|
t.Fatalf("Failed to create pod %v: %v", executor.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the 2 executor pods to be unschedulable.
|
||||||
|
for _, pod := range pods {
|
||||||
|
if err := waitForPodUnschedulable(cs, pod); err != nil {
|
||||||
|
t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a driver pod.
|
||||||
|
driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj()
|
||||||
|
pods = append(pods, driver)
|
||||||
|
if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil {
|
||||||
|
t.Fatalf("Failed to create pod %v: %v", driver.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all pods to be scheduled.
|
||||||
|
for _, pod := range pods {
|
||||||
|
if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil {
|
||||||
|
t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lastly verify the pods activation logic is really called.
|
||||||
|
if jobPlugin.podsActivated == false {
|
||||||
|
t.Errorf("JobPlugin's pods activation logic is not called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
|
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
|
||||||
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...)
|
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...)
|
||||||
testutils.SyncInformerFactory(testCtx)
|
testutils.SyncInformerFactory(testCtx)
|
||||||
|
Loading…
Reference in New Issue
Block a user