Merge pull request #98358 from tanjing2020/scheduling_queue.go

migrate scheduling_queue.go to structured logging
This commit is contained in:
Kubernetes Prow Robot 2021-02-19 00:20:24 -08:00 committed by GitHub
commit 320aaf4859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 22 deletions

View File

@ -250,16 +250,16 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
defer p.lock.Unlock() defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod) pInfo := p.newQueuedPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil { if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err) klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
return err return err
} }
if p.unschedulableQ.get(pod) != nil { if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod)) klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
} }
// Delete pod from backoffQ if it is backing off // Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil { if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod)) klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
} }
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pod, "") p.PodNominator.AddNominatedPod(pod, "")
@ -268,14 +268,6 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
return nil return nil
} }
// nsNameForPod returns a namespacedname for a pod
func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
return ktypes.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
}
// isPodBackingoff returns true if a pod is still waiting for its backoff timer. // isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried. // If this returns true, the pod should not be re-tried.
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
@ -299,14 +291,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
defer p.lock.Unlock() defer p.lock.Unlock()
pod := pInfo.Pod pod := pInfo.Pod
if p.unschedulableQ.get(pod) != nil { if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod)) return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
} }
if _, exists, _ := p.activeQ.Get(pInfo); exists { if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod)) return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
} }
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod)) return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
} }
// Refresh the timestamp since the pod is re-added. // Refresh the timestamp since the pod is re-added.
@ -344,7 +336,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
} }
_, err := p.podBackoffQ.Pop() _, err := p.podBackoffQ.Pop()
if err != nil { if err != nil {
klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod)) klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
return return
} }
p.activeQ.Add(rawPodInfo) p.activeQ.Add(rawPodInfo)
@ -514,14 +506,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
pod := pInfo.Pod pod := pInfo.Pod
if p.isPodBackingoff(pInfo) { if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else { } else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
} }
} else { } else {
if err := p.activeQ.Add(pInfo); err != nil { if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
} else { } else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
@ -544,7 +536,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term) namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil { if err != nil {
klog.Errorf("Error getting label selectors for pod: %v.", up.Name) klog.ErrorS(err, "Error getting label selectors for pod", "pod", klog.KObj(up))
} }
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, pInfo) podsToMove = append(podsToMove, pInfo)
@ -748,7 +740,7 @@ func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
npm.nominatedPodToNode[p.UID] = nnn npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] { for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID { if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name) klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(p))
return return
} }
} }
@ -810,10 +802,10 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
return func() *framework.QueuedPodInfo { return func() *framework.QueuedPodInfo {
podInfo, err := queue.Pop() podInfo, err := queue.Pop()
if err == nil { if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
return podInfo return podInfo
} }
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
return nil return nil
} }
} }

View File

@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -1514,7 +1515,11 @@ func TestBackOffFlow(t *testing.T) {
UID: "test-uid", UID: "test-uid",
}, },
} }
podID := nsNameForPod(pod)
podID := ktypes.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
if err := q.Add(pod); err != nil { if err := q.Add(pod); err != nil {
t.Fatal(err) t.Fatal(err)
} }