using structured logging in scheduler framework runtime

This commit is contained in:
SataQiu 2020-09-24 21:10:42 +08:00
parent 51184187b9
commit 47c58c3785
4 changed files with 65 additions and 67 deletions

View File

@ -18,6 +18,7 @@ package core
import (
"context"
"errors"
"fmt"
"math"
"strconv"
@ -512,7 +513,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
name: "test error with priority map",
wErr: fmt.Errorf("error while running score plugin for pod \"2\": %+v", errPrioritize),
wErr: fmt.Errorf("running Score plugins: %w", fmt.Errorf(`plugin "FalseMap" failed with: %w`, errPrioritize)),
},
{
name: "test podtopologyspread plugin - 2 nodes with maxskew=1",
@ -721,7 +722,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
wErr: fmt.Errorf(`prefilter plugin "FakePreFilter" failed for pod "test-prefilter": injected error status`),
wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
},
}
for _, test := range tests {

View File

@ -273,7 +273,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
p, err := factory(args, f)
if err != nil {
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
}
pluginsMap[name] = p
@ -397,9 +397,9 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
if status.IsUnschedulable() {
return status
}
msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), err))
}
}
@ -432,10 +432,9 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod(
}
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running AddPod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err))
}
}
@ -468,10 +467,9 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
}
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running RemovePod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err))
}
}
@ -577,9 +575,9 @@ func (f *frameworkImpl) RunPreScorePlugins(
for _, pl := range f.preScorePlugins {
status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q prescore plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running PreScore plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running PreScore plugin %q: %w", pl.Name(), err))
}
}
@ -618,7 +616,8 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
@ -628,9 +627,8 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return nil, framework.NewStatus(framework.Error, msg)
klog.ErrorS(err, "Failed running Score plugins", "pod", klog.KObj(pod))
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
// Run NormalizeScore method for each ScorePlugin in parallel.
@ -642,15 +640,14 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return nil, framework.NewStatus(framework.Error, msg)
klog.ErrorS(err, "Failed running Normalize on Score plugins", "pod", klog.KObj(pod))
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}
// Apply score defaultWeights for each ScorePlugin in parallel.
@ -663,7 +660,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
errCh.SendErrorWithCancel(err, cancel)
return
}
@ -671,9 +668,8 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
klog.Error(msg)
return nil, framework.NewStatus(framework.Error, msg)
klog.ErrorS(err, "Failed applying score defaultWeights on Score plugins", "pod", klog.KObj(pod))
return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
}
return pluginToNodeScores, nil
@ -710,9 +706,9 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.
for _, pl := range f.preBindPlugins {
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := fmt.Errorf("error while running %q prebind plugin for pod %q: %w", pl.Name(), pod.Name, status.AsError())
klog.Error(err)
return framework.AsStatus(err)
err := status.AsError()
klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
}
}
return nil
@ -743,9 +739,9 @@ func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.Cyc
continue
}
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed to bind pod \"%v/%v\": %w", bp.Name(), pod.Namespace, pod.Name, status.AsError())
klog.Error(err)
return framework.AsStatus(err)
err := status.AsError()
klog.ErrorS(err, "Failed running Bind plugin", "plugin", bp.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", bp.Name(), err))
}
return status
}
@ -796,9 +792,9 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra
for _, pl := range f.reservePlugins {
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running Reserve in %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
}
}
return nil
@ -867,9 +863,9 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
pluginsWaitTime[pl.Name()] = timeout
statusCode = framework.Wait
} else {
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := status.AsError()
klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err))
}
}
}
@ -912,9 +908,9 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *
klog.V(4).Infof(msg)
return framework.NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
err := s.AsError()
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err))
}
return nil
}

View File

@ -346,6 +346,8 @@ var nodes = []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
}
var errInjectedStatus = errors.New("injected status")
func newFrameworkWithQueueSortAndBind(r Registry, pl *config.Plugins, plc []config.PluginConfig, opts ...Option) (v1alpha1.Framework, error) {
if _, ok := r[queueSortPlugin]; !ok {
r[queueSortPlugin] = newQueueSortPlugin
@ -1139,7 +1141,6 @@ func TestPostFilterPlugins(t *testing.T) {
}
func TestPreBindPlugins(t *testing.T) {
injectedStatusErr := errors.New("injected status")
tests := []struct {
name string
plugins []*TestPlugin
@ -1168,7 +1169,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Unschedulable)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "ErrorPreBindPlugin",
@ -1178,7 +1179,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "UnschedulablePreBindPlugin",
@ -1188,7 +1189,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.UnschedulableAndUnresolvable)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "SuccessErrorPreBindPlugins",
@ -1202,7 +1203,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin 1" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin 1": %w`, errInjectedStatus)),
},
{
name: "ErrorSuccessPreBindPlugin",
@ -1216,7 +1217,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Success)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "SuccessSuccessPreBindPlugin",
@ -1244,7 +1245,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "UnschedulableAndSuccessPreBindPlugin",
@ -1258,7 +1259,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(v1alpha1.Success)},
},
},
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`error while running "TestPlugin" prebind plugin for pod "": %w`, injectedStatusErr)),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)),
},
}
@ -1324,7 +1325,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Unschedulable)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "ErrorReservePlugin",
@ -1334,7 +1335,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "UnschedulableReservePlugin",
@ -1344,7 +1345,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.UnschedulableAndUnresolvable)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "SuccessSuccessReservePlugins",
@ -1372,7 +1373,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "SuccessErrorReservePlugins",
@ -1386,7 +1387,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Error)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin 1" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin 1": %w`, errInjectedStatus)),
},
{
name: "ErrorSuccessReservePlugin",
@ -1400,7 +1401,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Success)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "UnschedulableAndSuccessReservePlugin",
@ -1414,7 +1415,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(v1alpha1.Success)},
},
},
wantStatus: v1alpha1.NewStatus(v1alpha1.Error, `error while running Reserve in "TestPlugin" reserve plugin for pod "": injected status`),
wantStatus: v1alpha1.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
},
}
@ -1490,7 +1491,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(v1alpha1.Error)},
},
},
want: v1alpha1.NewStatus(v1alpha1.Error, `error while running "TestPlugin" permit plugin for pod "": injected status`),
want: v1alpha1.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)),
},
{
name: "UnschedulableAndUnresolvablePermitPlugin",
@ -1538,7 +1539,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(v1alpha1.Error)},
},
},
want: v1alpha1.NewStatus(v1alpha1.Error, `error while running "TestPlugin" permit plugin for pod "": injected status`),
want: v1alpha1.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)),
},
}

View File

@ -235,7 +235,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: errors.New(`error while running Reserve in "FakeReserve" reserve plugin for pod "foo": reserve error`),
expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, errors.New("reserve error")),
eventReason: "FailedScheduling",
},
{
@ -248,7 +248,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: errors.New(`error while running "FakePermit" permit plugin for pod "foo": permit error`),
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, errors.New("permit error")),
eventReason: "FailedScheduling",
},
{
@ -261,7 +261,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: fmt.Errorf(`error while running "FakePreBind" prebind plugin for pod "foo": %w`, preBindErr),
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
eventReason: "FailedScheduling",
},
{
@ -287,7 +287,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
injectBindError: errB,
expectError: errors.New("Binding rejected: plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"),
expectError: errors.New(`Binding rejected: running Bind plugin "DefaultBinder": binder`),
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
eventReason: "FailedScheduling",
@ -964,7 +964,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
expectAssumeCalled: true,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("error while running Reserve in %q reserve plugin for pod %q: %v", volumebinding.Name, "foo", assumeErr),
expectError: fmt.Errorf("running Reserve plugin %q: %w", volumebinding.Name, assumeErr),
},
{
name: "bind error",
@ -974,7 +974,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectAssumeCalled: true,
expectBindCalled: true,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("error while running %q prebind plugin for pod %q: %v", volumebinding.Name, "foo", bindErr),
expectError: fmt.Errorf("running PreBind plugin %q: %w", volumebinding.Name, bindErr),
},
}