mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Improve apiserver storage size metric to allow it's graduation
Change name to make it compliant with prometheus guidelines. Calculate it on demand instead of periodic to comply with prometheus standards. Replace "endpoint" with "server" label to make it semantically consistent with storage factory
This commit is contained in:
parent
c2fb8057b2
commit
7a63997c8a
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagevalue "k8s.io/apiserver/pkg/storage/value"
|
||||
@ -238,10 +239,34 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.SetStorageMonitorGetter(monitorGetter(factory))
|
||||
|
||||
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
|
||||
return nil
|
||||
}
|
||||
|
||||
func monitorGetter(factory serverstorage.StorageFactory) func() (monitors []metrics.Monitor, err error) {
|
||||
return func() (monitors []metrics.Monitor, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, m := range monitors {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var m metrics.Monitor
|
||||
for _, cfg := range factory.Configs() {
|
||||
m, err = storagefactory.CreateMonitor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
monitors = append(monitors, m)
|
||||
}
|
||||
return monitors, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
|
||||
if resourceTransformers != nil {
|
||||
factory = &transformerStorageFactory{
|
||||
|
@ -17,11 +17,14 @@ limitations under the License.
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -73,13 +76,16 @@ var (
|
||||
)
|
||||
dbTotalSize = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
DeprecatedVersion: "1.28.0",
|
||||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"server"}, nil, compbasemetrics.ALPHA, "")
|
||||
storageMonitor = &monitorCollector{}
|
||||
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Subsystem: "apiserver",
|
||||
@ -160,6 +166,7 @@ func Register() {
|
||||
legacyregistry.MustRegister(etcdRequestErrorCounts)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.CustomMustRegister(storageMonitor)
|
||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
legacyregistry.MustRegister(listStorageCount)
|
||||
@ -214,10 +221,16 @@ var sinceInSeconds = func(start time.Time) float64 {
|
||||
}
|
||||
|
||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
|
||||
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
|
||||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
|
||||
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
|
||||
storageMonitor.monitorGetter = getter
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
@ -232,3 +245,51 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
|
||||
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
|
||||
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
|
||||
}
|
||||
|
||||
type Monitor interface {
|
||||
Monitor(ctx context.Context) (StorageMetrics, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type StorageMetrics struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
type monitorCollector struct {
|
||||
compbasemetrics.BaseStableCollector
|
||||
|
||||
monitorGetter func() ([]Monitor, error)
|
||||
}
|
||||
|
||||
// DescribeWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
|
||||
ch <- storageSizeDescription
|
||||
}
|
||||
|
||||
// CollectWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
|
||||
monitors, err := c.monitorGetter()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, m := range monitors {
|
||||
server := fmt.Sprintf("etcd-%d", i)
|
||||
|
||||
klog.V(4).InfoS("Start collecting storage metrics", "server", server)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
metrics, err := m.Monitor(ctx)
|
||||
cancel()
|
||||
m.Close()
|
||||
if err != nil {
|
||||
klog.InfoS("Failed to get storage metrics", "server", server, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), server)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to create metric", "server", server)
|
||||
}
|
||||
ch <- metric
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -37,6 +38,7 @@ import (
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
@ -52,7 +54,6 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -153,11 +154,11 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
||||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
lock := sync.RWMutex{}
|
||||
var prober *etcd3Prober
|
||||
var prober *etcd3ProberMonitor
|
||||
clientErr := fmt.Errorf("etcd client connection not yet established")
|
||||
|
||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||
newProber, err := newETCD3Prober(c)
|
||||
newProber, err := newETCD3ProberMonitor(c)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
// Ensure that server is already not shutting down.
|
||||
@ -221,49 +222,66 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) {
|
||||
func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) {
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &etcd3Prober{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
return &etcd3ProberMonitor{
|
||||
client: client,
|
||||
prefix: c.Prefix,
|
||||
endpoints: c.Transport.ServerList,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type etcd3Prober struct {
|
||||
prefix string
|
||||
type etcd3ProberMonitor struct {
|
||||
prefix string
|
||||
endpoints []string
|
||||
|
||||
mux sync.RWMutex
|
||||
client *clientv3.Client
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Close() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
if !p.closed {
|
||||
p.closed = true
|
||||
return p.client.Close()
|
||||
func (t *etcd3ProberMonitor) Close() error {
|
||||
t.mux.Lock()
|
||||
defer t.mux.Unlock()
|
||||
if !t.closed {
|
||||
t.closed = true
|
||||
return t.client.Close()
|
||||
}
|
||||
return fmt.Errorf("prober was closed")
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
|
||||
func (p *etcd3Prober) Probe(ctx context.Context) error {
|
||||
p.mux.RLock()
|
||||
defer p.mux.RUnlock()
|
||||
if p.closed {
|
||||
return fmt.Errorf("prober was closed")
|
||||
func (t *etcd3ProberMonitor) Probe(ctx context.Context) error {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
||||
_, err := p.client.Get(ctx, path.Join("/", p.prefix, "health"))
|
||||
_, err := t.client.Get(ctx, path.Join("/", t.prefix, "health"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting data from etcd: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return metrics.StorageMetrics{}, fmt.Errorf("closed")
|
||||
}
|
||||
status, err := t.client.Status(ctx, t.endpoints[rand.Int()%len(t.endpoints)])
|
||||
if err != nil {
|
||||
return metrics.StorageMetrics{}, err
|
||||
}
|
||||
return metrics.StorageMetrics{
|
||||
Size: status.DbSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
@ -441,6 +459,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
|
||||
|
||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
|
||||
// Deprecated: Will be replaced with newETCD3ProberMonitor
|
||||
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
|
||||
if interval == 0 {
|
||||
return func() {}, nil
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
@ -68,7 +69,18 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3Prober(c)
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3ProberMonitor(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
|
@ -3703,6 +3703,7 @@
|
||||
subsystem: apiserver
|
||||
help: Total size of the storage database file physically allocated in bytes.
|
||||
type: Gauge
|
||||
deprecatedVersion: "1.28.0"
|
||||
stabilityLevel: ALPHA
|
||||
labels:
|
||||
- endpoint
|
||||
@ -3750,6 +3751,12 @@
|
||||
stabilityLevel: ALPHA
|
||||
labels:
|
||||
- resource
|
||||
- name: apiserver_storage_size_bytes
|
||||
help: Size of the storage database file physically allocated in bytes.
|
||||
type: Custom
|
||||
stabilityLevel: ALPHA
|
||||
labels:
|
||||
- server
|
||||
- name: transformation_duration_seconds
|
||||
subsystem: storage
|
||||
namespace: apiserver
|
||||
|
@ -878,7 +878,7 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
|
||||
<td class="metric_description">Total size of the storage database file physically allocated in bytes.</td>
|
||||
<td class="metric_labels_varying"><div class="metric_label">endpoint</div></td>
|
||||
<td class="metric_labels_constant"></td>
|
||||
<td class="metric_deprecated_version"></td></tr>
|
||||
<td class="metric_deprecated_version">1.28.0</td></tr>
|
||||
<tr class="metric"><td class="metric_name">apiserver_storage_decode_errors_total</td>
|
||||
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
|
||||
<td class="metric_type" data-type="counter">Counter</td>
|
||||
@ -928,6 +928,13 @@ components using an HTTP scrape, and fetch the current metrics data in Prometheu
|
||||
<td class="metric_labels_varying"><div class="metric_label">resource</div></td>
|
||||
<td class="metric_labels_constant"></td>
|
||||
<td class="metric_deprecated_version"></td></tr>
|
||||
<tr class="metric"><td class="metric_name">apiserver_storage_size_bytes</td>
|
||||
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
|
||||
<td class="metric_type" data-type="custom">Custom</td>
|
||||
<td class="metric_description">Size of the storage database file physically allocated in bytes.</td>
|
||||
<td class="metric_labels_varying"><div class="metric_label">server</div></td>
|
||||
<td class="metric_labels_constant"></td>
|
||||
<td class="metric_deprecated_version"></td></tr>
|
||||
<tr class="metric"><td class="metric_name">apiserver_storage_transformation_duration_seconds</td>
|
||||
<td class="metric_stability_level" data-stability="alpha">ALPHA</td>
|
||||
<td class="metric_type" data-type="histogram">Histogram</td>
|
||||
|
@ -76,6 +76,28 @@ func TestAPIServerProcessMetrics(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAPIServerStorageMetrics(t *testing.T) {
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
metrics, err := scrapeMetrics(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
samples, ok := metrics["apiserver_storage_size_bytes"]
|
||||
if !ok {
|
||||
t.Fatalf("apiserver_storage_size_bytes metric not exposed")
|
||||
}
|
||||
if len(samples) != 1 {
|
||||
t.Fatalf("Unexpected number of samples in apiserver_storage_size_bytes")
|
||||
}
|
||||
|
||||
if samples[0].Value == -1 {
|
||||
t.Errorf("Unexpected non-zero apiserver_storage_size_bytes, got: %s", samples[0].Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIServerMetrics(t *testing.T) {
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
Loading…
Reference in New Issue
Block a user