Extend CRI stats provider to support PSI

This commit is contained in:
Haowei Cai 2025-03-20 19:28:45 +00:00
parent 77118d4ca0
commit 6bbaf8cb10
4 changed files with 103 additions and 2 deletions

View File

@ -33,11 +33,13 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
kubetypes "k8s.io/kubelet/pkg/types" kubetypes "k8s.io/kubelet/pkg/types"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/utils/clock" "k8s.io/utils/clock"
@ -211,6 +213,7 @@ func (p *criStatsProvider) listPodStatsPartiallyFromCRI(ctx context.Context, upd
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID]) p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
p.addSwapStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) p.addSwapStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
p.addIOStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
// If cadvisor stats is available for the container, use it to populate // If cadvisor stats is available for the container, use it to populate
// container stats // container stats
@ -260,6 +263,7 @@ func (p *criStatsProvider) listPodStatsStrictlyFromCRI(ctx context.Context, upda
addCRIPodCPUStats(ps, criSandboxStat) addCRIPodCPUStats(ps, criSandboxStat)
addCRIPodMemoryStats(ps, criSandboxStat) addCRIPodMemoryStats(ps, criSandboxStat)
addCRIPodProcessStats(ps, criSandboxStat) addCRIPodProcessStats(ps, criSandboxStat)
addCRIPodIOStats(ps, criSandboxStat)
makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true) makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
summarySandboxStats = append(summarySandboxStats, *ps) summarySandboxStats = append(summarySandboxStats, *ps)
} }
@ -535,6 +539,7 @@ func (p *criStatsProvider) addPodCPUMemoryStats(
usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores) usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
ps.CPU.UsageNanoCores = &usageNanoCores ps.CPU.UsageNanoCores = &usageNanoCores
// Pod level PSI stats cannot be calculated from container level
} }
if cs.Memory != nil { if cs.Memory != nil {
@ -555,6 +560,7 @@ func (p *criStatsProvider) addPodCPUMemoryStats(
ps.Memory.RSSBytes = &rSSBytes ps.Memory.RSSBytes = &rSSBytes
ps.Memory.PageFaults = &pageFaults ps.Memory.PageFaults = &pageFaults
ps.Memory.MajorPageFaults = &majorPageFaults ps.Memory.MajorPageFaults = &majorPageFaults
// Pod level PSI stats cannot be calculated from container level
} }
} }
@ -564,14 +570,14 @@ func (p *criStatsProvider) addSwapStats(
allInfos map[string]cadvisorapiv2.ContainerInfo, allInfos map[string]cadvisorapiv2.ContainerInfo,
cs *statsapi.ContainerStats, cs *statsapi.ContainerStats,
) { ) {
// try get cpu and memory stats from cadvisor first. // try get swap stats from cadvisor first.
podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos) podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if podCgroupInfo != nil { if podCgroupInfo != nil {
ps.Swap = cadvisorInfoToSwapStats(podCgroupInfo) ps.Swap = cadvisorInfoToSwapStats(podCgroupInfo)
return return
} }
// Sum Pod cpu and memory stats from containers stats. // Sum Pod swap stats from containers stats.
if cs.Swap != nil { if cs.Swap != nil {
if ps.Swap == nil { if ps.Swap == nil {
ps.Swap = &statsapi.SwapStats{Time: cs.Swap.Time} ps.Swap = &statsapi.SwapStats{Time: cs.Swap.Time}
@ -583,6 +589,30 @@ func (p *criStatsProvider) addSwapStats(
} }
} }
func (p *criStatsProvider) addIOStats(
ps *statsapi.PodStats,
podUID types.UID,
allInfos map[string]cadvisorapiv2.ContainerInfo,
cs *statsapi.ContainerStats,
) {
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletPSI) {
return
}
// try get IO stats from cadvisor first.
podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if podCgroupInfo != nil {
ps.IO = cadvisorInfoToIOStats(podCgroupInfo)
return
}
if cs.IO != nil {
if ps.IO == nil {
ps.IO = &statsapi.IOStats{Time: cs.IO.Time}
}
// Pod level PSI stats cannot be calculated from container level
}
}
func (p *criStatsProvider) addProcessStats( func (p *criStatsProvider) addProcessStats(
ps *statsapi.PodStats, ps *statsapi.PodStats,
container *cadvisorapiv2.ContainerInfo, container *cadvisorapiv2.ContainerInfo,
@ -624,6 +654,7 @@ func (p *criStatsProvider) makeContainerStats(
if usageNanoCores != nil { if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores result.CPU.UsageNanoCores = usageNanoCores
} }
result.CPU.PSI = makePSIStats(stats.Cpu.Psi)
} else { } else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano())) result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0) result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
@ -634,6 +665,7 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Memory.WorkingSetBytes != nil { if stats.Memory.WorkingSetBytes != nil {
result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
} }
result.Memory.PSI = makePSIStats(stats.Memory.Psi)
} else { } else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano())) result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = uint64Ptr(0) result.Memory.WorkingSetBytes = uint64Ptr(0)
@ -651,6 +683,15 @@ func (p *criStatsProvider) makeContainerStats(
result.Swap.SwapUsageBytes = uint64Ptr(0) result.Swap.SwapUsageBytes = uint64Ptr(0)
result.Swap.SwapAvailableBytes = uint64Ptr(0) result.Swap.SwapAvailableBytes = uint64Ptr(0)
} }
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPSI) {
result.IO = &statsapi.IOStats{}
if stats.Io != nil {
result.IO.Time = metav1.NewTime(time.Unix(0, stats.Io.Timestamp))
result.IO.PSI = makePSIStats(stats.Io.Psi)
} else {
result.IO.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
}
}
if stats.WritableLayer != nil { if stats.WritableLayer != nil {
result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp)) result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
if stats.WritableLayer.UsedBytes != nil { if stats.WritableLayer.UsedBytes != nil {
@ -714,6 +755,7 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if usageNanoCores != nil { if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores result.CPU.UsageNanoCores = usageNanoCores
} }
result.CPU.PSI = makePSIStats(stats.Cpu.Psi)
} else { } else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano())) result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0) result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
@ -724,6 +766,7 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if stats.Memory.WorkingSetBytes != nil { if stats.Memory.WorkingSetBytes != nil {
result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
} }
result.Memory.PSI = makePSIStats(stats.Memory.Psi)
} else { } else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano())) result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = uint64Ptr(0) result.Memory.WorkingSetBytes = uint64Ptr(0)
@ -732,6 +775,33 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result return result
} }
func makePSIStats(stats *runtimeapi.PsiStats) *statsapi.PSIStats {
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletPSI) {
return nil
}
if stats == nil {
return nil
}
result := &statsapi.PSIStats{}
if stats.Full != nil {
result.Full = statsapi.PSIData{
Total: stats.Full.Total,
Avg10: stats.Full.Avg10,
Avg60: stats.Full.Avg60,
Avg300: stats.Full.Avg300,
}
}
if stats.Some != nil {
result.Some = statsapi.PSIData{
Total: stats.Some.Total,
Avg10: stats.Some.Avg10,
Avg60: stats.Some.Avg60,
Avg300: stats.Some.Avg300,
}
}
return result
}
// getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported // getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
// by the CRI. If it is unable to, it gets the information from the cache instead. // by the CRI. If it is unable to, it gets the information from the cache instead.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
@ -910,6 +980,13 @@ func (p *criStatsProvider) addCadvisorContainerStats(
if swap != nil { if swap != nil {
cs.Swap = swap cs.Swap = swap
} }
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPSI) {
io := cadvisorInfoToIOStats(caPodStats)
if io != nil {
cs.IO = io
}
}
} }
func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats( func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats(

View File

@ -25,8 +25,10 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
) )
func (p *criStatsProvider) addCRIPodContainerStats(criSandboxStat *runtimeapi.PodSandboxStats, func (p *criStatsProvider) addCRIPodContainerStats(criSandboxStat *runtimeapi.PodSandboxStats,
@ -79,6 +81,7 @@ func addCRIPodMemoryStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandb
RSSBytes: valueOfUInt64Value(criMemory.RssBytes), RSSBytes: valueOfUInt64Value(criMemory.RssBytes),
PageFaults: valueOfUInt64Value(criMemory.PageFaults), PageFaults: valueOfUInt64Value(criMemory.PageFaults),
MajorPageFaults: valueOfUInt64Value(criMemory.MajorPageFaults), MajorPageFaults: valueOfUInt64Value(criMemory.MajorPageFaults),
PSI: makePSIStats(criMemory.Psi),
} }
} }
@ -91,6 +94,21 @@ func addCRIPodCPUStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxS
Time: metav1.NewTime(time.Unix(0, criCPU.Timestamp)), Time: metav1.NewTime(time.Unix(0, criCPU.Timestamp)),
UsageNanoCores: valueOfUInt64Value(criCPU.UsageNanoCores), UsageNanoCores: valueOfUInt64Value(criCPU.UsageNanoCores),
UsageCoreNanoSeconds: valueOfUInt64Value(criCPU.UsageCoreNanoSeconds), UsageCoreNanoSeconds: valueOfUInt64Value(criCPU.UsageCoreNanoSeconds),
PSI: makePSIStats(criCPU.Psi),
}
}
func addCRIPodIOStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletPSI) {
return
}
if criPodStat == nil || criPodStat.Linux == nil || criPodStat.Linux.Io == nil {
return
}
criIO := criPodStat.Linux.Io
ps.IO = &statsapi.IOStats{
Time: metav1.NewTime(time.Unix(0, criIO.Timestamp)),
PSI: makePSIStats(criIO.Psi),
} }
} }

View File

@ -50,3 +50,6 @@ func addCRIPodCPUStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxS
func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
} }
func addCRIPodIOStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
}

View File

@ -236,6 +236,9 @@ func addCRIPodMemoryStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandb
} }
} }
func addCRIPodIOStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
}
func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) { func addCRIPodProcessStats(ps *statsapi.PodStats, criPodStat *runtimeapi.PodSandboxStats) {
if criPodStat == nil || criPodStat.Windows == nil || criPodStat.Windows.Process == nil { if criPodStat == nil || criPodStat.Windows == nil || criPodStat.Windows.Process == nil {
return return