Merge pull request #118812 from serathius/storage-metric

Improve apiserver storage size metric
This commit is contained in:
Kubernetes Prow Robot 2023-07-12 10:57:26 -07:00 committed by GitHub
commit 2ec4e14bfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 181 additions and 28 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagevalue "k8s.io/apiserver/pkg/storage/value"
@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
return err
}
metrics.SetStorageMonitorGetter(monitorGetter(factory))
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
return nil
}
func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) {
return func() (monitors []metrics.Monitor, err error) {
defer func() {
if err != nil {
for _, m := range monitors {
m.Close()
}
}
}()
var m metrics.Monitor
for _, cfg := range factory.Configs() {
m, err = storagefactory.CreateMonitor(cfg)
if err != nil {
return nil, err
}
monitors = append(monitors, m)
}
return monitors, nil
}
}
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
if resourceTransformers != nil {
factory = &transformerStorageFactory{

View File

@ -17,11 +17,14 @@ limitations under the License.
package metrics
import (
"context"
"fmt"
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)
/*
@ -73,13 +76,16 @@ var (
)
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
DeprecatedVersion: "1.28.0",
},
[]string{"endpoint"},
)
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "")
storageMonitor = &monitorCollector{}
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: "apiserver",
@ -160,6 +166,7 @@ func Register() {
legacyregistry.MustRegister(etcdRequestErrorCounts)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.CustomMustRegister(storageMonitor)
legacyregistry.MustRegister(etcdBookmarkCounts)
legacyregistry.MustRegister(etcdLeaseObjectCounts)
legacyregistry.MustRegister(listStorageCount)
@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 {
}
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
func UpdateEtcdDbSize(ep string, size int64) {
dbTotalSize.WithLabelValues(ep).Set(float64(size))
}
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
storageMonitor.monitorGetter = getter
}
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
func UpdateLeaseObjectCount(count int64) {
// Currently we only store one previous lease, since all the events have the same ttl.
@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
}
type Monitor interface {
Monitor(ctx context.Context) (StorageMetrics, error)
Close() error
}
type StorageMetrics struct {
Size int64
}
type monitorCollector struct {
compbasemetrics.BaseStableCollector
monitorGetter func() ([]Monitor, error)
}
// DescribeWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
ch <- storageSizeDescription
}
// CollectWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
monitors, err := c.monitorGetter()
if err != nil {
return
}
for i, m := range monitors {
server := fmt.Sprintf("etcd-%d", i)
klog.V(4).InfoS("Start collecting storage metrics", "server", server)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
metrics, err := m.Monitor(ctx)
cancel()
m.Close()
if err != nil {
klog.InfoS("Failed to get storage metrics", "server", server, "err", err)
continue
}
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server)
if err != nil {
klog.ErrorS(err, "Failed to create metric", "server", server)
}
ch <- metric
}
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net"
"net/url"
"os"
@ -37,6 +38,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -52,7 +54,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
const (
@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
lock := sync.RWMutex{}
var prober *etcd3Prober
var prober *etcd3ProberMonitor
clientErr := fmt.Errorf("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
newProber, err := newETCD3Prober(c)
newProber, err := newETCD3ProberMonitor(c)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}, nil
}
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) {
client, err := newETCD3Client(c.Transport)
if err != nil {
return nil, err
}
return &etcd3Prober{
client: client,
prefix: c.Prefix,
return &etcd3ProberMonitor{
client: client,
prefix: c.Prefix,
endpoints: c.Transport.ServerList,
}, nil
}
type etcd3Prober struct {
prefix string
type etcd3ProberMonitor struct {
prefix string
endpoints []string
mux sync.RWMutex
client *clientv3.Client
closed bool
}
func (p *etcd3Prober) Close() error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.closed {
p.closed = true
return p.client.Close()
func (t *etcd3ProberMonitor) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if !t.closed {
t.closed = true
return t.client.Close()
}
return fmt.Errorf("prober was closed")
return fmt.Errorf("closed")
}
func (p *etcd3Prober) Probe(ctx context.Context) error {
p.mux.RLock()
defer p.mux.RUnlock()
if p.closed {
return fmt.Errorf("prober was closed")
func (t *etcd3ProberMonitor) Probe(ctx context.Context) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return fmt.Errorf("closed")
}
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
_, err := t.client.Get(ctx, path.Join("/", t.prefix, "health"))
if err != nil {
return fmt.Errorf("error getting data from etcd: %w", err)
}
return nil
}
func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return metrics.StorageMetrics{}, fmt.Errorf("closed")
}
status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)])
if err != nil {
return metrics.StorageMetrics{}, err
}
return metrics.StorageMetrics{
Size: status.DbSize,
}, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
// Deprecated: Will be replaced with newETCD3ProberMonitor
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
if interval == 0 {
return func() {}, nil

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Prober(c)
return newETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -3707,6 +3707,7 @@
subsystem: apiserver
help: Total size of the storage database file physically allocated in bytes.
type: Gauge
deprecatedVersion: "1.28.0"
stabilityLevel: ALPHA
labels:
- endpoint
@ -3754,6 +3755,12 @@
stabilityLevel: ALPHA
labels:
- resource
- name: apiserver_storage_size_bytes
help: Size of the storage database file physically allocated in bytes.
type: Custom
stabilityLevel: ALPHA
labels:
- server
- name: transformation_duration_seconds
subsystem: storage
namespace: apiserver

View File

@ -878,7 +878,7 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
<td class="metric_description">Total size of the storage database file physically allocated in bytes.</td>
<td class="metric_labels_varying"><div class="metric_label">endpoint</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<td class="metric_deprecated_version">1.28.0</td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_decode_errors_total</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="counter">Counter</td>
@ -928,6 +928,13 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
<td class="metric_labels_varying"><div class="metric_label">resource</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_size_bytes</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="custom">Custom</td>
<td class="metric_description">Size of the storage database file physically allocated in bytes.</td>
<td class="metric_labels_varying"><div class="metric_label">server</div></td>
<td class="metric_labels_constant"></td>
<td class="metric_deprecated_version"></td></tr>
<tr class="metric"><td class="metric_name">apiserver_storage_transformation_duration_seconds</td>
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
<td class="metric_type" data-type="histogram">Histogram</td>

View File

@ -76,6 +76,28 @@ func TestAPIServerProcessMetrics(t *testing.T) {
})
}
func TestAPIServerStorageMetrics(t *testing.T) {
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer s.TearDownFn()
metrics, err := scrapeMetrics(s)
if err != nil {
t.Fatal(err)
}
samples, ok := metrics["apiserver_storage_size_bytes"]
if !ok {
t.Fatalf("apiserver_storage_size_bytes metric not exposed")
}
if len(samples) != 1 {
t.Fatalf("Unexpected number of samples in apiserver_storage_size_bytes")
}
if samples[0].Value == -1 {
t.Errorf("Unexpected non-zero apiserver_storage_size_bytes, got: %s", samples[0].Value)
}
}
func TestAPIServerMetrics(t *testing.T) {
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer s.TearDownFn()