Merge pull request #24291 from gmarek/gatherer

Automatic merge from submit-queue

Make resource gatherer work for Kubemark
This commit is contained in:
k8s-merge-robot 2016-04-23 02:53:58 -07:00
commit e61e396de7
2 changed files with 86 additions and 37 deletions

View File

@ -144,7 +144,7 @@ func (f *Framework) BeforeEach() {
}
if TestContext.GatherKubeSystemResourceUsageData {
f.gatherer, err = NewResourceUsageGatherer(c)
f.gatherer, err = NewResourceUsageGatherer(c, ResourceGathererOptions{inKubemark: ProviderIs("kubemark")})
if err != nil {
Logf("Error while creating NewResourceUsageGatherer: %v", err)
} else {

View File

@ -17,6 +17,7 @@ limitations under the License.
package framework
import (
"bufio"
"bytes"
"fmt"
"math"
@ -135,17 +136,26 @@ type resourceGatherWorker struct {
stopCh chan struct{}
dataSeries []ResourceUsagePerContainer
finished bool
inKubemark bool
}
func (w *resourceGatherWorker) singleProbe() {
data := make(ResourceUsagePerContainer)
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }, true)
if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err)
return
}
for k, v := range nodeUsage {
data[w.containerIDToNameMap[k]] = v
var data ResourceUsagePerContainer
if w.inKubemark {
data = getKubemarkMasterComponentsResourceUsage()
if data == nil {
return
}
} else {
data = make(ResourceUsagePerContainer)
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, probeDuration, func() []string { return w.containerIDs }, true)
if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err)
return
}
for k, v := range nodeUsage {
data[w.containerIDToNameMap[k]] = v
}
}
w.dataSeries = append(w.dataSeries, data)
}
@ -171,6 +181,28 @@ func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
}
}
func getKubemarkMasterComponentsResourceUsage() ResourceUsagePerContainer {
result := make(ResourceUsagePerContainer)
sshResult, err := SSH("ps ax -o %cpu,rss,command | tail -n +2 | grep kube", GetMasterHost()+":22", TestContext.Provider)
if err != nil {
Logf("Error when trying to SSH to master machine. Skipping probe")
return nil
}
scanner := bufio.NewScanner(strings.NewReader(sshResult.Stdout))
for scanner.Scan() {
var cpu float64
var mem uint64
var name string
fmt.Sscanf(strings.TrimSpace(scanner.Text()), "%f %d kubernetes/server/bin/%s", &cpu, &mem, &name)
if name != "" {
// Gatherer expects pod_name/container_name format
fullName := name + "/" + name
result[fullName] = &ContainerResourceUsage{Name: fullName, MemoryWorkingSetInBytes: mem, CPUUsageInCores: cpu}
}
}
return result
}
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) {
delayPeriod := resourceDataGatheringPeriod / time.Duration(len(g.workers))
delay := time.Duration(0)
@ -188,45 +220,62 @@ type containerResourceGatherer struct {
workerWg sync.WaitGroup
containerIDToNameMap map[string]string
containerIDs []string
options ResourceGathererOptions
}
func NewResourceUsageGatherer(c *client.Client) (*containerResourceGatherer, error) {
type ResourceGathererOptions struct {
inKubemark bool
}
func NewResourceUsageGatherer(c *client.Client, options ResourceGathererOptions) (*containerResourceGatherer, error) {
g := containerResourceGatherer{
client: c,
stopCh: make(chan struct{}),
containerIDToNameMap: make(map[string]string),
containerIDs: make([]string, 0),
options: options,
}
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
Logf("Error while listing Pods: %v", err)
return nil, err
}
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
g.containerIDs = append(g.containerIDs, containerID)
}
}
nodeList, err := c.Nodes().List(api.ListOptions{})
if err != nil {
Logf("Error while listing Nodes: %v", err)
return nil, err
}
g.workerWg.Add(len(nodeList.Items))
for _, node := range nodeList.Items {
if options.inKubemark {
g.workerWg.Add(1)
g.workers = append(g.workers, resourceGatherWorker{
c: c,
nodeName: node.Name,
wg: &g.workerWg,
containerIDToNameMap: g.containerIDToNameMap,
containerIDs: g.containerIDs,
stopCh: g.stopCh,
finished: false,
inKubemark: true,
stopCh: g.stopCh,
wg: &g.workerWg,
finished: false,
})
} else {
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
Logf("Error while listing Pods: %v", err)
return nil, err
}
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
g.containerIDs = append(g.containerIDs, containerID)
}
}
nodeList, err := c.Nodes().List(api.ListOptions{})
if err != nil {
Logf("Error while listing Nodes: %v", err)
return nil, err
}
g.workerWg.Add(len(nodeList.Items))
for _, node := range nodeList.Items {
g.workers = append(g.workers, resourceGatherWorker{
c: c,
nodeName: node.Name,
wg: &g.workerWg,
containerIDToNameMap: g.containerIDToNameMap,
containerIDs: g.containerIDs,
stopCh: g.stopCh,
finished: false,
inKubemark: false,
})
}
}
return &g, nil
}