diff --git a/test/e2e/framework/metrics_util.go b/test/e2e/framework/metrics_util.go index 410919aed38..1ef6b37268e 100644 --- a/test/e2e/framework/metrics_util.go +++ b/test/e2e/framework/metrics_util.go @@ -27,6 +27,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -252,6 +253,7 @@ type EtcdMetrics struct { SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"` PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"` WalFsyncDuration HistogramVec `json:"walFsyncDuration"` + MaxDatabaseSize float64 `json:"maxDatabaseSize"` } func newEtcdMetrics() *EtcdMetrics { @@ -275,6 +277,100 @@ func (l *EtcdMetrics) PrintJSON() string { return PrettyPrintJSON(l) } +type EtcdMetricsCollector struct { + stopCh chan struct{} + wg *sync.WaitGroup + metrics *EtcdMetrics +} + +func NewEtcdMetricsCollector() *EtcdMetricsCollector { + return &EtcdMetricsCollector{ + stopCh: make(chan struct{}), + wg: &sync.WaitGroup{}, + metrics: newEtcdMetrics(), + } +} + +func getEtcdMetrics() ([]*model.Sample, error) { + // Etcd is only exposed on localhost level. We are using ssh method + if TestContext.Provider == "gke" { + Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke") + return nil, nil + } + + cmd := "curl http://localhost:2379/metrics" + sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider) + if err != nil || sshResult.Code != 0 { + return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err) + } + data := sshResult.Stdout + + return extractMetricSamples(data) +} + +func getEtcdDatabaseSize() (float64, error) { + samples, err := getEtcdMetrics() + if err != nil { + return 0, err + } + for _, sample := range samples { + if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" { + return float64(sample.Value), nil + } + } + return 0, fmt.Errorf("Couldn't find etcd database size metric") +} + +// StartCollecting starts to collect etcd db size metric periodically +// and updates MaxDatabaseSize accordingly. +func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration) { + mc.wg.Add(1) + go func() { + defer mc.wg.Done() + for { + select { + case <-time.After(interval): + dbSize, err := getEtcdDatabaseSize() + if err != nil { + Logf("Failed to collect etcd database size") + continue + } + mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize) + case <-mc.stopCh: + return + } + } + }() +} + +func (mc *EtcdMetricsCollector) StopAndSummarize() error { + close(mc.stopCh) + mc.wg.Wait() + + // Do some one-off collection of metrics. + samples, err := getEtcdMetrics() + if err != nil { + return err + } + for _, sample := range samples { + switch sample.Metric[model.MetricNameLabel] { + case "etcd_disk_backend_commit_duration_seconds_bucket": + convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration) + case "etcd_debugging_snap_save_total_duration_seconds_bucket": + convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration) + case "etcd_disk_wal_fsync_duration_seconds_bucket": + convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration) + case "etcd_network_peer_round_trip_time_seconds_bucket": + convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime) + } + } + return nil +} + +func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics { + return mc.metrics +} + type SaturationTime struct { TimeToSaturate time.Duration `json:"timeToSaturate"` NumberOfNodes int `json:"numberOfNodes"` @@ -619,42 +715,6 @@ func convertSampleToBucket(sample *model.Sample, h *HistogramVec) { hist.Buckets[string(sample.Metric["le"])] = int(sample.Value) } -// VerifyEtcdMetrics verifies etcd metrics by logging them -func VerifyEtcdMetrics(c clientset.Interface) (*EtcdMetrics, error) { - // Etcd is only exposed on localhost level. We are using ssh method - if TestContext.Provider == "gke" { - Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke") - return nil, nil - } - - cmd := "curl http://localhost:2379/metrics" - sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider) - if err != nil || sshResult.Code != 0 { - return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err) - } - data := sshResult.Stdout - - samples, err := extractMetricSamples(data) - if err != nil { - return nil, err - } - - result := newEtcdMetrics() - for _, sample := range samples { - switch sample.Metric[model.MetricNameLabel] { - case "etcd_disk_backend_commit_duration_seconds_bucket": - convertSampleToBucket(sample, &result.BackendCommitDuration) - case "etcd_debugging_snap_save_total_duration_seconds_bucket": - convertSampleToBucket(sample, &result.SnapshotSaveTotalDuration) - case "etcd_disk_wal_fsync_duration_seconds_bucket": - convertSampleToBucket(sample, &result.WalFsyncDuration) - case "etcd_network_peer_round_trip_time_seconds_bucket": - convertSampleToBucket(sample, &result.PeerRoundTripTime) - } - } - return result, nil -} - func PrettyPrintJSON(metrics interface{}) string { output := &bytes.Buffer{} if err := json.NewEncoder(output).Encode(metrics); err != nil { diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index c966119be79..a06f43c2daf 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -395,6 +395,7 @@ var _ = SIGDescribe("Density", func() { missingMeasurements := 0 var testPhaseDurations *timer.TestPhaseTimer var profileGathererStopCh chan struct{} + var etcdMetricsCollector *framework.EtcdMetricsCollector // Gathers data prior to framework namespace teardown AfterEach(func() { @@ -426,7 +427,7 @@ var _ = SIGDescribe("Density", func() { summaries = append(summaries, metrics) } - // Verify scheduler metrics. + // Summarize scheduler metrics. latency, err := framework.VerifySchedulerLatency(c) framework.ExpectNoError(err) if err == nil { @@ -443,10 +444,11 @@ var _ = SIGDescribe("Density", func() { summaries = append(summaries, latency) } - etcdMetrics, err := framework.VerifyEtcdMetrics(c) + // Summarize etcd metrics. + err = etcdMetricsCollector.StopAndSummarize() framework.ExpectNoError(err) if err == nil { - summaries = append(summaries, etcdMetrics) + summaries = append(summaries, etcdMetricsCollector.GetMetrics()) } summaries = append(summaries, testPhaseDurations) @@ -509,6 +511,10 @@ var _ = SIGDescribe("Density", func() { // Start apiserver CPU profile gatherer with frequency based on cluster size. profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay) + + // Start etcs metrics collection. + etcdMetricsCollector = framework.NewEtcdMetricsCollector() + etcdMetricsCollector.StartCollecting(time.Minute) }) type Density struct {