From 0071a8627c9fedb2fd51ceffc12aa4e780543b52 Mon Sep 17 00:00:00 2001 From: gmarek Date: Fri, 15 Apr 2016 01:00:19 +0200 Subject: [PATCH] Make resource gatherer work for Kubemark --- test/e2e/framework/framework.go | 2 +- test/e2e/framework/resource_usage_gatherer.go | 121 ++++++++++++------ 2 files changed, 86 insertions(+), 37 deletions(-) diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 97794eedec2..ddacb38d111 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -142,7 +142,7 @@ func (f *Framework) BeforeEach() { } if TestContext.GatherKubeSystemResourceUsageData { - f.gatherer, err = NewResourceUsageGatherer(c) + f.gatherer, err = NewResourceUsageGatherer(c, ResourceGathererOptions{inKubemark: ProviderIs("kubemark")}) if err != nil { Logf("Error while creating NewResourceUsageGatherer: %v", err) } else { diff --git a/test/e2e/framework/resource_usage_gatherer.go b/test/e2e/framework/resource_usage_gatherer.go index d8365c0458e..067becbeffc 100644 --- a/test/e2e/framework/resource_usage_gatherer.go +++ b/test/e2e/framework/resource_usage_gatherer.go @@ -17,6 +17,7 @@ limitations under the License. package framework import ( + "bufio" "bytes" "fmt" "math" @@ -135,17 +136,26 @@ type resourceGatherWorker struct { stopCh chan struct{} dataSeries []ResourceUsagePerContainer finished bool + inKubemark bool } func (w *resourceGatherWorker) singleProbe() { - data := make(ResourceUsagePerContainer) - nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }, true) - if err != nil { - Logf("Error while reading data from %v: %v", w.nodeName, err) - return - } - for k, v := range nodeUsage { - data[w.containerIDToNameMap[k]] = v + var data ResourceUsagePerContainer + if w.inKubemark { + data = getKubemarkMasterComponentsResourceUsage() + if data == nil { + return + } + } else { + data = make(ResourceUsagePerContainer) + nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }, true) + if err != nil { + Logf("Error while reading data from %v: %v", w.nodeName, err) + return + } + for k, v := range nodeUsage { + data[w.containerIDToNameMap[k]] = v + } } w.dataSeries = append(w.dataSeries, data) } @@ -171,6 +181,28 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) { } } +func getKubemarkMasterComponentsResourceUsage() ResourceUsagePerContainer { + result := make(ResourceUsagePerContainer) + sshResult, err := SSH("ps ax -o %cpu,rss,command | tail -n +2 | grep kube", GetMasterHost()+":22", TestContext.Provider) + if err != nil { + Logf("Error when trying to SSH to master machine. Skipping probe") + return nil + } + scanner := bufio.NewScanner(strings.NewReader(sshResult.Stdout)) + for scanner.Scan() { + var cpu float64 + var mem uint64 + var name string + fmt.Sscanf(strings.TrimSpace(scanner.Text()), "%f %d kubernetes/server/bin/%s", &cpu, &mem, &name) + if name != "" { + // Gatherer expects pod_name/container_name format + fullName := name + "/" + name + result[fullName] = &ContainerResourceUsage{Name: fullName, MemoryWorkingSetInBytes: mem, CPUUsageInCores: cpu} + } + } + return result +} + func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) { delayPeriod := resourceDataGatheringPeriod / time.Duration(len(g.workers)) delay := time.Duration(0) @@ -188,45 +220,62 @@ type containerResourceGatherer struct { workerWg sync.WaitGroup containerIDToNameMap map[string]string containerIDs []string + options ResourceGathererOptions } -func NewResourceUsageGatherer(c *client.Client) (*containerResourceGatherer, error) { +type ResourceGathererOptions struct { + inKubemark bool +} + +func NewResourceUsageGatherer(c *client.Client, options ResourceGathererOptions) (*containerResourceGatherer, error) { g := containerResourceGatherer{ client: c, stopCh: make(chan struct{}), containerIDToNameMap: make(map[string]string), containerIDs: make([]string, 0), + options: options, } - pods, err := c.Pods("kube-system").List(api.ListOptions{}) - if err != nil { - Logf("Error while listing Pods: %v", err) - return nil, err - } - for _, pod := range pods.Items { - for _, container := range pod.Status.ContainerStatuses { - containerID := strings.TrimPrefix(container.ContainerID, "docker:/") - g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name - g.containerIDs = append(g.containerIDs, containerID) - } - } - nodeList, err := c.Nodes().List(api.ListOptions{}) - if err != nil { - Logf("Error while listing Nodes: %v", err) - return nil, err - } - - g.workerWg.Add(len(nodeList.Items)) - for _, node := range nodeList.Items { + if options.inKubemark { + g.workerWg.Add(1) g.workers = append(g.workers, resourceGatherWorker{ - c: c, - nodeName: node.Name, - wg: &g.workerWg, - containerIDToNameMap: g.containerIDToNameMap, - containerIDs: g.containerIDs, - stopCh: g.stopCh, - finished: false, + inKubemark: true, + stopCh: g.stopCh, + wg: &g.workerWg, + finished: false, }) + } else { + pods, err := c.Pods("kube-system").List(api.ListOptions{}) + if err != nil { + Logf("Error while listing Pods: %v", err) + return nil, err + } + for _, pod := range pods.Items { + for _, container := range pod.Status.ContainerStatuses { + containerID := strings.TrimPrefix(container.ContainerID, "docker:/") + g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name + g.containerIDs = append(g.containerIDs, containerID) + } + } + nodeList, err := c.Nodes().List(api.ListOptions{}) + if err != nil { + Logf("Error while listing Nodes: %v", err) + return nil, err + } + + g.workerWg.Add(len(nodeList.Items)) + for _, node := range nodeList.Items { + g.workers = append(g.workers, resourceGatherWorker{ + c: c, + nodeName: node.Name, + wg: &g.workerWg, + containerIDToNameMap: g.containerIDToNameMap, + containerIDs: g.containerIDs, + stopCh: g.stopCh, + finished: false, + inKubemark: false, + }) + } } return &g, nil }