diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5aa4d536450..fe36edd4a62 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -784,6 +784,14 @@ const ( // node affinity, selector and tolerations. This is allowed only for suspended jobs // that have never been unsuspended before. JobMutableNodeSchedulingDirectives featuregate.Feature = "JobMutableNodeSchedulingDirectives" + + // owner: @haircommander + // kep: http://kep.k8s.io/2364 + // alpha: v1.23 + // + // Configures the Kubelet to use the CRI to populate pod and container stats, instead of supplimenting with stats from cAdvisor. + // Requires the CRI implementation supports supplying the required stats. + PodAndContainerStatsFromCRI featuregate.Feature = "PodAndContainerStatsFromCRI" ) func init() { @@ -899,6 +907,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS CPUManagerPolicyBetaOptions: {Default: true, PreRelease: featuregate.Beta}, JobMutableNodeSchedulingDirectives: {Default: true, PreRelease: featuregate.Beta}, IdentifyPodOS: {Default: false, PreRelease: featuregate.Alpha}, + PodAndContainerStatsFromCRI: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f174235d262..2b286285411 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -723,7 +723,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.RemoteRuntimeService, kubeDeps.RemoteImageService, hostStatsProvider, - utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics)) + utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics), + utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI)) } klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index e68aa7c42ac..50f182e73b9 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -27,6 +27,8 @@ import ( cadvisorfs "github.com/google/cadvisor/fs" cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" internalapi "k8s.io/cri-api/pkg/apis" @@ -70,6 +72,7 @@ type criStatsProvider struct { cpuUsageCache map[string]*cpuUsageRecord mutex sync.RWMutex disableAcceleratorUsageMetrics bool + podAndContainerStatsFromCRI bool } // newCRIStatsProvider returns a containerStatsProvider implementation that @@ -80,7 +83,8 @@ func newCRIStatsProvider( runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, hostStatsProvider HostStatsProvider, - disableAcceleratorUsageMetrics bool, + disableAcceleratorUsageMetrics, + podAndContainerStatsFromCRI bool, ) containerStatsProvider { return &criStatsProvider{ cadvisor: cadvisor, @@ -90,6 +94,7 @@ func newCRIStatsProvider( hostStatsProvider: hostStatsProvider, cpuUsageCache: make(map[string]*cpuUsageRecord), disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics, + podAndContainerStatsFromCRI: podAndContainerStatsFromCRI, } } @@ -122,21 +127,29 @@ func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi return nil, fmt.Errorf("failed to get rootFs info: %v", err) } - containers, err := p.runtimeService.ListContainers(&runtimeapi.ContainerFilter{}) + containerMap, podSandboxMap, err := p.getPodAndContainerMaps() if err != nil { - return nil, fmt.Errorf("failed to list all containers: %v", err) + return nil, fmt.Errorf("failed to get pod or container map: %v", err) } - // Creates pod sandbox map. - podSandboxMap := make(map[string]*runtimeapi.PodSandbox) - podSandboxes, err := p.runtimeService.ListPodSandbox(&runtimeapi.PodSandboxFilter{}) - if err != nil { - return nil, fmt.Errorf("failed to list all pod sandboxes: %v", err) - } - podSandboxes = removeTerminatedPods(podSandboxes) - for _, s := range podSandboxes { - podSandboxMap[s.Id] = s + if p.podAndContainerStatsFromCRI { + _, err := p.listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo) + if err != nil { + s, ok := status.FromError(err) + // Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats. + if !ok || s.Code() != codes.Unimplemented { + return nil, err + } + // CRI implementation doesn't support ListPodSandboxStats, warn and fallback. + klog.V(5).ErrorS(err, + "CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.", + ) + } } + return p.listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo) +} + +func (p *criStatsProvider) listPodStatsPartiallyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) { // fsIDtoInfo is a map from filesystem id to its stats. This will be used // as a cache to avoid querying cAdvisor for the filesystem stats with the // same filesystem id many times. @@ -149,14 +162,6 @@ func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi if err != nil { return nil, fmt.Errorf("failed to list all container stats: %v", err) } - - containers = removeTerminatedContainers(containers) - // Creates container map. - containerMap := make(map[string]*runtimeapi.Container) - for _, c := range containers { - containerMap[c.Id] = c - } - allInfos, err := getCadvisorContainerInfo(p.cadvisor) if err != nil { return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err) @@ -192,7 +197,7 @@ func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi } // Fill available stats for full set of required pod stats - cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage) + cs := p.makeContainerStats(stats, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage) p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID]) p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) p.addProcessStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) @@ -212,45 +217,92 @@ func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats)) for _, s := range sandboxIDToPodStats { - p.makePodStorageStats(s, &rootFsInfo) + p.makePodStorageStats(s, rootFsInfo) result = append(result, *s) } return result, nil } +func (p *criStatsProvider) listPodStatsStrictlyFromCRI(updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) { + criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{}) + if err != nil { + return nil, err + } + + fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo) + summarySandboxStats := make([]statsapi.PodStats, 0, len(podSandboxMap)) + for _, criSandboxStat := range criSandboxStats { + if criSandboxStat == nil || criSandboxStat.Attributes == nil { + klog.V(5).InfoS("Unable to find CRI stats for sandbox") + continue + } + podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id] + if !found { + continue + } + ps := buildPodStats(podSandbox) + for _, criContainerStat := range criSandboxStat.Linux.Containers { + container, found := containerMap[criContainerStat.Attributes.Id] + if !found { + continue + } + // Fill available stats for full set of required pod stats + cs := p.makeContainerStats(criContainerStat, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage) + ps.Containers = append(ps.Containers, *cs) + } + addCRIPodNetworkStats(ps, criSandboxStat) + addCRIPodCPUStats(ps, criSandboxStat) + addCRIPodMemoryStats(ps, criSandboxStat) + addCRIPodProcessStats(ps, criSandboxStat) + p.makePodStorageStats(ps, rootFsInfo) + summarySandboxStats = append(summarySandboxStats, *ps) + } + return summarySandboxStats, nil +} + // ListPodCPUAndMemoryStats returns the CPU and Memory stats of all the pod-managed containers. func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { - containers, err := p.runtimeService.ListContainers(&runtimeapi.ContainerFilter{}) - if err != nil { - return nil, fmt.Errorf("failed to list all containers: %v", err) - } - - // Creates pod sandbox map. - podSandboxMap := make(map[string]*runtimeapi.PodSandbox) - podSandboxes, err := p.runtimeService.ListPodSandbox(&runtimeapi.PodSandboxFilter{}) - if err != nil { - return nil, fmt.Errorf("failed to list all pod sandboxes: %v", err) - } - podSandboxes = removeTerminatedPods(podSandboxes) - for _, s := range podSandboxes { - podSandboxMap[s.Id] = s - } - // sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats. sandboxIDToPodStats := make(map[string]*statsapi.PodStats) + containerMap, podSandboxMap, err := p.getPodAndContainerMaps() + if err != nil { + return nil, fmt.Errorf("failed to get pod or container map: %v", err) + } + + result := make([]statsapi.PodStats, 0, len(podSandboxMap)) + if p.podAndContainerStatsFromCRI { + criSandboxStats, err := p.runtimeService.ListPodSandboxStats(&runtimeapi.PodSandboxStatsFilter{}) + // Call succeeded + if err == nil { + for _, criSandboxStat := range criSandboxStats { + podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id] + if !found { + continue + } + ps := buildPodStats(podSandbox) + addCRIPodCPUStats(ps, criSandboxStat) + addCRIPodMemoryStats(ps, criSandboxStat) + result = append(result, *ps) + } + return result, err + } + // Call failed, why? + s, ok := status.FromError(err) + // Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats. + if !ok || s.Code() != codes.Unimplemented { + return nil, err + } + // CRI implementation doesn't support ListPodSandboxStats, warn and fallback. + klog.ErrorS(err, + "CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.", + ) + } resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{}) if err != nil { return nil, fmt.Errorf("failed to list all container stats: %v", err) } - containers = removeTerminatedContainers(containers) - // Creates container map. - containerMap := make(map[string]*runtimeapi.Container) - for _, c := range containers { - containerMap[c.Id] = c - } - allInfos, err := getCadvisorContainerInfo(p.cadvisor) if err != nil { return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err) @@ -295,13 +347,38 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro // cleanup outdated caches. p.cleanupOutdatedCaches() - result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats)) for _, s := range sandboxIDToPodStats { result = append(result, *s) } return result, nil } +func (p *criStatsProvider) getPodAndContainerMaps() (map[string]*runtimeapi.Container, map[string]*runtimeapi.PodSandbox, error) { + containers, err := p.runtimeService.ListContainers(&runtimeapi.ContainerFilter{}) + if err != nil { + return nil, nil, fmt.Errorf("failed to list all containers: %v", err) + } + + // Creates pod sandbox map between the pod sandbox ID and the PodSandbox object. + podSandboxMap := make(map[string]*runtimeapi.PodSandbox) + podSandboxes, err := p.runtimeService.ListPodSandbox(&runtimeapi.PodSandboxFilter{}) + if err != nil { + return nil, nil, fmt.Errorf("failed to list all pod sandboxes: %v", err) + } + podSandboxes = removeTerminatedPods(podSandboxes) + for _, s := range podSandboxes { + podSandboxMap[s.Id] = s + } + + containers = removeTerminatedContainers(containers) + // Creates container map between the container ID and the Container object. + containerMap := make(map[string]*runtimeapi.Container) + for _, c := range containers { + containerMap[c.Id] = c + } + return containerMap, podSandboxMap, nil +} + // ImageFsStats returns the stats of the image filesystem. func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) { resp, err := p.imageService.ImageFsInfo() @@ -628,12 +705,18 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats( return result } -// getContainerUsageNanoCores gets the cached usageNanoCores. +// getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported +// by the CRI. If it is unable to, it gets the information from the cache instead. func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { if stats == nil || stats.Attributes == nil { return nil } + // Bypass the cache if the CRI implementation specified the UsageNanoCores. + if stats.Cpu != nil && stats.Cpu.UsageNanoCores != nil { + return &stats.Cpu.UsageNanoCores.Value + } + p.mutex.RLock() defer p.mutex.RUnlock() @@ -646,11 +729,19 @@ func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.Containe return &latestUsage } -// getContainerUsageNanoCores computes usageNanoCores based on the given and -// the cached usageCoreNanoSeconds, updates the cache with the computed -// usageNanoCores, and returns the usageNanoCores. +// getAndUpdateContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported +// by the CRI. If it is unable to, it computes usageNanoCores based on the given and the cached usageCoreNanoSeconds, +// updates the cache with the computed usageNanoCores, and returns the usageNanoCores. func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { - if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + if stats == nil || stats.Attributes == nil || stats.Cpu == nil { + return nil + } + // Bypass the cache if the CRI implementation specified the UsageNanoCores. + if stats.Cpu.UsageNanoCores != nil { + return &stats.Cpu.UsageNanoCores.Value + } + // If there is no UsageNanoCores, nor UsageCoreNanoSeconds, there is no information to use + if stats.Cpu.UsageCoreNanoSeconds == nil { return nil } id := stats.Attributes.Id @@ -847,3 +938,73 @@ func extractIDFromCgroupPath(cgroupPath string) string { } return id } + +func addCRIPodNetworkStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { + if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Network == nil { + return + } + criNetwork := criPodStat.Linux.Network + iStats := statsapi.NetworkStats{ + Time: metav1.NewTime(time.Unix(0, criNetwork.Timestamp)), + InterfaceStats: criInterfaceToSummary(criNetwork.DefaultInterface), + Interfaces: make([]statsapi.InterfaceStats, 0, len(criNetwork.Interfaces)), + } + for _, iface := range criNetwork.Interfaces { + iStats.Interfaces = append(iStats.Interfaces, criInterfaceToSummary(iface)) + } + ps.Network = &iStats +} + +func criInterfaceToSummary(criIface *runtimeapi.NetworkInterfaceUsage) statsapi.InterfaceStats { + return statsapi.InterfaceStats{ + Name: criIface.Name, + RxBytes: valueOfUInt64Value(criIface.RxBytes), + RxErrors: valueOfUInt64Value(criIface.RxErrors), + TxBytes: valueOfUInt64Value(criIface.TxBytes), + TxErrors: valueOfUInt64Value(criIface.TxErrors), + } +} + +func addCRIPodCPUStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { + if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Cpu == nil { + return + } + criCPU := criPodStat.Linux.Cpu + ps.CPU = &statsapi.CPUStats{ + Time: metav1.NewTime(time.Unix(0, criCPU.Timestamp)), + UsageNanoCores: valueOfUInt64Value(criCPU.UsageNanoCores), + UsageCoreNanoSeconds: valueOfUInt64Value(criCPU.UsageCoreNanoSeconds), + } +} + +func addCRIPodMemoryStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { + if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Memory == nil { + return + } + criMemory := criPodStat.Linux.Memory + ps.Memory = &statsapi.MemoryStats{ + Time: metav1.NewTime(time.Unix(0, criMemory.Timestamp)), + AvailableBytes: valueOfUInt64Value(criMemory.AvailableBytes), + UsageBytes: valueOfUInt64Value(criMemory.UsageBytes), + WorkingSetBytes: valueOfUInt64Value(criMemory.WorkingSetBytes), + RSSBytes: valueOfUInt64Value(criMemory.RssBytes), + PageFaults: valueOfUInt64Value(criMemory.PageFaults), + MajorPageFaults: valueOfUInt64Value(criMemory.MajorPageFaults), + } +} + +func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { + if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Process == nil { + return + } + ps.ProcessStats = &statsapi.ProcessStats{ + ProcessCount: valueOfUInt64Value(criPodStat.Linux.Process.ProcessCount), + } +} + +func valueOfUInt64Value(value *runtimeapi.UInt64Value) *uint64 { + if value == nil { + return nil + } + return &value.Value +} diff --git a/pkg/kubelet/stats/cri_stats_provider_test.go b/pkg/kubelet/stats/cri_stats_provider_test.go index a5857c1a0a9..52a3ea069a0 100644 --- a/pkg/kubelet/stats/cri_stats_provider_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_test.go @@ -236,6 +236,7 @@ func TestCRIListPodStats(t *testing.T) { fakeImageService, NewFakeHostStatsProviderWithData(fakeStats, fakeOS), false, + false, ) stats, err := provider.ListPodStats() @@ -396,6 +397,7 @@ func TestAcceleratorUsageStatsCanBeDisabled(t *testing.T) { fakeImageService, NewFakeHostStatsProvider(), true, // this is what the test is actually testing + false, ) stats, err := provider.ListPodStats() @@ -541,6 +543,7 @@ func TestCRIListPodCPUAndMemoryStats(t *testing.T) { nil, NewFakeHostStatsProvider(), false, + false, ) stats, err := provider.ListPodCPUAndMemoryStats() @@ -671,6 +674,7 @@ func TestCRIImagesFsStats(t *testing.T) { fakeImageService, NewFakeHostStatsProvider(), false, + false, ) stats, err := provider.ImageFsStats() diff --git a/pkg/kubelet/stats/provider.go b/pkg/kubelet/stats/provider.go index fd3c5dd8248..7c06a550949 100644 --- a/pkg/kubelet/stats/provider.go +++ b/pkg/kubelet/stats/provider.go @@ -42,10 +42,10 @@ func NewCRIStatsProvider( runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, hostStatsProvider HostStatsProvider, - disableAcceleratorUsageMetrics bool, + disableAcceleratorUsageMetrics, podAndContainerStatsFromCRI bool, ) *Provider { return newStatsProvider(cadvisor, podManager, runtimeCache, newCRIStatsProvider(cadvisor, resourceAnalyzer, - runtimeService, imageService, hostStatsProvider, disableAcceleratorUsageMetrics)) + runtimeService, imageService, hostStatsProvider, disableAcceleratorUsageMetrics, podAndContainerStatsFromCRI)) } // NewCadvisorStatsProvider returns a containerStatsProvider that provides both