diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c750d036676..ad96e3d2592 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -540,7 +540,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, containerRefManager := kubecontainer.NewRefManager() - oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder) + oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder) + if err != nil { + return nil, err + } clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS)) for _, ipEntry := range kubeCfg.ClusterDNS { diff --git a/pkg/kubelet/oom/BUILD b/pkg/kubelet/oom/BUILD index 0a550c46458..d9e826b155c 100644 --- a/pkg/kubelet/oom/BUILD +++ b/pkg/kubelet/oom/BUILD @@ -66,11 +66,13 @@ go_test( "@io_bazel_rules_go//go/platform:android": [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//vendor/github.com/google/cadvisor/utils/oomparser:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", ], "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//vendor/github.com/google/cadvisor/utils/oomparser:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", ], "//conditions:default": [], diff --git a/pkg/kubelet/oom/oom_watcher_linux.go b/pkg/kubelet/oom/oom_watcher_linux.go index cf015f841ff..11724ad592f 100644 --- a/pkg/kubelet/oom/oom_watcher_linux.go +++ b/pkg/kubelet/oom/oom_watcher_linux.go @@ -29,29 +29,41 @@ import ( "github.com/google/cadvisor/utils/oomparser" ) +type streamer interface { + StreamOoms(chan<- *oomparser.OomInstance) +} + +var _ streamer = &oomparser.OomParser{} + type realWatcher struct { - recorder record.EventRecorder + recorder record.EventRecorder + oomStreamer streamer } var _ Watcher = &realWatcher{} -// NewWatcher creates and initializes a OOMWatcher based on parameters. -func NewWatcher(recorder record.EventRecorder) Watcher { - return &realWatcher{ - recorder: recorder, +// NewWatcher creates and initializes a OOMWatcher backed by Cadvisor as +// the oom streamer. +func NewWatcher(recorder record.EventRecorder) (Watcher, error) { + oomStreamer, err := oomparser.New() + if err != nil { + return nil, err } + + watcher := &realWatcher{ + recorder: recorder, + oomStreamer: oomStreamer, + } + + return watcher, nil } const systemOOMEvent = "SystemOOM" // Start watches for system oom's and records an event for every system oom encountered. func (ow *realWatcher) Start(ref *v1.ObjectReference) error { - oomLog, err := oomparser.New() - if err != nil { - return err - } outStream := make(chan *oomparser.OomInstance, 10) - go oomLog.StreamOoms(outStream) + go ow.oomStreamer.StreamOoms(outStream) go func() { defer runtime.HandleCrash() diff --git a/pkg/kubelet/oom/oom_watcher_linux_test.go b/pkg/kubelet/oom/oom_watcher_linux_test.go index 0a09a2fd842..8411df37269 100644 --- a/pkg/kubelet/oom/oom_watcher_linux_test.go +++ b/pkg/kubelet/oom/oom_watcher_linux_test.go @@ -19,19 +19,26 @@ package oom import ( "testing" - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" + + "github.com/google/cadvisor/utils/oomparser" + "github.com/stretchr/testify/assert" ) // TestBasic verifies that the OOMWatch works without error. func TestBasic(t *testing.T) { fakeRecorder := &record.FakeRecorder{} node := &v1.ObjectReference{} - oomWatcher := NewWatcher(fakeRecorder) - assert.NoError(t, oomWatcher.Start(node)) - // TODO: Improve this test once cadvisor exports events.EventChannel as an interface - // and thereby allow using a mock version of cadvisor. + // TODO: Substitute this `oomStreamer` out for a fake, and then write + // more comprehensive unit tests of the actual behavior. + oomStreamer, err := oomparser.New() + assert.NoError(t, err) + + oomWatcher := &realWatcher{ + recorder: fakeRecorder, + oomStreamer: oomStreamer, + } + assert.NoError(t, oomWatcher.Start(node)) } diff --git a/pkg/kubelet/oom/oom_watcher_unsupported.go b/pkg/kubelet/oom/oom_watcher_unsupported.go index a18b19d7af2..47e97fad57d 100644 --- a/pkg/kubelet/oom/oom_watcher_unsupported.go +++ b/pkg/kubelet/oom/oom_watcher_unsupported.go @@ -28,8 +28,8 @@ type oomWatcherUnsupported struct{} var _ Watcher = new(oomWatcherUnsupported) // NewWatcher creates a fake one here -func NewWatcher(_ record.EventRecorder) Watcher { - return &oomWatcherUnsupported{} +func NewWatcher(_ record.EventRecorder) (Watcher, error) { + return &oomWatcherUnsupported{}, nil } func (ow *oomWatcherUnsupported) Start(_ *v1.ObjectReference) error {