From b52afc350ff93cf2b26831ef4b65e2610600d782 Mon Sep 17 00:00:00 2001 From: changyaowei <156511344@qq.com> Date: Wed, 30 Jan 2019 20:43:54 +0800 Subject: [PATCH 1/3] when pleg channel is full, discard events and record how many events discard --- pkg/kubelet/metrics/metrics.go | 10 +++ pkg/kubelet/pleg/generic.go | 7 +- pkg/kubelet/pleg/generic_test.go | 107 +++++++++++++++++++++++++++++-- 3 files changed, 116 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index ec1a7166c30..b3c216138cc 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -36,6 +36,7 @@ const ( CgroupManagerOperationsKey = "cgroup_manager_latency_microseconds" PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" + PLEGDiscardEventsKey = "pleg_discard_events" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" EvictionStatsAgeKey = "eviction_stats_age_microseconds" VolumeStatsCapacityBytesKey = "volume_stats_capacity_bytes" @@ -109,6 +110,14 @@ var ( Help: "Latency in microseconds for relisting pods in PLEG.", }, ) + PLEGDiscardEvents = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PLEGDiscardEventsKey, + Help: "The number of discard events in PLEG.", + }, + []string{}, + ) PLEGRelistInterval = prometheus.NewSummary( prometheus.SummaryOpts{ Subsystem: KubeletSubsystem, @@ -214,6 +223,7 @@ func Register(containerCache kubecontainer.RuntimeCache, collectors ...prometheu prometheus.MustRegister(ContainersPerPodCount) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(PLEGRelistLatency) + prometheus.MustRegister(PLEGDiscardEvents) prometheus.MustRegister(PLEGRelistInterval) prometheus.MustRegister(RuntimeOperations) prometheus.MustRegister(RuntimeOperationsLatency) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index f69e0d49552..77a2e5d3799 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -265,7 +265,12 @@ func (g *GenericPLEG) relist() { if events[i].Type == ContainerChanged { continue } - g.eventChannel <- events[i] + select { + case g.eventChannel <- events[i]: + default: + metrics.PLEGDiscardEvents.WithLabelValues().Inc() + klog.Error("event channel is full, discard this relist() cycle event") + } } } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 2e972ed4521..d07ec37d737 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -30,10 +30,16 @@ import ( "k8s.io/apimachinery/pkg/util/diff" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "github.com/prometheus/client_golang/prometheus" + "net" + "net/http" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) const ( testContainerRuntimeType = "fooRuntime" + // largeChannelCap is a large enough capacity to hold all events in a single test. + largeChannelCap = 100 ) type TestGenericPLEG struct { @@ -42,7 +48,11 @@ type TestGenericPLEG struct { clock *clock.FakeClock } -func newTestGenericPLEG() *TestGenericPLEG { +func newTestGenericPLEGWithLargeChannel() *TestGenericPLEG { + return newTestGenericPLEG(largeChannelCap) +} + +func newTestGenericPLEG(eventChannelCap int) *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} clock := clock.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a @@ -50,7 +60,7 @@ func newTestGenericPLEG() *TestGenericPLEG { pleg := &GenericPLEG{ relistPeriod: time.Hour, runtime: fakeRuntime, - eventChannel: make(chan *PodLifecycleEvent, 100), + eventChannel: make(chan *PodLifecycleEvent, eventChannelCap), podRecords: make(podRecords), clock: clock, } @@ -93,7 +103,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { } func TestRelisting(t *testing.T) { - testPleg := newTestGenericPLEG() + testPleg := newTestGenericPLEGWithLargeChannel() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod. @@ -157,6 +167,89 @@ func TestRelisting(t *testing.T) { verifyEvents(t, expected, actual) } +// TestEventChannelFull test when channel is full, the events will be discard. +func TestEventChannelFull(t *testing.T) { + prometheus.MustRegister(metrics.PLEGDiscardEvents) + + temporalServer := "127.0.0.1:1234" + l, err := net.Listen("tcp", temporalServer) + assert.NoError(t, err) + defer l.Close() + + prometheusUrl := "http://" + temporalServer + "/metrics" + mux := http.NewServeMux() + mux.Handle("/metrics", prometheus.Handler()) + server := &http.Server{ + Addr: temporalServer, + Handler: mux, + } + go func() { + server.Serve(l) + }() + + testPleg := newTestGenericPLEG(4) + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + // The first relist should send a PodSync event to each pod. + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + createTestContainer("c2", kubecontainer.ContainerStateRunning), + createTestContainer("c3", kubecontainer.ContainerStateUnknown), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + }, + }}, + } + pleg.relist() + // Report every running/exited container if we see them for the first time. + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerStarted, Data: "c2"}, + {ID: "4567", Type: ContainerDied, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c1"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStateExited), + createTestContainer("c3", kubecontainer.ContainerStateRunning), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c4", kubecontainer.ContainerStateRunning), + }, + }}, + } + pleg.relist() + // event channel is full, discard events + expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerRemoved, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerRemoved, Data: "c1"}, + } + actual = getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + plegEventsDiscardCounterExpected := "pleg_discard_events 1" + + assert.HTTPBodyContains(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mux.ServeHTTP(w, r) + }), "GET", prometheusUrl, nil, plegEventsDiscardCounterExpected) +} + func TestDetectingContainerDeaths(t *testing.T) { // Vary the number of relists after the container started and before the // container died to account for the changes in pleg's internal states. @@ -168,7 +261,7 @@ func TestDetectingContainerDeaths(t *testing.T) { } func testReportMissingContainers(t *testing.T, numRelists int) { - testPleg := newTestGenericPLEG() + testPleg := newTestGenericPLEGWithLargeChannel() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() runtime.AllPodList = []*containertest.FakePod{ @@ -209,7 +302,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) { } func testReportMissingPods(t *testing.T, numRelists int) { - testPleg := newTestGenericPLEG() + testPleg := newTestGenericPLEGWithLargeChannel() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() runtime.AllPodList = []*containertest.FakePod{ @@ -345,7 +438,7 @@ func TestRemoveCacheEntry(t *testing.T) { } func TestHealthy(t *testing.T) { - testPleg := newTestGenericPLEG() + testPleg := newTestGenericPLEGWithLargeChannel() // pleg should initially be unhealthy pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock @@ -441,7 +534,7 @@ func TestRelistWithReinspection(t *testing.T) { // Test detecting sandbox state changes. func TestRelistingWithSandboxes(t *testing.T) { - testPleg := newTestGenericPLEG() + testPleg := newTestGenericPLEGWithLargeChannel() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod. From c70ee4272b10262fb2292b093828211e3e21cfe6 Mon Sep 17 00:00:00 2001 From: changyaowei <156511344@qq.com> Date: Thu, 31 Jan 2019 12:18:02 +0800 Subject: [PATCH 2/3] delete prometheus in unit testing --- pkg/kubelet/pleg/generic_test.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index d07ec37d737..b02314b76cd 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -30,10 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/diff" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" - "github.com/prometheus/client_golang/prometheus" - "net" - "net/http" - "k8s.io/kubernetes/pkg/kubelet/metrics" ) const ( @@ -169,24 +165,6 @@ func TestRelisting(t *testing.T) { // TestEventChannelFull test when channel is full, the events will be discard. func TestEventChannelFull(t *testing.T) { - prometheus.MustRegister(metrics.PLEGDiscardEvents) - - temporalServer := "127.0.0.1:1234" - l, err := net.Listen("tcp", temporalServer) - assert.NoError(t, err) - defer l.Close() - - prometheusUrl := "http://" + temporalServer + "/metrics" - mux := http.NewServeMux() - mux.Handle("/metrics", prometheus.Handler()) - server := &http.Server{ - Addr: temporalServer, - Handler: mux, - } - go func() { - server.Serve(l) - }() - testPleg := newTestGenericPLEG(4) pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() @@ -242,12 +220,6 @@ func TestEventChannelFull(t *testing.T) { } actual = getEventsFromChannel(ch) verifyEvents(t, expected, actual) - - plegEventsDiscardCounterExpected := "pleg_discard_events 1" - - assert.HTTPBodyContains(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - mux.ServeHTTP(w, r) - }), "GET", prometheusUrl, nil, plegEventsDiscardCounterExpected) } func TestDetectingContainerDeaths(t *testing.T) { From 19f73899fc829734b2c78782ad5a654e07ed3a4a Mon Sep 17 00:00:00 2001 From: changyaowei <156511344@qq.com> Date: Wed, 13 Feb 2019 16:27:15 +0800 Subject: [PATCH 3/3] modify test case --- pkg/kubelet/pleg/generic_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index b02314b76cd..860ff2934b8 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -44,11 +44,11 @@ type TestGenericPLEG struct { clock *clock.FakeClock } -func newTestGenericPLEGWithLargeChannel() *TestGenericPLEG { - return newTestGenericPLEG(largeChannelCap) +func newTestGenericPLEG() *TestGenericPLEG { + return newTestGenericPLEGWithChannelSize(largeChannelCap) } -func newTestGenericPLEG(eventChannelCap int) *TestGenericPLEG { +func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} clock := clock.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a @@ -99,7 +99,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { } func TestRelisting(t *testing.T) { - testPleg := newTestGenericPLEGWithLargeChannel() + testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod. @@ -165,7 +165,7 @@ func TestRelisting(t *testing.T) { // TestEventChannelFull test when channel is full, the events will be discard. func TestEventChannelFull(t *testing.T) { - testPleg := newTestGenericPLEG(4) + testPleg := newTestGenericPLEGWithChannelSize(4) pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod. @@ -233,7 +233,7 @@ func TestDetectingContainerDeaths(t *testing.T) { } func testReportMissingContainers(t *testing.T, numRelists int) { - testPleg := newTestGenericPLEGWithLargeChannel() + testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() runtime.AllPodList = []*containertest.FakePod{ @@ -274,7 +274,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) { } func testReportMissingPods(t *testing.T, numRelists int) { - testPleg := newTestGenericPLEGWithLargeChannel() + testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() runtime.AllPodList = []*containertest.FakePod{ @@ -410,7 +410,7 @@ func TestRemoveCacheEntry(t *testing.T) { } func TestHealthy(t *testing.T) { - testPleg := newTestGenericPLEGWithLargeChannel() + testPleg := newTestGenericPLEG() // pleg should initially be unhealthy pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock @@ -506,7 +506,7 @@ func TestRelistWithReinspection(t *testing.T) { // Test detecting sandbox state changes. func TestRelistingWithSandboxes(t *testing.T) { - testPleg := newTestGenericPLEGWithLargeChannel() + testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod.