mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
when pleg channel is full, discard events and record how many events discard
This commit is contained in:
parent
af31a202ea
commit
b52afc350f
@ -36,6 +36,7 @@ const (
|
|||||||
CgroupManagerOperationsKey = "cgroup_manager_latency_microseconds"
|
CgroupManagerOperationsKey = "cgroup_manager_latency_microseconds"
|
||||||
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
|
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
|
||||||
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
|
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
|
||||||
|
PLEGDiscardEventsKey = "pleg_discard_events"
|
||||||
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
|
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
|
||||||
EvictionStatsAgeKey = "eviction_stats_age_microseconds"
|
EvictionStatsAgeKey = "eviction_stats_age_microseconds"
|
||||||
VolumeStatsCapacityBytesKey = "volume_stats_capacity_bytes"
|
VolumeStatsCapacityBytesKey = "volume_stats_capacity_bytes"
|
||||||
@ -109,6 +110,14 @@ var (
|
|||||||
Help: "Latency in microseconds for relisting pods in PLEG.",
|
Help: "Latency in microseconds for relisting pods in PLEG.",
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
PLEGDiscardEvents = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Subsystem: KubeletSubsystem,
|
||||||
|
Name: PLEGDiscardEventsKey,
|
||||||
|
Help: "The number of discard events in PLEG.",
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
)
|
||||||
PLEGRelistInterval = prometheus.NewSummary(
|
PLEGRelistInterval = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Subsystem: KubeletSubsystem,
|
Subsystem: KubeletSubsystem,
|
||||||
@ -214,6 +223,7 @@ func Register(containerCache kubecontainer.RuntimeCache, collectors ...prometheu
|
|||||||
prometheus.MustRegister(ContainersPerPodCount)
|
prometheus.MustRegister(ContainersPerPodCount)
|
||||||
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
|
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
|
||||||
prometheus.MustRegister(PLEGRelistLatency)
|
prometheus.MustRegister(PLEGRelistLatency)
|
||||||
|
prometheus.MustRegister(PLEGDiscardEvents)
|
||||||
prometheus.MustRegister(PLEGRelistInterval)
|
prometheus.MustRegister(PLEGRelistInterval)
|
||||||
prometheus.MustRegister(RuntimeOperations)
|
prometheus.MustRegister(RuntimeOperations)
|
||||||
prometheus.MustRegister(RuntimeOperationsLatency)
|
prometheus.MustRegister(RuntimeOperationsLatency)
|
||||||
|
@ -265,7 +265,12 @@ func (g *GenericPLEG) relist() {
|
|||||||
if events[i].Type == ContainerChanged {
|
if events[i].Type == ContainerChanged {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
g.eventChannel <- events[i]
|
select {
|
||||||
|
case g.eventChannel <- events[i]:
|
||||||
|
default:
|
||||||
|
metrics.PLEGDiscardEvents.WithLabelValues().Inc()
|
||||||
|
klog.Error("event channel is full, discard this relist() cycle event")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,10 +30,16 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
testContainerRuntimeType = "fooRuntime"
|
testContainerRuntimeType = "fooRuntime"
|
||||||
|
// largeChannelCap is a large enough capacity to hold all events in a single test.
|
||||||
|
largeChannelCap = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestGenericPLEG struct {
|
type TestGenericPLEG struct {
|
||||||
@ -42,7 +48,11 @@ type TestGenericPLEG struct {
|
|||||||
clock *clock.FakeClock
|
clock *clock.FakeClock
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestGenericPLEG() *TestGenericPLEG {
|
func newTestGenericPLEGWithLargeChannel() *TestGenericPLEG {
|
||||||
|
return newTestGenericPLEG(largeChannelCap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestGenericPLEG(eventChannelCap int) *TestGenericPLEG {
|
||||||
fakeRuntime := &containertest.FakeRuntime{}
|
fakeRuntime := &containertest.FakeRuntime{}
|
||||||
clock := clock.NewFakeClock(time.Time{})
|
clock := clock.NewFakeClock(time.Time{})
|
||||||
// The channel capacity should be large enough to hold all events in a
|
// The channel capacity should be large enough to hold all events in a
|
||||||
@ -50,7 +60,7 @@ func newTestGenericPLEG() *TestGenericPLEG {
|
|||||||
pleg := &GenericPLEG{
|
pleg := &GenericPLEG{
|
||||||
relistPeriod: time.Hour,
|
relistPeriod: time.Hour,
|
||||||
runtime: fakeRuntime,
|
runtime: fakeRuntime,
|
||||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
|
||||||
podRecords: make(podRecords),
|
podRecords: make(podRecords),
|
||||||
clock: clock,
|
clock: clock,
|
||||||
}
|
}
|
||||||
@ -93,7 +103,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRelisting(t *testing.T) {
|
func TestRelisting(t *testing.T) {
|
||||||
testPleg := newTestGenericPLEG()
|
testPleg := newTestGenericPLEGWithLargeChannel()
|
||||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||||
ch := pleg.Watch()
|
ch := pleg.Watch()
|
||||||
// The first relist should send a PodSync event to each pod.
|
// The first relist should send a PodSync event to each pod.
|
||||||
@ -157,6 +167,89 @@ func TestRelisting(t *testing.T) {
|
|||||||
verifyEvents(t, expected, actual)
|
verifyEvents(t, expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEventChannelFull test when channel is full, the events will be discard.
|
||||||
|
func TestEventChannelFull(t *testing.T) {
|
||||||
|
prometheus.MustRegister(metrics.PLEGDiscardEvents)
|
||||||
|
|
||||||
|
temporalServer := "127.0.0.1:1234"
|
||||||
|
l, err := net.Listen("tcp", temporalServer)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer l.Close()
|
||||||
|
|
||||||
|
prometheusUrl := "http://" + temporalServer + "/metrics"
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("/metrics", prometheus.Handler())
|
||||||
|
server := &http.Server{
|
||||||
|
Addr: temporalServer,
|
||||||
|
Handler: mux,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
server.Serve(l)
|
||||||
|
}()
|
||||||
|
|
||||||
|
testPleg := newTestGenericPLEG(4)
|
||||||
|
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||||
|
ch := pleg.Watch()
|
||||||
|
// The first relist should send a PodSync event to each pod.
|
||||||
|
runtime.AllPodList = []*containertest.FakePod{
|
||||||
|
{Pod: &kubecontainer.Pod{
|
||||||
|
ID: "1234",
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c1", kubecontainer.ContainerStateExited),
|
||||||
|
createTestContainer("c2", kubecontainer.ContainerStateRunning),
|
||||||
|
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
{Pod: &kubecontainer.Pod{
|
||||||
|
ID: "4567",
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c1", kubecontainer.ContainerStateExited),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
pleg.relist()
|
||||||
|
// Report every running/exited container if we see them for the first time.
|
||||||
|
expected := []*PodLifecycleEvent{
|
||||||
|
{ID: "1234", Type: ContainerStarted, Data: "c2"},
|
||||||
|
{ID: "4567", Type: ContainerDied, Data: "c1"},
|
||||||
|
{ID: "1234", Type: ContainerDied, Data: "c1"},
|
||||||
|
}
|
||||||
|
actual := getEventsFromChannel(ch)
|
||||||
|
verifyEvents(t, expected, actual)
|
||||||
|
|
||||||
|
runtime.AllPodList = []*containertest.FakePod{
|
||||||
|
{Pod: &kubecontainer.Pod{
|
||||||
|
ID: "1234",
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c2", kubecontainer.ContainerStateExited),
|
||||||
|
createTestContainer("c3", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
{Pod: &kubecontainer.Pod{
|
||||||
|
ID: "4567",
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c4", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
pleg.relist()
|
||||||
|
// event channel is full, discard events
|
||||||
|
expected = []*PodLifecycleEvent{
|
||||||
|
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
|
||||||
|
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||||
|
{ID: "1234", Type: ContainerStarted, Data: "c3"},
|
||||||
|
{ID: "4567", Type: ContainerRemoved, Data: "c1"},
|
||||||
|
}
|
||||||
|
actual = getEventsFromChannel(ch)
|
||||||
|
verifyEvents(t, expected, actual)
|
||||||
|
|
||||||
|
plegEventsDiscardCounterExpected := "pleg_discard_events 1"
|
||||||
|
|
||||||
|
assert.HTTPBodyContains(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
mux.ServeHTTP(w, r)
|
||||||
|
}), "GET", prometheusUrl, nil, plegEventsDiscardCounterExpected)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDetectingContainerDeaths(t *testing.T) {
|
func TestDetectingContainerDeaths(t *testing.T) {
|
||||||
// Vary the number of relists after the container started and before the
|
// Vary the number of relists after the container started and before the
|
||||||
// container died to account for the changes in pleg's internal states.
|
// container died to account for the changes in pleg's internal states.
|
||||||
@ -168,7 +261,7 @@ func TestDetectingContainerDeaths(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testReportMissingContainers(t *testing.T, numRelists int) {
|
func testReportMissingContainers(t *testing.T, numRelists int) {
|
||||||
testPleg := newTestGenericPLEG()
|
testPleg := newTestGenericPLEGWithLargeChannel()
|
||||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||||
ch := pleg.Watch()
|
ch := pleg.Watch()
|
||||||
runtime.AllPodList = []*containertest.FakePod{
|
runtime.AllPodList = []*containertest.FakePod{
|
||||||
@ -209,7 +302,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testReportMissingPods(t *testing.T, numRelists int) {
|
func testReportMissingPods(t *testing.T, numRelists int) {
|
||||||
testPleg := newTestGenericPLEG()
|
testPleg := newTestGenericPLEGWithLargeChannel()
|
||||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||||
ch := pleg.Watch()
|
ch := pleg.Watch()
|
||||||
runtime.AllPodList = []*containertest.FakePod{
|
runtime.AllPodList = []*containertest.FakePod{
|
||||||
@ -345,7 +438,7 @@ func TestRemoveCacheEntry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthy(t *testing.T) {
|
func TestHealthy(t *testing.T) {
|
||||||
testPleg := newTestGenericPLEG()
|
testPleg := newTestGenericPLEGWithLargeChannel()
|
||||||
|
|
||||||
// pleg should initially be unhealthy
|
// pleg should initially be unhealthy
|
||||||
pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
|
pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
|
||||||
@ -441,7 +534,7 @@ func TestRelistWithReinspection(t *testing.T) {
|
|||||||
|
|
||||||
// Test detecting sandbox state changes.
|
// Test detecting sandbox state changes.
|
||||||
func TestRelistingWithSandboxes(t *testing.T) {
|
func TestRelistingWithSandboxes(t *testing.T) {
|
||||||
testPleg := newTestGenericPLEG()
|
testPleg := newTestGenericPLEGWithLargeChannel()
|
||||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||||
ch := pleg.Watch()
|
ch := pleg.Watch()
|
||||||
// The first relist should send a PodSync event to each pod.
|
// The first relist should send a PodSync event to each pod.
|
||||||
|
Loading…
Reference in New Issue
Block a user