diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 439d1101950..0ff6ff7ae00 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -259,6 +259,7 @@ const ( // owner: @harche // kep: http://kep.k8s.io/3386 // alpha: v1.25 + // beta: v1.27 // // Allows using event-driven PLEG (pod lifecycle event generator) through kubelet // which avoids frequent relisting of containers which helps optimize performance. @@ -937,7 +938,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS DynamicResourceAllocation: {Default: false, PreRelease: featuregate.Alpha}, - EventedPLEG: {Default: false, PreRelease: featuregate.Alpha}, + EventedPLEG: {Default: false, PreRelease: featuregate.Beta}, // off by default, requires CRI Runtime support ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default and remove after v1.22 based on KEP #1972 update diff --git a/pkg/kubelet/cri/remote/remote_runtime.go b/pkg/kubelet/cri/remote/remote_runtime.go index 7a886bda395..18e6bf7275f 100644 --- a/pkg/kubelet/cri/remote/remote_runtime.go +++ b/pkg/kubelet/cri/remote/remote_runtime.go @@ -37,8 +37,10 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe/exec" + utilexec "k8s.io/utils/exec" ) @@ -797,6 +799,9 @@ func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtim return err } + // The connection is successfully established and we have a streaming client ready for use. + metrics.EventedPLEGConn.Inc() + for { resp, err := containerEventsStreamingClient.Recv() if err == io.EOF { diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index a3eca5a37e5..7eeabcaf6e6 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -43,6 +43,9 @@ const ( PLEGDiscardEventsKey = "pleg_discard_events" PLEGRelistIntervalKey = "pleg_relist_interval_seconds" PLEGLastSeenKey = "pleg_last_seen_seconds" + EventedPLEGConnErrKey = "evented_pleg_connection_error_count" + EventedPLEGConnKey = "evented_pleg_connection_success_count" + EventedPLEGConnLatencyKey = "evented_pleg_connection_latency_seconds" EvictionsKey = "evictions" EvictionStatsAgeKey = "eviction_stats_age_seconds" PreemptionsKey = "preemptions" @@ -250,6 +253,41 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // EventedPLEGConnErr is a Counter that tracks the number of errors encountered during + // the establishment of streaming connection with the CRI runtime. + EventedPLEGConnErr = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: EventedPLEGConnErrKey, + Help: "The number of errors encountered during the establishment of streaming connection with the CRI runtime.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // EventedPLEGConn is a Counter that tracks the number of times a streaming client + // was obtained to receive CRI Events. + EventedPLEGConn = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: EventedPLEGConnKey, + Help: "The number of times a streaming client was obtained to receive CRI Events.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // EventedPLEGConnLatency is a Histogram that tracks the latency of streaming connection + // with the CRI runtime, measured in seconds. + EventedPLEGConnLatency = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: KubeletSubsystem, + Name: EventedPLEGConnLatencyKey, + Help: "The latency of streaming connection with the CRI runtime, measured in seconds.", + Buckets: metrics.DefBuckets, + StabilityLevel: metrics.ALPHA, + }, + ) + // RuntimeOperations is a Counter that tracks the cumulative number of remote runtime operations. // Broken down by operation type. RuntimeOperations = metrics.NewCounterVec( @@ -692,6 +730,9 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(PLEGDiscardEvents) legacyregistry.MustRegister(PLEGRelistInterval) legacyregistry.MustRegister(PLEGLastSeen) + legacyregistry.MustRegister(EventedPLEGConnErr) + legacyregistry.MustRegister(EventedPLEGConn) + legacyregistry.MustRegister(EventedPLEGConnLatency) legacyregistry.MustRegister(RuntimeOperations) legacyregistry.MustRegister(RuntimeOperationsDuration) legacyregistry.MustRegister(RuntimeOperationsErrors) diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 46ba5c1525c..1dd176489a5 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -190,6 +190,7 @@ func (e *EventedPLEG) watchEventsChannel() { err := e.runtimeService.GetContainerEvents(containerEventsResponseCh) if err != nil { + metrics.EventedPLEGConnErr.Inc() numAttempts++ e.Relist() // Force a relist to get the latest container and pods running metric. klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err) @@ -245,6 +246,7 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap e.updateRunningPodMetric(status) e.updateRunningContainerMetric(status) + e.updateLatencyMetric(event) if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT { for _, sandbox := range status.SandboxStatuses { @@ -410,6 +412,11 @@ func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodS } } +func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventResponse) { + duration := time.Duration(time.Now().UnixNano()-event.CreatedAt) * time.Nanosecond + metrics.EventedPLEGConnLatency.Observe(duration.Seconds()) +} + func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { return fmt.Errorf("not implemented"), false }