mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #77598 from danielqsj/unreserve
Add Un-reserve extension point for the scheduling framework
This commit is contained in:
commit
b9ccdd2824
@ -33,6 +33,7 @@ type framework struct {
|
|||||||
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
|
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
|
||||||
reservePlugins []ReservePlugin
|
reservePlugins []ReservePlugin
|
||||||
prebindPlugins []PrebindPlugin
|
prebindPlugins []PrebindPlugin
|
||||||
|
unreservePlugins []UnreservePlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = Framework(&framework{})
|
var _ = Framework(&framework{})
|
||||||
@ -64,6 +65,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
|
|||||||
if pp, ok := p.(PrebindPlugin); ok {
|
if pp, ok := p.(PrebindPlugin); ok {
|
||||||
f.prebindPlugins = append(f.prebindPlugins, pp)
|
f.prebindPlugins = append(f.prebindPlugins, pp)
|
||||||
}
|
}
|
||||||
|
if up, ok := p.(UnreservePlugin); ok {
|
||||||
|
f.unreservePlugins = append(f.unreservePlugins, up)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
@ -105,6 +109,14 @@ func (f *framework) RunReservePlugins(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||||
|
func (f *framework) RunUnreservePlugins(
|
||||||
|
pc *PluginContext, pod *v1.Pod, nodeName string) {
|
||||||
|
for _, pl := range f.unreservePlugins {
|
||||||
|
pl.Unreserve(pc, pod, nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
|
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
|
||||||
// is taken at the beginning of a scheduling cycle and remains unchanged until a
|
// is taken at the beginning of a scheduling cycle and remains unchanged until a
|
||||||
// pod finishes "Reserve". There is no guarantee that the information remains
|
// pod finishes "Reserve". There is no guarantee that the information remains
|
||||||
|
@ -113,6 +113,17 @@ type PrebindPlugin interface {
|
|||||||
Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
|
Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnreservePlugin is an interface for Unreserve plugins. This is an informational
|
||||||
|
// extension point. If a pod was reserved and then rejected in a later phase, then
|
||||||
|
// un-reserve plugins will be notified. Un-reserve plugins should clean up state
|
||||||
|
// associated with the reserved Pod.
|
||||||
|
type UnreservePlugin interface {
|
||||||
|
Plugin
|
||||||
|
// Unreserve is called by the scheduling framework when a reserved pod was
|
||||||
|
// rejected in a later phase.
|
||||||
|
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
|
||||||
|
}
|
||||||
|
|
||||||
// Framework manages the set of plugins in use by the scheduling framework.
|
// Framework manages the set of plugins in use by the scheduling framework.
|
||||||
// Configured plugins are called at specified points in a scheduling context.
|
// Configured plugins are called at specified points in a scheduling context.
|
||||||
type Framework interface {
|
type Framework interface {
|
||||||
@ -128,6 +139,9 @@ type Framework interface {
|
|||||||
// plugins returns an error, it does not continue running the remaining ones and
|
// plugins returns an error, it does not continue running the remaining ones and
|
||||||
// returns the error. In such case, pod will not be scheduled.
|
// returns the error. In such case, pod will not be scheduled.
|
||||||
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||||
|
|
||||||
|
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||||
|
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FrameworkHandle provides data and some tools that plugins can use. It is
|
// FrameworkHandle provides data and some tools that plugins can use. It is
|
||||||
|
@ -515,6 +515,8 @@ func (sched *Scheduler) scheduleOne() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("error assuming pod: %v", err)
|
klog.Errorf("error assuming pod: %v", err)
|
||||||
metrics.PodScheduleErrors.Inc()
|
metrics.PodScheduleErrors.Inc()
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
||||||
@ -525,6 +527,8 @@ func (sched *Scheduler) scheduleOne() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("error binding volumes: %v", err)
|
klog.Errorf("error binding volumes: %v", err)
|
||||||
metrics.PodScheduleErrors.Inc()
|
metrics.PodScheduleErrors.Inc()
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -543,6 +547,8 @@ func (sched *Scheduler) scheduleOne() {
|
|||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
|
sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -558,6 +564,8 @@ func (sched *Scheduler) scheduleOne() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("error binding pod: %v", err)
|
klog.Errorf("error binding pod: %v", err)
|
||||||
metrics.PodScheduleErrors.Inc()
|
metrics.PodScheduleErrors.Inc()
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
|
klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
|
||||||
metrics.PodScheduleSuccesses.Inc()
|
metrics.PodScheduleSuccesses.Inc()
|
||||||
|
@ -18,11 +18,11 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
)
|
)
|
||||||
@ -30,11 +30,12 @@ import (
|
|||||||
// TesterPlugin is common ancestor for a test plugin that allows injection of
|
// TesterPlugin is common ancestor for a test plugin that allows injection of
|
||||||
// failures and some other test functionalities.
|
// failures and some other test functionalities.
|
||||||
type TesterPlugin struct {
|
type TesterPlugin struct {
|
||||||
numReserveCalled int
|
numReserveCalled int
|
||||||
numPrebindCalled int
|
numPrebindCalled int
|
||||||
failReserve bool
|
numUnreserveCalled int
|
||||||
failPrebind bool
|
failReserve bool
|
||||||
rejectPrebind bool
|
failPrebind bool
|
||||||
|
rejectPrebind bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReservePlugin struct {
|
type ReservePlugin struct {
|
||||||
@ -45,19 +46,27 @@ type PrebindPlugin struct {
|
|||||||
TesterPlugin
|
TesterPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type UnreservePlugin struct {
|
||||||
|
TesterPlugin
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
reservePluginName = "reserve-plugin"
|
reservePluginName = "reserve-plugin"
|
||||||
prebindPluginName = "prebind-plugin"
|
prebindPluginName = "prebind-plugin"
|
||||||
|
unreservePluginName = "unreserve-plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = framework.ReservePlugin(&ReservePlugin{})
|
var _ = framework.ReservePlugin(&ReservePlugin{})
|
||||||
var _ = framework.PrebindPlugin(&PrebindPlugin{})
|
var _ = framework.PrebindPlugin(&PrebindPlugin{})
|
||||||
|
var _ = framework.UnreservePlugin(&UnreservePlugin{})
|
||||||
|
|
||||||
// Name returns name of the plugin.
|
// Name returns name of the plugin.
|
||||||
func (rp *ReservePlugin) Name() string {
|
func (rp *ReservePlugin) Name() string {
|
||||||
return reservePluginName
|
return reservePluginName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var resPlugin = &ReservePlugin{}
|
||||||
|
|
||||||
// Reserve is a test function that returns an error or nil, depending on the
|
// Reserve is a test function that returns an error or nil, depending on the
|
||||||
// value of "failReserve".
|
// value of "failReserve".
|
||||||
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||||
@ -68,14 +77,13 @@ func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeN
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var resPlugin = &ReservePlugin{}
|
|
||||||
var pbdPlugin = &PrebindPlugin{}
|
|
||||||
|
|
||||||
// NewReservePlugin is the factory for reserve plugin.
|
// NewReservePlugin is the factory for reserve plugin.
|
||||||
func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
return resPlugin, nil
|
return resPlugin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pbdPlugin = &PrebindPlugin{}
|
||||||
|
|
||||||
// Name returns name of the plugin.
|
// Name returns name of the plugin.
|
||||||
func (pp *PrebindPlugin) Name() string {
|
func (pp *PrebindPlugin) Name() string {
|
||||||
return prebindPluginName
|
return prebindPluginName
|
||||||
@ -93,11 +101,39 @@ func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeN
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset used to reset numPrebindCalled.
|
||||||
|
func (pp *PrebindPlugin) reset() {
|
||||||
|
pp.numPrebindCalled = 0
|
||||||
|
}
|
||||||
|
|
||||||
// NewPrebindPlugin is the factory for prebind plugin.
|
// NewPrebindPlugin is the factory for prebind plugin.
|
||||||
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
return pbdPlugin, nil
|
return pbdPlugin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var unresPlugin = &UnreservePlugin{}
|
||||||
|
|
||||||
|
// Name returns name of the plugin.
|
||||||
|
func (up *UnreservePlugin) Name() string {
|
||||||
|
return unreservePluginName
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unreserve is a test function that returns an error or nil, depending on the
|
||||||
|
// value of "failUnreserve".
|
||||||
|
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
|
||||||
|
up.numUnreserveCalled++
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset used to reset numUnreserveCalled.
|
||||||
|
func (up *UnreservePlugin) reset() {
|
||||||
|
up.numUnreserveCalled = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUnreservePlugin is the factory for unreserve plugin.
|
||||||
|
func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
|
return unresPlugin, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestReservePlugin tests invocation of reserve plugins.
|
// TestReservePlugin tests invocation of reserve plugins.
|
||||||
func TestReservePlugin(t *testing.T) {
|
func TestReservePlugin(t *testing.T) {
|
||||||
// Create a plugin registry for testing. Register only a reserve plugin.
|
// Create a plugin registry for testing. Register only a reserve plugin.
|
||||||
@ -216,3 +252,87 @@ func TestPrebindPlugin(t *testing.T) {
|
|||||||
cleanupPods(cs, t, []*v1.Pod{pod})
|
cleanupPods(cs, t, []*v1.Pod{pod})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestUnreservePlugin tests invocation of un-reserve plugin
|
||||||
|
func TestUnreservePlugin(t *testing.T) {
|
||||||
|
// TODO: register more plugin which would trigger un-reserve plugin
|
||||||
|
registry := framework.Registry{
|
||||||
|
unreservePluginName: NewUnreservePlugin,
|
||||||
|
prebindPluginName: NewPrebindPlugin,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the master and the scheduler with the test plugin set.
|
||||||
|
context := initTestSchedulerWithOptions(t,
|
||||||
|
initTestMaster(t, "unreserve-plugin", nil),
|
||||||
|
false, nil, registry, false, time.Second)
|
||||||
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
|
cs := context.clientSet
|
||||||
|
// Add a few nodes.
|
||||||
|
_, err := createNodes(cs, "test-node", nil, 2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Cannot create nodes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
prebindFail bool
|
||||||
|
prebindReject bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
prebindFail: false,
|
||||||
|
prebindReject: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prebindFail: true,
|
||||||
|
prebindReject: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prebindFail: false,
|
||||||
|
prebindReject: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prebindFail: true,
|
||||||
|
prebindReject: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
pbdPlugin.failPrebind = test.prebindFail
|
||||||
|
pbdPlugin.rejectPrebind = test.prebindReject
|
||||||
|
|
||||||
|
// Create a best effort pod.
|
||||||
|
pod, err := createPausePod(cs,
|
||||||
|
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while creating a test pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.prebindFail {
|
||||||
|
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
|
||||||
|
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
|
||||||
|
}
|
||||||
|
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
|
||||||
|
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if test.prebindReject {
|
||||||
|
if err = waitForPodUnschedulable(cs, pod); err != nil {
|
||||||
|
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
|
||||||
|
}
|
||||||
|
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
|
||||||
|
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = waitForPodToSchedule(cs, pod); err != nil {
|
||||||
|
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
|
||||||
|
}
|
||||||
|
if unresPlugin.numUnreserveCalled > 0 {
|
||||||
|
t.Errorf("test #%v: Didn't expected the unreserve plugin to be called, was called %d times.", i, unresPlugin.numUnreserveCalled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
unresPlugin.reset()
|
||||||
|
pbdPlugin.reset()
|
||||||
|
cleanupPods(cs, t, []*v1.Pod{pod})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user