Migrate scheduler.go, factory.go to structured logging

This commit is contained in:
Alex Dudko 2020-12-24 13:19:38 -08:00
parent 88a05df5ff
commit b11e4f2484
2 changed files with 27 additions and 27 deletions

View File

@ -92,7 +92,7 @@ func (c *Configurator) create() (*Scheduler, error) {
if len(c.extenders) != 0 {
var ignorableExtenders []framework.Extender
for ii := range c.extenders {
klog.V(2).Infof("Creating extender with config %+v", c.extenders[ii])
klog.V(2).InfoS("Creating extender", "extender", c.extenders[ii])
extender, err := core.NewHTTPExtender(&c.extenders[ii])
if err != nil {
return nil, err
@ -184,7 +184,7 @@ func (c *Configurator) create() (*Scheduler, error) {
// createFromProvider creates a scheduler from the name of a registered algorithm provider.
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
klog.V(2).InfoS("Creating scheduler from algorithm provider", "algorithmProvider", providerName)
r := algorithmprovider.NewRegistry()
defaultPlugins, exist := r[providerName]
if !exist {
@ -207,7 +207,7 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
lr := frameworkplugins.NewLegacyRegistry()
args := &frameworkplugins.ConfigProducerArgs{}
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
klog.V(2).InfoS("Creating scheduler from configuration", "policy", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
@ -216,26 +216,26 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
klog.V(2).InfoS("Using predicates from algorithm provider", "algorithmProvider", schedulerapi.SchedulerDefaultProviderName)
predicateKeys = lr.DefaultPredicates
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
klog.V(2).InfoS("Registering predicate", "predicate", predicate.Name)
predicateKeys.Insert(lr.ProcessPredicatePolicy(predicate, args))
}
}
priorityKeys := make(map[string]int64)
if policy.Priorities == nil {
klog.V(2).Infof("Using default priorities")
klog.V(2).InfoS("Using default priorities")
priorityKeys = lr.DefaultPriorities
} else {
for _, priority := range policy.Priorities {
if priority.Name == frameworkplugins.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
klog.V(2).InfoS("Skip registering priority", "priority", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
klog.V(2).InfoS("Registering priority", "priority", priority.Name)
priorityKeys[lr.ProcessPriorityPolicy(priority, args)] = priority.Weight
}
}
@ -254,7 +254,7 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
klog.V(2).InfoS("Creating scheduler", "predicates", predicateKeys, "priorities", priorityKeys)
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
@ -320,7 +320,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
} else if _, ok := err.(*core.FitError); ok {
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
} else if apierrors.IsNotFound(err) {
klog.V(2).Infof("Unable to schedule %v/%v: possibly due to node not found: %v; waiting", pod.Namespace, pod.Name, err)
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
nodeName := errStatus.Status().Details.Name
// when node is not found, We do not remove the node right away. Trying again to get
@ -329,7 +329,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
if err != nil && apierrors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
if err := schedulerCache.RemoveNode(&node); err != nil {
klog.V(4).Infof("Node %q is not found; failed to remove it from the cache.", node.Name)
klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
}
}
}
@ -340,21 +340,21 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
// Check if the Pod exists in informer cache.
cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err)
klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
return
}
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
if len(cachedPod.Spec.NodeName) != 0 {
klog.Warningf("Pod %v/%v has been assigned with %v. Abort adding it back to queue.", pod.Namespace, pod.Name, cachedPod.Spec.NodeName)
klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
return
}
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
podInfo.Pod = cachedPod.DeepCopy()
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
klog.Error(err)
klog.ErrorS(err, "Error occurred")
}
}
}

View File

@ -335,7 +335,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo
Reason: reason,
Message: err.Error(),
}, nominatedNode); err != nil {
klog.Errorf("Error updating pod %s/%s: %v", pod.Namespace, pod.Name, err)
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
}
}
@ -364,7 +364,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
assumed.Spec.NodeName = host
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
klog.ErrorS(err, "scheduler cache AssumePod failed")
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
@ -413,10 +413,10 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
klog.ErrorS(finErr, "scheduler cache FinishBinding failed")
}
if err != nil {
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
klog.V(1).InfoS("Failed to bind pod", "pod", klog.KObj(assumed))
return
}
@ -435,7 +435,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.Error(err)
klog.ErrorS(err, "Error occurred")
return
}
if sched.skipPodSchedule(fwk, pod) {
@ -459,14 +459,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
nominatedNode := ""
if fitError, ok := err.(*core.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
} else {
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
if status.Code() == framework.Error {
klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
klog.ErrorS(nil, "Status after running PostFilter plugins for pod", klog.KObj(pod), "status", status)
} else {
klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
}
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
@ -510,7 +510,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
return
@ -530,7 +530,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
return
@ -556,7 +556,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
return
@ -569,7 +569,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
return
@ -581,7 +581,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
klog.ErrorS(err, "scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
} else {