New histogram: Pod start SLI duration

This commit is contained in:
Artur Żyliński 2022-07-15 14:39:48 +02:00
parent 5539a5b80f
commit 9f31669a53
19 changed files with 570 additions and 54 deletions

View File

@ -99,6 +99,7 @@ import (
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/pkg/util/flock"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -759,6 +760,10 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
}
if kubeDeps.PodStartupLatencyTracker == nil {
kubeDeps.PodStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
}
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
@ -30,6 +31,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/config"
)
@ -68,9 +70,9 @@ type PodConfig struct {
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, podStartupLatencyTracker *util.PodStartupLatencyTracker) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
storage := newPodStorage(updates, mode, recorder, podStartupLatencyTracker)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
@ -132,18 +134,21 @@ type podStorage struct {
// the EventRecorder to use
recorder record.EventRecorder
podStartupLatencyTracker *util.PodStartupLatencyTracker
}
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, podStartupLatencyTracker *util.PodStartupLatencyTracker) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
podStartupLatencyTracker: podStartupLatencyTracker,
}
}
@ -235,6 +240,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
// ignore static pods
if !kubetypes.IsStaticPod(ref) {
s.podStartupLatencyTracker.ObservedPodOnWatch(ref, time.Now())
}
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/securitycontext"
)
@ -90,7 +91,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), util.NewPodStartupLatencyTracker())
channel := config.Channel(ctx, TestSource)
ch := config.Updates()
return channel, ch, config
@ -461,7 +462,7 @@ func TestPodUpdateLabels(t *testing.T) {
func TestPodConfigRace(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), util.NewPodStartupLatencyTracker())
seenSources := sets.NewString(TestSource)
var wg sync.WaitGroup
const iterations = 100

View File

@ -29,6 +29,7 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
)
// imageManager provides the functionalities for image pulling.
@ -38,12 +39,14 @@ type imageManager struct {
backOff *flowcontrol.Backoff
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
puller imagePuller
podStartupLatencyTracker *kubeletutil.PodStartupLatencyTracker
}
var _ ImageManager = &imageManager{}
// NewImageManager instantiates a new ImageManager object.
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int) ImageManager {
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int, podStartupLatencyTracker *kubeletutil.PodStartupLatencyTracker) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller
@ -53,10 +56,11 @@ func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.I
puller = newParallelImagePuller(imageService)
}
return &imageManager{
recorder: recorder,
imageService: imageService,
backOff: imageBackOff,
puller: puller,
recorder: recorder,
imageService: imageService,
backOff: imageBackOff,
puller: puller,
podStartupLatencyTracker: podStartupLatencyTracker,
}
}
@ -138,6 +142,7 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
return "", msg, ErrImagePullBackOff
}
m.podStartupLatencyTracker.RecordImageStartedPulling(pod.UID)
m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info)
startTime := time.Now()
pullChan := make(chan pullResult)
@ -153,6 +158,7 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
return "", imagePullResult.err.Error(), ErrImagePull
}
m.podStartupLatencyTracker.RecordImageFinishedPulling(pod.UID)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v", container.Image, time.Since(startTime)), klog.Info)
m.backOff.GC()
return imagePullResult.imageRef, "", nil

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
testingclock "k8s.io/utils/clock/testing"
)
@ -176,7 +177,9 @@ func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fake
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst)
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst, podStartupLatencyTracker)
return
}

View File

@ -222,28 +222,29 @@ type Dependencies struct {
Options []Option
// Injected Dependencies
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
EventClient v1core.EventsGetter
HeartbeatClient clientset.Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
Mounter mount.Interface
HostUtil hostutil.HostUtils
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
ProbeManager prober.Manager
Recorder record.EventRecorder
Subpather subpath.Interface
TracerProvider trace.TracerProvider
VolumePlugins []volume.VolumePlugin
DynamicPluginProber volume.DynamicPluginProber
TLSOptions *server.TLSOptions
RemoteRuntimeService internalapi.RuntimeService
RemoteImageService internalapi.ImageManagerService
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
EventClient v1core.EventsGetter
HeartbeatClient clientset.Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
Mounter mount.Interface
HostUtil hostutil.HostUtils
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
ProbeManager prober.Manager
Recorder record.EventRecorder
Subpather subpath.Interface
TracerProvider trace.TracerProvider
VolumePlugins []volume.VolumePlugin
DynamicPluginProber volume.DynamicPluginProber
TLSOptions *server.TLSOptions
RemoteRuntimeService internalapi.RuntimeService
RemoteImageService internalapi.ImageManagerService
PodStartupLatencyTracker *util.PodStartupLatencyTracker
// remove it after cadvisor.UsingLegacyCadvisorStats dropped.
useLegacyCadvisorStats bool
}
@ -261,7 +262,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
@ -593,7 +594,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
@ -657,6 +658,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
kubeDeps.PodStartupLatencyTracker,
)
if err != nil {
return nil, err

View File

@ -72,6 +72,7 @@ import (
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
"k8s.io/kubernetes/pkg/kubelet/token"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -245,7 +246,8 @@ func newTestKubeletWithImageList(
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
kubelet.containerRuntime = fakeRuntime
kubelet.runtimeCache = containertest.NewFakeRuntimeCache(kubelet.containerRuntime)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
@ -123,6 +124,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
false,
0, // Disable image pull throttling by setting QPS to 0,
0,
util.NewPodStartupLatencyTracker(),
)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(
&fakeHTTP{},

View File

@ -54,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
sc "k8s.io/kubernetes/pkg/securitycontext"
@ -194,6 +195,7 @@ func NewKubeGenericRuntimeManager(
memorySwapBehavior string,
getNodeAllocatable func() v1.ResourceList,
memoryThrottlingFactor float64,
podStartupLatencyTracker *util.PodStartupLatencyTracker,
) (KubeGenericRuntime, error) {
runtimeService = newInstrumentedRuntimeService(runtimeService)
imageService = newInstrumentedImageManagerService(imageService)
@ -264,7 +266,8 @@ func NewKubeGenericRuntimeManager(
imageBackOff,
serializeImagePulls,
imagePullQPS,
imagePullBurst)
imagePullBurst,
podStartupLatencyTracker)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider

View File

@ -35,6 +35,7 @@ const (
NodeLabelKey = "node"
PodWorkerDurationKey = "pod_worker_duration_seconds"
PodStartDurationKey = "pod_start_duration_seconds"
PodStartSLIDurationKey = "pod_start_sli_duration_seconds"
CgroupManagerOperationsKey = "cgroup_manager_duration_seconds"
PodWorkerStartDurationKey = "pod_worker_start_duration_seconds"
PodStatusSyncDurationKey = "pod_status_sync_duration_seconds"
@ -136,6 +137,24 @@ var (
StabilityLevel: metrics.ALPHA,
},
)
// PodStartSLIDuration is a Histogram that tracks the duration (in seconds) it takes for a single pod to run,
// excluding the time for image pulling. This metric should reflect the "Pod startup latency SLI" definition
// ref: https://github.com/kubernetes/community/blob/master/sig-scalability/slos/pod_startup_latency.md
//
// The histogram bucket boundaries for pod startup latency metrics, measured in seconds. These are hand-picked
// so as to be roughly exponential but still round numbers in everyday units. This is to minimise the number
// of buckets while allowing accurate measurement of thresholds which might be used in SLOs
// e.g. x% of pods start up within 30 seconds, or 15 minutes, etc.
PodStartSLIDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: KubeletSubsystem,
Name: PodStartSLIDurationKey,
Help: "Duration in seconds to start a pod, excluding time to pull images and run init containers, measured from pod creation timestamp to when all its containers are reported as started and observed via watch",
Buckets: []float64{0.5, 1, 2, 3, 4, 5, 6, 8, 10, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600},
StabilityLevel: metrics.ALPHA,
},
[]string{},
)
// CgroupManagerDuration is a Histogram that tracks the duration (in seconds) it takes for cgroup manager operations to complete.
// Broken down by method.
CgroupManagerDuration = metrics.NewHistogramVec(
@ -517,6 +536,7 @@ func Register(collectors ...metrics.StableCollector) {
legacyregistry.MustRegister(NodeName)
legacyregistry.MustRegister(PodWorkerDuration)
legacyregistry.MustRegister(PodStartDuration)
legacyregistry.MustRegister(PodStartSLIDuration)
legacyregistry.MustRegister(CgroupManagerDuration)
legacyregistry.MustRegister(PodWorkerStartDuration)
legacyregistry.MustRegister(PodStatusSyncDuration)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/utils/exec"
)
@ -105,10 +106,11 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
func newTestManager() *manager {
podManager := kubepod.NewBasicPodManager(nil, nil, nil)
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
)
@ -150,7 +151,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -71,6 +72,7 @@ func TestRunOnce(t *testing.T) {
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
fakeRuntime := &containertest.FakeRuntime{}
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
basePath, err := utiltesting.MkTmpdir("kubelet")
if err != nil {
t.Fatalf("can't make a temp rootdir %v", err)
@ -81,7 +83,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}),
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
podManager: podManager,
podWorkers: &fakePodWorkers{},
os: &containertest.FakeOS{},

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
statusutil "k8s.io/kubernetes/pkg/util/pod"
)
@ -75,6 +76,8 @@ type manager struct {
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyTracker *util.PodStartupLatencyTracker
}
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
@ -124,14 +127,15 @@ type Manager interface {
const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyTracker *util.PodStartupLatencyTracker) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety,
kubeClient: kubeClient,
podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety,
podStartupLatencyTracker: podStartupLatencyTracker,
}
}
@ -599,6 +603,7 @@ func (m *manager) deletePodStatus(uid types.UID) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
delete(m.podStatuses, uid)
m.podStartupLatencyTracker.DeletePodStartupState(uid)
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
@ -710,6 +715,8 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
} else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod
// We pass a new object (result of API call which contains updated ResourceVersion)
m.podStartupLatencyTracker.RecordStatusUpdated(pod)
}
// measure how long the status update took to propagate from generation to update on the server

View File

@ -46,6 +46,7 @@ import (
kubesecret "k8s.io/kubernetes/pkg/kubelet/secret"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// Generate new instance of test pod with the same initial value.
@ -84,7 +85,8 @@ func (m *manager) testSyncBatch() {
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
podManager.AddPod(getTestPod())
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
}
func generateRandomMessage() string {
@ -958,7 +960,8 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status)

View File

@ -0,0 +1,178 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/utils/clock"
)
// PodStartupLatencyTracker records key moments for startup latency calculation,
// e.g. image pulling or pod observed running on watch.
type PodStartupLatencyTracker struct {
// protect against concurrent read and write on pods map
lock sync.Mutex
pods map[types.UID]*perPodState
// For testability
clock clock.Clock
}
type perPodState struct {
firstStartedPulling time.Time
lastFinishedPulling time.Time
// first time, when pod status changed into Running
observedRunningTime time.Time
// log, if pod latency was already Observed
metricRecorded bool
}
func NewPodStartupLatencyTracker() *PodStartupLatencyTracker {
return &PodStartupLatencyTracker{
pods: map[types.UID]*perPodState{},
clock: clock.RealClock{},
}
}
// ObservedPodOnWatch to be called from somewhere where we look for pods.
func (p *PodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {
p.lock.Lock()
defer p.lock.Unlock()
// if the pod is terminal, we do not have to track it anymore for startup
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
delete(p.pods, pod.UID)
return
}
state := p.pods[pod.UID]
if state == nil {
// create a new record for pod, only if it was not yet acknowledged by the Kubelet
// this is required, as we want to log metric only for those pods, that where scheduled
// after Kubelet started
if pod.Status.StartTime.IsZero() {
p.pods[pod.UID] = &perPodState{}
}
return
}
if state.observedRunningTime.IsZero() {
// skip, pod didn't start yet
return
}
if state.metricRecorded {
// skip, pod's latency already recorded
return
}
if hasPodStartedSLO(pod) {
podStartingDuration := when.Sub(pod.CreationTimestamp.Time)
imagePullingDuration := state.lastFinishedPulling.Sub(state.firstStartedPulling)
podStartSLOduration := (podStartingDuration - imagePullingDuration).Seconds()
klog.InfoS("Observed pod startup duration",
"pod", klog.KObj(pod),
"podStartSLOduration", podStartSLOduration,
"pod.CreationTimestamp", pod.CreationTimestamp.Time,
"firstStartedPulling", state.firstStartedPulling,
"lastFinishedPulling", state.lastFinishedPulling,
"observedRunningTime", state.observedRunningTime,
"watchObservedRunningTime", when)
metrics.PodStartSLIDuration.WithLabelValues().Observe(podStartSLOduration)
state.metricRecorded = true
}
}
func (p *PodStartupLatencyTracker) RecordImageStartedPulling(podUID types.UID) {
p.lock.Lock()
defer p.lock.Unlock()
state := p.pods[podUID]
if state == nil {
return
}
if state.firstStartedPulling.IsZero() {
state.firstStartedPulling = p.clock.Now()
}
}
func (p *PodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.UID) {
p.lock.Lock()
defer p.lock.Unlock()
state := p.pods[podUID]
if state == nil {
return
}
state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past.
}
func (p *PodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
state := p.pods[pod.UID]
if state == nil {
return
}
if state.metricRecorded {
// skip, pod latency already recorded
return
}
if !state.observedRunningTime.IsZero() {
// skip, pod already started
return
}
if hasPodStartedSLO(pod) {
klog.V(2).InfoS("Mark when the pod was running for the first time", "pod", klog.KObj(pod), "rv", pod.ResourceVersion)
state.observedRunningTime = p.clock.Now()
}
}
// hasPodStartedSLO, check if for given pod, each container has been started at least once
//
// This should reflect "Pod startup latency SLI" definition
// ref: https://github.com/kubernetes/community/blob/master/sig-scalability/slos/pod_startup_latency.md
func hasPodStartedSLO(pod *v1.Pod) bool {
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Running.StartedAt.IsZero() {
return false
}
}
return true
}
func (p *PodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.pods, podUID)
}

View File

@ -0,0 +1,266 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/kubelet/metrics"
testingclock "k8s.io/utils/clock/testing"
)
var frozenTime = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
const (
uid = "3c1df8a9-11a8-4791-aeae-184c18cca686"
metricsName = "kubelet_pod_start_sli_duration_seconds"
)
func TestNoEvents(t *testing.T) {
t.Run("metrics registered; no incoming events", func(t *testing.T) {
// expects no metrics in the output
wants := ""
metrics.Register()
tracker := NewPodStartupLatencyTracker()
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err)
}
assert.Empty(t, tracker.pods)
metrics.PodStartSLIDuration.Reset()
})
}
func TestPodsRunningBeforeKubeletStarted(t *testing.T) {
t.Run("pod was running for 10m before kubelet started", func(t *testing.T) {
// expects no metrics in the output
wants := ""
metrics.Register()
tracker := NewPodStartupLatencyTracker()
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err)
}
podStarted := &corev1.Pod{
Status: corev1.PodStatus{
StartTime: &metav1.Time{Time: frozenTime.Add(-10 * time.Minute)},
},
}
tracker.ObservedPodOnWatch(podStarted, frozenTime)
assert.Empty(t, tracker.pods)
metrics.PodStartSLIDuration.Reset()
})
}
func TestSinglePodOneImageDownloadRecorded(t *testing.T) {
t.Run("single pod; started in 3s, image pulling 100ms", func(t *testing.T) {
wants := `
# HELP kubelet_pod_start_sli_duration_seconds [ALPHA] Duration in seconds to start a pod, excluding time to pull images and run init containers, measured from pod creation timestamp to when all its containers are reported as started and observed via watch
# TYPE kubelet_pod_start_sli_duration_seconds histogram
kubelet_pod_start_sli_duration_seconds_bucket{le="0.5"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="1"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="2"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="3"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="4"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="5"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="6"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="8"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="10"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="20"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="30"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="45"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="60"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="120"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="180"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="240"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="300"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="360"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="480"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="600"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="900"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="1200"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="1800"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="2700"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="3600"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="+Inf"} 1
kubelet_pod_start_sli_duration_seconds_sum 2.9
kubelet_pod_start_sli_duration_seconds_count 1
`
fakeClock := testingclock.NewFakeClock(frozenTime)
metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker.clock = fakeClock
podInit := buildInitializingPod()
tracker.ObservedPodOnWatch(podInit, frozenTime)
// image pulling took 100ms
tracker.RecordImageStartedPulling(podInit.UID)
fakeClock.Step(time.Millisecond * 100)
tracker.RecordImageFinishedPulling(podInit.UID)
podStarted := buildRunningPod()
tracker.RecordStatusUpdated(podStarted)
// 3s later, observe the same pod on watch
tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*3))
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err)
}
// cleanup
tracker.DeletePodStartupState(podStarted.UID)
assert.Empty(t, tracker.pods)
metrics.PodStartSLIDuration.Reset()
})
}
func TestSinglePodMultipleDownloadsAndRestartsRecorded(t *testing.T) {
t.Run("single pod; started in 30s, image pulling between 10th and 20th seconds", func(t *testing.T) {
wants := `
# HELP kubelet_pod_start_sli_duration_seconds [ALPHA] Duration in seconds to start a pod, excluding time to pull images and run init containers, measured from pod creation timestamp to when all its containers are reported as started and observed via watch
# TYPE kubelet_pod_start_sli_duration_seconds histogram
kubelet_pod_start_sli_duration_seconds_bucket{le="0.5"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="1"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="2"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="3"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="4"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="5"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="6"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="8"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="10"} 0
kubelet_pod_start_sli_duration_seconds_bucket{le="20"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="30"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="45"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="60"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="120"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="180"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="240"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="300"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="360"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="480"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="600"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="900"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="1200"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="1800"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="2700"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="3600"} 1
kubelet_pod_start_sli_duration_seconds_bucket{le="+Inf"} 1
kubelet_pod_start_sli_duration_seconds_sum 20
kubelet_pod_start_sli_duration_seconds_count 1
`
fakeClock := testingclock.NewFakeClock(frozenTime)
metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker.clock = fakeClock
podInitializing := buildInitializingPod()
tracker.ObservedPodOnWatch(podInitializing, frozenTime)
// image pulling started at 10s and the last one finished at 30s
// first image starts pulling at 10s
fakeClock.SetTime(frozenTime.Add(time.Second * 10))
tracker.RecordImageStartedPulling(podInitializing.UID)
// second image starts pulling at 11s
fakeClock.SetTime(frozenTime.Add(time.Second * 11))
tracker.RecordImageStartedPulling(podInitializing.UID)
// third image starts pulling at 14s
fakeClock.SetTime(frozenTime.Add(time.Second * 14))
tracker.RecordImageStartedPulling(podInitializing.UID)
// first image finished pulling at 18s
fakeClock.SetTime(frozenTime.Add(time.Second * 18))
tracker.RecordImageFinishedPulling(podInitializing.UID)
// second and third finished pulling at 20s
fakeClock.SetTime(frozenTime.Add(time.Second * 20))
tracker.RecordImageFinishedPulling(podInitializing.UID)
// pod started
podStarted := buildRunningPod()
tracker.RecordStatusUpdated(podStarted)
// at 30s observe the same pod on watch
tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*30))
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err)
}
// any new pod observations should not impact the metrics, as the pod should be recorder only once
tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*150))
tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*200))
tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*250))
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err)
}
// cleanup
tracker.DeletePodStartupState(podStarted.UID)
assert.Empty(t, tracker.pods)
metrics.PodStartSLIDuration.Reset()
})
}
func buildInitializingPod() *corev1.Pod {
return buildPodWithStatus([]corev1.ContainerStatus{
{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "PodInitializing"}}},
})
}
func buildRunningPod() *corev1.Pod {
return buildPodWithStatus([]corev1.ContainerStatus{
{State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(frozenTime)}}},
})
}
func buildPodWithStatus(cs []corev1.ContainerStatus) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(uid),
CreationTimestamp: metav1.NewTime(frozenTime),
},
Status: corev1.PodStatus{
ContainerStatuses: cs,
},
}
}

View File

@ -46,6 +46,7 @@ var interestingKubeletMetrics = []string{
"kubelet_docker_operations_errors_total",
"kubelet_docker_operations_duration_seconds",
"kubelet_pod_start_duration_seconds",
"kubelet_pod_start_sli_duration_seconds",
"kubelet_pod_worker_duration_seconds",
"kubelet_pod_worker_start_duration_seconds",
}

View File

@ -44,6 +44,8 @@ const (
// Taken from k8s.io/kubernetes/pkg/kubelet/metrics
podStartDurationKey = "pod_start_duration_seconds"
// Taken from k8s.io/kubernetes/pkg/kubelet/metrics
PodStartSLIDurationKey = "pod_start_sli_duration_seconds"
// Taken from k8s.io/kubernetes/pkg/kubelet/metrics
cgroupManagerOperationsKey = "cgroup_manager_duration_seconds"
// Taken from k8s.io/kubernetes/pkg/kubelet/metrics
podWorkerStartDurationKey = "pod_worker_start_duration_seconds"
@ -175,6 +177,7 @@ func GetDefaultKubeletLatencyMetrics(ms KubeletMetrics) KubeletLatencyMetrics {
podWorkerDurationKey,
podWorkerStartDurationKey,
podStartDurationKey,
PodStartSLIDurationKey,
cgroupManagerOperationsKey,
dockerOperationsLatencyKey,
podWorkerStartDurationKey,