mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
podgc controller: convert to contextual logging
This commit is contained in:
parent
745cfa35bd
commit
1b8ddf6b79
@ -98,11 +98,13 @@ func NewPodGCInternal(ctx context.Context, kubeClient clientset.Interface, podIn
|
||||
}
|
||||
|
||||
func (gcc *PodGCController) Run(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
klog.Infof("Starting GC controller")
|
||||
logger.Info("Starting GC controller")
|
||||
defer gcc.nodeQueue.ShutDown()
|
||||
defer klog.Infof("Shutting down GC controller")
|
||||
defer logger.Info("Shutting down GC controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("GC", ctx.Done(), gcc.podListerSynced, gcc.nodeListerSynced) {
|
||||
return
|
||||
@ -116,12 +118,12 @@ func (gcc *PodGCController) Run(ctx context.Context) {
|
||||
func (gcc *PodGCController) gc(ctx context.Context) {
|
||||
pods, err := gcc.podLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Error while listing all pods: %v", err)
|
||||
klog.FromContext(ctx).Error(err, "Error while listing all pods")
|
||||
return
|
||||
}
|
||||
nodes, err := gcc.nodeLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Error while listing all nodes: %v", err)
|
||||
klog.FromContext(ctx).Error(err, "Error while listing all nodes")
|
||||
return
|
||||
}
|
||||
if gcc.terminatedPodThreshold > 0 {
|
||||
@ -147,20 +149,21 @@ func isPodTerminating(pod *v1.Pod) bool {
|
||||
}
|
||||
|
||||
func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) {
|
||||
klog.V(4).Info("GC'ing terminating pods that are on out-of-service nodes")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("GC'ing terminating pods that are on out-of-service nodes")
|
||||
terminatingPods := []*v1.Pod{}
|
||||
for _, pod := range pods {
|
||||
if isPodTerminating(pod) {
|
||||
node, err := gcc.nodeLister.Get(pod.Spec.NodeName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get node %s : %s", pod.Spec.NodeName, err)
|
||||
logger.Error(err, "Failed to get node", "node", klog.KRef("", pod.Spec.NodeName))
|
||||
continue
|
||||
}
|
||||
// Add this pod to terminatingPods list only if the following conditions are met:
|
||||
// 1. Node is not ready.
|
||||
// 2. Node has `node.kubernetes.io/out-of-service` taint.
|
||||
if !nodeutil.IsNodeReady(node) && taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService) {
|
||||
klog.V(4).Infof("garbage collecting pod %s that is terminating. Phase [%v]", pod.Name, pod.Status.Phase)
|
||||
logger.V(4).Info("Garbage collecting pod that is terminating", "pod", klog.KObj(pod), "phase", pod.Status.Phase)
|
||||
terminatingPods = append(terminatingPods, pod)
|
||||
}
|
||||
}
|
||||
@ -171,7 +174,7 @@ func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Garbage collecting %v pods that are terminating on node tainted with node.kubernetes.io/out-of-service", deleteCount)
|
||||
logger.V(4).Info("Garbage collecting pods that are terminating on node tainted with node.kubernetes.io/out-of-service", "numPods", deleteCount)
|
||||
// sort only when necessary
|
||||
sort.Sort(byEvictionAndCreationTimestamp(terminatingPods))
|
||||
var wait sync.WaitGroup
|
||||
@ -205,7 +208,8 @@ func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.InfoS("Garbage collecting pods", "numPods", deleteCount)
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Garbage collecting pods", "numPods", deleteCount)
|
||||
// sort only when necessary
|
||||
sort.Sort(byEvictionAndCreationTimestamp(terminatedPods))
|
||||
var wait sync.WaitGroup
|
||||
@ -224,7 +228,8 @@ func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) {
|
||||
|
||||
// gcOrphaned deletes pods that are bound to nodes that don't exist.
|
||||
func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, nodes []*v1.Node) {
|
||||
klog.V(4).Infof("GC'ing orphaned")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("GC'ing orphaned")
|
||||
existingNodeNames := sets.NewString()
|
||||
for _, node := range nodes {
|
||||
existingNodeNames.Insert(node.Name)
|
||||
@ -245,7 +250,7 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
|
||||
if !deletedNodesNames.Has(pod.Spec.NodeName) {
|
||||
continue
|
||||
}
|
||||
klog.V(2).InfoS("Found orphaned Pod assigned to the Node, deleting.", "pod", klog.KObj(pod), "node", pod.Spec.NodeName)
|
||||
logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName))
|
||||
condition := corev1apply.PodCondition().
|
||||
WithType(v1.DisruptionTarget).
|
||||
WithStatus(v1.ConditionTrue).
|
||||
@ -255,7 +260,7 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
|
||||
if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
} else {
|
||||
klog.InfoS("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod))
|
||||
logger.Info("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -272,7 +277,7 @@ func (gcc *PodGCController) discoverDeletedNodes(ctx context.Context, existingNo
|
||||
exists, err := gcc.checkIfNodeExists(ctx, nodeName)
|
||||
switch {
|
||||
case err != nil:
|
||||
klog.ErrorS(err, "Error while getting node", "node", klog.KRef("", nodeName))
|
||||
klog.FromContext(ctx).Error(err, "Error while getting node", "node", klog.KRef("", nodeName))
|
||||
// Node will be added back to the queue in the subsequent loop if still needed
|
||||
case !exists:
|
||||
deletedNodesNames.Insert(nodeName)
|
||||
@ -293,18 +298,19 @@ func (gcc *PodGCController) checkIfNodeExists(ctx context.Context, name string)
|
||||
|
||||
// gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
|
||||
func (gcc *PodGCController) gcUnscheduledTerminating(ctx context.Context, pods []*v1.Pod) {
|
||||
klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("GC'ing unscheduled pods which are terminating")
|
||||
|
||||
for _, pod := range pods {
|
||||
if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
klog.V(2).InfoS("Found unscheduled terminating Pod not assigned to any Node, deleting.", "pod", klog.KObj(pod))
|
||||
logger.V(2).Info("Found unscheduled terminating Pod not assigned to any Node, deleting", "pod", klog.KObj(pod))
|
||||
if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
} else {
|
||||
klog.InfoS("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod))
|
||||
logger.Info("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -334,7 +340,8 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.
|
||||
}
|
||||
|
||||
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error {
|
||||
klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
|
||||
|
||||
// Mark the pod as failed - this is especially important in case the pod
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
metricstestutil "k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/testutil"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -45,11 +46,11 @@ import (
|
||||
|
||||
func alwaysReady() bool { return true }
|
||||
|
||||
func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer, coreinformers.NodeInformer) {
|
||||
func NewFromClient(ctx context.Context, kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer, coreinformers.NodeInformer) {
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||
controller := NewPodGC(context.TODO(), kubeClient, podInformer, nodeInformer, terminatedPodThreshold)
|
||||
controller := NewPodGC(ctx, kubeClient, podInformer, nodeInformer, terminatedPodThreshold)
|
||||
controller.podListerSynced = alwaysReady
|
||||
return controller, podInformer, nodeInformer
|
||||
}
|
||||
@ -148,9 +149,9 @@ func TestGCTerminated(t *testing.T) {
|
||||
deletedPodNames: sets.NewString("c"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
||||
creationTime := time.Unix(0, 0)
|
||||
nodes := []*v1.Node{testutil.NewNode("node")}
|
||||
@ -165,12 +166,12 @@ func TestGCTerminated(t *testing.T) {
|
||||
})
|
||||
}
|
||||
client := setupNewSimpleClient(nodes, pods)
|
||||
gcc, podInformer, _ := NewFromClient(client, test.threshold)
|
||||
gcc, podInformer, _ := NewFromClient(ctx, client, test.threshold)
|
||||
for _, pod := range pods {
|
||||
podInformer.Informer().GetStore().Add(pod)
|
||||
}
|
||||
|
||||
gcc.gc(context.TODO())
|
||||
gcc.gc(ctx)
|
||||
|
||||
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
|
||||
})
|
||||
@ -346,6 +347,7 @@ func TestGCOrphaned(t *testing.T) {
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
||||
nodes := make([]*v1.Node, 0, len(test.initialClientNodes))
|
||||
for _, node := range test.initialClientNodes {
|
||||
@ -356,7 +358,7 @@ func TestGCOrphaned(t *testing.T) {
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
client := setupNewSimpleClient(nodes, pods)
|
||||
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
|
||||
gcc, podInformer, nodeInformer := NewFromClient(ctx, client, -1)
|
||||
for _, node := range test.initialInformerNodes {
|
||||
nodeInformer.Informer().GetStore().Add(node)
|
||||
}
|
||||
@ -369,7 +371,7 @@ func TestGCOrphaned(t *testing.T) {
|
||||
gcc.nodeQueue = workqueue.NewDelayingQueueWithCustomClock(fakeClock, "podgc_test_queue")
|
||||
|
||||
// First GC of orphaned pods
|
||||
gcc.gc(context.TODO())
|
||||
gcc.gc(ctx)
|
||||
deletedPodNames := getDeletedPodNames(client)
|
||||
|
||||
if len(deletedPodNames) > 0 {
|
||||
@ -456,6 +458,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
||||
creationTime := time.Unix(0, 0)
|
||||
|
||||
@ -471,7 +474,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
|
||||
}
|
||||
nodes := []*v1.Node{}
|
||||
client := setupNewSimpleClient(nodes, pods)
|
||||
gcc, podInformer, _ := NewFromClient(client, -1)
|
||||
gcc, podInformer, _ := NewFromClient(ctx, client, -1)
|
||||
|
||||
for _, pod := range pods {
|
||||
podInformer.Informer().GetStore().Add(pod)
|
||||
@ -482,7 +485,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
|
||||
t.Errorf("Error while listing all Pods: %v", err)
|
||||
return
|
||||
}
|
||||
gcc.gcUnscheduledTerminating(context.TODO(), pods)
|
||||
gcc.gcUnscheduledTerminating(ctx, pods)
|
||||
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
|
||||
})
|
||||
}
|
||||
@ -608,6 +611,7 @@ func TestGCTerminating(t *testing.T) {
|
||||
}
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
||||
|
||||
creationTime := time.Unix(0, 0)
|
||||
@ -640,7 +644,7 @@ func TestGCTerminating(t *testing.T) {
|
||||
})
|
||||
}
|
||||
client := setupNewSimpleClient(nodes, pods)
|
||||
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
|
||||
gcc, podInformer, nodeInformer := NewFromClient(ctx, client, -1)
|
||||
|
||||
for _, pod := range pods {
|
||||
podInformer.Informer().GetStore().Add(pod)
|
||||
@ -649,7 +653,7 @@ func TestGCTerminating(t *testing.T) {
|
||||
nodeInformer.Informer().GetStore().Add(node)
|
||||
}
|
||||
|
||||
gcc.gc(context.TODO())
|
||||
gcc.gc(ctx)
|
||||
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user