Improve apiserver storage size metric to allow it's graduation

Change name to make it compliant with prometheus guidelines.
Calculate it on demand instead of periodic to comply with prometheus standards.
Replace "endpoint" with "server" label to make it semantically consistent with storage factory
This commit is contained in:
Marek Siarkowicz 2023-06-22 11:56:09 +02:00
parent c2fb8057b2
commit 7a63997c8a
7 changed files with 181 additions and 28 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagevalue "k8s.io/apiserver/pkg/storage/value"
@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
return err
}
metrics.SetStorageMonitorGetter(monitorGetter(factory))
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
return nil
}
func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) {
return func() (monitors []metrics.Monitor, err error) {
defer func() {
if err != nil {
for _, m := range monitors {
m.Close()
}
}
}()
var m metrics.Monitor
for _, cfg := range factory.Configs() {
m, err = storagefactory.CreateMonitor(cfg)
if err != nil {
return nil, err
}
monitors = append(monitors, m)
}
return monitors, nil
}
}
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
if resourceTransformers != nil {
factory = &transformerStorageFactory{

View File

@ -17,11 +17,14 @@ limitations under the License.
package metrics
import (
"context"
"fmt"
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)
/*
@ -73,13 +76,16 @@ var (
)
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
DeprecatedVersion: "1.28.0",
},
[]string{"endpoint"},
)
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "")
storageMonitor = &monitorCollector{}
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: "apiserver",
@ -160,6 +166,7 @@ func Register() {
legacyregistry.MustRegister(etcdRequestErrorCounts)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.CustomMustRegister(storageMonitor)
legacyregistry.MustRegister(etcdBookmarkCounts)
legacyregistry.MustRegister(etcdLeaseObjectCounts)
legacyregistry.MustRegister(listStorageCount)
@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 {
}
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
func UpdateEtcdDbSize(ep string, size int64) {
dbTotalSize.WithLabelValues(ep).Set(float64(size))
}
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
storageMonitor.monitorGetter = getter
}
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
func UpdateLeaseObjectCount(count int64) {
// Currently we only store one previous lease, since all the events have the same ttl.
@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
}
type Monitor interface {
Monitor(ctx context.Context) (StorageMetrics, error)
Close() error
}
type StorageMetrics struct {
Size int64
}
type monitorCollector struct {
compbasemetrics.BaseStableCollector
monitorGetter func() ([]Monitor, error)
}
// DescribeWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
ch <- storageSizeDescription
}
// CollectWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
monitors, err := c.monitorGetter()
if err != nil {
return
}
for i, m := range monitors {
server := fmt.Sprintf("etcd-%d", i)
klog.V(4).InfoS("Start collecting storage metrics", "server", server)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
metrics, err := m.Monitor(ctx)
cancel()
m.Close()
if err != nil {
klog.InfoS("Failed to get storage metrics", "server", server, "err", err)
continue
}
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server)
if err != nil {
klog.ErrorS(err, "Failed to create metric", "server", server)
}
ch <- metric
}
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net"
"net/url"
"os"
@ -37,6 +38,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -52,7 +54,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
const (
@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
lock := sync.RWMutex{}
var prober *etcd3Prober
var prober *etcd3ProberMonitor
clientErr := fmt.Errorf("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
newProber, err := newETCD3Prober(c)
newProber, err := newETCD3ProberMonitor(c)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}, nil
}
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) {
client, err := newETCD3Client(c.Transport)
if err != nil {
return nil, err
}
return &etcd3Prober{
client: client,
prefix: c.Prefix,
return &etcd3ProberMonitor{
client: client,
prefix: c.Prefix,
endpoints: c.Transport.ServerList,
}, nil
}
type etcd3Prober struct {
prefix string
type etcd3ProberMonitor struct {
prefix string
endpoints []string
mux sync.RWMutex
client *clientv3.Client
closed bool
}
func (p *etcd3Prober) Close() error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.closed {
p.closed = true
return p.client.Close()
func (t *etcd3ProberMonitor) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if !t.closed {
t.closed = true
return t.client.Close()
}
return fmt.Errorf("prober was closed")
return fmt.Errorf("closed")
}
func (p *etcd3Prober) Probe(ctx context.Context) error {
p.mux.RLock()
defer p.mux.RUnlock()
if p.closed {
return fmt.Errorf("prober was closed")
func (t *etcd3ProberMonitor) Probe(ctx context.Context) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return fmt.Errorf("closed")
}
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
_, err := t.client.Get(ctx, path.Join("/", t.prefix, "health"))
if err != nil {
return fmt.Errorf("error getting data from etcd: %w", err)
}
return nil
}
func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return metrics.StorageMetrics{}, fmt.Errorf("closed")
}
status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)])
if err != nil {
return metrics.StorageMetrics{}, err
}
return metrics.StorageMetrics{
Size: status.DbSize,
}, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
// Deprecated: Will be replaced with newETCD3ProberMonitor
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
if interval == 0 {
return func() {}, nil

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Prober(c)
return newETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -3703,6 +3703,7 @@
subsystem: apiserver
help: Total size of the storage database file physically allocated in bytes.
type: Gauge
deprecatedVersion: "1.28.0"
stabilityLevel: ALPHA
labels:
- endpoint
@ -3750,6 +3751,12 @@
stabilityLevel: ALPHA
labels:
- resource
- name: apiserver_storage_size_bytes
help: Size of the storage database file physically allocated in bytes.
type: Custom
stabilityLevel: ALPHA
labels:
- server
- name: transformation_duration_seconds
subsystem: storage
namespace: apiserver

View File

@ -878,7 +878,7 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
<td class="metric_description">Total size of the storage database file physically allocated in bytes.</td>
<td class="metric_labels_varying"><div class="metric_label">endpoint</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<td class="metric_deprecated_version">1.28.0</td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_decode_errors_total</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="counter">Counter</td>
@ -928,6 +928,13 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
<td class="metric_labels_varying"><div class="metric_label">resource</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_size_bytes</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="custom">Custom</td>
<td class="metric_description">Size of the storage database file physically allocated in bytes.</td>
<td class="metric_labels_varying"><div class="metric_label">server</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_transformation_duration_seconds</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="histogram">Histogram</td>

View File

@ -76,6 +76,28 @@ func TestAPIServerProcessMetrics(t *testing.T) {
})
}
func TestAPIServerStorageMetrics(t *testing.T) {
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer s.TearDownFn()
metrics, err := scrapeMetrics(s)
if err != nil {
t.Fatal(err)
}
samples, ok := metrics["apiserver_storage_size_bytes"]
if !ok {
t.Fatalf("apiserver_storage_size_bytes metric not exposed")
}
if len(samples) != 1 {
t.Fatalf("Unexpected number of samples in apiserver_storage_size_bytes")
}
if samples[0].Value == -1 {
t.Errorf("Unexpected non-zero apiserver_storage_size_bytes, got: %s", samples[0].Value)
}
}
func TestAPIServerMetrics(t *testing.T) {
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer s.TearDownFn()