e2e: add resourceMontior to poll resource usages on the nodes

This change adds resourceMontior, which spawns a goroutine per node to poll the
container stats for known, relevant containers, computes the resource usage and
stores the data. Users can then examine the data in the buffer to get resource
of each individual containers.
This commit is contained in:
Yu-Ju Hong 2015-07-13 12:09:57 -07:00
parent bfd22a6974
commit 12a252bd8b
2 changed files with 255 additions and 60 deletions

View File

@ -34,8 +34,9 @@ import (
const ( const (
// Interval to poll /runningpods on a node // Interval to poll /runningpods on a node
pollInterval = 1 * time.Second pollInterval = 1 * time.Second
// Interval used compute cpu usage of a container // Interval to poll /stats/container on a node
cpuIntervalInSeconds = 60 containerStatsPollingInterval = 5 * time.Second
resourceCollectionTime = 1 * time.Minute
) )
// getPodMatches returns a set of pod names on the given node that matches the // getPodMatches returns a set of pod names on the given node that matches the
@ -87,10 +88,11 @@ func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, pod
}) })
} }
var _ = Describe("Clean up pods on node", func() { var _ = Describe("kubelet", func() {
var numNodes int var numNodes int
var nodeNames util.StringSet var nodeNames util.StringSet
framework := NewFramework("kubelet-delete") framework := NewFramework("kubelet")
var resourceMonitor *resourceMonitor
BeforeEach(func() { BeforeEach(func() {
nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything()) nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything())
@ -100,56 +102,73 @@ var _ = Describe("Clean up pods on node", func() {
for _, node := range nodes.Items { for _, node := range nodes.Items {
nodeNames.Insert(node.Name) nodeNames.Insert(node.Name)
} }
logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds) resourceMonitor = newResourceMonitor(framework.Client, targetContainers, containerStatsPollingInterval)
resourceMonitor.Start()
}) })
type DeleteTest struct { AfterEach(func() {
podsPerNode int resourceMonitor.Stop()
timeout time.Duration })
}
deleteTests := []DeleteTest{ Describe("Clean up pods on node", func() {
{podsPerNode: 10, timeout: 1 * time.Minute}, type DeleteTest struct {
} podsPerNode int
timeout time.Duration
}
deleteTests := []DeleteTest{
{podsPerNode: 10, timeout: 1 * time.Minute},
}
for _, itArg := range deleteTests {
name := fmt.Sprintf(
"kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
It(name, func() {
totalPods := itArg.podsPerNode * numNodes
By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID()))
for _, itArg := range deleteTests { Expect(RunRC(RCConfig{
name := fmt.Sprintf( Client: framework.Client,
"kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout) Name: rcName,
It(name, func() { Namespace: framework.Namespace.Name,
totalPods := itArg.podsPerNode * numNodes Image: "gcr.io/google_containers/pause:go",
Replicas: totalPods,
})).NotTo(HaveOccurred())
// Perform a sanity check so that we know all desired pods are
// running on the nodes according to kubelet. The timeout is set to
// only 30 seconds here because RunRC already waited for all pods to
// transition to the running status.
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods,
time.Second*30)).NotTo(HaveOccurred())
resourceMonitor.LogLatest()
By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) By("Deleting the RC")
rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID())) DeleteRC(framework.Client, framework.Namespace.Name, rcName)
// Check that the pods really are gone by querying /runningpods on the
// node. The /runningpods handler checks the container runtime (or its
// cache) and returns a list of running pods. Some possible causes of
// failures are:
// - kubelet deadlock
// - a bug in graceful termination (if it is enabled)
// - docker slow to delete pods (or resource problems causing slowness)
start := time.Now()
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0,
itArg.timeout)).NotTo(HaveOccurred())
Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
time.Since(start))
resourceMonitor.LogCPUSummary()
})
}
})
Expect(RunRC(RCConfig{ Describe("Monitor resource usage on node", func() {
Client: framework.Client, It("Ask kubelet to report container resource usage", func() {
Name: rcName, // TODO: After gathering some numbers, we should set a resource
Namespace: framework.Namespace.Name, // limit for each container and fail the test if the usage exceeds
Image: "gcr.io/google_containers/pause:go", // the preset limit.
Replicas: totalPods, By(fmt.Sprintf("Waiting %v to collect resource usage on node", resourceCollectionTime))
})).NotTo(HaveOccurred()) time.Sleep(resourceCollectionTime)
// Perform a sanity check so that we know all desired pods are resourceMonitor.LogLatest()
// running on the nodes according to kubelet. The timeout is set to resourceMonitor.LogCPUSummary()
// only 30 seconds here because RunRC already waited for all pods to
// transition to the running status.
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods,
time.Second*30)).NotTo(HaveOccurred())
logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds)
By("Deleting the RC")
DeleteRC(framework.Client, framework.Namespace.Name, rcName)
// Check that the pods really are gone by querying /runningpods on the
// node. The /runningpods handler checks the container runtime (or its
// cache) and returns a list of running pods. Some possible causes of
// failures are:
// - kubelet deadlock
// - a bug in graceful termination (if it is enabled)
// - docker slow to delete pods (or resource problems causing slowness)
start := time.Now()
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0,
itArg.timeout)).NotTo(HaveOccurred())
Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
time.Since(start))
}) })
} })
}) })

View File

@ -30,9 +30,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
cadvisor "github.com/google/cadvisor/info/v1" cadvisor "github.com/google/cadvisor/info/v1"
) )
@ -234,9 +237,6 @@ type containerResourceUsage struct {
// interval (and #containers) increases, the size of kubelet's response // interval (and #containers) increases, the size of kubelet's response
// could be sigificant. E.g., the 60s interval stats for ~20 containers is // could be sigificant. E.g., the 60s interval stats for ~20 containers is
// ~1.5MB. Don't hammer the node with frequent, heavy requests. // ~1.5MB. Don't hammer the node with frequent, heavy requests.
// TODO: Implement a constant, lightweight resource monitor, which polls
// kubelet every few second, stores the data, and reports meaningful statistics
// numbers over a longer period (e.g., max/mean cpu usage in the last hour).
// //
// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two // cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
// stats points to compute the cpu usage over the interval. Assuming cadvisor // stats points to compute the cpu usage over the interval. Assuming cadvisor
@ -267,14 +267,7 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
} }
first := info.Stats[0] first := info.Stats[0]
last := info.Stats[len(info.Stats)-1] last := info.Stats[len(info.Stats)-1]
usageMap[name] = &containerResourceUsage{ usageMap[name] = computeContainerResourceUsage(name, first, last)
Name: name,
Timestamp: last.Timestamp,
CPUUsageInCores: float64(last.Cpu.Usage.Total-first.Cpu.Usage.Total) / float64(last.Timestamp.Sub(first.Timestamp).Nanoseconds()),
MemoryUsageInBytes: int64(last.Memory.Usage),
MemoryWorkingSetInBytes: int64(last.Memory.WorkingSet),
CPUInterval: last.Timestamp.Sub(first.Timestamp),
}
} }
return usageMap, nil return usageMap, nil
} }
@ -358,3 +351,186 @@ func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
} }
return result, nil return result, nil
} }
func computeContainerResourceUsage(name string, oldStats, newStats *cadvisor.ContainerStats) *containerResourceUsage {
return &containerResourceUsage{
Name: name,
Timestamp: newStats.Timestamp,
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
MemoryUsageInBytes: int64(newStats.Memory.Usage),
MemoryWorkingSetInBytes: int64(newStats.Memory.WorkingSet),
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
}
}
// resourceCollector periodically polls the node, collect stats for a given
// list of containers, computes and cache resource usage up to
// maxEntriesPerContainer for each container.
type resourceCollector struct {
node string
containers []string
client *client.Client
buffers map[string][]*containerResourceUsage
pollingInterval time.Duration
stopCh chan struct{}
}
func newResourceCollector(c *client.Client, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
buffers := make(map[string][]*containerResourceUsage)
return &resourceCollector{
node: nodeName,
containers: containerNames,
client: c,
buffers: buffers,
pollingInterval: pollingInterval,
}
}
// Start starts a goroutine to poll the node every pollingInerval.
func (r *resourceCollector) Start() {
r.stopCh = make(chan struct{}, 1)
// Keep the last observed stats for comparison.
oldStats := make(map[string]*cadvisor.ContainerStats)
go util.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
}
// Stop sends a signal to terminate the stats collecting goroutine.
func (r *resourceCollector) Stop() {
close(r.stopCh)
}
// collectStats gets the latest stats from kubelet's /stats/container, computes
// the resource usage, and pushes it to the buffer.
func (r *resourceCollector) collectStats(oldStats map[string]*cadvisor.ContainerStats) {
infos, err := getContainerInfo(r.client, r.node, &kubelet.StatsRequest{
ContainerName: "/",
NumStats: 1,
Subcontainers: true,
})
if err != nil {
Logf("Error getting container info on %q, err: %v", r.node, err)
return
}
for _, name := range r.containers {
info, ok := infos[name]
if !ok || len(info.Stats) < 1 {
Logf("Missing info/stats for container %q on node %q", name, r.node)
return
}
if _, ok := oldStats[name]; ok {
r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats[name], info.Stats[0]))
}
oldStats[name] = info.Stats[0]
}
}
// LogLatest logs the latest resource usage of each container.
func (r *resourceCollector) LogLatest() {
stats := make(map[string]*containerResourceUsage)
for _, name := range r.containers {
s := r.buffers[name][len(r.buffers)-1]
if s == nil {
Logf("Resource usage on node %q is not ready yet", r.node)
return
}
stats[name] = s
}
Logf("\n%s", formatResourceUsageStats(r.node, stats))
}
type resourceUsageByCPU []*containerResourceUsage
func (r resourceUsageByCPU) Len() int { return len(r) }
func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
var percentiles = [...]float64{0.05, 0.50, 0.90, 0.95}
// GetBasicCPUStats returns the 5-th, 50-th, and 95-th, percentiles the cpu
// usage in cores for containerName. This method examines all data currently in the buffer.
func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
result := make(map[float64]float64, len(percentiles))
usages := r.buffers[containerName]
sort.Sort(resourceUsageByCPU(usages))
for _, q := range percentiles {
index := int(float64(len(usages))*q) - 1
if index < 0 {
index = 0
}
result[q] = usages[index].CPUUsageInCores
}
return result
}
// resourceMonitor manages a resourceCollector per node.
type resourceMonitor struct {
client *client.Client
containers []string
pollingInterval time.Duration
collectors map[string]*resourceCollector
}
func newResourceMonitor(c *client.Client, containerNames []string, pollingInterval time.Duration) *resourceMonitor {
return &resourceMonitor{
containers: containerNames,
client: c,
pollingInterval: pollingInterval,
}
}
func (r *resourceMonitor) Start() {
nodes, err := r.client.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
Failf("resourceMonitor: unable to get list of nodes: %v", err)
}
r.collectors = make(map[string]*resourceCollector, 0)
for _, node := range nodes.Items {
collector := newResourceCollector(r.client, node.Name, r.containers, pollInterval)
r.collectors[node.Name] = collector
collector.Start()
}
}
func (r *resourceMonitor) Stop() {
for _, collector := range r.collectors {
collector.Stop()
}
}
func (r *resourceMonitor) LogLatest() {
for _, collector := range r.collectors {
collector.LogLatest()
}
}
func (r *resourceMonitor) LogCPUSummary() {
// Example output for a node:
// CPU usage of containers on node "e2e-test-yjhong-minion-0vj7":
// container 5th% 50th% 90th% 95th%
// "/" 0.051 0.159 0.387 0.455
// "/docker-daemon" 0.000 0.000 0.146 0.166
// "/kubelet" 0.036 0.053 0.091 0.154
// "/kube-proxy" 0.017 0.000 0.000 0.000
// "/system" 0.001 0.001 0.001 0.002
var header []string
header = append(header, "container")
for _, p := range percentiles {
header = append(header, fmt.Sprintf("%.0fth%%", p*100))
}
for nodeName, collector := range r.collectors {
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
for _, containerName := range targetContainers {
data := collector.GetBasicCPUStats(containerName)
var s []string
s = append(s, fmt.Sprintf("%q", containerName))
for _, p := range percentiles {
s = append(s, fmt.Sprintf("%.3f", data[p]))
}
fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
}
w.Flush()
Logf("\nCPU usage of containers on node %q:\n%s", nodeName, buf.String())
}
}