Merge pull request #97042 from joelsmith/eviction

Measure/compute used ephemeral storage in stats provider, not eviction manager
This commit is contained in:
Kubernetes Prow Robot 2020-12-14 08:47:30 -08:00 committed by GitHub
commit ac101cbdda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 343 additions and 264 deletions

View File

@ -59,11 +59,9 @@ go_library(
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -26,7 +26,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
@ -99,8 +98,6 @@ type managerImpl struct {
thresholdNotifiers []ThresholdNotifier
// thresholdsLastUpdated is the last time the thresholdNotifiers were updated.
thresholdsLastUpdated time.Time
// etcHostsPath is a function that will get the etc-hosts file's path for a pod given its UID
etcHostsPath func(podUID types.UID) string
}
// ensure it implements the required interface
@ -117,7 +114,6 @@ func NewManager(
recorder record.EventRecorder,
nodeRef *v1.ObjectReference,
clock clock.Clock,
etcHostsPath func(types.UID) string,
) (Manager, lifecycle.PodAdmitHandler) {
manager := &managerImpl{
clock: clock,
@ -133,7 +129,6 @@ func NewManager(
thresholdsFirstObservedAt: thresholdsObservedAt{},
dedicatedImageFs: nil,
thresholdNotifiers: []ThresholdNotifier{},
etcHostsPath: etcHostsPath,
}
return manager, manager
}
@ -512,20 +507,11 @@ func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStat
return false
}
// pod stats api summarizes ephemeral storage usage (container, emptyDir, host[etc-hosts, logs])
podEphemeralStorageTotalUsage := &resource.Quantity{}
var fsStatsSet []fsStatsType
if *m.dedicatedImageFs {
fsStatsSet = []fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}
} else {
fsStatsSet = []fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}
if podStats.EphemeralStorage != nil {
podEphemeralStorageTotalUsage = resource.NewQuantity(int64(*podStats.EphemeralStorage.UsedBytes), resource.BinarySI)
}
podEphemeralUsage, err := podLocalEphemeralStorageUsage(podStats, pod, fsStatsSet, m.etcHostsPath(pod.UID))
if err != nil {
klog.Errorf("eviction manager: error getting pod disk usage %v", err)
return false
}
podEphemeralStorageTotalUsage.Add(podEphemeralUsage[v1.ResourceEphemeralStorage])
podEphemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
if podEphemeralStorageTotalUsage.Cmp(podEphemeralStorageLimit) > 0 {
// the total usage of pod exceeds the total size limit of containers, evict the pod

View File

@ -18,7 +18,6 @@ package eviction
import (
"fmt"
"os"
"sort"
"strconv"
"strings"
@ -32,7 +31,6 @@ import (
v1resource "k8s.io/kubernetes/pkg/api/v1/resource"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
volumeutils "k8s.io/kubernetes/pkg/volume/util"
)
const (
@ -413,44 +411,6 @@ func podDiskUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsSt
}, nil
}
// localEphemeralVolumeNames returns the set of ephemeral volumes for the pod that are local
func localEphemeralVolumeNames(pod *v1.Pod) []string {
result := []string{}
for _, volume := range pod.Spec.Volumes {
if volumeutils.IsLocalEphemeralVolume(volume) {
result = append(result, volume.Name)
}
}
return result
}
// podLocalEphemeralStorageUsage aggregates pod local ephemeral storage usage and inode consumption for the specified stats to measure.
func podLocalEphemeralStorageUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsStatsType, etcHostsPath string) (v1.ResourceList, error) {
disk := resource.Quantity{Format: resource.BinarySI}
inodes := resource.Quantity{Format: resource.DecimalSI}
containerUsageList := containerUsage(podStats, statsToMeasure)
disk.Add(containerUsageList[v1.ResourceEphemeralStorage])
inodes.Add(containerUsageList[resourceInodes])
if hasFsStatsType(statsToMeasure, fsStatsLocalVolumeSource) {
volumeNames := localEphemeralVolumeNames(pod)
podLocalVolumeUsageList := podLocalVolumeUsage(volumeNames, podStats)
disk.Add(podLocalVolumeUsageList[v1.ResourceEphemeralStorage])
inodes.Add(podLocalVolumeUsageList[resourceInodes])
}
if len(etcHostsPath) > 0 {
if stat, err := os.Stat(etcHostsPath); err == nil {
disk.Add(*resource.NewQuantity(int64(stat.Size()), resource.BinarySI))
inodes.Add(*resource.NewQuantity(int64(1), resource.DecimalSI))
}
}
return v1.ResourceList{
v1.ResourceEphemeralStorage: disk,
resourceInodes: inodes,
}, nil
}
// formatThreshold formats a threshold for logging.
func formatThreshold(threshold evictionapi.Threshold) string {
return fmt.Sprintf("threshold(signal=%v, operator=%v, value=%v, gracePeriod=%v)", threshold.Signal, threshold.Operator, evictionapi.ThresholdValue(threshold.Value), threshold.GracePeriod)

View File

@ -628,6 +628,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.runtimeCache = runtimeCache
// common provider to get host file system usage associated with a pod managed by kubelet
hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) string {
return getEtcHostsPath(klet.getPodDir(podUID))
})
if kubeDeps.useLegacyCadvisorStats {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
@ -635,7 +639,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime,
klet.statusManager)
klet.statusManager,
hostStatsProvider)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
@ -644,8 +649,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.runtimeCache,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
stats.NewLogMetricsService(),
kubecontainer.RealOS{})
hostStatsProvider)
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
@ -740,9 +744,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKiller = NewPodKiller(klet)
etcHostsPathFunc := func(podUID types.UID) string { return getEtcHostsPath(klet.getPodDir(podUID)) }
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, etcHostsPathFunc)
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

View File

@ -250,13 +250,17 @@ func newTestKubeletWithImageList(
volumeStatsAggPeriod := time.Second * 10
kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod)
fakeHostStatsProvider := stats.NewFakeHostStatsProvider()
kubelet.StatsProvider = stats.NewCadvisorStatsProvider(
kubelet.cadvisor,
kubelet.resourceAnalyzer,
kubelet.podManager,
kubelet.runtimeCache,
fakeRuntime,
kubelet.statusManager)
kubelet.statusManager,
fakeHostStatsProvider,
)
fakeImageGCPolicy := images.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
@ -293,9 +297,8 @@ func newTestKubeletWithImageList(
UID: types.UID(kubelet.nodeName),
Namespace: "",
}
etcHostsPathFunc := func(podUID types.UID) string { return getEtcHostsPath(kubelet.getPodDir(podUID)) }
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, etcHostsPathFunc)
evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock)
kubelet.evictionManager = evictionManager
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

View File

@ -126,8 +126,7 @@ func TestRunOnce(t *testing.T) {
return nil
}
fakeMirrodPodFunc := func(*v1.Pod) (*v1.Pod, bool) { return nil, false }
etcHostsPathFunc := func(podUID types.UID) string { return getEtcHostsPath(kb.getPodDir(podUID)) }
evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, fakeMirrodPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, etcHostsPathFunc)
evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, fakeMirrodPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock)
kb.evictionManager = evictionManager
kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

View File

@ -8,7 +8,8 @@ go_library(
"cri_stats_provider_others.go",
"cri_stats_provider_windows.go",
"helper.go",
"log_metrics_provider.go",
"host_stats_provider.go",
"host_stats_provider_fake.go",
"provider.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/stats",
@ -17,6 +18,7 @@ go_library(
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/kuberuntime:go_default_library",
"//pkg/kubelet/leaky:go_default_library",
"//pkg/kubelet/pod:go_default_library",
@ -66,7 +68,6 @@ go_test(
"cadvisor_stats_provider_test.go",
"cri_stats_provider_test.go",
"helper_test.go",
"log_metrics_provider_test.go",
"provider_test.go",
],
embed = [":go_default_library"],

View File

@ -52,6 +52,8 @@ type cadvisorStatsProvider struct {
imageService kubecontainer.ImageService
// statusProvider is used to get pod metadata
statusProvider status.PodStatusProvider
// hostStatsProvider is used to get pod host stat usage.
hostStatsProvider HostStatsProvider
}
// newCadvisorStatsProvider returns a containerStatsProvider that provides
@ -61,12 +63,14 @@ func newCadvisorStatsProvider(
resourceAnalyzer stats.ResourceAnalyzer,
imageService kubecontainer.ImageService,
statusProvider status.PodStatusProvider,
hostStatsProvider HostStatsProvider,
) containerStatsProvider {
return &cadvisorStatsProvider{
cadvisor: cadvisor,
resourceAnalyzer: resourceAnalyzer,
imageService: imageService,
statusProvider: statusProvider,
cadvisor: cadvisor,
resourceAnalyzer: resourceAnalyzer,
imageService: imageService,
statusProvider: statusProvider,
hostStatsProvider: hostStatsProvider,
}
}
@ -137,7 +141,17 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
copy(ephemeralStats, vstats.EphemeralVolumes)
podStats.VolumeStats = append(append([]statsapi.VolumeStats{}, vstats.EphemeralVolumes...), vstats.PersistentVolumes...)
}
podStats.EphemeralStorage = calcEphemeralStorage(podStats.Containers, ephemeralStats, &rootFsInfo, nil, false)
logStats, err := p.hostStatsProvider.getPodLogStats(podStats.PodRef.Namespace, podStats.PodRef.Name, podUID, &rootFsInfo)
if err != nil {
klog.Errorf("Unable to fetch pod log stats: %v", err)
}
etcHostsStats, err := p.hostStatsProvider.getPodEtcHostsStats(podUID, &rootFsInfo)
if err != nil {
klog.Errorf("unable to fetch pod etc hosts stats: %v", err)
}
podStats.EphemeralStorage = calcEphemeralStorage(podStats.Containers, ephemeralStats, &rootFsInfo, logStats, etcHostsStats, false)
// Lookup the pod-level cgroup's CPU and memory stats
podInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if podInfo != nil {

View File

@ -22,7 +22,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
@ -232,7 +232,7 @@ func TestCadvisorListPodStats(t *testing.T) {
resourceAnalyzer := &fakeResourceAnalyzer{podVolumeStats: volumeStats}
p := NewCadvisorStatsProvider(mockCadvisor, resourceAnalyzer, nil, nil, mockRuntime, mockStatus)
p := NewCadvisorStatsProvider(mockCadvisor, resourceAnalyzer, nil, nil, mockRuntime, mockStatus, NewFakeHostStatsProvider())
pods, err := p.ListPodStats()
assert.NoError(t, err)
@ -400,7 +400,7 @@ func TestCadvisorListPodCPUAndMemoryStats(t *testing.T) {
resourceAnalyzer := &fakeResourceAnalyzer{podVolumeStats: volumeStats}
p := NewCadvisorStatsProvider(mockCadvisor, resourceAnalyzer, nil, nil, nil, nil)
p := NewCadvisorStatsProvider(mockCadvisor, resourceAnalyzer, nil, nil, nil, nil, NewFakeHostStatsProvider())
pods, err := p.ListPodCPUAndMemoryStats()
assert.NoError(t, err)
@ -486,7 +486,7 @@ func TestCadvisorImagesFsStats(t *testing.T) {
mockCadvisor.On("ImagesFsInfo").Return(imageFsInfo, nil)
mockRuntime.On("ImageStats").Return(imageStats, nil)
provider := newCadvisorStatsProvider(mockCadvisor, &fakeResourceAnalyzer{}, mockRuntime, nil)
provider := newCadvisorStatsProvider(mockCadvisor, &fakeResourceAnalyzer{}, mockRuntime, nil, NewFakeHostStatsProvider())
stats, err := provider.ImageFsStats()
assert.NoError(err)

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"path"
"path/filepath"
"sort"
"strings"
"sync"
@ -35,8 +34,6 @@ import (
"k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -66,10 +63,8 @@ type criStatsProvider struct {
runtimeService internalapi.RuntimeService
// imageService is used to get the stats of the image filesystem.
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService
// osInterface is the interface for syscalls.
osInterface kubecontainer.OSInterface
// hostStatsProvider is used to get the status of the host filesystem consumed by pods.
hostStatsProvider HostStatsProvider
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord
@ -83,16 +78,14 @@ func newCRIStatsProvider(
resourceAnalyzer stats.ResourceAnalyzer,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
logMetricsService LogMetricsService,
osInterface kubecontainer.OSInterface,
hostStatsProvider HostStatsProvider,
) containerStatsProvider {
return &criStatsProvider{
cadvisor: cadvisor,
resourceAnalyzer: resourceAnalyzer,
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
osInterface: osInterface,
hostStatsProvider: hostStatsProvider,
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}
@ -401,19 +394,22 @@ func (p *criStatsProvider) makePodStorageStats(s *statsapi.PodStats, rootFsInfo
if !found {
return
}
podLogDir := kuberuntime.BuildPodLogsDirectory(podNs, podName, podUID)
logStats, err := p.getPodLogStats(podLogDir, rootFsInfo)
logStats, err := p.hostStatsProvider.getPodLogStats(podNs, podName, podUID, rootFsInfo)
if err != nil {
klog.Errorf("Unable to fetch pod log stats for path %s: %v ", podLogDir, err)
klog.Errorf("Unable to fetch pod log stats: %v", err)
// If people do in-place upgrade, there might be pods still using
// the old log path. For those pods, no pod log stats is returned.
// We should continue generating other stats in that case.
// calcEphemeralStorage tolerants logStats == nil.
}
etcHostsStats, err := p.hostStatsProvider.getPodEtcHostsStats(podUID, rootFsInfo)
if err != nil {
klog.Errorf("unable to fetch pod etc hosts stats: %v", err)
}
ephemeralStats := make([]statsapi.VolumeStats, len(vstats.EphemeralVolumes))
copy(ephemeralStats, vstats.EphemeralVolumes)
s.VolumeStats = append(append([]statsapi.VolumeStats{}, vstats.EphemeralVolumes...), vstats.PersistentVolumes...)
s.EphemeralStorage = calcEphemeralStorage(s.Containers, ephemeralStats, rootFsInfo, logStats, true)
s.EphemeralStorage = calcEphemeralStorage(s.Containers, ephemeralStats, rootFsInfo, logStats, etcHostsStats, true)
}
func (p *criStatsProvider) addPodNetworkStats(
@ -581,14 +577,10 @@ func (p *criStatsProvider) makeContainerStats(
// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
// using old log path, empty log stats are returned. This is fine, because we don't
// officially support in-place upgrade anyway.
var (
containerLogPath = kuberuntime.BuildContainerLogsDirectory(meta.GetNamespace(),
meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName())
err error
)
result.Logs, err = p.getPathFsStats(containerLogPath, rootFsInfo)
var err error
result.Logs, err = p.hostStatsProvider.getPodContainerLogStats(meta.GetNamespace(), meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName(), rootFsInfo)
if err != nil {
klog.Errorf("Unable to fetch container log stats for path %s: %v ", containerLogPath, err)
klog.Errorf("Unable to fetch container log stats: %v ", err)
}
return result
}
@ -832,58 +824,3 @@ func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) map[strin
}
return stats
}
func (p *criStatsProvider) getPathFsStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
m := p.logMetricsService.createLogMetricsProvider(path)
logMetrics, err := m.GetMetrics()
if err != nil {
return nil, err
}
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
usedbytes := uint64(logMetrics.Used.Value())
result.UsedBytes = &usedbytes
inodesUsed := uint64(logMetrics.InodesUsed.Value())
result.InodesUsed = &inodesUsed
result.Time = maxUpdateTime(&result.Time, &logMetrics.Time)
return result, nil
}
// getPodLogStats gets stats for logs under the pod log directory. Container logs usually exist
// under the container log directory. However, for some container runtimes, e.g. kata, gvisor,
// they may want to keep some pod level logs, in that case they can put those logs directly under
// the pod log directory. And kubelet will take those logs into account as part of pod ephemeral
// storage.
func (p *criStatsProvider) getPodLogStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
files, err := p.osInterface.ReadDir(path)
if err != nil {
return nil, err
}
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
for _, f := range files {
if f.IsDir() {
continue
}
// Only include *files* under pod log directory.
fpath := filepath.Join(path, f.Name())
fstats, err := p.getPathFsStats(fpath, rootFsInfo)
if err != nil {
return nil, fmt.Errorf("failed to get fsstats for %q: %v", fpath, err)
}
result.UsedBytes = addUsage(result.UsedBytes, fstats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, fstats.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &fstats.Time)
}
return result, nil
}

View File

@ -192,7 +192,7 @@ func TestCRIListPodStats(t *testing.T) {
PersistentVolumes: persistentVolumes,
}
fakeLogStats := map[string]*volume.Metrics{
fakeStats := map[string]*volume.Metrics{
kuberuntime.BuildContainerLogsDirectory("sandbox0-ns", "sandbox0-name", types.UID("sandbox0-uid"), cName0): containerLogStats0,
kuberuntime.BuildContainerLogsDirectory("sandbox0-ns", "sandbox0-name", types.UID("sandbox0-uid"), cName1): containerLogStats1,
kuberuntime.BuildContainerLogsDirectory("sandbox1-ns", "sandbox1-name", types.UID("sandbox1-uid"), cName2): containerLogStats2,
@ -202,7 +202,6 @@ func TestCRIListPodStats(t *testing.T) {
filepath.Join(kuberuntime.BuildPodLogsDirectory("sandbox0-ns", "sandbox0-name", types.UID("sandbox0-uid")), podLogName0): podLogStats0,
filepath.Join(kuberuntime.BuildPodLogsDirectory("sandbox1-ns", "sandbox1-name", types.UID("sandbox1-uid")), podLogName1): podLogStats1,
}
fakeLogStatsProvider := NewFakeLogMetricsService(fakeLogStats)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@ -231,8 +230,7 @@ func TestCRIListPodStats(t *testing.T) {
mockRuntimeCache,
fakeRuntimeService,
fakeImageService,
fakeLogStatsProvider,
fakeOS,
NewFakeHostStatsProviderWithData(fakeStats, fakeOS),
)
stats, err := provider.ListPodStats()
@ -427,8 +425,7 @@ func TestCRIListPodCPUAndMemoryStats(t *testing.T) {
mockRuntimeCache,
fakeRuntimeService,
nil,
nil,
&kubecontainertest.FakeOS{},
NewFakeHostStatsProvider(),
)
stats, err := provider.ListPodCPUAndMemoryStats()
@ -536,13 +533,12 @@ func TestCRIImagesFsStats(t *testing.T) {
imageFsUsage = makeFakeImageFsUsage(imageFsMountpoint)
)
var (
mockCadvisor = new(cadvisortest.Mock)
mockRuntimeCache = new(kubecontainertest.MockRuntimeCache)
mockPodManager = new(kubepodtest.MockManager)
resourceAnalyzer = new(fakeResourceAnalyzer)
fakeRuntimeService = critest.NewFakeRuntimeService()
fakeImageService = critest.NewFakeImageService()
fakeLogStatsProvider = NewFakeLogMetricsService(nil)
mockCadvisor = new(cadvisortest.Mock)
mockRuntimeCache = new(kubecontainertest.MockRuntimeCache)
mockPodManager = new(kubepodtest.MockManager)
resourceAnalyzer = new(fakeResourceAnalyzer)
fakeRuntimeService = critest.NewFakeRuntimeService()
fakeImageService = critest.NewFakeImageService()
)
mockCadvisor.On("GetDirFsInfo", imageFsMountpoint).Return(imageFsInfo, nil)
@ -557,8 +553,7 @@ func TestCRIImagesFsStats(t *testing.T) {
mockRuntimeCache,
fakeRuntimeService,
fakeImageService,
fakeLogStatsProvider,
&kubecontainertest.FakeOS{},
NewFakeHostStatsProvider(),
)
stats, err := provider.ImageFsStats()

View File

@ -372,7 +372,7 @@ func uint64Ptr(i uint64) *uint64 {
}
func calcEphemeralStorage(containers []statsapi.ContainerStats, volumes []statsapi.VolumeStats, rootFsInfo *cadvisorapiv2.FsInfo,
podLogStats *statsapi.FsStats, isCRIStatsProvider bool) *statsapi.FsStats {
podLogStats *statsapi.FsStats, etcHostsStats *statsapi.FsStats, isCRIStatsProvider bool) *statsapi.FsStats {
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
@ -393,6 +393,11 @@ func calcEphemeralStorage(containers []statsapi.ContainerStats, volumes []statsa
result.InodesUsed = addUsage(result.InodesUsed, podLogStats.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &podLogStats.Time)
}
if etcHostsStats != nil {
result.UsedBytes = addUsage(result.UsedBytes, etcHostsStats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, etcHostsStats.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &etcHostsStats.Time)
}
return result
}

View File

@ -0,0 +1,155 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"path/filepath"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/volume"
)
// PodEtcHostsPathFunc is a function to fetch a etc hosts path by pod uid.
type PodEtcHostsPathFunc func(podUID types.UID) string
// metricsProviderByPath maps a path to its metrics provider
type metricsProviderByPath map[string]volume.MetricsProvider
// HostStatsProvider defines an interface for providing host stats associated with pod.
type HostStatsProvider interface {
// getPodLogStats gets stats associated with pod log usage
getPodLogStats(podNamespace, podName string, podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error)
// getPodContainerLogStats gets stats associated with container log usage
getPodContainerLogStats(podNamespace, podName string, podUID types.UID, containerName string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error)
// getPodEtcHostsStats gets stats associated with pod etc-hosts usage
getPodEtcHostsStats(podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error)
}
type hostStatsProvider struct {
// osInterface is the interface for syscalls.
osInterface kubecontainer.OSInterface
// podEtcHostsPathFunc fetches a pod etc hosts path by uid.
podEtcHostsPathFunc PodEtcHostsPathFunc
}
// NewHostStatsProvider returns a new HostStatsProvider type struct.
func NewHostStatsProvider(osInterface kubecontainer.OSInterface, podEtcHostsPathFunc PodEtcHostsPathFunc) HostStatsProvider {
return hostStatsProvider{
osInterface: osInterface,
podEtcHostsPathFunc: podEtcHostsPathFunc,
}
}
func (h hostStatsProvider) getPodLogStats(podNamespace, podName string, podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
metricsByPath, err := h.podLogMetrics(podNamespace, podName, podUID)
if err != nil {
return nil, err
}
return metricsByPathToFsStats(metricsByPath, rootFsInfo)
}
// getPodContainerLogStats gets stats for container
func (h hostStatsProvider) getPodContainerLogStats(podNamespace, podName string, podUID types.UID, containerName string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
metricsByPath, err := h.podContainerLogMetrics(podNamespace, podName, podUID, containerName)
if err != nil {
return nil, err
}
return metricsByPathToFsStats(metricsByPath, rootFsInfo)
}
// getPodEtcHostsStats gets status for pod etc hosts usage
func (h hostStatsProvider) getPodEtcHostsStats(podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
metrics := h.podEtcHostsMetrics(podUID)
hostMetrics, err := metrics.GetMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get stats %v", err)
}
result := rootFsInfoToFsStats(rootFsInfo)
usedBytes := uint64(hostMetrics.Used.Value())
inodesUsed := uint64(hostMetrics.InodesUsed.Value())
result.UsedBytes = addUsage(result.UsedBytes, &usedBytes)
result.InodesUsed = addUsage(result.InodesUsed, &inodesUsed)
result.Time = maxUpdateTime(&result.Time, &hostMetrics.Time)
return result, nil
}
func (h hostStatsProvider) podLogMetrics(podNamespace, podName string, podUID types.UID) (metricsProviderByPath, error) {
podLogsDirectoryPath := kuberuntime.BuildPodLogsDirectory(podNamespace, podName, podUID)
return h.fileMetricsByDir(podLogsDirectoryPath)
}
func (h hostStatsProvider) podContainerLogMetrics(podNamespace, podName string, podUID types.UID, containerName string) (metricsProviderByPath, error) {
podContainerLogsDirectoryPath := kuberuntime.BuildContainerLogsDirectory(podNamespace, podName, podUID, containerName)
return h.fileMetricsByDir(podContainerLogsDirectoryPath)
}
func (h hostStatsProvider) podEtcHostsMetrics(podUID types.UID) volume.MetricsProvider {
podEtcHostsPath := h.podEtcHostsPathFunc(podUID)
return volume.NewMetricsDu(podEtcHostsPath)
}
// fileMetricsByDir returns metrics by path for each file under specified directory
func (h hostStatsProvider) fileMetricsByDir(dirname string) (metricsProviderByPath, error) {
files, err := h.osInterface.ReadDir(dirname)
if err != nil {
return nil, err
}
results := metricsProviderByPath{}
for _, f := range files {
if f.IsDir() {
continue
}
// Only include *files* under pod log directory.
fpath := filepath.Join(dirname, f.Name())
results[fpath] = volume.NewMetricsDu(fpath)
}
return results, nil
}
// metricsByPathToFsStats converts a metrics provider by path to fs stats
func metricsByPathToFsStats(metricsByPath metricsProviderByPath, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
result := rootFsInfoToFsStats(rootFsInfo)
for fpath, metrics := range metricsByPath {
hostMetrics, err := metrics.GetMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get fsstats for %q: %v", fpath, err)
}
usedBytes := uint64(hostMetrics.Used.Value())
inodesUsed := uint64(hostMetrics.InodesUsed.Value())
result.UsedBytes = addUsage(result.UsedBytes, &usedBytes)
result.InodesUsed = addUsage(result.InodesUsed, &inodesUsed)
result.Time = maxUpdateTime(&result.Time, &hostMetrics.Time)
}
return result, nil
}
// rootFsInfoToFsStats is a utility to convert rootFsInfo into statsapi.FsStats
func rootFsInfoToFsStats(rootFsInfo *cadvisorapiv2.FsInfo) *statsapi.FsStats {
return &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"path/filepath"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/apimachinery/pkg/types"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/volume"
)
type fakeHostStatsProvider struct {
fakeStats map[string]*volume.Metrics
osInterface kubecontainer.OSInterface
}
// NewFakeHostStatsProvider provides a way to test with fake host statistics
func NewFakeHostStatsProvider() HostStatsProvider {
return &fakeHostStatsProvider{
osInterface: &kubecontainertest.FakeOS{},
}
}
// NewFakeHostStatsProviderWithData provides a way to test with fake host statistics
func NewFakeHostStatsProviderWithData(fakeStats map[string]*volume.Metrics, osInterface kubecontainer.OSInterface) HostStatsProvider {
return &fakeHostStatsProvider{
fakeStats: fakeStats,
osInterface: osInterface,
}
}
func (f *fakeHostStatsProvider) getPodLogStats(podNamespace, podName string, podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
path := kuberuntime.BuildPodLogsDirectory(podNamespace, podName, podUID)
files, err := f.osInterface.ReadDir(path)
if err != nil {
return nil, err
}
var results []volume.MetricsProvider
for _, file := range files {
if file.IsDir() {
continue
}
// Only include *files* under pod log directory.
fpath := filepath.Join(path, file.Name())
results = append(results, NewFakeMetricsDu(fpath, f.fakeStats[fpath]))
}
return fakeMetricsProvidersToStats(results, rootFsInfo)
}
func (f *fakeHostStatsProvider) getPodContainerLogStats(podNamespace, podName string, podUID types.UID, containerName string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
path := kuberuntime.BuildContainerLogsDirectory(podNamespace, podName, podUID, containerName)
metricsProvider := NewFakeMetricsDu(path, f.fakeStats[path])
return fakeMetricsProvidersToStats([]volume.MetricsProvider{metricsProvider}, rootFsInfo)
}
func (f *fakeHostStatsProvider) getPodEtcHostsStats(podUID types.UID, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
return nil, fmt.Errorf("not implemented")
}
func fakeMetricsProvidersToStats(metricsProviders []volume.MetricsProvider, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
result := rootFsInfoToFsStats(rootFsInfo)
for i, metricsProvider := range metricsProviders {
hostMetrics, err := metricsProvider.GetMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get stats for item %d: %v", i, err)
}
usedBytes := uint64(hostMetrics.Used.Value())
inodesUsed := uint64(hostMetrics.InodesUsed.Value())
result.UsedBytes = addUsage(result.UsedBytes, &usedBytes)
result.InodesUsed = addUsage(result.InodesUsed, &inodesUsed)
result.Time = maxUpdateTime(&result.Time, &hostMetrics.Time)
}
return result, nil
}
type fakeMetricsDu struct {
fakeStats *volume.Metrics
}
// NewFakeMetricsDu inserts fake statistics when asked for metrics
func NewFakeMetricsDu(path string, stats *volume.Metrics) volume.MetricsProvider {
return &fakeMetricsDu{fakeStats: stats}
}
func (f *fakeMetricsDu) GetMetrics() (*volume.Metrics, error) {
if f.fakeStats == nil {
return nil, fmt.Errorf("no stats provided")
}
return f.fakeStats, nil
}

View File

@ -1,37 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"k8s.io/kubernetes/pkg/volume"
)
// LogMetricsService defines an interface for providing LogMetrics functionality.
type LogMetricsService interface {
createLogMetricsProvider(path string) volume.MetricsProvider
}
type logMetrics struct{}
// NewLogMetricsService returns a new LogMetricsService type struct.
func NewLogMetricsService() LogMetricsService {
return logMetrics{}
}
func (l logMetrics) createLogMetricsProvider(path string) volume.MetricsProvider {
return volume.NewMetricsDu(path)
}

View File

@ -1,50 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"k8s.io/kubernetes/pkg/volume"
)
type fakeLogMetrics struct {
fakeStats map[string]*volume.Metrics
}
func NewFakeLogMetricsService(stats map[string]*volume.Metrics) LogMetricsService {
return &fakeLogMetrics{fakeStats: stats}
}
func (l *fakeLogMetrics) createLogMetricsProvider(path string) volume.MetricsProvider {
return NewFakeMetricsDu(path, l.fakeStats[path])
}
type fakeMetricsDu struct {
fakeStats *volume.Metrics
}
func NewFakeMetricsDu(path string, stats *volume.Metrics) volume.MetricsProvider {
return &fakeMetricsDu{fakeStats: stats}
}
func (f *fakeMetricsDu) GetMetrics() (*volume.Metrics, error) {
if f.fakeStats == nil {
return nil, fmt.Errorf("no stats provided")
}
return f.fakeStats, nil
}

View File

@ -41,11 +41,10 @@ func NewCRIStatsProvider(
runtimeCache kubecontainer.RuntimeCache,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
logMetricsService LogMetricsService,
osInterface kubecontainer.OSInterface,
hostStatsProvider HostStatsProvider,
) *Provider {
return newStatsProvider(cadvisor, podManager, runtimeCache, newCRIStatsProvider(cadvisor, resourceAnalyzer,
runtimeService, imageService, logMetricsService, osInterface))
runtimeService, imageService, hostStatsProvider))
}
// NewCadvisorStatsProvider returns a containerStatsProvider that provides both
@ -57,8 +56,9 @@ func NewCadvisorStatsProvider(
runtimeCache kubecontainer.RuntimeCache,
imageService kubecontainer.ImageService,
statusProvider status.PodStatusProvider,
hostStatsProvider HostStatsProvider,
) *Provider {
return newStatsProvider(cadvisor, podManager, runtimeCache, newCadvisorStatsProvider(cadvisor, resourceAnalyzer, imageService, statusProvider))
return newStatsProvider(cadvisor, podManager, runtimeCache, newCadvisorStatsProvider(cadvisor, resourceAnalyzer, imageService, statusProvider, hostStatsProvider))
}
// newStatsProvider returns a new Provider that provides node stats from