diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 2774c24d665..61394fb14a6 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -236,6 +236,7 @@ oidc-username-claim oom-score-adj output-base output-package +output-print-type output-version out-version path-override diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 1da3e22b1f5..21b29771dec 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -91,6 +91,7 @@ func init() { flag.BoolVar(&testContext.GatherKubeSystemResourceUsageData, "gather-resource-usage", false, "If set to true framework will be monitoring resource usage of system add-ons in (some) e2e tests.") flag.BoolVar(&testContext.GatherLogsSizes, "gather-logs-sizes", false, "If set to true framework will be monitoring logs sizes on all machines running e2e tests.") flag.BoolVar(&testContext.GatherMetricsAfterTest, "gather-metrics-at-teardown", false, "If set to true framwork will gather metrics from all components after each test.") + flag.StringVar(&testContext.OutputPrintType, "output-print-type", "hr", "Comma separated list: 'hr' for human readable summaries 'json' for JSON ones.") } func TestE2E(t *testing.T) { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index afad37ce894..68cee2cd25f 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -52,6 +52,11 @@ type Framework struct { logsSizeVerifier *LogsSizeVerifier } +type TestDataSummary interface { + PrintHumanReadable() string + PrintJSON() string +} + // NewFramework makes a new framework and sets up a BeforeEach/AfterEach for // you (you can write additional before/after each functions). func NewFramework(baseName string) *Framework { @@ -91,7 +96,7 @@ func (f *Framework) beforeEach() { } if testContext.GatherKubeSystemResourceUsageData { - f.gatherer.startGatheringData(c, time.Minute) + f.gatherer.startGatheringData(c, resourceDataGatheringPeriodSeconds*time.Second) } if testContext.GatherLogsSizes { @@ -145,13 +150,31 @@ func (f *Framework) afterEach() { Logf("Found DeleteNamespace=false, skipping namespace deletion!") } + summaries := make([]TestDataSummary, 0) if testContext.GatherKubeSystemResourceUsageData { - f.gatherer.stopAndPrintData([]int{50, 90, 99, 100}, f.addonResourceConstraints) + summaries = append(summaries, f.gatherer.stopAndSummarize([]int{50, 90, 99, 100}, f.addonResourceConstraints)) } if testContext.GatherLogsSizes { close(f.logsSizeCloseChannel) f.logsSizeWaitGroup.Wait() + summaries = append(summaries, f.logsSizeVerifier.GetSummary()) + } + + outputTypes := strings.Split(testContext.OutputPrintType, ",") + for _, printType := range outputTypes { + switch printType { + case "hr": + for i := range summaries { + Logf(summaries[i].PrintHumanReadable()) + } + case "json": + for i := range summaries { + Logf(summaries[i].PrintJSON()) + } + default: + Logf("Unknown ouptut type: %v. Skipping.", printType) + } } if testContext.GatherMetricsAfterTest { diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index 90f00a3e62a..1bea066d151 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "math" "net/http" "sort" "strconv" @@ -39,8 +38,6 @@ import ( "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" - - . "github.com/onsi/gomega" ) // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. @@ -269,52 +266,6 @@ func getOneTimeResourceUsageOnNode( return usageMap, nil } -func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) { - pods, err := c.Pods("kube-system").List(api.ListOptions{}) - if err != nil { - return resourceUsagePerContainer{}, err - } - nodes, err := c.Nodes().List(api.ListOptions{}) - if err != nil { - return resourceUsagePerContainer{}, err - } - containerIDToNameMap := make(map[string]string) - containerIDs := make([]string, 0) - for _, pod := range pods.Items { - for _, container := range pod.Status.ContainerStatuses { - containerID := strings.TrimPrefix(container.ContainerID, "docker:/") - containerIDToNameMap[containerID] = pod.Name + "/" + container.Name - containerIDs = append(containerIDs, containerID) - } - } - - mutex := sync.Mutex{} - wg := sync.WaitGroup{} - wg.Add(len(nodes.Items)) - errors := make([]error, 0) - nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap)) - for _, node := range nodes.Items { - go func(nodeName string) { - defer wg.Done() - nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true) - mutex.Lock() - defer mutex.Unlock() - if err != nil { - errors = append(errors, err) - return - } - for k, v := range nodeUsage { - nameToUsageMap[containerIDToNameMap[k]] = v - } - }(node.Name) - } - wg.Wait() - if len(errors) != 0 { - return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors) - } - return nameToUsageMap, nil -} - // logOneTimeResourceUsageSummary collects container resource for the list of // nodes, formats and logs the stats. func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) { @@ -361,139 +312,6 @@ type usageDataPerContainer struct { memWorkSetData []int64 } -func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { - if len(timeSeries) == 0 { - return make(map[int]resourceUsagePerContainer) - } - dataMap := make(map[string]*usageDataPerContainer) - for _, singleStatistic := range timeSeries { - for name, data := range singleStatistic { - if dataMap[name] == nil { - dataMap[name] = &usageDataPerContainer{ - cpuData: make([]float64, len(timeSeries)), - memUseData: make([]int64, len(timeSeries)), - memWorkSetData: make([]int64, len(timeSeries)), - } - } - dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) - dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) - dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) - } - } - for _, v := range dataMap { - sort.Float64s(v.cpuData) - sort.Sort(int64arr(v.memUseData)) - sort.Sort(int64arr(v.memWorkSetData)) - } - - result := make(map[int]resourceUsagePerContainer) - for _, perc := range percentilesToCompute { - data := make(resourceUsagePerContainer) - for k, v := range dataMap { - percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 - data[k] = &containerResourceUsage{ - Name: k, - CPUUsageInCores: v.cpuData[percentileIndex], - MemoryUsageInBytes: v.memUseData[percentileIndex], - MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], - } - } - result[perc] = data - } - return result -} - -type resourceConstraint struct { - cpuConstraint float64 - memoryConstraint int64 -} - -type containerResourceGatherer struct { - usageTimeseries map[time.Time]resourceUsagePerContainer - stopCh chan struct{} - timer *time.Ticker - wg sync.WaitGroup -} - -func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) { - g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer) - g.wg.Add(1) - g.stopCh = make(chan struct{}) - g.timer = time.NewTicker(period) - go func() error { - for { - select { - case <-g.timer.C: - now := time.Now() - data, err := getKubeSystemContainersResourceUsage(c) - if err != nil { - return err - } - g.usageTimeseries[now] = data - case <-g.stopCh: - g.wg.Done() - return nil - } - } - }() -} - -func (g *containerResourceGatherer) stopAndPrintData(percentiles []int, constraints map[string]resourceConstraint) { - close(g.stopCh) - g.timer.Stop() - g.wg.Wait() - if len(percentiles) == 0 { - Logf("Warning! Empty percentile list for stopAndPrintData.") - return - } - stats := computePercentiles(g.usageTimeseries, percentiles) - sortedKeys := []string{} - for name := range stats[percentiles[0]] { - sortedKeys = append(sortedKeys, name) - } - sort.Strings(sortedKeys) - violatedConstraints := make([]string, 0) - for _, perc := range percentiles { - buf := &bytes.Buffer{} - w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) - fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n") - for _, name := range sortedKeys { - usage := stats[perc][name] - fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024)) - // Verifying 99th percentile of resource usage - if perc == 99 { - // Name has a form: / - containerName := strings.Split(name, "/")[1] - if constraint, ok := constraints[containerName]; ok { - if usage.CPUUsageInCores > constraint.cpuConstraint { - violatedConstraints = append( - violatedConstraints, - fmt.Sprintf("Container %v is using %v/%v CPU", - name, - usage.CPUUsageInCores, - constraint.cpuConstraint, - ), - ) - } - if usage.MemoryWorkingSetInBytes > constraint.memoryConstraint { - violatedConstraints = append( - violatedConstraints, - fmt.Sprintf("Container %v is using %v/%v MB of memory", - name, - float64(usage.MemoryWorkingSetInBytes)/(1024*1024), - float64(constraint.memoryConstraint)/(1024*1024), - ), - ) - } - } - } - } - w.Flush() - Logf("%v percentile:\n%v", perc, buf.String()) - } - Expect(violatedConstraints).To(BeEmpty()) -} - // Performs a get on a node proxy endpoint given the nodename and rest client. func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result { return c.Get(). diff --git a/test/e2e/log_size_monitoring.go b/test/e2e/log_size_monitoring.go index 106d7f62fc3..e7419e1d7d4 100644 --- a/test/e2e/log_size_monitoring.go +++ b/test/e2e/log_size_monitoring.go @@ -75,8 +75,35 @@ type LogsSizeVerifier struct { workers []*LogSizeGatherer } +// node -> file -> data +type LogsSizeDataSummary map[string]map[string][]TimestampedSize + +// TODO: make sure that we don't need locking here +func (s *LogsSizeDataSummary) PrintHumanReadable() string { + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n") + for k, v := range *s { + fmt.Fprintf(w, "%v\t\t\t\n", k) + for path, data := range v { + if len(data) > 1 { + last := data[len(data)-1] + first := data[0] + rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second) + fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, rate, len(data)) + } + } + } + w.Flush() + return buf.String() +} + +func (s *LogsSizeDataSummary) PrintJSON() string { + return "JSON printer not implemented for LogsSizeDataSummary" +} + type LogsSizeData struct { - data map[string]map[string][]TimestampedSize + data LogsSizeDataSummary lock sync.Mutex } @@ -88,7 +115,7 @@ type WorkItem struct { } func prepareData(masterAddress string, nodeAddresses []string) LogsSizeData { - data := make(map[string]map[string][]TimestampedSize) + data := make(LogsSizeDataSummary) ips := append(nodeAddresses, masterAddress) for _, ip := range ips { data[ip] = make(map[string][]TimestampedSize) @@ -111,27 +138,6 @@ func (d *LogsSizeData) AddNewData(ip, path string, timestamp time.Time, size int ) } -func (d *LogsSizeData) PrintData() string { - d.lock.Lock() - defer d.lock.Unlock() - buf := &bytes.Buffer{} - w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) - fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n") - for k, v := range d.data { - fmt.Fprintf(w, "%v\t\t\t\n", k) - for path, data := range v { - if len(data) > 1 { - last := data[len(data)-1] - first := data[0] - rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second) - fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, rate, len(data)) - } - } - } - w.Flush() - return buf.String() -} - // NewLogsVerifier creates a new LogsSizeVerifier which will stop when stopChannel is closed func NewLogsVerifier(c *client.Client, stopChannel chan bool) *LogsSizeVerifier { nodeAddresses, err := NodeSSHHosts(c) @@ -164,8 +170,8 @@ func NewLogsVerifier(c *client.Client, stopChannel chan bool) *LogsSizeVerifier } // PrintData returns a string with formated results -func (v *LogsSizeVerifier) PrintData() string { - return v.data.PrintData() +func (v *LogsSizeVerifier) GetSummary() *LogsSizeDataSummary { + return &v.data.data } // Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed @@ -185,8 +191,6 @@ func (v *LogsSizeVerifier) Run() { } <-v.stopChannel v.wg.Wait() - - Logf("\n%v", v.PrintData()) } func (g *LogSizeGatherer) Run() { diff --git a/test/e2e/resource_usage_gatherer.go b/test/e2e/resource_usage_gatherer.go new file mode 100644 index 00000000000..563c65079d6 --- /dev/null +++ b/test/e2e/resource_usage_gatherer.go @@ -0,0 +1,242 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "bytes" + "fmt" + "math" + "sort" + "strings" + "sync" + "text/tabwriter" + "time" + + . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +const ( + resourceDataGatheringPeriodSeconds = 60 +) + +type resourceConstraint struct { + cpuConstraint float64 + memoryConstraint int64 +} + +type containerResourceGatherer struct { + usageTimeseries map[time.Time]resourceUsagePerContainer + stopCh chan struct{} + timer *time.Ticker + wg sync.WaitGroup +} + +type singleContainerSummary struct { + name string + cpu float64 + mem int64 +} + +type ResourceUsageSummary map[int][]singleContainerSummary + +func (s *ResourceUsageSummary) PrintHumanReadable() string { + buf := &bytes.Buffer{} + w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) + for perc, summaries := range *s { + buf.WriteString(fmt.Sprintf("%v percentile:\n", perc)) + fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n") + for _, summary := range summaries { + fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", summary.name, summary.cpu, float64(summary.mem)/(1024*1024)) + } + w.Flush() + } + return buf.String() +} + +func (s *ResourceUsageSummary) PrintJSON() string { + return "JSON printer not implemented for ResourceUsageSummary" +} + +func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) { + g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer) + g.wg.Add(1) + g.stopCh = make(chan struct{}) + g.timer = time.NewTicker(period) + go func() error { + for { + select { + case <-g.timer.C: + now := time.Now() + data, err := g.getKubeSystemContainersResourceUsage(c) + if err != nil { + return err + } + g.usageTimeseries[now] = data + case <-g.stopCh: + g.wg.Done() + return nil + } + } + }() +} + +func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary { + close(g.stopCh) + g.timer.Stop() + g.wg.Wait() + if len(percentiles) == 0 { + Logf("Warning! Empty percentile list for stopAndPrintData.") + return &ResourceUsageSummary{} + } + stats := g.computePercentiles(g.usageTimeseries, percentiles) + sortedKeys := []string{} + for name := range stats[percentiles[0]] { + sortedKeys = append(sortedKeys, name) + } + sort.Strings(sortedKeys) + violatedConstraints := make([]string, 0) + summary := make(ResourceUsageSummary) + for _, perc := range percentiles { + for _, name := range sortedKeys { + usage := stats[perc][name] + summary[perc] = append(summary[perc], singleContainerSummary{ + name: name, + cpu: usage.CPUUsageInCores, + mem: usage.MemoryWorkingSetInBytes, + }) + // Verifying 99th percentile of resource usage + if perc == 99 { + // Name has a form: / + containerName := strings.Split(name, "/")[1] + if constraint, ok := constraints[containerName]; ok { + if usage.CPUUsageInCores > constraint.cpuConstraint { + violatedConstraints = append( + violatedConstraints, + fmt.Sprintf("Container %v is using %v/%v CPU", + name, + usage.CPUUsageInCores, + constraint.cpuConstraint, + ), + ) + } + if usage.MemoryWorkingSetInBytes > constraint.memoryConstraint { + violatedConstraints = append( + violatedConstraints, + fmt.Sprintf("Container %v is using %v/%v MB of memory", + name, + float64(usage.MemoryWorkingSetInBytes)/(1024*1024), + float64(constraint.memoryConstraint)/(1024*1024), + ), + ) + } + } + } + } + } + Expect(violatedConstraints).To(BeEmpty()) + return &summary +} + +func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { + if len(timeSeries) == 0 { + return make(map[int]resourceUsagePerContainer) + } + dataMap := make(map[string]*usageDataPerContainer) + for _, singleStatistic := range timeSeries { + for name, data := range singleStatistic { + if dataMap[name] == nil { + dataMap[name] = &usageDataPerContainer{ + cpuData: make([]float64, len(timeSeries)), + memUseData: make([]int64, len(timeSeries)), + memWorkSetData: make([]int64, len(timeSeries)), + } + } + dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) + dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) + dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) + } + } + for _, v := range dataMap { + sort.Float64s(v.cpuData) + sort.Sort(int64arr(v.memUseData)) + sort.Sort(int64arr(v.memWorkSetData)) + } + + result := make(map[int]resourceUsagePerContainer) + for _, perc := range percentilesToCompute { + data := make(resourceUsagePerContainer) + for k, v := range dataMap { + percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 + data[k] = &containerResourceUsage{ + Name: k, + CPUUsageInCores: v.cpuData[percentileIndex], + MemoryUsageInBytes: v.memUseData[percentileIndex], + MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], + } + } + result[perc] = data + } + return result +} + +func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) { + pods, err := c.Pods("kube-system").List(api.ListOptions{}) + if err != nil { + return resourceUsagePerContainer{}, err + } + nodes, err := c.Nodes().List(api.ListOptions{}) + if err != nil { + return resourceUsagePerContainer{}, err + } + containerIDToNameMap := make(map[string]string) + containerIDs := make([]string, 0) + for _, pod := range pods.Items { + for _, container := range pod.Status.ContainerStatuses { + containerID := strings.TrimPrefix(container.ContainerID, "docker:/") + containerIDToNameMap[containerID] = pod.Name + "/" + container.Name + containerIDs = append(containerIDs, containerID) + } + } + + mutex := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(nodes.Items)) + errors := make([]error, 0) + nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap)) + for _, node := range nodes.Items { + go func(nodeName string) { + defer wg.Done() + nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true) + mutex.Lock() + defer mutex.Unlock() + if err != nil { + errors = append(errors, err) + return + } + for k, v := range nodeUsage { + nameToUsageMap[containerIDToNameMap[k]] = v + } + }(node.Name) + } + wg.Wait() + if len(errors) != 0 { + return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors) + } + return nameToUsageMap, nil +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 8ed47b77043..4b305ff496d 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -154,6 +154,8 @@ type TestContextType struct { GatherKubeSystemResourceUsageData bool GatherLogsSizes bool GatherMetricsAfterTest bool + // Currently supported values are 'hr' for human-readable and 'json'. It's a comma separated list. + OutputPrintType string } var testContext TestContextType