mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Add etcd DB size monitoring in density test
This commit is contained in:
parent
0e31372b2f
commit
203664933d
@ -27,6 +27,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -252,6 +253,7 @@ type EtcdMetrics struct {
|
||||
SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"`
|
||||
PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"`
|
||||
WalFsyncDuration HistogramVec `json:"walFsyncDuration"`
|
||||
MaxDatabaseSize float64 `json:"maxDatabaseSize"`
|
||||
}
|
||||
|
||||
func newEtcdMetrics() *EtcdMetrics {
|
||||
@ -275,6 +277,100 @@ func (l *EtcdMetrics) PrintJSON() string {
|
||||
return PrettyPrintJSON(l)
|
||||
}
|
||||
|
||||
type EtcdMetricsCollector struct {
|
||||
stopCh chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
metrics *EtcdMetrics
|
||||
}
|
||||
|
||||
func NewEtcdMetricsCollector() *EtcdMetricsCollector {
|
||||
return &EtcdMetricsCollector{
|
||||
stopCh: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
metrics: newEtcdMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
func getEtcdMetrics() ([]*model.Sample, error) {
|
||||
// Etcd is only exposed on localhost level. We are using ssh method
|
||||
if TestContext.Provider == "gke" {
|
||||
Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cmd := "curl http://localhost:2379/metrics"
|
||||
sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
|
||||
if err != nil || sshResult.Code != 0 {
|
||||
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
|
||||
}
|
||||
data := sshResult.Stdout
|
||||
|
||||
return extractMetricSamples(data)
|
||||
}
|
||||
|
||||
func getEtcdDatabaseSize() (float64, error) {
|
||||
samples, err := getEtcdMetrics()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
for _, sample := range samples {
|
||||
if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" {
|
||||
return float64(sample.Value), nil
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("Couldn't find etcd database size metric")
|
||||
}
|
||||
|
||||
// StartCollecting starts to collect etcd db size metric periodically
|
||||
// and updates MaxDatabaseSize accordingly.
|
||||
func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration) {
|
||||
mc.wg.Add(1)
|
||||
go func() {
|
||||
defer mc.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(interval):
|
||||
dbSize, err := getEtcdDatabaseSize()
|
||||
if err != nil {
|
||||
Logf("Failed to collect etcd database size")
|
||||
continue
|
||||
}
|
||||
mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize)
|
||||
case <-mc.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (mc *EtcdMetricsCollector) StopAndSummarize() error {
|
||||
close(mc.stopCh)
|
||||
mc.wg.Wait()
|
||||
|
||||
// Do some one-off collection of metrics.
|
||||
samples, err := getEtcdMetrics()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, sample := range samples {
|
||||
switch sample.Metric[model.MetricNameLabel] {
|
||||
case "etcd_disk_backend_commit_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration)
|
||||
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration)
|
||||
case "etcd_disk_wal_fsync_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration)
|
||||
case "etcd_network_peer_round_trip_time_seconds_bucket":
|
||||
convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics {
|
||||
return mc.metrics
|
||||
}
|
||||
|
||||
type SaturationTime struct {
|
||||
TimeToSaturate time.Duration `json:"timeToSaturate"`
|
||||
NumberOfNodes int `json:"numberOfNodes"`
|
||||
@ -619,42 +715,6 @@ func convertSampleToBucket(sample *model.Sample, h *HistogramVec) {
|
||||
hist.Buckets[string(sample.Metric["le"])] = int(sample.Value)
|
||||
}
|
||||
|
||||
// VerifyEtcdMetrics verifies etcd metrics by logging them
|
||||
func VerifyEtcdMetrics(c clientset.Interface) (*EtcdMetrics, error) {
|
||||
// Etcd is only exposed on localhost level. We are using ssh method
|
||||
if TestContext.Provider == "gke" {
|
||||
Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cmd := "curl http://localhost:2379/metrics"
|
||||
sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
|
||||
if err != nil || sshResult.Code != 0 {
|
||||
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
|
||||
}
|
||||
data := sshResult.Stdout
|
||||
|
||||
samples, err := extractMetricSamples(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := newEtcdMetrics()
|
||||
for _, sample := range samples {
|
||||
switch sample.Metric[model.MetricNameLabel] {
|
||||
case "etcd_disk_backend_commit_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &result.BackendCommitDuration)
|
||||
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &result.SnapshotSaveTotalDuration)
|
||||
case "etcd_disk_wal_fsync_duration_seconds_bucket":
|
||||
convertSampleToBucket(sample, &result.WalFsyncDuration)
|
||||
case "etcd_network_peer_round_trip_time_seconds_bucket":
|
||||
convertSampleToBucket(sample, &result.PeerRoundTripTime)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func PrettyPrintJSON(metrics interface{}) string {
|
||||
output := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(output).Encode(metrics); err != nil {
|
||||
|
@ -395,6 +395,7 @@ var _ = SIGDescribe("Density", func() {
|
||||
missingMeasurements := 0
|
||||
var testPhaseDurations *timer.TestPhaseTimer
|
||||
var profileGathererStopCh chan struct{}
|
||||
var etcdMetricsCollector *framework.EtcdMetricsCollector
|
||||
|
||||
// Gathers data prior to framework namespace teardown
|
||||
AfterEach(func() {
|
||||
@ -426,7 +427,7 @@ var _ = SIGDescribe("Density", func() {
|
||||
summaries = append(summaries, metrics)
|
||||
}
|
||||
|
||||
// Verify scheduler metrics.
|
||||
// Summarize scheduler metrics.
|
||||
latency, err := framework.VerifySchedulerLatency(c)
|
||||
framework.ExpectNoError(err)
|
||||
if err == nil {
|
||||
@ -443,10 +444,11 @@ var _ = SIGDescribe("Density", func() {
|
||||
summaries = append(summaries, latency)
|
||||
}
|
||||
|
||||
etcdMetrics, err := framework.VerifyEtcdMetrics(c)
|
||||
// Summarize etcd metrics.
|
||||
err = etcdMetricsCollector.StopAndSummarize()
|
||||
framework.ExpectNoError(err)
|
||||
if err == nil {
|
||||
summaries = append(summaries, etcdMetrics)
|
||||
summaries = append(summaries, etcdMetricsCollector.GetMetrics())
|
||||
}
|
||||
|
||||
summaries = append(summaries, testPhaseDurations)
|
||||
@ -509,6 +511,10 @@ var _ = SIGDescribe("Density", func() {
|
||||
// Start apiserver CPU profile gatherer with frequency based on cluster size.
|
||||
profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute
|
||||
profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay)
|
||||
|
||||
// Start etcs metrics collection.
|
||||
etcdMetricsCollector = framework.NewEtcdMetricsCollector()
|
||||
etcdMetricsCollector.StartCollecting(time.Minute)
|
||||
})
|
||||
|
||||
type Density struct {
|
||||
|
Loading…
Reference in New Issue
Block a user