From a21f3f0a04777c994aceba92129171c3ec32e377 Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 5 Sep 2024 15:18:32 +0300 Subject: [PATCH 1/5] kubelet: add DRAOperationsDuration metric --- pkg/kubelet/cm/dra/manager.go | 16 +++++++++++++++- pkg/kubelet/metrics/metrics.go | 20 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index e26e2f7b259..a5534536b6b 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -19,6 +19,7 @@ package dra import ( "context" "fmt" + "strconv" "time" v1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) // draManagerStateFileName is the file name where dra manager stores its state @@ -150,6 +152,13 @@ func (m *ManagerImpl) reconcileLoop(ctx context.Context) { // for each new resource requirement, process their responses and update the cached // containerResources on success. func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error { + startTime := time.Now() + err := m.prepareResources(ctx, pod) + metrics.DRAOperationsDuration.WithLabelValues("PrepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds()) + return err +} + +func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error { logger := klog.FromContext(ctx) batches := make(map[string][]*drapb.Claim) resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim) @@ -369,6 +378,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have // already been successfully unprepared. func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error { + var err error = nil + defer func(startTime time.Time) { + metrics.DRAOperationsDuration.WithLabelValues("UnprepareResources", strconv.FormatBool(err != nil)).Observe(time.Since(startTime).Seconds()) + }(time.Now()) var claimNames []string for i := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) @@ -383,7 +396,8 @@ func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error } claimNames = append(claimNames, *claimName) } - return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames) + err = m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames) + return err } func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error { diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 30a194a3a66..97f79dd052c 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -32,6 +32,7 @@ import ( const ( FirstNetworkPodStartSLIDurationKey = "first_network_pod_start_sli_duration_seconds" KubeletSubsystem = "kubelet" + DRASubsystem = "dra" NodeNameKey = "node_name" NodeLabelKey = "node" NodeStartupPreKubeletKey = "node_startup_pre_kubelet_duration_seconds" @@ -132,6 +133,9 @@ const ( ContainerAlignedComputeResourcesScopeLabelKey = "scope" ContainerAlignedComputeResourcesBoundaryLabelKey = "boundary" + // Metric keys for DRA operations + DRAOperationsDurationKey = "operations_duration_seconds" + // Values used in metric labels Container = "container" InitContainer = "init_container" @@ -938,6 +942,18 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // DRAOperationsDuration tracks the duration of the DRA PrepareResources and UnprepareResources requests. + DRAOperationsDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: DRASubsystem, + Name: DRAOperationsDurationKey, + Help: "Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.", + Buckets: metrics.DefBuckets, + StabilityLevel: metrics.ALPHA, + }, + []string{"operation_name", "is_error"}, + ) ) var registerMetrics sync.Once @@ -1030,6 +1046,10 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) legacyregistry.MustRegister(LifecycleHandlerSleepTerminated) legacyregistry.MustRegister(CgroupVersion) + + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + legacyregistry.MustRegister(DRAOperationsDuration) + } }) } From 2048b7b8fd45bc46efd53a29ef0c6bb91e0f31c5 Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 10 Oct 2024 00:32:05 +0300 Subject: [PATCH 2/5] kubelet: add DRAGRPCOperationsDuration metric --- pkg/kubelet/metrics/metrics.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 97f79dd052c..be0a4cbfa53 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -134,7 +134,8 @@ const ( ContainerAlignedComputeResourcesBoundaryLabelKey = "boundary" // Metric keys for DRA operations - DRAOperationsDurationKey = "operations_duration_seconds" + DRAOperationsDurationKey = "operations_duration_seconds" + DRAGRPCOperationsDurationKey = "grpc_operations_duration_seconds" // Values used in metric labels Container = "container" @@ -954,6 +955,18 @@ var ( }, []string{"operation_name", "is_error"}, ) + + // DRAGRPCOperationsDuration tracks the duration of the DRA GRPC operations. + DRAGRPCOperationsDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: DRASubsystem, + Name: DRAGRPCOperationsDurationKey, + Help: "Duration in seconds of the DRA gRPC operations", + Buckets: metrics.DefBuckets, + StabilityLevel: metrics.ALPHA, + }, + []string{"driver_name", "method_name", "grpc_status_code"}, + ) ) var registerMetrics sync.Once @@ -1049,6 +1062,7 @@ func Register(collectors ...metrics.StableCollector) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { legacyregistry.MustRegister(DRAOperationsDuration) + legacyregistry.MustRegister(DRAGRPCOperationsDuration) } }) } From 3a67bc0defc1f0dc69ff7328b8714435199abbcd Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 10 Oct 2024 00:33:35 +0300 Subject: [PATCH 3/5] kubelet: add DRA plugin name to the Plugin struct --- pkg/kubelet/cm/dra/plugin/plugin.go | 1 + pkg/kubelet/cm/dra/plugin/plugin_test.go | 18 +++++++++++------- pkg/kubelet/cm/dra/plugin/plugins_store.go | 6 +++--- pkg/kubelet/cm/dra/plugin/registration.go | 3 ++- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 01a9de105c8..708a3774c9e 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -51,6 +51,7 @@ func NewDRAPluginClient(pluginName string) (*Plugin, error) { } type Plugin struct { + name string backgroundCtx context.Context cancel func(cause error) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index a1be8bb845e..d5039c3e0a0 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -114,7 +114,9 @@ func TestGRPCConnIsReused(t *testing.T) { wg := sync.WaitGroup{} m := sync.Mutex{} + pluginName := "dummy-plugin" p := &Plugin{ + name: pluginName, backgroundCtx: tCtx, endpoint: addr, clientCallTimeout: defaultClientCallTimeout, @@ -132,15 +134,15 @@ func TestGRPCConnIsReused(t *testing.T) { } // ensure the plugin we are using is registered - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") + draPlugins.add(p) + defer draPlugins.delete(pluginName) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() - client, err := NewDRAPluginClient("dummy-plugin") + client, err := NewDRAPluginClient(pluginName) if err != nil { t.Error(err) return @@ -205,7 +207,7 @@ func TestNewDRAPluginClient(t *testing.T) { { description: "plugin exists", setup: func(name string) tearDown { - draPlugins.add(name, &Plugin{}) + draPlugins.add(&Plugin{name: name}) return func() { draPlugins.delete(name) } @@ -251,7 +253,9 @@ func TestNodeUnprepareResources(t *testing.T) { } defer teardown() + pluginName := "dummy-plugin" p := &Plugin{ + name: pluginName, backgroundCtx: tCtx, endpoint: addr, clientCallTimeout: defaultClientCallTimeout, @@ -268,10 +272,10 @@ func TestNodeUnprepareResources(t *testing.T) { t.Fatal(err) } - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") + draPlugins.add(p) + defer draPlugins.delete(pluginName) - client, err := NewDRAPluginClient("dummy-plugin") + client, err := NewDRAPluginClient(pluginName) if err != nil { t.Fatal(err) } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index e172ac2545b..ebd65cc848b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -42,7 +42,7 @@ func (s *pluginsStore) get(pluginName string) *Plugin { // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) { +func (s *pluginsStore) add(p *Plugin) (replaced bool) { s.Lock() defer s.Unlock() @@ -50,8 +50,8 @@ func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) { s.store = make(map[string]*Plugin) } - _, exists := s.store[pluginName] - s.store[pluginName] = p + _, exists := s.store[p.name] + s.store[p.name] = p return exists } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 99e577a4259..0fb4bcffaed 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -160,6 +160,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, ctx, cancel := context.WithCancelCause(ctx) pluginInstance := &Plugin{ + name: pluginName, backgroundCtx: ctx, cancel: cancel, conn: nil, @@ -170,7 +171,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. - if draPlugins.add(pluginName, pluginInstance) { + if draPlugins.add(pluginInstance) { logger.V(1).Info("Already registered, previous plugin was replaced") } From 9a044cd46a3e823f22df1e34c808825bb42360fd Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 10 Oct 2024 00:36:11 +0300 Subject: [PATCH 4/5] kubelet: intercept DRA GRPC to record metrics --- pkg/kubelet/cm/dra/plugin/plugin.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 708a3774c9e..7cddcfe420e 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -27,10 +27,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/klog/v2" drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) // NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA @@ -86,6 +88,7 @@ func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) { grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, network, target) }), + grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)), ) if err != nil { return nil, err @@ -145,3 +148,12 @@ func (p *Plugin) NodeUnprepareResources( logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err) return response, err } + +func newMetricsInterceptor(pluginName string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + start := time.Now() + err := invoker(ctx, method, req, reply, conn, opts...) + metrics.DRAGRPCOperationsDuration.WithLabelValues(pluginName, method, status.Code(err).String()).Observe(time.Since(start).Seconds()) + return err + } +} From c1cd8495a5b4521d226c531299d0a345d6cc8af0 Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Mon, 28 Oct 2024 13:07:14 +0200 Subject: [PATCH 5/5] kubelet: define custom buckets for DRA metrics --- pkg/kubelet/metrics/metrics.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index be0a4cbfa53..319aedd286d 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -173,6 +173,11 @@ var ( {60 * 1024 * 1024 * 1024, "60GB-100GB"}, {100 * 1024 * 1024 * 1024, "GT100GB"}, } + // DRADurationBuckets is the bucket boundaries for DRA operation duration metrics + // DRAOperationsDuration and DRAGRPCOperationsDuration defined below in this file. + // The buckets max value 40 is based on the 45sec max gRPC timeout value defined + // for the DRA gRPC calls in the pkg/kubelet/cm/dra/plugin/registration.go + DRADurationBuckets = metrics.ExponentialBucketsRange(.1, 40, 15) ) var ( @@ -950,7 +955,7 @@ var ( Subsystem: DRASubsystem, Name: DRAOperationsDurationKey, Help: "Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.", - Buckets: metrics.DefBuckets, + Buckets: DRADurationBuckets, StabilityLevel: metrics.ALPHA, }, []string{"operation_name", "is_error"}, @@ -962,7 +967,7 @@ var ( Subsystem: DRASubsystem, Name: DRAGRPCOperationsDurationKey, Help: "Duration in seconds of the DRA gRPC operations", - Buckets: metrics.DefBuckets, + Buckets: DRADurationBuckets, StabilityLevel: metrics.ALPHA, }, []string{"driver_name", "method_name", "grpc_status_code"},