kubelet: Migrate pkg/kubelet/oom to contextual logging

This commit is contained in:
Ed Bartosh 2024-10-30 16:56:18 +02:00
parent 95d71c464a
commit f622be0333
9 changed files with 29 additions and 12 deletions

View File

@ -168,6 +168,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues, # As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift # NewContext calls have to go through klog. Once it is GA, we can lift

View File

@ -214,6 +214,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues, # As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift # NewContext calls have to go through klog. Once it is GA, we can lift

View File

@ -216,6 +216,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues, # As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift # NewContext calls have to go through klog. Once it is GA, we can lift

View File

@ -52,6 +52,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues, # As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift # NewContext calls have to go through klog. Once it is GA, we can lift

View File

@ -1575,7 +1575,7 @@ func (kl *Kubelet) StartGarbageCollection() {
// initializeModules will initialize internal modules that do not require the container runtime to be up. // initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here. // Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error { func (kl *Kubelet) initializeModules(ctx context.Context) error {
// Prometheus metrics. // Prometheus metrics.
metrics.Register( metrics.Register(
collectors.NewVolumeStatsCollector(kl), collectors.NewVolumeStatsCollector(kl),
@ -1614,7 +1614,7 @@ func (kl *Kubelet) initializeModules() error {
// Start out of memory watcher. // Start out of memory watcher.
if kl.oomWatcher != nil { if kl.oomWatcher != nil {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { if err := kl.oomWatcher.Start(ctx, kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher: %w", err) return fmt.Errorf("failed to start OOM watcher: %w", err)
} }
} }
@ -1721,7 +1721,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go kl.cloudResourceSyncManager.Run(wait.NeverStop) go kl.cloudResourceSyncManager.Run(wait.NeverStop)
} }
if err := kl.initializeModules(); err != nil { if err := kl.initializeModules(ctx); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules") klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1) os.Exit(1)

View File

@ -20,6 +20,7 @@ limitations under the License.
package oom package oom
import ( import (
"context"
"fmt" "fmt"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -71,16 +72,17 @@ const (
) )
// Start watches for system oom's and records an event for every system oom encountered. // Start watches for system oom's and records an event for every system oom encountered.
func (ow *realWatcher) Start(ref *v1.ObjectReference) error { func (ow *realWatcher) Start(ctx context.Context, ref *v1.ObjectReference) error {
outStream := make(chan *oomparser.OomInstance, 10) outStream := make(chan *oomparser.OomInstance, 10)
go ow.oomStreamer.StreamOoms(outStream) go ow.oomStreamer.StreamOoms(outStream)
go func() { go func() {
logger := klog.FromContext(ctx)
defer runtime.HandleCrash() defer runtime.HandleCrash()
for event := range outStream { for event := range outStream {
if event.VictimContainerName == recordEventContainerName { if event.VictimContainerName == recordEventContainerName {
klog.V(1).InfoS("Got sys oom event", "event", event) logger.V(1).Info("Got sys oom event", "event", event)
eventMsg := "System OOM encountered" eventMsg := "System OOM encountered"
if event.ProcessName != "" && event.Pid != 0 { if event.ProcessName != "" && event.Pid != 0 {
eventMsg = fmt.Sprintf("%s, victim process: %s, pid: %d", eventMsg, event.ProcessName, event.Pid) eventMsg = fmt.Sprintf("%s, victim process: %s, pid: %d", eventMsg, event.ProcessName, event.Pid)
@ -88,7 +90,7 @@ func (ow *realWatcher) Start(ref *v1.ObjectReference) error {
ow.recorder.Eventf(ref, v1.EventTypeWarning, systemOOMEvent, eventMsg) ow.recorder.Eventf(ref, v1.EventTypeWarning, systemOOMEvent, eventMsg)
} }
} }
klog.ErrorS(nil, "Unexpectedly stopped receiving OOM notifications") logger.Error(nil, "Unexpectedly stopped receiving OOM notifications")
}() }()
return nil return nil
} }

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/test/utils/ktesting"
"github.com/google/cadvisor/utils/oomparser" "github.com/google/cadvisor/utils/oomparser"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -41,6 +42,7 @@ func (fs *fakeStreamer) StreamOoms(outStream chan<- *oomparser.OomInstance) {
// TestWatcherRecordsEventsForOomEvents ensures that our OomInstances coming // TestWatcherRecordsEventsForOomEvents ensures that our OomInstances coming
// from `StreamOoms` are translated into events in our recorder. // from `StreamOoms` are translated into events in our recorder.
func TestWatcherRecordsEventsForOomEvents(t *testing.T) { func TestWatcherRecordsEventsForOomEvents(t *testing.T) {
tCtx := ktesting.Init(t)
oomInstancesToStream := []*oomparser.OomInstance{ oomInstancesToStream := []*oomparser.OomInstance{
{ {
Pid: 1000, Pid: 1000,
@ -63,7 +65,7 @@ func TestWatcherRecordsEventsForOomEvents(t *testing.T) {
recorder: fakeRecorder, recorder: fakeRecorder,
oomStreamer: fakeStreamer, oomStreamer: fakeStreamer,
} }
assert.NoError(t, oomWatcher.Start(node)) assert.NoError(t, oomWatcher.Start(tCtx, node))
eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents) eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents)
assert.Len(t, eventsRecorded, numExpectedOomEvents) assert.Len(t, eventsRecorded, numExpectedOomEvents)
@ -92,6 +94,7 @@ func getRecordedEvents(fakeRecorder *record.FakeRecorder, numExpectedOomEvents i
func TestWatcherRecordsEventsForOomEventsCorrectContainerName(t *testing.T) { func TestWatcherRecordsEventsForOomEventsCorrectContainerName(t *testing.T) {
// By "incorrect" container name, we mean a container name for which we // By "incorrect" container name, we mean a container name for which we
// don't want to record an oom event. // don't want to record an oom event.
tCtx := ktesting.Init(t)
numOomEventsWithIncorrectContainerName := 1 numOomEventsWithIncorrectContainerName := 1
oomInstancesToStream := []*oomparser.OomInstance{ oomInstancesToStream := []*oomparser.OomInstance{
{ {
@ -122,7 +125,7 @@ func TestWatcherRecordsEventsForOomEventsCorrectContainerName(t *testing.T) {
recorder: fakeRecorder, recorder: fakeRecorder,
oomStreamer: fakeStreamer, oomStreamer: fakeStreamer,
} }
assert.NoError(t, oomWatcher.Start(node)) assert.NoError(t, oomWatcher.Start(tCtx, node))
eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents) eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents)
assert.Len(t, eventsRecorded, numExpectedOomEvents) assert.Len(t, eventsRecorded, numExpectedOomEvents)
@ -135,6 +138,8 @@ func TestWatcherRecordsEventsForOomEventsWithAdditionalInfo(t *testing.T) {
eventPid := 1000 eventPid := 1000
processName := "fakeProcess" processName := "fakeProcess"
tCtx := ktesting.Init(t)
oomInstancesToStream := []*oomparser.OomInstance{ oomInstancesToStream := []*oomparser.OomInstance{
{ {
Pid: eventPid, Pid: eventPid,
@ -157,7 +162,7 @@ func TestWatcherRecordsEventsForOomEventsWithAdditionalInfo(t *testing.T) {
recorder: fakeRecorder, recorder: fakeRecorder,
oomStreamer: fakeStreamer, oomStreamer: fakeStreamer,
} }
assert.NoError(t, oomWatcher.Start(node)) assert.NoError(t, oomWatcher.Start(tCtx, node))
eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents) eventsRecorded := getRecordedEvents(fakeRecorder, numExpectedOomEvents)

View File

@ -20,6 +20,8 @@ limitations under the License.
package oom package oom
import ( import (
"context"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
) )
@ -33,6 +35,6 @@ func NewWatcher(_ record.EventRecorder) (Watcher, error) {
return &oomWatcherUnsupported{}, nil return &oomWatcherUnsupported{}, nil
} }
func (ow *oomWatcherUnsupported) Start(_ *v1.ObjectReference) error { func (ow *oomWatcherUnsupported) Start(_ context.Context, _ *v1.ObjectReference) error {
return nil return nil
} }

View File

@ -16,9 +16,13 @@ limitations under the License.
package oom package oom
import v1 "k8s.io/api/core/v1" import (
"context"
v1 "k8s.io/api/core/v1"
)
// Watcher defines the interface of OOM watchers. // Watcher defines the interface of OOM watchers.
type Watcher interface { type Watcher interface {
Start(ref *v1.ObjectReference) error Start(ctx context.Context, ref *v1.ObjectReference) error
} }