diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 28e54bd32fd..58b382f0b2c 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -152,6 +152,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* + contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci-strict.yaml b/hack/golangci-strict.yaml index 751a689bb39..aadba2975ce 100644 --- a/hack/golangci-strict.yaml +++ b/hack/golangci-strict.yaml @@ -198,6 +198,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* + contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index b9225b2aee5..cfa010446e2 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -201,6 +201,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* + contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 1ba8edd315f..01707811a3b 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -48,6 +48,7 @@ contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* +contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5d848de88fb..1094b410d8b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -738,7 +738,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistPeriod: eventedPlegRelistPeriod, RelistThreshold: eventedPlegRelistThreshold, } - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) + klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) // In case Evented PLEG has to fall back on Generic PLEG due to an error, // Evented PLEG should be able to reset the Generic PLEG relisting duration // to the default value. @@ -746,7 +746,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistPeriod: genericPlegRelistPeriod, RelistThreshold: genericPlegRelistThreshold, } - klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, + klet.eventedPleg, err = pleg.NewEventedPLEG(logger, klet.containerRuntime, klet.runtimeService, eventChannel, klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}) if err != nil { return nil, err @@ -756,7 +756,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistPeriod: genericPlegRelistPeriod, RelistThreshold: genericPlegRelistThreshold, } - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) + klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) } klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 1ad7b14fe87..0c4ae059ad0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -334,7 +334,7 @@ func newTestKubeletWithImageList( kubelet.resyncInterval = 10 * time.Second kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock) // Relist period does not affect the tests. - kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{}) + kubelet.pleg = pleg.NewGenericPLEG(logger, fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{}) kubelet.clock = fakeClock nodeRef := &v1.ObjectReference{ diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index f3d0cf7d22d..f20f0a28007 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -83,10 +83,12 @@ type EventedPLEG struct { stopCacheUpdateCh chan struct{} // Locks the start/stop operation of the Evented PLEG. runningMu sync.Mutex + // logger is used for contextual logging + logger klog.Logger } // NewEventedPLEG instantiates a new EventedPLEG object and return it. -func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, +func NewEventedPLEG(logger klog.Logger, runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int, relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) { handler, ok := genericPleg.(podLifecycleEventGeneratorHandler) @@ -102,6 +104,7 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries, relistDuration: relistDuration, clock: clock, + logger: logger, }, nil } @@ -184,7 +187,7 @@ func (e *EventedPLEG) watchEventsChannel() { if numAttempts >= e.eventedPlegMaxStreamRetries { if isEventedPLEGInUse() { // Fall back to Generic PLEG relisting since Evented PLEG is not working. - klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working") + e.logger.V(4).Info("Fall back to Generic PLEG relisting since Evented PLEG is not working") e.Stop() e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use. e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG. @@ -200,7 +203,7 @@ func (e *EventedPLEG) watchEventsChannel() { metrics.EventedPLEGConnErr.Inc() numAttempts++ e.Relist() // Force a relist to get the latest container and pods running metric. - klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err) + e.logger.V(4).Info("Evented PLEG: Failed to get container events, retrying: ", "err", err) } } }() @@ -221,7 +224,7 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap // b) in worst case, a relist will eventually sync the pod status. // TODO(#114371): Figure out a way to handle this case instead of ignoring. if event.PodSandboxStatus == nil || event.PodSandboxStatus.Metadata == nil { - klog.ErrorS(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event) + e.logger.Error(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event) continue } @@ -234,15 +237,15 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap // if branch is okay, we just use it to determine whether the // additional "podStatus" key and its value should be added. if klog.V(6).Enabled() { - klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status) + e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status) } else { - klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID) + e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID) } } else { - if klogV := klog.V(6); klogV.Enabled() { - klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status) + if klogV := e.logger.V(6); klogV.Enabled() { + e.logger.Info("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status) } else { - klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID) + e.logger.V(4).Info("Evented PLEG: Generated pod status from the received event", "podUID", podID) } // Preserve the pod IP across cache updates if the new IP is empty. // When a pod is torn down, kubelet may race with PLEG and retrieve @@ -282,23 +285,23 @@ func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) switch event.ContainerEventType { case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT: e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId}) - klog.V(4).InfoS("Received Container Stopped Event", "event", event.String()) + e.logger.V(4).Info("Received Container Stopped Event", "event", event.String()) case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT: // We only need to update the pod status on container create. // But we don't have to generate any PodLifeCycleEvent. Container creation related // PodLifeCycleEvent is ignored by the existing Generic PLEG as well. // https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and // https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273 - klog.V(4).InfoS("Received Container Created Event", "event", event.String()) + e.logger.V(4).Info("Received Container Created Event", "event", event.String()) case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT: e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId}) - klog.V(4).InfoS("Received Container Started Event", "event", event.String()) + e.logger.V(4).Info("Received Container Started Event", "event", event.String()) case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT: // In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of // Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169 e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId}) e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId}) - klog.V(4).InfoS("Received Container Deleted Event", "event", event) + e.logger.V(4).Info("Received Container Deleted Event", "event", event) } } @@ -330,7 +333,7 @@ func (e *EventedPLEG) sendPodLifecycleEvent(event *PodLifecycleEvent) { default: // record how many events were discarded due to channel out of capacity metrics.PLEGDiscardEvents.Inc() - klog.ErrorS(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event") + e.logger.Error(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event") } } @@ -356,7 +359,7 @@ func getPodSandboxState(podStatus *kubecontainer.PodStatus) kubecontainer.State func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) { cachedPodStatus, err := e.cache.Get(podStatus.ID) if err != nil { - klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID) + e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID) } // cache miss condition: The pod status object will have empty state if missed in cache if len(cachedPodStatus.SandboxStatuses) < 1 { @@ -387,7 +390,7 @@ func getContainerStateCount(podStatus *kubecontainer.PodStatus) map[kubecontaine func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) { cachedPodStatus, err := e.cache.Get(podStatus.ID) if err != nil { - klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID) + e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID) } // cache miss condition: The pod status object will have empty state if missed in cache diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index a020101920c..a8082721496 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -78,6 +78,8 @@ type GenericPLEG struct { relistDuration *RelistDuration // Mutex to serialize updateCache called by relist vs UpdateCache interface podCacheMutex sync.Mutex + // logger is used for contextual logging + logger klog.Logger } // plegContainerState has a one-to-one mapping to the @@ -116,10 +118,11 @@ type podRecord struct { type podRecords map[types.UID]*podRecord // NewGenericPLEG instantiates a new GenericPLEG object and return it. -func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent, +func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent, relistDuration *RelistDuration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator { return &GenericPLEG{ + logger: logger, relistDuration: relistDuration, runtime: runtime, eventChannel: eventChannel, @@ -176,12 +179,12 @@ func (g *GenericPLEG) Healthy() (bool, error) { return true, nil } -func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { +func generateEvents(logger klog.Logger, podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { if newState == oldState { return nil } - klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState) + logger.V(4).Info("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState) switch newState { case plegContainerRunning: return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}} @@ -221,7 +224,8 @@ func (g *GenericPLEG) Relist() { defer g.relistLock.Unlock() ctx := context.Background() - klog.V(5).InfoS("GenericPLEG: Relisting") + + g.logger.V(5).Info("GenericPLEG: Relisting") if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime)) @@ -235,7 +239,7 @@ func (g *GenericPLEG) Relist() { // Get all the pods. podList, err := g.runtime.GetPods(ctx, true) if err != nil { - klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods") + g.logger.Error(err, "GenericPLEG: Unable to retrieve pods") return } @@ -254,7 +258,7 @@ func (g *GenericPLEG) Relist() { // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { - events := computeEvents(oldPod, pod, &container.ID) + events := computeEvents(g.logger, oldPod, pod, &container.ID) for _, e := range events { updateEvents(eventsByPodID, e) } @@ -282,7 +286,7 @@ func (g *GenericPLEG) Relist() { // parallelize if needed. if err, updated := g.updateCache(ctx, pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. - klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) + g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) // make sure we try to reinspect the pod during the next relisting needsReinspection[pid] = pod @@ -315,7 +319,7 @@ func (g *GenericPLEG) Relist() { case g.eventChannel <- events[i]: default: metrics.PLEGDiscardEvents.Inc() - klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event") + g.logger.Error(nil, "Event channel is full, discard this relist() cycle event") } // Log exit code of containers when they finished in a particular event if events[i].Type == ContainerDied { @@ -331,7 +335,7 @@ func (g *GenericPLEG) Relist() { } if containerID, ok := events[i].Data.(string); ok { if exitCode, ok := containerExitCode[containerID]; ok && pod != nil { - klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode) + g.logger.V(2).Info("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode) } } } @@ -341,11 +345,11 @@ func (g *GenericPLEG) Relist() { if g.cacheEnabled() { // reinspect any pods that failed inspection during the previous relist if len(g.podsToReinspect) > 0 { - klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection") + g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection") for pid, pod := range g.podsToReinspect { if err, _ := g.updateCache(ctx, pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. - klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) + g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) needsReinspection[pid] = pod } } @@ -386,7 +390,7 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe return containers } -func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent { +func computeEvents(logger klog.Logger, oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent { var pid types.UID if oldPod != nil { pid = oldPod.ID @@ -395,7 +399,7 @@ func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.Contain } oldState := getContainerState(oldPod, cid) newState := getContainerState(newPod, cid) - return generateEvents(pid, cid.ID, oldState, newState) + return generateEvents(logger, pid, cid.ID, oldState, newState) } func (g *GenericPLEG) cacheEnabled() bool { @@ -433,7 +437,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p if pod == nil { // The pod is missing in the current relist. This means that // the pod has no visible (active or inactive) containers. - klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid)) + g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid)) g.cache.Delete(pid) return nil, true } @@ -448,15 +452,15 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p // if branch is okay, we just use it to determine whether the // additional "podStatus" key and its value should be added. if klog.V(6).Enabled() { - klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status) + g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status) } else { - klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name)) + g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name)) } } else { - if klogV := klog.V(6); klogV.Enabled() { - klogV.InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status) + if klogV := g.logger.V(6); klogV.Enabled() { + g.logger.Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status) } else { - klog.V(4).InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name)) + g.logger.V(4).Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name)) } // Preserve the pod IP across cache updates if the new IP is empty. // When a pod is torn down, kubelet may race with PLEG and retrieve