mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #16505 from gmarek/addon_monitoring
Auto commit by PR queue bot
This commit is contained in:
commit
62307e03d5
@ -112,6 +112,7 @@ var _ = Describe("Density", func() {
|
|||||||
|
|
||||||
framework := NewFramework("density")
|
framework := NewFramework("density")
|
||||||
framework.NamespaceDeletionTimeout = time.Hour
|
framework.NamespaceDeletionTimeout = time.Hour
|
||||||
|
framework.GatherKubeSystemResourceUsageData = true
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
c = framework.Client
|
c = framework.Client
|
||||||
|
@ -39,6 +39,11 @@ type Framework struct {
|
|||||||
Namespace *api.Namespace
|
Namespace *api.Namespace
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
NamespaceDeletionTimeout time.Duration
|
NamespaceDeletionTimeout time.Duration
|
||||||
|
|
||||||
|
// If set to true framework will start a goroutine monitoring resource usage of system add-ons.
|
||||||
|
// It will read the data every 30 seconds from all Nodes and print summary during afterEach.
|
||||||
|
GatherKubeSystemResourceUsageData bool
|
||||||
|
gatherer containerResourceGatherer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
|
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
|
||||||
@ -75,6 +80,10 @@ func (f *Framework) beforeEach() {
|
|||||||
} else {
|
} else {
|
||||||
Logf("Skipping waiting for service account")
|
Logf("Skipping waiting for service account")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if f.GatherKubeSystemResourceUsageData {
|
||||||
|
f.gatherer.startGatheringData(c, time.Minute)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// afterEach deletes the namespace, after reading its events.
|
// afterEach deletes the namespace, after reading its events.
|
||||||
@ -113,6 +122,10 @@ func (f *Framework) afterEach() {
|
|||||||
} else {
|
} else {
|
||||||
Logf("Found DeleteNamespace=false, skipping namespace deletion!")
|
Logf("Found DeleteNamespace=false, skipping namespace deletion!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if f.GatherKubeSystemResourceUsageData {
|
||||||
|
f.gatherer.stopAndPrintData([]int{50, 90, 99, 100})
|
||||||
|
}
|
||||||
// Paranoia-- prevent reuse!
|
// Paranoia-- prevent reuse!
|
||||||
f.Namespace = nil
|
f.Namespace = nil
|
||||||
f.Client = nil
|
f.Client = nil
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -209,8 +210,10 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa
|
|||||||
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryUsageInBytes > rhs.MemoryUsageInBytes && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
|
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryUsageInBytes > rhs.MemoryUsageInBytes && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type resourceUsagePerContainer map[string]*containerResourceUsage
|
||||||
|
|
||||||
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
|
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
|
||||||
// and returns the resource usage of targetContainers for the past
|
// and returns the resource usage of all containerNames for the past
|
||||||
// cpuInterval.
|
// cpuInterval.
|
||||||
// The acceptable range of the interval is 2s~120s. Be warned that as the
|
// The acceptable range of the interval is 2s~120s. Be warned that as the
|
||||||
// interval (and #containers) increases, the size of kubelet's response
|
// interval (and #containers) increases, the size of kubelet's response
|
||||||
@ -223,7 +226,19 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa
|
|||||||
// Note that this is an approximation and may not be accurate, hence we also
|
// Note that this is an approximation and may not be accurate, hence we also
|
||||||
// write the actual interval used for calculation (based on the timestamps of
|
// write the actual interval used for calculation (based on the timestamps of
|
||||||
// the stats points in containerResourceUsage.CPUInterval.
|
// the stats points in containerResourceUsage.CPUInterval.
|
||||||
func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterval time.Duration) (map[string]*containerResourceUsage, error) {
|
//
|
||||||
|
// containerNames is a function returning a collection of contianer names in which
|
||||||
|
// user is interested in. ExpectMissingContainers is a flag which says if the test
|
||||||
|
// should fail if one of containers listed by containerNames is missing on any node
|
||||||
|
// (useful e.g. when looking for system containers or daemons). If set to true function
|
||||||
|
// is more forgiving and ignores missing containers.
|
||||||
|
func getOneTimeResourceUsageOnNode(
|
||||||
|
c *client.Client,
|
||||||
|
nodeName string,
|
||||||
|
cpuInterval time.Duration,
|
||||||
|
containerNames func() []string,
|
||||||
|
expectMissingContainers bool,
|
||||||
|
) (resourceUsagePerContainer, error) {
|
||||||
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
|
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
|
||||||
if numStats < 2 || numStats > maxNumStatsToRequest {
|
if numStats < 2 || numStats > maxNumStatsToRequest {
|
||||||
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
|
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
|
||||||
@ -238,12 +253,15 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Process container infos that are relevant to us.
|
// Process container infos that are relevant to us.
|
||||||
containers := targetContainers()
|
containers := containerNames()
|
||||||
usageMap := make(map[string]*containerResourceUsage, len(containers))
|
usageMap := make(resourceUsagePerContainer, len(containers))
|
||||||
for _, name := range containers {
|
for _, name := range containers {
|
||||||
info, ok := containerInfos[name]
|
info, ok := containerInfos[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName)
|
if !expectMissingContainers {
|
||||||
|
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName)
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
first := info.Stats[0]
|
first := info.Stats[0]
|
||||||
last := info.Stats[len(info.Stats)-1]
|
last := info.Stats[len(info.Stats)-1]
|
||||||
@ -252,12 +270,58 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
|
|||||||
return usageMap, nil
|
return usageMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
|
||||||
|
pods, err := c.Pods("kube-system").List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return resourceUsagePerContainer{}, err
|
||||||
|
}
|
||||||
|
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return resourceUsagePerContainer{}, err
|
||||||
|
}
|
||||||
|
containerIDToNameMap := make(map[string]string)
|
||||||
|
containerIDs := make([]string, 0)
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
for _, container := range pod.Status.ContainerStatuses {
|
||||||
|
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
|
||||||
|
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
|
||||||
|
containerIDs = append(containerIDs, containerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mutex := sync.Mutex{}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(len(nodes.Items))
|
||||||
|
errors := make([]error, 0)
|
||||||
|
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
go func(nodeName string) {
|
||||||
|
defer wg.Done()
|
||||||
|
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 5*time.Second, func() []string { return containerIDs }, true)
|
||||||
|
mutex.Lock()
|
||||||
|
defer mutex.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for k, v := range nodeUsage {
|
||||||
|
nameToUsageMap[containerIDToNameMap[k]] = v
|
||||||
|
}
|
||||||
|
}(node.Name)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if len(errors) != 0 {
|
||||||
|
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
|
||||||
|
}
|
||||||
|
return nameToUsageMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
// logOneTimeResourceUsageSummary collects container resource for the list of
|
// logOneTimeResourceUsageSummary collects container resource for the list of
|
||||||
// nodes, formats and logs the stats.
|
// nodes, formats and logs the stats.
|
||||||
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) {
|
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) {
|
||||||
var summary []string
|
var summary []string
|
||||||
for _, nodeName := range nodeNames {
|
for _, nodeName := range nodeNames {
|
||||||
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval)
|
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval, targetContainers, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err))
|
summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err))
|
||||||
} else {
|
} else {
|
||||||
@ -267,7 +331,7 @@ func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInt
|
|||||||
Logf("\n%s", strings.Join(summary, "\n"))
|
Logf("\n%s", strings.Join(summary, "\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func formatResourceUsageStats(nodeName string, containerStats map[string]*containerResourceUsage) string {
|
func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string {
|
||||||
// Example output:
|
// Example output:
|
||||||
//
|
//
|
||||||
// Resource usage for node "e2e-test-foo-minion-abcde":
|
// Resource usage for node "e2e-test-foo-minion-abcde":
|
||||||
@ -287,6 +351,120 @@ func formatResourceUsageStats(nodeName string, containerStats map[string]*contai
|
|||||||
return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
|
return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type int64arr []int64
|
||||||
|
|
||||||
|
func (a int64arr) Len() int { return len(a) }
|
||||||
|
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] }
|
||||||
|
|
||||||
|
type usageDataPerContainer struct {
|
||||||
|
cpuData []float64
|
||||||
|
memUseData []int64
|
||||||
|
memWorkSetData []int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
|
||||||
|
if len(timeSeries) == 0 {
|
||||||
|
return make(map[int]resourceUsagePerContainer)
|
||||||
|
}
|
||||||
|
dataMap := make(map[string]*usageDataPerContainer)
|
||||||
|
for _, v := range timeSeries {
|
||||||
|
for k := range v {
|
||||||
|
dataMap[k] = &usageDataPerContainer{
|
||||||
|
cpuData: make([]float64, len(timeSeries)),
|
||||||
|
memUseData: make([]int64, len(timeSeries)),
|
||||||
|
memWorkSetData: make([]int64, len(timeSeries)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, singleStatistic := range timeSeries {
|
||||||
|
for name, data := range singleStatistic {
|
||||||
|
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
|
||||||
|
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
|
||||||
|
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range dataMap {
|
||||||
|
sort.Float64s(v.cpuData)
|
||||||
|
sort.Sort(int64arr(v.memUseData))
|
||||||
|
sort.Sort(int64arr(v.memWorkSetData))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(map[int]resourceUsagePerContainer)
|
||||||
|
for _, perc := range percentilesToCompute {
|
||||||
|
data := make(resourceUsagePerContainer)
|
||||||
|
for k, v := range dataMap {
|
||||||
|
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
|
||||||
|
data[k] = &containerResourceUsage{
|
||||||
|
Name: k,
|
||||||
|
CPUUsageInCores: v.cpuData[percentileIndex],
|
||||||
|
MemoryUsageInBytes: v.memUseData[percentileIndex],
|
||||||
|
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result[perc] = data
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type containerResourceGatherer struct {
|
||||||
|
usageTimeseries map[time.Time]resourceUsagePerContainer
|
||||||
|
stopCh chan struct{}
|
||||||
|
timer *time.Ticker
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
|
||||||
|
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
|
||||||
|
g.wg.Add(1)
|
||||||
|
g.stopCh = make(chan struct{})
|
||||||
|
g.timer = time.NewTicker(period)
|
||||||
|
go func() error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-g.timer.C:
|
||||||
|
now := time.Now()
|
||||||
|
data, err := getKubeSystemContainersResourceUsage(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.usageTimeseries[now] = data
|
||||||
|
case <-g.stopCh:
|
||||||
|
g.wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *containerResourceGatherer) stopAndPrintData(percentiles []int) {
|
||||||
|
close(g.stopCh)
|
||||||
|
g.timer.Stop()
|
||||||
|
g.wg.Wait()
|
||||||
|
if len(percentiles) == 0 {
|
||||||
|
Logf("Warning! Empty percentile list for stopAndPrintData.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stats := computePercentiles(g.usageTimeseries, percentiles)
|
||||||
|
sortedKeys := []string{}
|
||||||
|
for name := range stats[percentiles[0]] {
|
||||||
|
sortedKeys = append(sortedKeys, name)
|
||||||
|
}
|
||||||
|
sort.Strings(sortedKeys)
|
||||||
|
for _, perc := range percentiles {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
|
||||||
|
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
|
||||||
|
for _, name := range sortedKeys {
|
||||||
|
usage := stats[perc][name]
|
||||||
|
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024))
|
||||||
|
}
|
||||||
|
w.Flush()
|
||||||
|
Logf("%v percentile:\n%v", perc, buf.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Performs a get on a node proxy endpoint given the nodename and rest client.
|
// Performs a get on a node proxy endpoint given the nodename and rest client.
|
||||||
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result {
|
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result {
|
||||||
return c.Get().
|
return c.Get().
|
||||||
|
@ -110,7 +110,8 @@ func readLatencyMetrics(c *client.Client) (APIResponsiveness, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ignoredResources := sets.NewString("events")
|
ignoredResources := sets.NewString("events")
|
||||||
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY")
|
// TODO: figure out why we're getting non-capitalized proxy and fix this.
|
||||||
|
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY", "proxy")
|
||||||
|
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
// Example line:
|
// Example line:
|
||||||
|
@ -29,8 +29,6 @@ import (
|
|||||||
|
|
||||||
const datapointAmount = 5
|
const datapointAmount = 5
|
||||||
|
|
||||||
type resourceUsagePerContainer map[string]*containerResourceUsage
|
|
||||||
|
|
||||||
var systemContainers = []string{"/docker-daemon", "/kubelet", "/kube-proxy", "/system"}
|
var systemContainers = []string{"/docker-daemon", "/kubelet", "/kube-proxy", "/system"}
|
||||||
|
|
||||||
//TODO tweak those values.
|
//TODO tweak those values.
|
||||||
@ -102,7 +100,13 @@ var _ = Describe("Resource usage of system containers", func() {
|
|||||||
|
|
||||||
for i := 0; i < datapointAmount; i++ {
|
for i := 0; i < datapointAmount; i++ {
|
||||||
for _, node := range nodeList.Items {
|
for _, node := range nodeList.Items {
|
||||||
resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second)
|
resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second, func() []string {
|
||||||
|
if providerIs("gce", "gke") {
|
||||||
|
return systemContainers
|
||||||
|
} else {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
}, false)
|
||||||
expectNoError(err)
|
expectNoError(err)
|
||||||
resourceUsagePerNode[node.Name] = append(resourceUsagePerNode[node.Name], resourceUsage)
|
resourceUsagePerNode[node.Name] = append(resourceUsagePerNode[node.Name], resourceUsage)
|
||||||
}
|
}
|
||||||
@ -119,6 +123,9 @@ var _ = Describe("Resource usage of system containers", func() {
|
|||||||
for container, cUsage := range usage {
|
for container, cUsage := range usage {
|
||||||
Logf("%v on %v usage: %#v", container, node, cUsage)
|
Logf("%v on %v usage: %#v", container, node, cUsage)
|
||||||
if !allowedUsage[container].isStrictlyGreaterThan(cUsage) {
|
if !allowedUsage[container].isStrictlyGreaterThan(cUsage) {
|
||||||
|
if _, ok := violating[node]; !ok {
|
||||||
|
violating[node] = make(resourceUsagePerContainer)
|
||||||
|
}
|
||||||
if allowedUsage[container].CPUUsageInCores < cUsage.CPUUsageInCores {
|
if allowedUsage[container].CPUUsageInCores < cUsage.CPUUsageInCores {
|
||||||
Logf("CPU is too high for %s (%v)", container, cUsage.CPUUsageInCores)
|
Logf("CPU is too high for %s (%v)", container, cUsage.CPUUsageInCores)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user