From 7a63997c8a1a9ba14f2bdc478fdf33cf88f48d80 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 22 Jun 2023 11:56:09 +0200 Subject: [PATCH] Improve apiserver storage size metric to allow it's graduation Change name to make it compliant with prometheus guidelines. Calculate it on demand instead of periodic to comply with prometheus standards. Replace "endpoint" with "server" label to make it semantically consistent with storage factory --- .../apiserver/pkg/server/options/etcd.go | 25 +++++++ .../pkg/storage/etcd3/metrics/metrics.go | 69 +++++++++++++++++-- .../storage/storagebackend/factory/etcd3.go | 63 +++++++++++------ .../storage/storagebackend/factory/factory.go | 14 +++- .../documentation/documentation-list.yaml | 7 ++ .../documentation/documentation.md | 9 ++- test/integration/metrics/metrics_test.go | 22 ++++++ 7 files changed, 181 insertions(+), 28 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 5ad5e0277cc..57e9c1a9f13 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/server/options/encryptionconfig" encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" serverstorage "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" storagevalue "k8s.io/apiserver/pkg/storage/value" @@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac return err } + metrics.SetStorageMonitorGetter(monitorGetter(factory)) + c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers) return nil } +func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) { + return func() (monitors []metrics.Monitor, err error) { + defer func() { + if err != nil { + for _, m := range monitors { + m.Close() + } + } + }() + + var m metrics.Monitor + for _, cfg := range factory.Configs() { + m, err = storagefactory.CreateMonitor(cfg) + if err != nil { + return nil, err + } + monitors = append(monitors, m) + } + return monitors, nil + } +} + func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter { if resourceTransformers != nil { factory = &transformerStorageFactory{ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index 78c9a598388..a8eda9d2206 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -17,11 +17,14 @@ limitations under the License. package metrics import ( + "context" + "fmt" "sync" "time" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog/v2" ) /* @@ -73,13 +76,16 @@ var ( ) dbTotalSize = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ - Subsystem: "apiserver", - Name: "storage_db_total_size_in_bytes", - Help: "Total size of the storage database file physically allocated in bytes.", - StabilityLevel: compbasemetrics.ALPHA, + Subsystem: "apiserver", + Name: "storage_db_total_size_in_bytes", + Help: "Total size of the storage database file physically allocated in bytes.", + StabilityLevel: compbasemetrics.ALPHA, + DeprecatedVersion: "1.28.0", }, []string{"endpoint"}, ) + storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "") + storageMonitor = &monitorCollector{} etcdEventsReceivedCounts = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Subsystem: "apiserver", @@ -160,6 +166,7 @@ func Register() { legacyregistry.MustRegister(etcdRequestErrorCounts) legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(dbTotalSize) + legacyregistry.CustomMustRegister(storageMonitor) legacyregistry.MustRegister(etcdBookmarkCounts) legacyregistry.MustRegister(etcdLeaseObjectCounts) legacyregistry.MustRegister(listStorageCount) @@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 { } // UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric. +// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes func UpdateEtcdDbSize(ep string, size int64) { dbTotalSize.WithLabelValues(ep).Set(float64(size)) } +// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats. +func SetStorageMonitorGetter(getter func() ([]Monitor, error)) { + storageMonitor.monitorGetter = getter +} + // UpdateLeaseObjectCount sets the etcd_lease_object_counts metric. func UpdateLeaseObjectCount(count int64) { // Currently we only store one previous lease, since all the events have the same ttl. @@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald)) listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned)) } + +type Monitor interface { + Monitor(ctx context.Context) (StorageMetrics, error) + Close() error +} + +type StorageMetrics struct { + Size int64 +} + +type monitorCollector struct { + compbasemetrics.BaseStableCollector + + monitorGetter func() ([]Monitor, error) +} + +// DescribeWithStability implements compbasemetrics.StableColletor +func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) { + ch <- storageSizeDescription +} + +// CollectWithStability implements compbasemetrics.StableColletor +func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) { + monitors, err := c.monitorGetter() + if err != nil { + return + } + + for i, m := range monitors { + server := fmt.Sprintf("etcd-%d", i) + + klog.V(4).InfoS("Start collecting storage metrics", "server", server) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + metrics, err := m.Monitor(ctx) + cancel() + m.Close() + if err != nil { + klog.InfoS("Failed to get storage metrics", "server", server, "err", err) + continue + } + + metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server) + if err != nil { + klog.ErrorS(err, "Failed to create metric", "server", server) + } + ch <- metric + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 64bcabadb97..5736abf63c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "log" + "math/rand" "net" "net/url" "os" @@ -37,6 +38,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "google.golang.org/grpc" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -52,7 +54,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics/legacyregistry" tracing "k8s.io/component-base/tracing" - "k8s.io/klog/v2" ) const ( @@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // retry in a loop in the background until we successfully create the client, storing the client or error encountered lock := sync.RWMutex{} - var prober *etcd3Prober + var prober *etcd3ProberMonitor clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - newProber, err := newETCD3Prober(c) + newProber, err := newETCD3ProberMonitor(c) lock.Lock() defer lock.Unlock() // Ensure that server is already not shutting down. @@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan }, nil } -func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) { +func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) { client, err := newETCD3Client(c.Transport) if err != nil { return nil, err } - return &etcd3Prober{ - client: client, - prefix: c.Prefix, + return &etcd3ProberMonitor{ + client: client, + prefix: c.Prefix, + endpoints: c.Transport.ServerList, }, nil } -type etcd3Prober struct { - prefix string +type etcd3ProberMonitor struct { + prefix string + endpoints []string mux sync.RWMutex client *clientv3.Client closed bool } -func (p *etcd3Prober) Close() error { - p.mux.Lock() - defer p.mux.Unlock() - if !p.closed { - p.closed = true - return p.client.Close() +func (t *etcd3ProberMonitor) Close() error { + t.mux.Lock() + defer t.mux.Unlock() + if !t.closed { + t.closed = true + return t.client.Close() } - return fmt.Errorf("prober was closed") + return fmt.Errorf("closed") } -func (p *etcd3Prober) Probe(ctx context.Context) error { - p.mux.RLock() - defer p.mux.RUnlock() - if p.closed { - return fmt.Errorf("prober was closed") +func (t *etcd3ProberMonitor) Probe(ctx context.Context) error { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return fmt.Errorf("closed") } // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 - _, err := p.client.Get(ctx, path.Join("/", p.prefix, "health")) + _, err := t.client.Get(ctx, path.Join("/", t.prefix, "health")) if err != nil { return fmt.Errorf("error getting data from etcd: %w", err) } return nil } +func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return metrics.StorageMetrics{}, fmt.Errorf("closed") + } + status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)]) + if err != nil { + return metrics.StorageMetrics{}, err + } + return metrics.StorageMetrics{ + Size: status.DbSize, + }, nil +} + var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, @@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint. +// Deprecated: Will be replaced with newETCD3ProberMonitor func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) { if interval == 0 { return func() {}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index c8cdd19b97a..1a60c92902c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) { case storagebackend.StorageTypeETCD2: return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3Prober(c) + return newETCD3ProberMonitor(c) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} + +func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) { + switch c.Type { + case storagebackend.StorageTypeETCD2: + return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) + case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: + return newETCD3ProberMonitor(c) default: return nil, fmt.Errorf("unknown storage type: %s", c.Type) } diff --git a/test/instrumentation/documentation/documentation-list.yaml b/test/instrumentation/documentation/documentation-list.yaml index f29e16ef063..b0ddf1fbcbc 100644 --- a/test/instrumentation/documentation/documentation-list.yaml +++ b/test/instrumentation/documentation/documentation-list.yaml @@ -3703,6 +3703,7 @@ subsystem: apiserver help: Total size of the storage database file physically allocated in bytes. type: Gauge + deprecatedVersion: "1.28.0" stabilityLevel: ALPHA labels: - endpoint @@ -3750,6 +3751,12 @@ stabilityLevel: ALPHA labels: - resource +- name: apiserver_storage_size_bytes + help: Size of the storage database file physically allocated in bytes. + type: Custom + stabilityLevel: ALPHA + labels: + - server - name: transformation_duration_seconds subsystem: storage namespace: apiserver diff --git a/test/instrumentation/documentation/documentation.md b/test/instrumentation/documentation/documentation.md index adcbac0c55e..f4bd98cdaf5 100644 --- a/test/instrumentation/documentation/documentation.md +++ b/test/instrumentation/documentation/documentation.md @@ -878,7 +878,7 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu Total size of the storage database file physically allocated in bytes.
endpoint
- +1.28.0 apiserver_storage_decode_errors_total ALPHA Counter @@ -928,6 +928,13 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
resource
+apiserver_storage_size_bytes +ALPHA +Custom +Size of the storage database file physically allocated in bytes. +
server
+ + apiserver_storage_transformation_duration_seconds ALPHA Histogram diff --git a/test/integration/metrics/metrics_test.go b/test/integration/metrics/metrics_test.go index d91147f203a..de0087e26f6 100644 --- a/test/integration/metrics/metrics_test.go +++ b/test/integration/metrics/metrics_test.go @@ -76,6 +76,28 @@ func TestAPIServerProcessMetrics(t *testing.T) { }) } +func TestAPIServerStorageMetrics(t *testing.T) { + s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer s.TearDownFn() + + metrics, err := scrapeMetrics(s) + if err != nil { + t.Fatal(err) + } + + samples, ok := metrics["apiserver_storage_size_bytes"] + if !ok { + t.Fatalf("apiserver_storage_size_bytes metric not exposed") + } + if len(samples) != 1 { + t.Fatalf("Unexpected number of samples in apiserver_storage_size_bytes") + } + + if samples[0].Value == -1 { + t.Errorf("Unexpected non-zero apiserver_storage_size_bytes, got: %s", samples[0].Value) + } +} + func TestAPIServerMetrics(t *testing.T) { s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer s.TearDownFn()