From 51199deaa05c0d5a1f3e8e26bd57cc1d7604dab1 Mon Sep 17 00:00:00 2001 From: Sascha Grunert Date: Thu, 7 Mar 2024 11:31:14 +0100 Subject: [PATCH] Decouple `kubelet/cri/remote` package from `kubelet/metrics` Importing the `k8s.io/kubernetes/pkg/kubelet/metrics` package in the remote runtime implementation makes it harder to separate the functionalities at some later point in time. We now decouple both packages by extending the CRI API services to allow a callback on `GetContainerEvents`. This callback can be used to do additional work if the connection got established, because `GetContainerEvents` will go into blocking mode after that. Signed-off-by: Sascha Grunert --- pkg/kubelet/cri/remote/remote_runtime.go | 9 +++++---- pkg/kubelet/kuberuntime/instrumented_services.go | 4 ++-- pkg/kubelet/pleg/evented.go | 4 +++- staging/src/k8s.io/cri-api/pkg/apis/services.go | 2 +- .../cri-api/pkg/apis/testing/fake_runtime_service.go | 2 +- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/cri/remote/remote_runtime.go b/pkg/kubelet/cri/remote/remote_runtime.go index 36b3839eb74..c87fbe8f836 100644 --- a/pkg/kubelet/cri/remote/remote_runtime.go +++ b/pkg/kubelet/cri/remote/remote_runtime.go @@ -38,7 +38,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe/exec" @@ -807,15 +806,17 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options return nil } -func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{}) if err != nil { klog.ErrorS(err, "GetContainerEvents failed to get streaming client") return err } - // The connection is successfully established and we have a streaming client ready for use. - metrics.EventedPLEGConn.Inc() + if connectionEstablishedCallback != nil { + // The connection is successfully established and we have a streaming client ready for use. + connectionEstablishedCallback(containerEventsStreamingClient) + } for { resp, err := containerEventsStreamingClient.Recv() diff --git a/pkg/kubelet/kuberuntime/instrumented_services.go b/pkg/kubelet/kuberuntime/instrumented_services.go index 3630d3d2b36..23a9c76081c 100644 --- a/pkg/kubelet/kuberuntime/instrumented_services.go +++ b/pkg/kubelet/kuberuntime/instrumented_services.go @@ -335,11 +335,11 @@ func (in instrumentedRuntimeService) CheckpointContainer(ctx context.Context, op return err } -func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { const operation = "get_container_events" defer recordOperation(operation, time.Now()) - err := in.service.GetContainerEvents(containerEventsCh) + err := in.service.GetContainerEvents(containerEventsCh, connectionEstablishedCallback) recordError(operation, err) return err } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index ef44ff5c994..1cdaff88d88 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -192,7 +192,9 @@ func (e *EventedPLEG) watchEventsChannel() { } } - err := e.runtimeService.GetContainerEvents(containerEventsResponseCh) + err := e.runtimeService.GetContainerEvents(containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) { + metrics.EventedPLEGConn.Inc() + }) if err != nil { metrics.EventedPLEGConnErr.Inc() numAttempts++ diff --git a/staging/src/k8s.io/cri-api/pkg/apis/services.go b/staging/src/k8s.io/cri-api/pkg/apis/services.go index eb073de989b..e6c0bfebc55 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/services.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/services.go @@ -61,7 +61,7 @@ type ContainerManager interface { // CheckpointContainer checkpoints a container CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error // GetContainerEvents gets container events from the CRI runtime - GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error + GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error } // PodSandboxManager contains methods for operating on PodSandboxes. The methods diff --git a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go index 332aa71d52a..989d281709b 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go @@ -716,7 +716,7 @@ func (r *FakeRuntimeService) CheckpointContainer(_ context.Context, options *run return nil } -func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { +func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error { return nil }