mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-26 04:36:00 +00:00
sched: provide an option for plugin developers to move pods to activeQ
This commit is contained in:
@@ -27,9 +27,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
listersv1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/kube-scheduler/config/v1beta2"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
@@ -39,6 +41,7 @@ import (
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
testutils "k8s.io/kubernetes/test/integration/util"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@@ -1941,6 +1944,141 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
||||
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
|
||||
}
|
||||
|
||||
const (
|
||||
jobPluginName = "job plugin"
|
||||
)
|
||||
|
||||
var _ framework.PreFilterPlugin = &JobPlugin{}
|
||||
var _ framework.PostBindPlugin = &PostBindPlugin{}
|
||||
|
||||
type JobPlugin struct {
|
||||
podLister listersv1.PodLister
|
||||
podsActivated bool
|
||||
}
|
||||
|
||||
func (j *JobPlugin) Name() string {
|
||||
return jobPluginName
|
||||
}
|
||||
|
||||
func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status {
|
||||
labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""})
|
||||
driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector)
|
||||
if err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
if len(driverPods) == 0 {
|
||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
|
||||
if _, ok := p.Labels["driver"]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// If it's a driver pod, move other executor pods proactively to accelerating the scheduling.
|
||||
labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""})
|
||||
podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector)
|
||||
if err == nil && len(podsToActivate) != 0 {
|
||||
c, err := state.Read(framework.PodsToActivateKey)
|
||||
if err == nil {
|
||||
if s, ok := c.(*framework.PodsToActivate); ok {
|
||||
s.Lock()
|
||||
for _, pod := range podsToActivate {
|
||||
namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
|
||||
s.Map[namespacedName] = pod
|
||||
}
|
||||
s.Unlock()
|
||||
j.podsActivated = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This test simulates a typical spark job workflow.
|
||||
// - N executor pods are created, but kept pending due to missing the driver pod
|
||||
// - when the driver pod gets created and scheduled, proactively move the executors to activeQ
|
||||
// and thus accelerate the entire job workflow.
|
||||
func TestActivatePods(t *testing.T) {
|
||||
var jobPlugin *JobPlugin
|
||||
// Create a plugin registry for testing. Register a Job plugin.
|
||||
registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()}
|
||||
return jobPlugin, nil
|
||||
}}
|
||||
|
||||
// Setup initial filter plugin for testing.
|
||||
cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
|
||||
Profiles: []v1beta2.KubeSchedulerProfile{{
|
||||
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
|
||||
Plugins: &v1beta2.Plugins{
|
||||
PreFilter: v1beta2.PluginSet{
|
||||
Enabled: []v1beta2.Plugin{
|
||||
{Name: jobPluginName},
|
||||
},
|
||||
},
|
||||
PostBind: v1beta2.PluginSet{
|
||||
Enabled: []v1beta2.Plugin{
|
||||
{Name: jobPluginName},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
|
||||
// Create the API server and the scheduler with the test plugin set.
|
||||
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
|
||||
scheduler.WithProfiles(cfg.Profiles...),
|
||||
scheduler.WithFrameworkOutOfTreeRegistry(registry))
|
||||
defer testutils.CleanupTest(t, testCtx)
|
||||
|
||||
cs := testCtx.ClientSet
|
||||
ns := testCtx.NS.Name
|
||||
pause := imageutils.GetPauseImageName()
|
||||
|
||||
// Firstly create 2 executor pods.
|
||||
var pods []*v1.Pod
|
||||
for i := 1; i <= 2; i++ {
|
||||
name := fmt.Sprintf("executor-%v", i)
|
||||
executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj()
|
||||
pods = append(pods, executor)
|
||||
if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create pod %v: %v", executor.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the 2 executor pods to be unschedulable.
|
||||
for _, pod := range pods {
|
||||
if err := waitForPodUnschedulable(cs, pod); err != nil {
|
||||
t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a driver pod.
|
||||
driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj()
|
||||
pods = append(pods, driver)
|
||||
if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create pod %v: %v", driver.Name, err)
|
||||
}
|
||||
|
||||
// Verify all pods to be scheduled.
|
||||
for _, pod := range pods {
|
||||
if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil {
|
||||
t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Lastly verify the pods activation logic is really called.
|
||||
if jobPlugin.podsActivated == false {
|
||||
t.Errorf("JobPlugin's pods activation logic is not called")
|
||||
}
|
||||
}
|
||||
|
||||
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
|
||||
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...)
|
||||
testutils.SyncInformerFactory(testCtx)
|
||||
|
Reference in New Issue
Block a user