diff --git a/pkg/kubelet/cri/remote/remote_runtime.go b/pkg/kubelet/cri/remote/remote_runtime.go index 36b3839eb74..c87fbe8f836 100644 --- a/pkg/kubelet/cri/remote/remote_runtime.go +++ b/pkg/kubelet/cri/remote/remote_runtime.go @@ -38,7 +38,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe/exec" @@ -807,15 +806,17 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options return nil } -func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{}) if err != nil { klog.ErrorS(err, "GetContainerEvents failed to get streaming client") return err } - // The connection is successfully established and we have a streaming client ready for use. - metrics.EventedPLEGConn.Inc() + if connectionEstablishedCallback != nil { + // The connection is successfully established and we have a streaming client ready for use. + connectionEstablishedCallback(containerEventsStreamingClient) + } for { resp, err := containerEventsStreamingClient.Recv() diff --git a/pkg/kubelet/kuberuntime/instrumented_services.go b/pkg/kubelet/kuberuntime/instrumented_services.go index 3630d3d2b36..23a9c76081c 100644 --- a/pkg/kubelet/kuberuntime/instrumented_services.go +++ b/pkg/kubelet/kuberuntime/instrumented_services.go @@ -335,11 +335,11 @@ func (in instrumentedRuntimeService) CheckpointContainer(ctx context.Context, op return err } -func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { const operation = "get_container_events" defer recordOperation(operation, time.Now()) - err := in.service.GetContainerEvents(containerEventsCh) + err := in.service.GetContainerEvents(containerEventsCh, connectionEstablishedCallback) recordError(operation, err) return err } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index ef44ff5c994..1cdaff88d88 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -192,7 +192,9 @@ func (e *EventedPLEG) watchEventsChannel() { } } - err := e.runtimeService.GetContainerEvents(containerEventsResponseCh) + err := e.runtimeService.GetContainerEvents(containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) { + metrics.EventedPLEGConn.Inc() + }) if err != nil { metrics.EventedPLEGConnErr.Inc() numAttempts++ diff --git a/staging/src/k8s.io/cri-api/pkg/apis/services.go b/staging/src/k8s.io/cri-api/pkg/apis/services.go index eb073de989b..e6c0bfebc55 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/services.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/services.go @@ -61,7 +61,7 @@ type ContainerManager interface { // CheckpointContainer checkpoints a container CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error // GetContainerEvents gets container events from the CRI runtime - GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error + GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error } // PodSandboxManager contains methods for operating on PodSandboxes. The methods diff --git a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go index 332aa71d52a..989d281709b 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go @@ -716,7 +716,7 @@ func (r *FakeRuntimeService) CheckpointContainer(_ context.Context, options *run return nil } -func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { return nil }