mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
Merge pull request #11196 from yujuhong/resource_monitor
Auto commit by PR queue bot
This commit is contained in:
commit
1b6089a783
@ -34,8 +34,9 @@ import (
|
|||||||
const (
|
const (
|
||||||
// Interval to poll /runningpods on a node
|
// Interval to poll /runningpods on a node
|
||||||
pollInterval = 1 * time.Second
|
pollInterval = 1 * time.Second
|
||||||
// Interval used compute cpu usage of a container
|
// Interval to poll /stats/container on a node
|
||||||
cpuIntervalInSeconds = 60
|
containerStatsPollingInterval = 5 * time.Second
|
||||||
|
resourceCollectionTime = 1 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// getPodMatches returns a set of pod names on the given node that matches the
|
// getPodMatches returns a set of pod names on the given node that matches the
|
||||||
@ -87,10 +88,11 @@ func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, pod
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = Describe("Clean up pods on node", func() {
|
var _ = Describe("kubelet", func() {
|
||||||
var numNodes int
|
var numNodes int
|
||||||
var nodeNames util.StringSet
|
var nodeNames util.StringSet
|
||||||
framework := NewFramework("kubelet-delete")
|
framework := NewFramework("kubelet")
|
||||||
|
var resourceMonitor *resourceMonitor
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything())
|
nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
@ -100,24 +102,27 @@ var _ = Describe("Clean up pods on node", func() {
|
|||||||
for _, node := range nodes.Items {
|
for _, node := range nodes.Items {
|
||||||
nodeNames.Insert(node.Name)
|
nodeNames.Insert(node.Name)
|
||||||
}
|
}
|
||||||
logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds)
|
resourceMonitor = newResourceMonitor(framework.Client, targetContainers, containerStatsPollingInterval)
|
||||||
|
resourceMonitor.Start()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
resourceMonitor.Stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Clean up pods on node", func() {
|
||||||
type DeleteTest struct {
|
type DeleteTest struct {
|
||||||
podsPerNode int
|
podsPerNode int
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteTests := []DeleteTest{
|
deleteTests := []DeleteTest{
|
||||||
{podsPerNode: 10, timeout: 1 * time.Minute},
|
{podsPerNode: 10, timeout: 1 * time.Minute},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, itArg := range deleteTests {
|
for _, itArg := range deleteTests {
|
||||||
name := fmt.Sprintf(
|
name := fmt.Sprintf(
|
||||||
"kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
|
"kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
|
||||||
It(name, func() {
|
It(name, func() {
|
||||||
totalPods := itArg.podsPerNode * numNodes
|
totalPods := itArg.podsPerNode * numNodes
|
||||||
|
|
||||||
By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
|
By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
|
||||||
rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID()))
|
rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID()))
|
||||||
|
|
||||||
@ -134,7 +139,7 @@ var _ = Describe("Clean up pods on node", func() {
|
|||||||
// transition to the running status.
|
// transition to the running status.
|
||||||
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods,
|
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods,
|
||||||
time.Second*30)).NotTo(HaveOccurred())
|
time.Second*30)).NotTo(HaveOccurred())
|
||||||
logOneTimeResourceUsageSummary(framework.Client, nodeNames.List(), cpuIntervalInSeconds)
|
resourceMonitor.LogLatest()
|
||||||
|
|
||||||
By("Deleting the RC")
|
By("Deleting the RC")
|
||||||
DeleteRC(framework.Client, framework.Namespace.Name, rcName)
|
DeleteRC(framework.Client, framework.Namespace.Name, rcName)
|
||||||
@ -150,6 +155,20 @@ var _ = Describe("Clean up pods on node", func() {
|
|||||||
itArg.timeout)).NotTo(HaveOccurred())
|
itArg.timeout)).NotTo(HaveOccurred())
|
||||||
Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
|
Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
|
||||||
time.Since(start))
|
time.Since(start))
|
||||||
|
resourceMonitor.LogCPUSummary()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("Monitor resource usage on node", func() {
|
||||||
|
It("Ask kubelet to report container resource usage", func() {
|
||||||
|
// TODO: After gathering some numbers, we should set a resource
|
||||||
|
// limit for each container and fail the test if the usage exceeds
|
||||||
|
// the preset limit.
|
||||||
|
By(fmt.Sprintf("Waiting %v to collect resource usage on node", resourceCollectionTime))
|
||||||
|
time.Sleep(resourceCollectionTime)
|
||||||
|
resourceMonitor.LogLatest()
|
||||||
|
resourceMonitor.LogCPUSummary()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
@ -30,8 +30,10 @@ import (
|
|||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
cadvisor "github.com/google/cadvisor/info/v1"
|
cadvisor "github.com/google/cadvisor/info/v1"
|
||||||
@ -200,9 +202,6 @@ type containerResourceUsage struct {
|
|||||||
// interval (and #containers) increases, the size of kubelet's response
|
// interval (and #containers) increases, the size of kubelet's response
|
||||||
// could be sigificant. E.g., the 60s interval stats for ~20 containers is
|
// could be sigificant. E.g., the 60s interval stats for ~20 containers is
|
||||||
// ~1.5MB. Don't hammer the node with frequent, heavy requests.
|
// ~1.5MB. Don't hammer the node with frequent, heavy requests.
|
||||||
// TODO: Implement a constant, lightweight resource monitor, which polls
|
|
||||||
// kubelet every few second, stores the data, and reports meaningful statistics
|
|
||||||
// numbers over a longer period (e.g., max/mean cpu usage in the last hour).
|
|
||||||
//
|
//
|
||||||
// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
|
// cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
|
||||||
// stats points to compute the cpu usage over the interval. Assuming cadvisor
|
// stats points to compute the cpu usage over the interval. Assuming cadvisor
|
||||||
@ -233,14 +232,7 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
|
|||||||
}
|
}
|
||||||
first := info.Stats[0]
|
first := info.Stats[0]
|
||||||
last := info.Stats[len(info.Stats)-1]
|
last := info.Stats[len(info.Stats)-1]
|
||||||
usageMap[name] = &containerResourceUsage{
|
usageMap[name] = computeContainerResourceUsage(name, first, last)
|
||||||
Name: name,
|
|
||||||
Timestamp: last.Timestamp,
|
|
||||||
CPUUsageInCores: float64(last.Cpu.Usage.Total-first.Cpu.Usage.Total) / float64(last.Timestamp.Sub(first.Timestamp).Nanoseconds()),
|
|
||||||
MemoryUsageInBytes: int64(last.Memory.Usage),
|
|
||||||
MemoryWorkingSetInBytes: int64(last.Memory.WorkingSet),
|
|
||||||
CPUInterval: last.Timestamp.Sub(first.Timestamp),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return usageMap, nil
|
return usageMap, nil
|
||||||
}
|
}
|
||||||
@ -324,3 +316,186 @@ func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func computeContainerResourceUsage(name string, oldStats, newStats *cadvisor.ContainerStats) *containerResourceUsage {
|
||||||
|
return &containerResourceUsage{
|
||||||
|
Name: name,
|
||||||
|
Timestamp: newStats.Timestamp,
|
||||||
|
CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
|
||||||
|
MemoryUsageInBytes: int64(newStats.Memory.Usage),
|
||||||
|
MemoryWorkingSetInBytes: int64(newStats.Memory.WorkingSet),
|
||||||
|
CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// resourceCollector periodically polls the node, collect stats for a given
|
||||||
|
// list of containers, computes and cache resource usage up to
|
||||||
|
// maxEntriesPerContainer for each container.
|
||||||
|
type resourceCollector struct {
|
||||||
|
node string
|
||||||
|
containers []string
|
||||||
|
client *client.Client
|
||||||
|
buffers map[string][]*containerResourceUsage
|
||||||
|
pollingInterval time.Duration
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newResourceCollector(c *client.Client, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
|
||||||
|
buffers := make(map[string][]*containerResourceUsage)
|
||||||
|
return &resourceCollector{
|
||||||
|
node: nodeName,
|
||||||
|
containers: containerNames,
|
||||||
|
client: c,
|
||||||
|
buffers: buffers,
|
||||||
|
pollingInterval: pollingInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts a goroutine to poll the node every pollingInerval.
|
||||||
|
func (r *resourceCollector) Start() {
|
||||||
|
r.stopCh = make(chan struct{}, 1)
|
||||||
|
// Keep the last observed stats for comparison.
|
||||||
|
oldStats := make(map[string]*cadvisor.ContainerStats)
|
||||||
|
go util.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop sends a signal to terminate the stats collecting goroutine.
|
||||||
|
func (r *resourceCollector) Stop() {
|
||||||
|
close(r.stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectStats gets the latest stats from kubelet's /stats/container, computes
|
||||||
|
// the resource usage, and pushes it to the buffer.
|
||||||
|
func (r *resourceCollector) collectStats(oldStats map[string]*cadvisor.ContainerStats) {
|
||||||
|
infos, err := getContainerInfo(r.client, r.node, &kubelet.StatsRequest{
|
||||||
|
ContainerName: "/",
|
||||||
|
NumStats: 1,
|
||||||
|
Subcontainers: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
Logf("Error getting container info on %q, err: %v", r.node, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, name := range r.containers {
|
||||||
|
info, ok := infos[name]
|
||||||
|
if !ok || len(info.Stats) < 1 {
|
||||||
|
Logf("Missing info/stats for container %q on node %q", name, r.node)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := oldStats[name]; ok {
|
||||||
|
r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats[name], info.Stats[0]))
|
||||||
|
}
|
||||||
|
oldStats[name] = info.Stats[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogLatest logs the latest resource usage of each container.
|
||||||
|
func (r *resourceCollector) LogLatest() {
|
||||||
|
stats := make(map[string]*containerResourceUsage)
|
||||||
|
for _, name := range r.containers {
|
||||||
|
s := r.buffers[name][len(r.buffers)-1]
|
||||||
|
if s == nil {
|
||||||
|
Logf("Resource usage on node %q is not ready yet", r.node)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stats[name] = s
|
||||||
|
}
|
||||||
|
Logf("\n%s", formatResourceUsageStats(r.node, stats))
|
||||||
|
}
|
||||||
|
|
||||||
|
type resourceUsageByCPU []*containerResourceUsage
|
||||||
|
|
||||||
|
func (r resourceUsageByCPU) Len() int { return len(r) }
|
||||||
|
func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
|
||||||
|
func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
|
||||||
|
|
||||||
|
var percentiles = [...]float64{0.05, 0.50, 0.90, 0.95}
|
||||||
|
|
||||||
|
// GetBasicCPUStats returns the 5-th, 50-th, and 95-th, percentiles the cpu
|
||||||
|
// usage in cores for containerName. This method examines all data currently in the buffer.
|
||||||
|
func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
|
||||||
|
result := make(map[float64]float64, len(percentiles))
|
||||||
|
usages := r.buffers[containerName]
|
||||||
|
sort.Sort(resourceUsageByCPU(usages))
|
||||||
|
for _, q := range percentiles {
|
||||||
|
index := int(float64(len(usages))*q) - 1
|
||||||
|
if index < 0 {
|
||||||
|
index = 0
|
||||||
|
}
|
||||||
|
result[q] = usages[index].CPUUsageInCores
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// resourceMonitor manages a resourceCollector per node.
|
||||||
|
type resourceMonitor struct {
|
||||||
|
client *client.Client
|
||||||
|
containers []string
|
||||||
|
pollingInterval time.Duration
|
||||||
|
collectors map[string]*resourceCollector
|
||||||
|
}
|
||||||
|
|
||||||
|
func newResourceMonitor(c *client.Client, containerNames []string, pollingInterval time.Duration) *resourceMonitor {
|
||||||
|
return &resourceMonitor{
|
||||||
|
containers: containerNames,
|
||||||
|
client: c,
|
||||||
|
pollingInterval: pollingInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceMonitor) Start() {
|
||||||
|
nodes, err := r.client.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
Failf("resourceMonitor: unable to get list of nodes: %v", err)
|
||||||
|
}
|
||||||
|
r.collectors = make(map[string]*resourceCollector, 0)
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
collector := newResourceCollector(r.client, node.Name, r.containers, pollInterval)
|
||||||
|
r.collectors[node.Name] = collector
|
||||||
|
collector.Start()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceMonitor) Stop() {
|
||||||
|
for _, collector := range r.collectors {
|
||||||
|
collector.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceMonitor) LogLatest() {
|
||||||
|
for _, collector := range r.collectors {
|
||||||
|
collector.LogLatest()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceMonitor) LogCPUSummary() {
|
||||||
|
// Example output for a node:
|
||||||
|
// CPU usage of containers on node "e2e-test-yjhong-minion-0vj7":
|
||||||
|
// container 5th% 50th% 90th% 95th%
|
||||||
|
// "/" 0.051 0.159 0.387 0.455
|
||||||
|
// "/docker-daemon" 0.000 0.000 0.146 0.166
|
||||||
|
// "/kubelet" 0.036 0.053 0.091 0.154
|
||||||
|
// "/kube-proxy" 0.017 0.000 0.000 0.000
|
||||||
|
// "/system" 0.001 0.001 0.001 0.002
|
||||||
|
var header []string
|
||||||
|
header = append(header, "container")
|
||||||
|
for _, p := range percentiles {
|
||||||
|
header = append(header, fmt.Sprintf("%.0fth%%", p*100))
|
||||||
|
}
|
||||||
|
for nodeName, collector := range r.collectors {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
|
||||||
|
fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
|
||||||
|
for _, containerName := range targetContainers {
|
||||||
|
data := collector.GetBasicCPUStats(containerName)
|
||||||
|
var s []string
|
||||||
|
s = append(s, fmt.Sprintf("%q", containerName))
|
||||||
|
for _, p := range percentiles {
|
||||||
|
s = append(s, fmt.Sprintf("%.3f", data[p]))
|
||||||
|
}
|
||||||
|
fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
|
||||||
|
}
|
||||||
|
w.Flush()
|
||||||
|
Logf("\nCPU usage of containers on node %q:\n%s", nodeName, buf.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user