scheduler: merge Reserve and Unreserve plugins

Previously, separate interfaces were defined for Reserve and Unreserve
plugins. However, in nearly all cases, a plugin that allocates a
resource using Reserve will likely want to register itself for Unreserve
as well in order to free the allocated resource at the end of a failed
scheduling/binding cycle. Having separate plugins for Reserve and
Unreserve also adds unnecessary config toil. To that end, this patch
aims to merge the two plugins into a single interface called a
ReservePlugin that requires implementing both the Reserve and Unreserve
methods.
This commit is contained in:
Adhityaa Chandrasekar
2020-06-15 21:52:54 +00:00
parent 8adcd7978e
commit ec83143342
23 changed files with 243 additions and 288 deletions

View File

@@ -67,8 +67,11 @@ type PostFilterPlugin struct {
}
type ReservePlugin struct {
numReserveCalled int
failReserve bool
name string
numReserveCalled int
failReserve bool
numUnreserveCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
type PreScorePlugin struct {
@@ -96,12 +99,6 @@ type PostBindPlugin struct {
pluginInvokeEventChan chan pluginInvokeEvent
}
type UnreservePlugin struct {
name string
numUnreserveCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
type PermitPlugin struct {
name string
numPermitCalled int
@@ -126,7 +123,6 @@ const (
preScorePluginName = "prescore-plugin"
reservePluginName = "reserve-plugin"
preBindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
postBindPluginName = "postbind-plugin"
permitPluginName = "permit-plugin"
)
@@ -142,7 +138,6 @@ var _ framework.PreScorePlugin = &PreScorePlugin{}
var _ framework.PreBindPlugin = &PreBindPlugin{}
var _ framework.BindPlugin = &BindPlugin{}
var _ framework.PostBindPlugin = &PostBindPlugin{}
var _ framework.UnreservePlugin = &UnreservePlugin{}
var _ framework.PermitPlugin = &PermitPlugin{}
// newPlugin returns a plugin factory with specified Plugin.
@@ -247,11 +242,11 @@ func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState,
// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
return reservePluginName
return rp.name
}
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
// Reserve is a test function that increments an intenral counter and returns
// an error or nil, depending on the value of "failReserve".
func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
rp.numReserveCalled++
if rp.failReserve {
@@ -260,9 +255,20 @@ func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleStat
return nil
}
// reset used to reset reserve plugin.
// Unreserve is a test function that increments an internal counter and emits
// an event to a channel. While Unreserve implementations should normally be
// idempotent, we relax that requirement here for testing purposes.
func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
rp.numUnreserveCalled++
if rp.pluginInvokeEventChan != nil {
rp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: rp.Name(), val: rp.numUnreserveCalled}
}
}
// reset used to reset internal counters.
func (rp *ReservePlugin) reset() {
rp.numReserveCalled = 0
rp.numUnreserveCalled = 0
}
// Name returns name of the plugin.
@@ -411,25 +417,6 @@ func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.Cyc
return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name))
}
// Name returns name of the plugin.
func (up *UnreservePlugin) Name() string {
return up.name
}
// Unreserve is a test function that returns an error or nil, depending on the
// value of "failUnreserve".
func (up *UnreservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
up.numUnreserveCalled++
if up.pluginInvokeEventChan != nil {
up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled}
}
}
// reset used to reset numUnreserveCalled.
func (up *UnreservePlugin) reset() {
up.numUnreserveCalled = 0
}
// Name returns name of the plugin.
func (pp *PermitPlugin) Name() string {
return pp.name
@@ -810,7 +797,7 @@ func TestNormalizeScorePlugin(t *testing.T) {
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
func TestReservePluginReserve(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
reservePlugin := &ReservePlugin{}
registry := frameworkruntime.Registry{reservePluginName: newPlugin(reservePlugin)}
@@ -830,7 +817,7 @@ func TestReservePlugin(t *testing.T) {
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin", nil), 2,
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-reserve", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
@@ -962,26 +949,40 @@ func TestPrebindPlugin(t *testing.T) {
}
}
// TestUnreservePlugin tests invocation of un-reserve plugin
func TestUnreservePlugin(t *testing.T) {
// TODO: register more plugin which would trigger un-reserve plugin
preBindPlugin := &PreBindPlugin{}
unreservePlugin := &UnreservePlugin{name: unreservePluginName}
registry := frameworkruntime.Registry{
unreservePluginName: newPlugin(unreservePlugin),
preBindPluginName: newPlugin(preBindPlugin),
// TestUnreserveReservePlugin tests invocation of the Unreserve operation in
// reserve plugins through failures in execution points such as pre-bind. Also
// tests that the order of invocation of Unreserve operation is executed in the
// reverse order of invocation of the Reserve operation.
func TestReservePluginUnreserve(t *testing.T) {
numReservePlugins := 3
pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
preBindPlugin := &PreBindPlugin{
failPreBind: true,
}
var reservePlugins []*ReservePlugin
for i := 0; i < numReservePlugins; i++ {
reservePlugins = append(reservePlugins, &ReservePlugin{
name: fmt.Sprintf("%s-%d", reservePluginName, i),
pluginInvokeEventChan: pluginInvokeEventChan,
})
}
// Setup initial unreserve and prebind plugin for testing.
registry := frameworkruntime.Registry{
// TODO(#92229): test more failure points that would trigger Unreserve in
// reserve plugins than just one pre-bind plugin.
preBindPluginName: newPlugin(preBindPlugin),
}
for _, pl := range reservePlugins {
registry[pl.Name()] = newPlugin(pl)
}
// Setup initial reserve and prebind plugin for testing.
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: unreservePluginName,
},
},
Reserve: &schedulerconfig.PluginSet{
// filled by looping over reservePlugins
},
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
@@ -992,9 +993,14 @@ func TestUnreservePlugin(t *testing.T) {
},
},
}
for _, pl := range reservePlugins {
prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
Name: pl.Name(),
})
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "unreserve-plugin", nil), 2,
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
@@ -1004,11 +1010,11 @@ func TestUnreservePlugin(t *testing.T) {
preBindFail bool
}{
{
name: "fail preBind unreserve plugin",
name: "fail preBind",
preBindFail: true,
},
{
name: "do not fail preBind unreserve plugin",
name: "pass preBind",
preBindFail: false,
},
}
@@ -1025,22 +1031,34 @@ func TestUnreservePlugin(t *testing.T) {
if test.preBindFail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
t.Errorf("Expected a scheduling error, but didn't get it: %v", err)
}
if unreservePlugin.numUnreserveCalled == 0 || unreservePlugin.numUnreserveCalled != preBindPlugin.numPreBindCalled {
t.Errorf("Expected the unreserve plugin to be called %d times, was called %d times.", preBindPlugin.numPreBindCalled, unreservePlugin.numUnreserveCalled)
for i := numReservePlugins - 1; i >= 0; i-- {
select {
case event := <-pluginInvokeEventChan:
expectedPluginName := reservePlugins[i].Name()
if expectedPluginName != event.pluginName {
t.Errorf("event.pluginName = %s, want %s", event.pluginName, expectedPluginName)
}
case <-time.After(time.Second * 30):
t.Errorf("pluginInvokeEventChan receive timed out")
}
}
} else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
t.Errorf("Expected the pod to be scheduled, got an error: %v", err)
}
if unreservePlugin.numUnreserveCalled > 0 {
t.Errorf("Didn't expect the unreserve plugin to be called, was called %d times.", unreservePlugin.numUnreserveCalled)
for i, pl := range reservePlugins {
if pl.numUnreserveCalled != 0 {
t.Errorf("reservePlugins[%d].numUnreserveCalled = %d, want 0", i, pl.numUnreserveCalled)
}
}
}
unreservePlugin.reset()
preBindPlugin.reset()
for _, pl := range reservePlugins {
pl.reset()
}
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
})
}
@@ -1056,12 +1074,13 @@ func TestBindPlugin(t *testing.T) {
testContext := testutils.InitTestMaster(t, "bind-plugin", nil)
bindPlugin1 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.ClientSet}
bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-2", client: testContext.ClientSet}
unreservePlugin := &UnreservePlugin{name: "mock-unreserve-plugin"}
reservePlugin := &ReservePlugin{name: "mock-reserve-plugin"}
postBindPlugin := &PostBindPlugin{name: "mock-post-bind-plugin"}
// Create a plugin registry for testing. Register an unreserve, a bind plugin and a postBind plugin.
// Create a plugin registry for testing. Register reserve, bind, and
// postBind plugins.
registry := frameworkruntime.Registry{
unreservePlugin.Name(): func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return unreservePlugin, nil
reservePlugin.Name(): func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return reservePlugin, nil
},
bindPlugin1.Name(): func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return bindPlugin1, nil
@@ -1078,8 +1097,8 @@ func TestBindPlugin(t *testing.T) {
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}},
Reserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: reservePlugin.Name()}},
},
Bind: &schedulerconfig.PluginSet{
// Put DefaultBinder last.
@@ -1137,7 +1156,7 @@ func TestBindPlugin(t *testing.T) {
{
name: "bind plugin fails to bind the pod",
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")},
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: unreservePlugin.Name(), val: 1}, {pluginName: bindPlugin1.Name(), val: 2}, {pluginName: unreservePlugin.Name(), val: 2}},
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: reservePlugin.Name(), val: 1}, {pluginName: bindPlugin1.Name(), val: 2}, {pluginName: reservePlugin.Name(), val: 2}},
},
}
@@ -1151,7 +1170,7 @@ func TestBindPlugin(t *testing.T) {
bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan
bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan
unreservePlugin.pluginInvokeEventChan = pluginInvokeEventChan
reservePlugin.pluginInvokeEventChan = pluginInvokeEventChan
postBindPlugin.pluginInvokeEventChan = pluginInvokeEventChan
// Create a best effort pod.
@@ -1194,8 +1213,8 @@ func TestBindPlugin(t *testing.T) {
}); err != nil {
t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled)
}
if unreservePlugin.numUnreserveCalled != 0 {
t.Errorf("Expected the unreserve plugin not to be called, was called %d times.", unreservePlugin.numUnreserveCalled)
if reservePlugin.numUnreserveCalled != 0 {
t.Errorf("Expected unreserve to not be called, was called %d times.", reservePlugin.numUnreserveCalled)
}
} else {
// bind plugin fails to bind the pod
@@ -1223,7 +1242,7 @@ func TestBindPlugin(t *testing.T) {
postBindPlugin.reset()
bindPlugin1.reset()
bindPlugin2.reset()
unreservePlugin.reset()
reservePlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
})
}

View File

@@ -144,10 +144,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
"ReservePlugin": {{Name: "VolumeBinding"}},
"UnreservePlugin": {{Name: "VolumeBinding"}},
"PreBindPlugin": {{Name: "VolumeBinding"}},
"BindPlugin": {{Name: "DefaultBinder"}},
"ReservePlugin": {{Name: "VolumeBinding"}},
"PreBindPlugin": {{Name: "VolumeBinding"}},
"BindPlugin": {{Name: "DefaultBinder"}},
},
},
{
@@ -238,10 +237,9 @@ kind: Policy
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
"ReservePlugin": {{Name: "VolumeBinding"}},
"UnreservePlugin": {{Name: "VolumeBinding"}},
"PreBindPlugin": {{Name: "VolumeBinding"}},
"BindPlugin": {{Name: "DefaultBinder"}},
"ReservePlugin": {{Name: "VolumeBinding"}},
"PreBindPlugin": {{Name: "VolumeBinding"}},
"BindPlugin": {{Name: "DefaultBinder"}},
},
},
{