mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-28 04:53:11 +00:00
Merge pull request #123795 from saschagrunert/metrics-remote-runtime
Decouple `kubelet/cri/remote` package from `kubelet/metrics`
This commit is contained in:
commit
3d49956fde
@ -38,7 +38,6 @@ import (
|
|||||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||||
"k8s.io/kubernetes/pkg/probe/exec"
|
"k8s.io/kubernetes/pkg/probe/exec"
|
||||||
|
|
||||||
@ -808,15 +807,17 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
|
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
|
||||||
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
|
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
|
klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if connectionEstablishedCallback != nil {
|
||||||
// The connection is successfully established and we have a streaming client ready for use.
|
// The connection is successfully established and we have a streaming client ready for use.
|
||||||
metrics.EventedPLEGConn.Inc()
|
connectionEstablishedCallback(containerEventsStreamingClient)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
resp, err := containerEventsStreamingClient.Recv()
|
resp, err := containerEventsStreamingClient.Recv()
|
||||||
|
@ -335,11 +335,11 @@ func (in instrumentedRuntimeService) CheckpointContainer(ctx context.Context, op
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
|
func (in instrumentedRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
|
||||||
const operation = "get_container_events"
|
const operation = "get_container_events"
|
||||||
defer recordOperation(operation, time.Now())
|
defer recordOperation(operation, time.Now())
|
||||||
|
|
||||||
err := in.service.GetContainerEvents(containerEventsCh)
|
err := in.service.GetContainerEvents(containerEventsCh, connectionEstablishedCallback)
|
||||||
recordError(operation, err)
|
recordError(operation, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -192,7 +192,9 @@ func (e *EventedPLEG) watchEventsChannel() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh)
|
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) {
|
||||||
|
metrics.EventedPLEGConn.Inc()
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.EventedPLEGConnErr.Inc()
|
metrics.EventedPLEGConnErr.Inc()
|
||||||
numAttempts++
|
numAttempts++
|
||||||
|
@ -61,7 +61,7 @@ type ContainerManager interface {
|
|||||||
// CheckpointContainer checkpoints a container
|
// CheckpointContainer checkpoints a container
|
||||||
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
|
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
|
||||||
// GetContainerEvents gets container events from the CRI runtime
|
// GetContainerEvents gets container events from the CRI runtime
|
||||||
GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error
|
GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
|
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
|
||||||
|
@ -716,7 +716,7 @@ func (r *FakeRuntimeService) CheckpointContainer(_ context.Context, options *run
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
|
func (f *FakeRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user