From 6ade0a12177c1b9e69ee45296fd1dc743a7cdc96 Mon Sep 17 00:00:00 2001 From: gmarek Date: Thu, 26 Nov 2015 15:43:30 +0100 Subject: [PATCH] Add a tool for monitoring log generation rates --- hack/verify-flags/known-flags.txt | 1 + test/e2e/e2e_test.go | 1 + test/e2e/framework.go | 21 +++ test/e2e/log_size_monitoring.go | 228 ++++++++++++++++++++++++++++++ test/e2e/util.go | 1 + 5 files changed, 252 insertions(+) create mode 100644 test/e2e/log_size_monitoring.go diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index d86df0a7d24..a1564c2c570 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -103,6 +103,7 @@ framework-name framework-weburi func-dest fuzz-iters +gather-logs-sizes gather-resource-usage gce-project gce-service-account diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index dd4f26893c8..f9aed5f7e37 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -89,6 +89,7 @@ func init() { flag.BoolVar(&testContext.DeleteNamespace, "delete-namespace", true, "If true tests will delete namespace after completion. It is only designed to make debugging easier, DO NOT turn it off by default.") flag.BoolVar(&testContext.CleanStart, "clean-start", false, "If true, purge all namespaces except default and system before running tests. This serves to cleanup test namespaces from failed/interrupted e2e runs in a long-lived cluster.") flag.BoolVar(&testContext.GatherKubeSystemResourceUsageData, "gather-resource-usage", false, "If set to true framework will be monitoring resource usage of system add-ons in (some) e2e tests.") + flag.BoolVar(&testContext.GatherLogsSizes, "gather-logs-sizes", false, "If set to true framework will be monitoring logs sizes on all machines running e2e tests.") } func TestE2E(t *testing.T) { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 9b48c7a5328..d72cf82148b 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strings" + "sync" "time" "k8s.io/kubernetes/pkg/api" @@ -42,6 +43,10 @@ type Framework struct { NamespaceDeletionTimeout time.Duration gatherer containerResourceGatherer + + logsSizeWaitGroup sync.WaitGroup + logsSizeCloseChannel chan bool + logsSizeVerifier *LogsSizeVerifier } // NewFramework makes a new framework and sets up a BeforeEach/AfterEach for @@ -82,6 +87,17 @@ func (f *Framework) beforeEach() { if testContext.GatherKubeSystemResourceUsageData { f.gatherer.startGatheringData(c, time.Minute) } + + if testContext.GatherLogsSizes { + f.logsSizeWaitGroup = sync.WaitGroup{} + f.logsSizeWaitGroup.Add(1) + f.logsSizeCloseChannel = make(chan bool) + f.logsSizeVerifier = NewLogsVerifier(c, f.logsSizeCloseChannel) + go func() { + f.logsSizeVerifier.Run() + f.logsSizeWaitGroup.Done() + }() + } } // afterEach deletes the namespace, after reading its events. @@ -126,6 +142,11 @@ func (f *Framework) afterEach() { if testContext.GatherKubeSystemResourceUsageData { f.gatherer.stopAndPrintData([]int{50, 90, 99, 100}) } + + if testContext.GatherLogsSizes { + close(f.logsSizeCloseChannel) + f.logsSizeWaitGroup.Wait() + } // Paranoia-- prevent reuse! f.Namespace = nil f.Client = nil diff --git a/test/e2e/log_size_monitoring.go b/test/e2e/log_size_monitoring.go new file mode 100644 index 00000000000..b948da53960 --- /dev/null +++ b/test/e2e/log_size_monitoring.go @@ -0,0 +1,228 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "bytes" + "fmt" + "strconv" + "strings" + "sync" + "text/tabwriter" + "time" + + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +const ( + // Minimal period between polling log sizes from components + pollingPeriod = 5 * time.Second + workersNo = 5 + kubeletLogsPath = "/var/log/kubelet.log" + kubeProxyLogsPath = "/var/log/kube-proxy.log" + kubeAddonsLogsPath = "/var/log/kube-addons.log" + kubeMasterAddonsLogsPath = "/var/log/kube-master-addons.log" + apiServerLogsPath = "/var/log/kube-apiserver.log" + controllersLogsPath = "/var/log/kube-controller-manager.log" + schedulerLogsPath = "/var/log/kube-scheduler.log" +) + +var ( + nodeLogsToCheck = []string{kubeletLogsPath, kubeProxyLogsPath} + masterLogsToCheck = []string{kubeletLogsPath, kubeAddonsLogsPath, kubeMasterAddonsLogsPath, + apiServerLogsPath, controllersLogsPath, schedulerLogsPath} +) + +// TimestampedSize contains a size together with a time of measurement. +type TimestampedSize struct { + timestamp time.Time + size int +} + +// LogSizeGatherer is a worker which grabs a WorkItem from the channel and does assigned work. +type LogSizeGatherer struct { + stopChannel chan bool + data *LogsSizeData + wg *sync.WaitGroup + workChannel chan WorkItem +} + +// LogsSizeVerifier gathers data about log files sizes from master and node machines. +// It oversees a workers which do the gathering. +type LogsSizeVerifier struct { + client *client.Client + stopChannel chan bool + // data stores LogSizeData groupped per IP and log_path + data LogsSizeData + masterAddress string + nodeAddresses []string + wg sync.WaitGroup + workChannel chan WorkItem + workers []*LogSizeGatherer +} + +type LogsSizeData struct { + data map[string]map[string][]TimestampedSize + lock sync.Mutex +} + +// WorkItem is a command for a worker that contains an IP of machine from which we want to +// gather data and paths to all files we're interested in. +type WorkItem struct { + ip string + paths []string +} + +func prepareData(masterAddress string, nodeAddresses []string) LogsSizeData { + data := make(map[string]map[string][]TimestampedSize) + ips := append(nodeAddresses, masterAddress) + for _, ip := range ips { + data[ip] = make(map[string][]TimestampedSize) + } + return LogsSizeData{ + data: data, + lock: sync.Mutex{}, + } +} + +func (d *LogsSizeData) AddNewData(ip, path string, timestamp time.Time, size int) { + d.lock.Lock() + defer d.lock.Unlock() + d.data[ip][path] = append( + d.data[ip][path], + TimestampedSize{ + timestamp: timestamp, + size: size, + }, + ) +} + +func (d *LogsSizeData) PrintData() string { + d.lock.Lock() + defer d.lock.Unlock() + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n") + for k, v := range d.data { + fmt.Fprintf(w, "%v\t\t\t\n", k) + for path, data := range v { + if len(data) > 1 { + last := data[len(data)-1] + first := data[0] + rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second) + fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, rate, len(data)) + } + } + } + w.Flush() + return buf.String() +} + +// NewLogsVerifier creates a new LogsSizeVerifier which will stop when stopChannel is closed +func NewLogsVerifier(c *client.Client, stopChannel chan bool) *LogsSizeVerifier { + nodeAddresses, err := NodeSSHHosts(c) + expectNoError(err) + masterAddress := getMasterHost() + ":22" + + workChannel := make(chan WorkItem, len(nodeAddresses)+1) + workers := make([]*LogSizeGatherer, workersNo) + + verifier := &LogsSizeVerifier{ + client: c, + stopChannel: stopChannel, + data: prepareData(masterAddress, nodeAddresses), + masterAddress: masterAddress, + nodeAddresses: nodeAddresses, + wg: sync.WaitGroup{}, + workChannel: workChannel, + workers: workers, + } + verifier.wg.Add(workersNo) + for i := 0; i < workersNo; i++ { + workers[i] = &LogSizeGatherer{ + stopChannel: stopChannel, + data: &verifier.data, + wg: &verifier.wg, + workChannel: workChannel, + } + } + return verifier +} + +// PrintData returns a string with formated results +func (v *LogsSizeVerifier) PrintData() string { + return v.data.PrintData() +} + +// Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed +func (v *LogsSizeVerifier) Run() { + v.workChannel <- WorkItem{ + ip: v.masterAddress, + paths: masterLogsToCheck, + } + for _, node := range v.nodeAddresses { + v.workChannel <- WorkItem{ + ip: node, + paths: nodeLogsToCheck, + } + } + for _, worker := range v.workers { + go worker.Run() + } + <-v.stopChannel + v.wg.Wait() + + Logf("\n%v", v.PrintData()) +} + +func (g *LogSizeGatherer) Run() { + for g.Work() { + } +} + +// Work does a single unit of work: tries to take out a WorkItem from the queue, ssh-es into a given machine, +// gathers data, writes it to the shared map, and creates a gorouting which reinserts work item into +// the queue with a delay. +func (g *LogSizeGatherer) Work() bool { + var workItem WorkItem + select { + case <-g.stopChannel: + g.wg.Done() + return false + case workItem = <-g.workChannel: + } + sshResult, err := SSH( + fmt.Sprintf("ls -l %v | awk '{print $9, $5}' | tr '\n' ' '", strings.Join(workItem.paths, " ")), + workItem.ip, + testContext.Provider, + ) + expectNoError(err) + results := strings.Split(sshResult.Stdout, " ") + + now := time.Now() + for i := 0; i+1 < len(results); i = i + 2 { + path := results[i] + size, err := strconv.Atoi(results[i+1]) + expectNoError(err) + g.data.AddNewData(workItem.ip, path, now, size) + } + go func() { + time.Sleep(pollingPeriod) + g.workChannel <- workItem + }() + return true +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 49a9696426a..5ede8a7cf44 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -139,6 +139,7 @@ type TestContextType struct { // If set to true framework will start a goroutine monitoring resource usage of system add-ons. // It will read the data every 30 seconds from all Nodes and print summary during afterEach. GatherKubeSystemResourceUsageData bool + GatherLogsSizes bool } var testContext TestContextType