Add a framework for multiple writers for various test data summaries

This commit is contained in:
gmarek
2015-12-29 09:19:54 +01:00
parent fec5206f1a
commit 1df78a2398
7 changed files with 302 additions and 211 deletions

View File

@@ -236,6 +236,7 @@ oidc-username-claim
oom-score-adj
output-base
output-package
output-print-type
output-version
out-version
path-override

View File

@@ -91,6 +91,7 @@ func init() {
flag.BoolVar(&testContext.GatherKubeSystemResourceUsageData, "gather-resource-usage", false, "If set to true framework will be monitoring resource usage of system add-ons in (some) e2e tests.")
flag.BoolVar(&testContext.GatherLogsSizes, "gather-logs-sizes", false, "If set to true framework will be monitoring logs sizes on all machines running e2e tests.")
flag.BoolVar(&testContext.GatherMetricsAfterTest, "gather-metrics-at-teardown", false, "If set to true framwork will gather metrics from all components after each test.")
flag.StringVar(&testContext.OutputPrintType, "output-print-type", "hr", "Comma separated list: 'hr' for human readable summaries 'json' for JSON ones.")
}
func TestE2E(t *testing.T) {

View File

@@ -52,6 +52,11 @@ type Framework struct {
logsSizeVerifier *LogsSizeVerifier
}
type TestDataSummary interface {
PrintHumanReadable() string
PrintJSON() string
}
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
// you (you can write additional before/after each functions).
func NewFramework(baseName string) *Framework {
@@ -91,7 +96,7 @@ func (f *Framework) beforeEach() {
}
if testContext.GatherKubeSystemResourceUsageData {
f.gatherer.startGatheringData(c, time.Minute)
f.gatherer.startGatheringData(c, resourceDataGatheringPeriodSeconds*time.Second)
}
if testContext.GatherLogsSizes {
@@ -145,13 +150,31 @@ func (f *Framework) afterEach() {
Logf("Found DeleteNamespace=false, skipping namespace deletion!")
}
summaries := make([]TestDataSummary, 0)
if testContext.GatherKubeSystemResourceUsageData {
f.gatherer.stopAndPrintData([]int{50, 90, 99, 100}, f.addonResourceConstraints)
summaries = append(summaries, f.gatherer.stopAndSummarize([]int{50, 90, 99, 100}, f.addonResourceConstraints))
}
if testContext.GatherLogsSizes {
close(f.logsSizeCloseChannel)
f.logsSizeWaitGroup.Wait()
summaries = append(summaries, f.logsSizeVerifier.GetSummary())
}
outputTypes := strings.Split(testContext.OutputPrintType, ",")
for _, printType := range outputTypes {
switch printType {
case "hr":
for i := range summaries {
Logf(summaries[i].PrintHumanReadable())
}
case "json":
for i := range summaries {
Logf(summaries[i].PrintJSON())
}
default:
Logf("Unknown ouptut type: %v. Skipping.", printType)
}
}
if testContext.GatherMetricsAfterTest {

View File

@@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"sort"
"strconv"
@@ -39,8 +38,6 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
. "github.com/onsi/gomega"
)
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
@@ -269,52 +266,6 @@ func getOneTimeResourceUsageOnNode(
return usageMap, nil
}
func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
nodes, err := c.Nodes().List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
containerIDToNameMap := make(map[string]string)
containerIDs := make([]string, 0)
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
containerIDs = append(containerIDs, containerID)
}
}
mutex := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(nodes.Items))
errors := make([]error, 0)
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
for _, node := range nodes.Items {
go func(nodeName string) {
defer wg.Done()
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
errors = append(errors, err)
return
}
for k, v := range nodeUsage {
nameToUsageMap[containerIDToNameMap[k]] = v
}
}(node.Name)
}
wg.Wait()
if len(errors) != 0 {
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
}
return nameToUsageMap, nil
}
// logOneTimeResourceUsageSummary collects container resource for the list of
// nodes, formats and logs the stats.
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) {
@@ -361,139 +312,6 @@ type usageDataPerContainer struct {
memWorkSetData []int64
}
func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]resourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for _, singleStatistic := range timeSeries {
for name, data := range singleStatistic {
if dataMap[name] == nil {
dataMap[name] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)),
memUseData: make([]int64, len(timeSeries)),
memWorkSetData: make([]int64, len(timeSeries)),
}
}
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
}
}
for _, v := range dataMap {
sort.Float64s(v.cpuData)
sort.Sort(int64arr(v.memUseData))
sort.Sort(int64arr(v.memWorkSetData))
}
result := make(map[int]resourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(resourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &containerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
}
}
result[perc] = data
}
return result
}
type resourceConstraint struct {
cpuConstraint float64
memoryConstraint int64
}
type containerResourceGatherer struct {
usageTimeseries map[time.Time]resourceUsagePerContainer
stopCh chan struct{}
timer *time.Ticker
wg sync.WaitGroup
}
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
g.wg.Add(1)
g.stopCh = make(chan struct{})
g.timer = time.NewTicker(period)
go func() error {
for {
select {
case <-g.timer.C:
now := time.Now()
data, err := getKubeSystemContainersResourceUsage(c)
if err != nil {
return err
}
g.usageTimeseries[now] = data
case <-g.stopCh:
g.wg.Done()
return nil
}
}
}()
}
func (g *containerResourceGatherer) stopAndPrintData(percentiles []int, constraints map[string]resourceConstraint) {
close(g.stopCh)
g.timer.Stop()
g.wg.Wait()
if len(percentiles) == 0 {
Logf("Warning! Empty percentile list for stopAndPrintData.")
return
}
stats := computePercentiles(g.usageTimeseries, percentiles)
sortedKeys := []string{}
for name := range stats[percentiles[0]] {
sortedKeys = append(sortedKeys, name)
}
sort.Strings(sortedKeys)
violatedConstraints := make([]string, 0)
for _, perc := range percentiles {
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
for _, name := range sortedKeys {
usage := stats[perc][name]
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024))
// Verifying 99th percentile of resource usage
if perc == 99 {
// Name has a form: <pod_name>/<container_name>
containerName := strings.Split(name, "/")[1]
if constraint, ok := constraints[containerName]; ok {
if usage.CPUUsageInCores > constraint.cpuConstraint {
violatedConstraints = append(
violatedConstraints,
fmt.Sprintf("Container %v is using %v/%v CPU",
name,
usage.CPUUsageInCores,
constraint.cpuConstraint,
),
)
}
if usage.MemoryWorkingSetInBytes > constraint.memoryConstraint {
violatedConstraints = append(
violatedConstraints,
fmt.Sprintf("Container %v is using %v/%v MB of memory",
name,
float64(usage.MemoryWorkingSetInBytes)/(1024*1024),
float64(constraint.memoryConstraint)/(1024*1024),
),
)
}
}
}
}
w.Flush()
Logf("%v percentile:\n%v", perc, buf.String())
}
Expect(violatedConstraints).To(BeEmpty())
}
// Performs a get on a node proxy endpoint given the nodename and rest client.
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result {
return c.Get().

View File

@@ -75,8 +75,35 @@ type LogsSizeVerifier struct {
workers []*LogSizeGatherer
}
// node -> file -> data
type LogsSizeDataSummary map[string]map[string][]TimestampedSize
// TODO: make sure that we don't need locking here
func (s *LogsSizeDataSummary) PrintHumanReadable() string {
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n")
for k, v := range *s {
fmt.Fprintf(w, "%v\t\t\t\n", k)
for path, data := range v {
if len(data) > 1 {
last := data[len(data)-1]
first := data[0]
rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second)
fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, rate, len(data))
}
}
}
w.Flush()
return buf.String()
}
func (s *LogsSizeDataSummary) PrintJSON() string {
return "JSON printer not implemented for LogsSizeDataSummary"
}
type LogsSizeData struct {
data map[string]map[string][]TimestampedSize
data LogsSizeDataSummary
lock sync.Mutex
}
@@ -88,7 +115,7 @@ type WorkItem struct {
}
func prepareData(masterAddress string, nodeAddresses []string) LogsSizeData {
data := make(map[string]map[string][]TimestampedSize)
data := make(LogsSizeDataSummary)
ips := append(nodeAddresses, masterAddress)
for _, ip := range ips {
data[ip] = make(map[string][]TimestampedSize)
@@ -111,27 +138,6 @@ func (d *LogsSizeData) AddNewData(ip, path string, timestamp time.Time, size int
)
}
func (d *LogsSizeData) PrintData() string {
d.lock.Lock()
defer d.lock.Unlock()
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n")
for k, v := range d.data {
fmt.Fprintf(w, "%v\t\t\t\n", k)
for path, data := range v {
if len(data) > 1 {
last := data[len(data)-1]
first := data[0]
rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second)
fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, rate, len(data))
}
}
}
w.Flush()
return buf.String()
}
// NewLogsVerifier creates a new LogsSizeVerifier which will stop when stopChannel is closed
func NewLogsVerifier(c *client.Client, stopChannel chan bool) *LogsSizeVerifier {
nodeAddresses, err := NodeSSHHosts(c)
@@ -164,8 +170,8 @@ func NewLogsVerifier(c *client.Client, stopChannel chan bool) *LogsSizeVerifier
}
// PrintData returns a string with formated results
func (v *LogsSizeVerifier) PrintData() string {
return v.data.PrintData()
func (v *LogsSizeVerifier) GetSummary() *LogsSizeDataSummary {
return &v.data.data
}
// Run starts log size gathering. It starts a gorouting for every worker and then blocks until stopChannel is closed
@@ -185,8 +191,6 @@ func (v *LogsSizeVerifier) Run() {
}
<-v.stopChannel
v.wg.Wait()
Logf("\n%v", v.PrintData())
}
func (g *LogSizeGatherer) Run() {

View File

@@ -0,0 +1,242 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"bytes"
"fmt"
"math"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
const (
resourceDataGatheringPeriodSeconds = 60
)
type resourceConstraint struct {
cpuConstraint float64
memoryConstraint int64
}
type containerResourceGatherer struct {
usageTimeseries map[time.Time]resourceUsagePerContainer
stopCh chan struct{}
timer *time.Ticker
wg sync.WaitGroup
}
type singleContainerSummary struct {
name string
cpu float64
mem int64
}
type ResourceUsageSummary map[int][]singleContainerSummary
func (s *ResourceUsageSummary) PrintHumanReadable() string {
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
for perc, summaries := range *s {
buf.WriteString(fmt.Sprintf("%v percentile:\n", perc))
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
for _, summary := range summaries {
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", summary.name, summary.cpu, float64(summary.mem)/(1024*1024))
}
w.Flush()
}
return buf.String()
}
func (s *ResourceUsageSummary) PrintJSON() string {
return "JSON printer not implemented for ResourceUsageSummary"
}
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
g.wg.Add(1)
g.stopCh = make(chan struct{})
g.timer = time.NewTicker(period)
go func() error {
for {
select {
case <-g.timer.C:
now := time.Now()
data, err := g.getKubeSystemContainersResourceUsage(c)
if err != nil {
return err
}
g.usageTimeseries[now] = data
case <-g.stopCh:
g.wg.Done()
return nil
}
}
}()
}
func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary {
close(g.stopCh)
g.timer.Stop()
g.wg.Wait()
if len(percentiles) == 0 {
Logf("Warning! Empty percentile list for stopAndPrintData.")
return &ResourceUsageSummary{}
}
stats := g.computePercentiles(g.usageTimeseries, percentiles)
sortedKeys := []string{}
for name := range stats[percentiles[0]] {
sortedKeys = append(sortedKeys, name)
}
sort.Strings(sortedKeys)
violatedConstraints := make([]string, 0)
summary := make(ResourceUsageSummary)
for _, perc := range percentiles {
for _, name := range sortedKeys {
usage := stats[perc][name]
summary[perc] = append(summary[perc], singleContainerSummary{
name: name,
cpu: usage.CPUUsageInCores,
mem: usage.MemoryWorkingSetInBytes,
})
// Verifying 99th percentile of resource usage
if perc == 99 {
// Name has a form: <pod_name>/<container_name>
containerName := strings.Split(name, "/")[1]
if constraint, ok := constraints[containerName]; ok {
if usage.CPUUsageInCores > constraint.cpuConstraint {
violatedConstraints = append(
violatedConstraints,
fmt.Sprintf("Container %v is using %v/%v CPU",
name,
usage.CPUUsageInCores,
constraint.cpuConstraint,
),
)
}
if usage.MemoryWorkingSetInBytes > constraint.memoryConstraint {
violatedConstraints = append(
violatedConstraints,
fmt.Sprintf("Container %v is using %v/%v MB of memory",
name,
float64(usage.MemoryWorkingSetInBytes)/(1024*1024),
float64(constraint.memoryConstraint)/(1024*1024),
),
)
}
}
}
}
}
Expect(violatedConstraints).To(BeEmpty())
return &summary
}
func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]resourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for _, singleStatistic := range timeSeries {
for name, data := range singleStatistic {
if dataMap[name] == nil {
dataMap[name] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)),
memUseData: make([]int64, len(timeSeries)),
memWorkSetData: make([]int64, len(timeSeries)),
}
}
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
}
}
for _, v := range dataMap {
sort.Float64s(v.cpuData)
sort.Sort(int64arr(v.memUseData))
sort.Sort(int64arr(v.memWorkSetData))
}
result := make(map[int]resourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(resourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &containerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
}
}
result[perc] = data
}
return result
}
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
nodes, err := c.Nodes().List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
containerIDToNameMap := make(map[string]string)
containerIDs := make([]string, 0)
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
containerIDs = append(containerIDs, containerID)
}
}
mutex := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(nodes.Items))
errors := make([]error, 0)
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
for _, node := range nodes.Items {
go func(nodeName string) {
defer wg.Done()
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
errors = append(errors, err)
return
}
for k, v := range nodeUsage {
nameToUsageMap[containerIDToNameMap[k]] = v
}
}(node.Name)
}
wg.Wait()
if len(errors) != 0 {
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
}
return nameToUsageMap, nil
}

View File

@@ -154,6 +154,8 @@ type TestContextType struct {
GatherKubeSystemResourceUsageData bool
GatherLogsSizes bool
GatherMetricsAfterTest bool
// Currently supported values are 'hr' for human-readable and 'json'. It's a comma separated list.
OutputPrintType string
}
var testContext TestContextType