diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index dcf1e9dbacc..c1b38795338 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -39,6 +39,7 @@ const ( CgroupManagerOperationsKey = "cgroup_manager_latency_microseconds" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" + PLEGDiscardEventsKey = "pleg_discard_events" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" EvictionStatsAgeKey = "eviction_stats_age_microseconds" VolumeStatsCapacityBytesKey = "volume_stats_capacity_bytes" @@ -124,6 +125,14 @@ var ( Help: "Latency in microseconds for relisting pods in PLEG.", }, ) + PLEGDiscardEvents = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PLEGDiscardEventsKey, + Help: "The number of discard events in PLEG.", + }, + []string{}, + ) PLEGRelistInterval = prometheus.NewSummary( prometheus.SummaryOpts{ Subsystem: KubeletSubsystem, @@ -246,6 +255,7 @@ func Register(containerCache kubecontainer.RuntimeCache, collectors ...prometheu prometheus.MustRegister(ContainersPerPodCount) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(PLEGRelistLatency) + prometheus.MustRegister(PLEGDiscardEvents) prometheus.MustRegister(PLEGRelistInterval) prometheus.MustRegister(RuntimeOperations) prometheus.MustRegister(RuntimeOperationsLatency) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 4ae4bf0ad8b..bce9c7ea20e 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -265,7 +265,12 @@ func (g *GenericPLEG) relist() { if events[i].Type == ContainerChanged { continue } - g.eventChannel <- events[i] + select { + case g.eventChannel <- events[i]: + default: + metrics.PLEGDiscardEvents.WithLabelValues().Inc() + klog.Error("event channel is full, discard this relist() cycle event") + } } } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 2e972ed4521..860ff2934b8 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -34,6 +34,8 @@ import ( const ( testContainerRuntimeType = "fooRuntime" + // largeChannelCap is a large enough capacity to hold all events in a single test. + largeChannelCap = 100 ) type TestGenericPLEG struct { @@ -43,6 +45,10 @@ type TestGenericPLEG struct { } func newTestGenericPLEG() *TestGenericPLEG { + return newTestGenericPLEGWithChannelSize(largeChannelCap) +} + +func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} clock := clock.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a @@ -50,7 +56,7 @@ func newTestGenericPLEG() *TestGenericPLEG { pleg := &GenericPLEG{ relistPeriod: time.Hour, runtime: fakeRuntime, - eventChannel: make(chan *PodLifecycleEvent, 100), + eventChannel: make(chan *PodLifecycleEvent, eventChannelCap), podRecords: make(podRecords), clock: clock, } @@ -157,6 +163,65 @@ func TestRelisting(t *testing.T) { verifyEvents(t, expected, actual) } +// TestEventChannelFull test when channel is full, the events will be discard. +func TestEventChannelFull(t *testing.T) { + testPleg := newTestGenericPLEGWithChannelSize(4) + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + // The first relist should send a PodSync event to each pod. + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + createTestContainer("c2", kubecontainer.ContainerStateRunning), + createTestContainer("c3", kubecontainer.ContainerStateUnknown), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + }, + }}, + } + pleg.relist() + // Report every running/exited container if we see them for the first time. + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerStarted, Data: "c2"}, + {ID: "4567", Type: ContainerDied, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c1"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStateExited), + createTestContainer("c3", kubecontainer.ContainerStateRunning), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c4", kubecontainer.ContainerStateRunning), + }, + }}, + } + pleg.relist() + // event channel is full, discard events + expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerRemoved, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerRemoved, Data: "c1"}, + } + actual = getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} + func TestDetectingContainerDeaths(t *testing.T) { // Vary the number of relists after the container started and before the // container died to account for the changes in pleg's internal states.