diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index e26e2f7b259..a5534536b6b 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -19,6 +19,7 @@ package dra import ( "context" "fmt" + "strconv" "time" v1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) // draManagerStateFileName is the file name where dra manager stores its state @@ -150,6 +152,13 @@ func (m *ManagerImpl) reconcileLoop(ctx context.Context) { // for each new resource requirement, process their responses and update the cached // containerResources on success. func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error { + startTime := time.Now() + err := m.prepareResources(ctx, pod) + metrics.DRAOperationsDuration.WithLabelValues("PrepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds()) + return err +} + +func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error { logger := klog.FromContext(ctx) batches := make(map[string][]*drapb.Claim) resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim) @@ -369,6 +378,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have // already been successfully unprepared. func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error { + var err error = nil + defer func(startTime time.Time) { + metrics.DRAOperationsDuration.WithLabelValues("UnprepareResources", strconv.FormatBool(err != nil)).Observe(time.Since(startTime).Seconds()) + }(time.Now()) var claimNames []string for i := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) @@ -383,7 +396,8 @@ func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error } claimNames = append(claimNames, *claimName) } - return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames) + err = m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames) + return err } func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error { diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 01a9de105c8..7cddcfe420e 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -27,10 +27,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/klog/v2" drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) // NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA @@ -51,6 +53,7 @@ func NewDRAPluginClient(pluginName string) (*Plugin, error) { } type Plugin struct { + name string backgroundCtx context.Context cancel func(cause error) @@ -85,6 +88,7 @@ func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) { grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, network, target) }), + grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)), ) if err != nil { return nil, err @@ -144,3 +148,12 @@ func (p *Plugin) NodeUnprepareResources( logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err) return response, err } + +func newMetricsInterceptor(pluginName string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + start := time.Now() + err := invoker(ctx, method, req, reply, conn, opts...) + metrics.DRAGRPCOperationsDuration.WithLabelValues(pluginName, method, status.Code(err).String()).Observe(time.Since(start).Seconds()) + return err + } +} diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index a1be8bb845e..d5039c3e0a0 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -114,7 +114,9 @@ func TestGRPCConnIsReused(t *testing.T) { wg := sync.WaitGroup{} m := sync.Mutex{} + pluginName := "dummy-plugin" p := &Plugin{ + name: pluginName, backgroundCtx: tCtx, endpoint: addr, clientCallTimeout: defaultClientCallTimeout, @@ -132,15 +134,15 @@ func TestGRPCConnIsReused(t *testing.T) { } // ensure the plugin we are using is registered - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") + draPlugins.add(p) + defer draPlugins.delete(pluginName) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() - client, err := NewDRAPluginClient("dummy-plugin") + client, err := NewDRAPluginClient(pluginName) if err != nil { t.Error(err) return @@ -205,7 +207,7 @@ func TestNewDRAPluginClient(t *testing.T) { { description: "plugin exists", setup: func(name string) tearDown { - draPlugins.add(name, &Plugin{}) + draPlugins.add(&Plugin{name: name}) return func() { draPlugins.delete(name) } @@ -251,7 +253,9 @@ func TestNodeUnprepareResources(t *testing.T) { } defer teardown() + pluginName := "dummy-plugin" p := &Plugin{ + name: pluginName, backgroundCtx: tCtx, endpoint: addr, clientCallTimeout: defaultClientCallTimeout, @@ -268,10 +272,10 @@ func TestNodeUnprepareResources(t *testing.T) { t.Fatal(err) } - draPlugins.add("dummy-plugin", p) - defer draPlugins.delete("dummy-plugin") + draPlugins.add(p) + defer draPlugins.delete(pluginName) - client, err := NewDRAPluginClient("dummy-plugin") + client, err := NewDRAPluginClient(pluginName) if err != nil { t.Fatal(err) } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index e172ac2545b..ebd65cc848b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -42,7 +42,7 @@ func (s *pluginsStore) get(pluginName string) *Plugin { // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) { +func (s *pluginsStore) add(p *Plugin) (replaced bool) { s.Lock() defer s.Unlock() @@ -50,8 +50,8 @@ func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) { s.store = make(map[string]*Plugin) } - _, exists := s.store[pluginName] - s.store[pluginName] = p + _, exists := s.store[p.name] + s.store[p.name] = p return exists } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 99e577a4259..0fb4bcffaed 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -160,6 +160,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, ctx, cancel := context.WithCancelCause(ctx) pluginInstance := &Plugin{ + name: pluginName, backgroundCtx: ctx, cancel: cancel, conn: nil, @@ -170,7 +171,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. - if draPlugins.add(pluginName, pluginInstance) { + if draPlugins.add(pluginInstance) { logger.V(1).Info("Already registered, previous plugin was replaced") } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 30a194a3a66..319aedd286d 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -32,6 +32,7 @@ import ( const ( FirstNetworkPodStartSLIDurationKey = "first_network_pod_start_sli_duration_seconds" KubeletSubsystem = "kubelet" + DRASubsystem = "dra" NodeNameKey = "node_name" NodeLabelKey = "node" NodeStartupPreKubeletKey = "node_startup_pre_kubelet_duration_seconds" @@ -132,6 +133,10 @@ const ( ContainerAlignedComputeResourcesScopeLabelKey = "scope" ContainerAlignedComputeResourcesBoundaryLabelKey = "boundary" + // Metric keys for DRA operations + DRAOperationsDurationKey = "operations_duration_seconds" + DRAGRPCOperationsDurationKey = "grpc_operations_duration_seconds" + // Values used in metric labels Container = "container" InitContainer = "init_container" @@ -168,6 +173,11 @@ var ( {60 * 1024 * 1024 * 1024, "60GB-100GB"}, {100 * 1024 * 1024 * 1024, "GT100GB"}, } + // DRADurationBuckets is the bucket boundaries for DRA operation duration metrics + // DRAOperationsDuration and DRAGRPCOperationsDuration defined below in this file. + // The buckets max value 40 is based on the 45sec max gRPC timeout value defined + // for the DRA gRPC calls in the pkg/kubelet/cm/dra/plugin/registration.go + DRADurationBuckets = metrics.ExponentialBucketsRange(.1, 40, 15) ) var ( @@ -938,6 +948,30 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // DRAOperationsDuration tracks the duration of the DRA PrepareResources and UnprepareResources requests. + DRAOperationsDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: DRASubsystem, + Name: DRAOperationsDurationKey, + Help: "Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.", + Buckets: DRADurationBuckets, + StabilityLevel: metrics.ALPHA, + }, + []string{"operation_name", "is_error"}, + ) + + // DRAGRPCOperationsDuration tracks the duration of the DRA GRPC operations. + DRAGRPCOperationsDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: DRASubsystem, + Name: DRAGRPCOperationsDurationKey, + Help: "Duration in seconds of the DRA gRPC operations", + Buckets: DRADurationBuckets, + StabilityLevel: metrics.ALPHA, + }, + []string{"driver_name", "method_name", "grpc_status_code"}, + ) ) var registerMetrics sync.Once @@ -1030,6 +1064,11 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) legacyregistry.MustRegister(LifecycleHandlerSleepTerminated) legacyregistry.MustRegister(CgroupVersion) + + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + legacyregistry.MustRegister(DRAOperationsDuration) + legacyregistry.MustRegister(DRAGRPCOperationsDuration) + } }) }