Merge pull request #127146 from bart0sh/PR156-DRA-Kubelet-latency

Kubelet: add DRA latency metrics
This commit is contained in:
Kubernetes Prow Robot 2024-10-29 19:04:55 +00:00 committed by GitHub
commit a12a32cd12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 83 additions and 12 deletions

View File

@ -19,6 +19,7 @@ package dra
import (
"context"
"fmt"
"strconv"
"time"
v1 "k8s.io/api/core/v1"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
// draManagerStateFileName is the file name where dra manager stores its state
@ -150,6 +152,13 @@ func (m *ManagerImpl) reconcileLoop(ctx context.Context) {
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
startTime := time.Now()
err := m.prepareResources(ctx, pod)
metrics.DRAOperationsDuration.WithLabelValues("PrepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds())
return err
}
func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
logger := klog.FromContext(ctx)
batches := make(map[string][]*drapb.Claim)
resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim)
@ -369,6 +378,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error {
var err error = nil
defer func(startTime time.Time) {
metrics.DRAOperationsDuration.WithLabelValues("UnprepareResources", strconv.FormatBool(err != nil)).Observe(time.Since(startTime).Seconds())
}(time.Now())
var claimNames []string
for i := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
@ -383,7 +396,8 @@ func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error
}
claimNames = append(claimNames, *claimName)
}
return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
err = m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
return err
}
func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error {

View File

@ -27,10 +27,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA
@ -51,6 +53,7 @@ func NewDRAPluginClient(pluginName string) (*Plugin, error) {
}
type Plugin struct {
name string
backgroundCtx context.Context
cancel func(cause error)
@ -85,6 +88,7 @@ func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)),
)
if err != nil {
return nil, err
@ -144,3 +148,12 @@ func (p *Plugin) NodeUnprepareResources(
logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err)
return response, err
}
func newMetricsInterceptor(pluginName string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, conn, opts...)
metrics.DRAGRPCOperationsDuration.WithLabelValues(pluginName, method, status.Code(err).String()).Observe(time.Since(start).Seconds())
return err
}
}

View File

@ -114,7 +114,9 @@ func TestGRPCConnIsReused(t *testing.T) {
wg := sync.WaitGroup{}
m := sync.Mutex{}
pluginName := "dummy-plugin"
p := &Plugin{
name: pluginName,
backgroundCtx: tCtx,
endpoint: addr,
clientCallTimeout: defaultClientCallTimeout,
@ -132,15 +134,15 @@ func TestGRPCConnIsReused(t *testing.T) {
}
// ensure the plugin we are using is registered
draPlugins.add("dummy-plugin", p)
defer draPlugins.delete("dummy-plugin")
draPlugins.add(p)
defer draPlugins.delete(pluginName)
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client, err := NewDRAPluginClient("dummy-plugin")
client, err := NewDRAPluginClient(pluginName)
if err != nil {
t.Error(err)
return
@ -205,7 +207,7 @@ func TestNewDRAPluginClient(t *testing.T) {
{
description: "plugin exists",
setup: func(name string) tearDown {
draPlugins.add(name, &Plugin{})
draPlugins.add(&Plugin{name: name})
return func() {
draPlugins.delete(name)
}
@ -251,7 +253,9 @@ func TestNodeUnprepareResources(t *testing.T) {
}
defer teardown()
pluginName := "dummy-plugin"
p := &Plugin{
name: pluginName,
backgroundCtx: tCtx,
endpoint: addr,
clientCallTimeout: defaultClientCallTimeout,
@ -268,10 +272,10 @@ func TestNodeUnprepareResources(t *testing.T) {
t.Fatal(err)
}
draPlugins.add("dummy-plugin", p)
defer draPlugins.delete("dummy-plugin")
draPlugins.add(p)
defer draPlugins.delete(pluginName)
client, err := NewDRAPluginClient("dummy-plugin")
client, err := NewDRAPluginClient(pluginName)
if err != nil {
t.Fatal(err)
}

View File

@ -42,7 +42,7 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
// Set lets you save a DRA Plugin to the list and give it a specific name.
// This method is protected by a mutex.
func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) {
func (s *pluginsStore) add(p *Plugin) (replaced bool) {
s.Lock()
defer s.Unlock()
@ -50,8 +50,8 @@ func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) {
s.store = make(map[string]*Plugin)
}
_, exists := s.store[pluginName]
s.store[pluginName] = p
_, exists := s.store[p.name]
s.store[p.name] = p
return exists
}

View File

@ -160,6 +160,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
ctx, cancel := context.WithCancelCause(ctx)
pluginInstance := &Plugin{
name: pluginName,
backgroundCtx: ctx,
cancel: cancel,
conn: nil,
@ -170,7 +171,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
if draPlugins.add(pluginName, pluginInstance) {
if draPlugins.add(pluginInstance) {
logger.V(1).Info("Already registered, previous plugin was replaced")
}

View File

@ -32,6 +32,7 @@ import (
const (
FirstNetworkPodStartSLIDurationKey = "first_network_pod_start_sli_duration_seconds"
KubeletSubsystem = "kubelet"
DRASubsystem = "dra"
NodeNameKey = "node_name"
NodeLabelKey = "node"
NodeStartupPreKubeletKey = "node_startup_pre_kubelet_duration_seconds"
@ -132,6 +133,10 @@ const (
ContainerAlignedComputeResourcesScopeLabelKey = "scope"
ContainerAlignedComputeResourcesBoundaryLabelKey = "boundary"
// Metric keys for DRA operations
DRAOperationsDurationKey = "operations_duration_seconds"
DRAGRPCOperationsDurationKey = "grpc_operations_duration_seconds"
// Values used in metric labels
Container = "container"
InitContainer = "init_container"
@ -168,6 +173,11 @@ var (
{60 * 1024 * 1024 * 1024, "60GB-100GB"},
{100 * 1024 * 1024 * 1024, "GT100GB"},
}
// DRADurationBuckets is the bucket boundaries for DRA operation duration metrics
// DRAOperationsDuration and DRAGRPCOperationsDuration defined below in this file.
// The buckets max value 40 is based on the 45sec max gRPC timeout value defined
// for the DRA gRPC calls in the pkg/kubelet/cm/dra/plugin/registration.go
DRADurationBuckets = metrics.ExponentialBucketsRange(.1, 40, 15)
)
var (
@ -938,6 +948,30 @@ var (
StabilityLevel: metrics.ALPHA,
},
)
// DRAOperationsDuration tracks the duration of the DRA PrepareResources and UnprepareResources requests.
DRAOperationsDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: DRASubsystem,
Name: DRAOperationsDurationKey,
Help: "Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.",
Buckets: DRADurationBuckets,
StabilityLevel: metrics.ALPHA,
},
[]string{"operation_name", "is_error"},
)
// DRAGRPCOperationsDuration tracks the duration of the DRA GRPC operations.
DRAGRPCOperationsDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: DRASubsystem,
Name: DRAGRPCOperationsDurationKey,
Help: "Duration in seconds of the DRA gRPC operations",
Buckets: DRADurationBuckets,
StabilityLevel: metrics.ALPHA,
},
[]string{"driver_name", "method_name", "grpc_status_code"},
)
)
var registerMetrics sync.Once
@ -1030,6 +1064,11 @@ func Register(collectors ...metrics.StableCollector) {
legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks)
legacyregistry.MustRegister(LifecycleHandlerSleepTerminated)
legacyregistry.MustRegister(CgroupVersion)
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
legacyregistry.MustRegister(DRAOperationsDuration)
legacyregistry.MustRegister(DRAGRPCOperationsDuration)
}
})
}