Decouple kubelet/cri/remote package from kubelet/metrics

Importing the `k8s.io/kubernetes/pkg/kubelet/metrics` package in the
remote runtime implementation makes it harder to separate the
functionalities at some later point in time. We now decouple both
packages by extending the CRI API services to allow a callback on
`GetContainerEvents`. This callback can be used to do additional work if
the connection got established, because `GetContainerEvents` will go
into blocking mode after that.

Signed-off-by: Sascha Grunert <sgrunert@redhat.com>
This commit is contained in:
Sascha Grunert 2024-03-07 11:31:14 +01:00
parent 2ec63e0d28
commit 51199deaa0
No known key found for this signature in database
GPG Key ID: 09D97D153EF94D93
5 changed files with 12 additions and 9 deletions

View File

@ -38,7 +38,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe/exec" "k8s.io/kubernetes/pkg/probe/exec"
@ -807,15 +806,17 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options
return nil return nil
} }
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{}) containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
if err != nil { if err != nil {
klog.ErrorS(err, "GetContainerEvents failed to get streaming client") klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
return err return err
} }
if connectionEstablishedCallback != nil {
// The connection is successfully established and we have a streaming client ready for use. // The connection is successfully established and we have a streaming client ready for use.
metrics.EventedPLEGConn.Inc() connectionEstablishedCallback(containerEventsStreamingClient)
}
for { for {
resp, err := containerEventsStreamingClient.Recv() resp, err := containerEventsStreamingClient.Recv()

View File

@ -335,11 +335,11 @@ func (in instrumentedRuntimeService) CheckpointContainer(ctx context.Context, op
return err return err
} }
func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
const operation = "get_container_events" const operation = "get_container_events"
defer recordOperation(operation, time.Now()) defer recordOperation(operation, time.Now())
err := in.service.GetContainerEvents(containerEventsCh) err := in.service.GetContainerEvents(containerEventsCh, connectionEstablishedCallback)
recordError(operation, err) recordError(operation, err)
return err return err
} }

View File

@ -192,7 +192,9 @@ func (e *EventedPLEG) watchEventsChannel() {
} }
} }
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh) err := e.runtimeService.GetContainerEvents(containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) {
metrics.EventedPLEGConn.Inc()
})
if err != nil { if err != nil {
metrics.EventedPLEGConnErr.Inc() metrics.EventedPLEGConnErr.Inc()
numAttempts++ numAttempts++

View File

@ -61,7 +61,7 @@ type ContainerManager interface {
// CheckpointContainer checkpoints a container // CheckpointContainer checkpoints a container
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
// GetContainerEvents gets container events from the CRI runtime // GetContainerEvents gets container events from the CRI runtime
GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error
} }
// PodSandboxManager contains methods for operating on PodSandboxes. The methods // PodSandboxManager contains methods for operating on PodSandboxes. The methods

View File

@ -716,7 +716,7 @@ func (r *FakeRuntimeService) CheckpointContainer(_ context.Context, options *run
return nil return nil
} }
func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
return nil return nil
} }