diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index 7f4a1351258..e227c7da865 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -19,6 +19,8 @@ package framework import ( "errors" "sync" + + "k8s.io/apimachinery/pkg/util/sets" ) var ( @@ -48,6 +50,8 @@ type CycleState struct { storage sync.Map // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool + // SkipFilterPlugins are plugins that will be skipped in the Filter extension point. + SkipFilterPlugins sets.Set[string] } // NewCycleState initializes a new CycleState and returns its pointer. @@ -83,6 +87,7 @@ func (c *CycleState) Clone() *CycleState { return true }) copy.recordPluginMetrics = c.recordPluginMetrics + copy.SkipFilterPlugins = c.SkipFilterPlugins return copy } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 3d988a7df5a..0ce06e7ab4e 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -90,7 +90,9 @@ const ( UnschedulableAndUnresolvable // Wait is used when a Permit plugin finds a pod scheduling should wait. Wait - // Skip is used when a Bind plugin chooses to skip binding. + // Skip is used in the following scenarios: + // - when a Bind plugin chooses to skip binding. + // - when a PreFilter plugin returns Skip so that coupled Filter plugin/PreFilterExtensions() will be skipped. Skip ) @@ -383,6 +385,8 @@ type PreFilterPlugin interface { // plugins must return success or the pod will be rejected. PreFilter could optionally // return a PreFilterResult to influence which nodes to evaluate downstream. This is useful // for cases where it is possible to determine the subset of nodes to process in O(1) time. + // When it returns Skip status, returned PreFilterResult and other fields in status are just ignored, + // and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle. PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status) // PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one, // or nil if it does not. A Pre-filter plugin can provide extensions to incrementally diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 41e0a27d708..841da141b58 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -20,7 +20,7 @@ import ( "context" "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -89,13 +89,19 @@ func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent { // PreFilter builds and writes cycle state used by Filter. func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + affinity := pod.Spec.Affinity + noNodeAffinity := (affinity == nil || + affinity.NodeAffinity == nil || + affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil) + if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil { + // NodeAffinity Filter has nothing to do with the Pod. + return nil, framework.NewStatus(framework.Skip) + } + state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)} cycleState.Write(preFilterStateKey, state) - affinity := pod.Spec.Affinity - if affinity == nil || - affinity.NodeAffinity == nil || - affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil || - len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + + if noNodeAffinity || len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { return nil, nil } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 651f76baa06..e372779ae85 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -44,10 +44,6 @@ func TestNodeAffinity(t *testing.T) { args config.NodeAffinityArgs disablePreFilter bool }{ - { - name: "no selector", - pod: &v1.Pod{}, - }, { name: "missing labels", pod: st.MakePod().NodeSelector(map[string]string{ @@ -285,6 +281,7 @@ func TestNodeAffinity(t *testing.T) { labels: map[string]string{ "foo": "bar", }, + wantPreFilterStatus: framework.NewStatus(framework.Skip), }, { name: "Pod with Affinity but nil NodeSelector will schedule onto a node", @@ -300,6 +297,7 @@ func TestNodeAffinity(t *testing.T) { labels: map[string]string{ "foo": "bar", }, + wantPreFilterStatus: framework.NewStatus(framework.Skip), }, { name: "Pod with multiple matchExpressions ANDed that matches the existing node", diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 3ca801de891..702cbf786f5 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -599,8 +599,10 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { // RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns // *Status and its code is set to non-success if any of the plugins returns -// anything but Success. If a non-success status is returned, then the scheduling -// cycle is aborted. +// anything but Success/Skip. +// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored, +// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle. +// If a non-success status is returned, then the scheduling cycle is aborted. func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) { startTime := time.Now() defer func() { @@ -608,8 +610,13 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor }() var result *framework.PreFilterResult var pluginsWithNodes []string + skipPlugins := sets.New[string]() for _, pl := range f.preFilterPlugins { r, s := f.runPreFilterPlugin(ctx, pl, state, pod) + if s.IsSkip() { + skipPlugins.Insert(pl.Name()) + continue + } if !s.IsSuccess() { s.SetFailedPlugin(pl.Name()) if s.IsUnschedulable() { @@ -628,8 +635,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor } return nil, framework.NewStatus(framework.Unschedulable, msg) } - } + state.SkipFilterPlugins = skipPlugins return result, nil } @@ -654,7 +661,7 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod( nodeInfo *framework.NodeInfo, ) (status *framework.Status) { for _, pl := range f.preFilterPlugins { - if pl.PreFilterExtensions() == nil { + if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { continue } status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo) @@ -689,7 +696,7 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod( nodeInfo *framework.NodeInfo, ) (status *framework.Status) { for _, pl := range f.preFilterPlugins { - if pl.PreFilterExtensions() == nil { + if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { continue } status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo) @@ -726,6 +733,9 @@ func (f *frameworkImpl) RunFilterPlugins( var status *framework.Status for _, pl := range f.filterPlugins { + if state.SkipFilterPlugins.Has(pl.Name()) { + continue + } status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if !status.IsSuccess() { if !status.IsUnschedulable() { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 8d5b9103e46..b8a26b97e06 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1402,9 +1402,11 @@ func TestPreFilterPlugins(t *testing.T) { if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - f.RunPreFilterPlugins(ctx, nil, nil) - f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil) - f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil) + state := framework.NewCycleState() + + f.RunPreFilterPlugins(ctx, state, nil) + f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil) + f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil) if preFilter1.PreFilterCalled != 1 { t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled) @@ -1421,39 +1423,309 @@ func TestPreFilterPlugins(t *testing.T) { }) } -func TestRunPreFilterPluginsStatus(t *testing.T) { - preFilter := &TestPlugin{ - name: preFilterPluginName, - inj: injectedResult{PreFilterStatus: int(framework.Error)}, +func TestRunPreFilterPlugins(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + wantPreFilterResult *framework.PreFilterResult + wantSkippedPlugins sets.Set[string] + wantStatusCode framework.Code + }{ + { + name: "all PreFilter returned success", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "success2", + }, + }, + wantPreFilterResult: nil, + wantStatusCode: framework.Success, + }, + { + name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "success", + }, + { + name: "error", + inj: injectedResult{PreFilterStatus: int(framework.Error)}, + }, + }, + wantPreFilterResult: nil, + wantStatusCode: framework.Error, + }, + { + name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "skip", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "error", + inj: injectedResult{PreFilterStatus: int(framework.Error)}, + }, + }, + wantPreFilterResult: nil, + wantStatusCode: framework.Error, + }, + { + name: "all PreFilter plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "skip2", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "skip3", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + }, + wantPreFilterResult: nil, + wantSkippedPlugins: sets.New("skip1", "skip2", "skip3"), + wantStatusCode: framework.Success, + }, + { + name: "some PreFilter plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "success1", + }, + { + name: "skip2", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "success2", + }, + }, + wantPreFilterResult: nil, + wantSkippedPlugins: sets.New("skip1", "skip2"), + wantStatusCode: framework.Success, + }, } - r := make(Registry) - r.Register(preFilterPluginName, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { - return preFilter, nil + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + enabled := make([]config.Plugin, len(tt.plugins)) + for i, p := range tt.plugins { + p := p + enabled[i].Name = p.name + r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return p, nil + }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind( + r, + config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, + ctx.Done(), + ) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + state := framework.NewCycleState() + result, status := f.RunPreFilterPlugins(ctx, state, nil) + if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" { + t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d) + } + if status.Code() != tt.wantStatusCode { + t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode) + } + skipped := state.SkipFilterPlugins + if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" { + t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d) + } }) - - plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}} - - profile := config.KubeSchedulerProfile{Plugins: plugins} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) - if err != nil { - t.Fatalf("Failed to create framework for testing: %v", err) } - _, status := f.RunPreFilterPlugins(ctx, nil, nil) - wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name()) - if !reflect.DeepEqual(status, wantStatus) { - t.Errorf("wrong status. got: %v, want:%v", status, wantStatus) +} + +func TestRunPreFilterExtensionRemovePod(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + skippedPluginNames sets.Set[string] + wantStatusCode framework.Code + }{ + { + name: "no plugins are skipped and all RemovePod() returned success", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "success2", + }, + }, + wantStatusCode: framework.Success, + }, + { + name: "one RemovePod() returned error", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "error1", + inj: injectedResult{PreFilterRemovePodStatus: int(framework.Error)}, + }, + }, + wantStatusCode: framework.Error, + }, + { + name: "one RemovePod() is skipped", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "skipped", + // To confirm it's skipped, return error so that this test case will fail when it isn't skipped. + inj: injectedResult{PreFilterRemovePodStatus: int(framework.Error)}, + }, + }, + skippedPluginNames: sets.New("skipped"), + wantStatusCode: framework.Success, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + enabled := make([]config.Plugin, len(tt.plugins)) + for i, p := range tt.plugins { + p := p + enabled[i].Name = p.name + r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return p, nil + }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind( + r, + config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, + ctx.Done(), + ) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + state := framework.NewCycleState() + state.SkipFilterPlugins = tt.skippedPluginNames + status := f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil) + if status.Code() != tt.wantStatusCode { + t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode) + } + }) + } +} + +func TestRunPreFilterExtensionAddPod(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + skippedPluginNames sets.Set[string] + wantStatusCode framework.Code + }{ + { + name: "no plugins are skipped and all AddPod() returned success", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "success2", + }, + }, + wantStatusCode: framework.Success, + }, + { + name: "one AddPod() returned error", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "error1", + inj: injectedResult{PreFilterAddPodStatus: int(framework.Error)}, + }, + }, + wantStatusCode: framework.Error, + }, + { + name: "one AddPod() is skipped", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "skipped", + // To confirm it's skipped, return error so that this test case will fail when it isn't skipped. + inj: injectedResult{PreFilterAddPodStatus: int(framework.Error)}, + }, + }, + skippedPluginNames: sets.New("skipped"), + wantStatusCode: framework.Success, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + enabled := make([]config.Plugin, len(tt.plugins)) + for i, p := range tt.plugins { + p := p + enabled[i].Name = p.name + r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return p, nil + }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind( + r, + config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, + ctx.Done(), + ) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + state := framework.NewCycleState() + state.SkipFilterPlugins = tt.skippedPluginNames + status := f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil) + if status.Code() != tt.wantStatusCode { + t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode) + } + }) } } func TestFilterPlugins(t *testing.T) { tests := []struct { - name string - plugins []*TestPlugin - wantStatus *framework.Status + name string + plugins []*TestPlugin + skippedPlugins sets.Set[string] + wantStatus *framework.Status }{ { name: "SuccessFilter", @@ -1553,6 +1825,22 @@ func TestFilterPlugins(t *testing.T) { }, wantStatus: nil, }, + { + name: "SuccessAndSkipFilters", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{FilterStatus: int(framework.Success)}, + }, + + { + name: "TestPlugin2", + inj: injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result. + }, + }, + wantStatus: nil, + skippedPlugins: sets.New("TestPlugin2"), + }, { name: "ErrorAndSuccessFilters", plugins: []*TestPlugin{ @@ -1624,7 +1912,9 @@ func TestFilterPlugins(t *testing.T) { if err != nil { t.Fatalf("fail to create framework: %s", err) } - gotStatus := f.RunFilterPlugins(ctx, nil, pod, nil) + state := framework.NewCycleState() + state.SkipFilterPlugins = tt.skippedPlugins + gotStatus := f.RunFilterPlugins(ctx, state, pod, nil) if diff := cmp.Diff(gotStatus, tt.wantStatus, cmpOpts...); diff != "" { t.Errorf("Unexpected status: (-got, +want):\n%s", diff) } @@ -1854,7 +2144,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { t.Fatalf("fail to create framework: %s", err) } tt.nodeInfo.SetNode(tt.node) - gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo) + gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo) if diff := cmp.Diff(gotStatus, tt.wantStatus, cmpOpts...); diff != "" { t.Errorf("Unexpected status: (-got, +want):\n%s", diff) } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index ff6caefc23b..f59757a0569 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1980,6 +1980,42 @@ func TestSchedulerSchedulePod(t *testing.T) { }, }, }, + { + name: "test prefilter plugin returning skip", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin( + "FakePreFilter1", + st.NewFakePreFilterPlugin("FakeFilter1", nil, nil), + ), + st.RegisterFilterPlugin( + "FakeFilter1", + st.NewFakeFilterPlugin(map[string]framework.Code{ + "node1": framework.Unschedulable, + }), + ), + st.RegisterPluginAsExtensions("FakeFilter2", func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { + return st.FakePreFilterAndFilterPlugin{ + FakePreFilterPlugin: &st.FakePreFilterPlugin{ + Result: nil, + Status: framework.NewStatus(framework.Skip), + }, + FakeFilterPlugin: &st.FakeFilterPlugin{ + // This Filter plugin shouldn't be executed in the Filter extension point due to skip. + // To confirm that, return the status code Error to all Nodes. + FailedNodeReturnCodeMap: map[string]framework.Code{ + "node1": framework.Error, "node2": framework.Error, "node3": framework.Error, + }, + }, + }, nil + }, "PreFilter", "Filter"), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + nodes: []string{"node1", "node2", "node3"}, + pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(), + wantNodes: sets.NewString("node2", "node3"), + wantEvaluatedNodes: pointer.Int32(3), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/scheduler/testing/fake_plugins.go b/pkg/scheduler/testing/fake_plugins.go index a81c1bfe788..0367f965e47 100644 --- a/pkg/scheduler/testing/fake_plugins.go +++ b/pkg/scheduler/testing/fake_plugins.go @@ -22,7 +22,7 @@ import ( "sync/atomic" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -67,6 +67,16 @@ func NewTrueFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin return &TrueFilterPlugin{}, nil } +type FakePreFilterAndFilterPlugin struct { + *FakePreFilterPlugin + *FakeFilterPlugin +} + +// Name returns name of the plugin. +func (pl FakePreFilterAndFilterPlugin) Name() string { + return "FakePreFilterAndFilterPlugin" +} + // FakeFilterPlugin is a test filter plugin to record how many times its Filter() function have // been called, and it returns different 'Code' depending on its internal 'failedNodeReturnCodeMap'. type FakeFilterPlugin struct {