Make the interface local to each package

This commit is contained in:
Artur Żyliński 2022-10-18 11:32:16 +02:00
parent 9f31669a53
commit b0fac15cd6
10 changed files with 106 additions and 63 deletions

View File

@ -31,7 +31,6 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/config" "k8s.io/kubernetes/pkg/util/config"
) )
@ -53,6 +52,10 @@ const (
PodConfigNotificationIncremental PodConfigNotificationIncremental
) )
type podStartupSLIObserver interface {
ObservedPodOnWatch(pod *v1.Pod, when time.Time)
}
// PodConfig is a configuration mux that merges many sources of pod configuration into a single // PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners // consistent structure, and then delivers incremental change notifications to listeners
// in order. // in order.
@ -70,9 +73,9 @@ type PodConfig struct {
// NewPodConfig creates an object that can merge many configuration sources into a stream // NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration. // of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, podStartupLatencyTracker *util.PodStartupLatencyTracker) *PodConfig { func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50) updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder, podStartupLatencyTracker) storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
podConfig := &PodConfig{ podConfig := &PodConfig{
pods: storage, pods: storage,
mux: config.NewMux(storage), mux: config.NewMux(storage),
@ -135,20 +138,20 @@ type podStorage struct {
// the EventRecorder to use // the EventRecorder to use
recorder record.EventRecorder recorder record.EventRecorder
podStartupLatencyTracker *util.PodStartupLatencyTracker startupSLIObserver podStartupSLIObserver
} }
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners. // in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version. // TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, podStartupLatencyTracker *util.PodStartupLatencyTracker) *podStorage { func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage {
return &podStorage{ return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod), pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode, mode: mode,
updates: updates, updates: updates,
sourcesSeen: sets.String{}, sourcesSeen: sets.String{},
recorder: recorder, recorder: recorder,
podStartupLatencyTracker: podStartupLatencyTracker, startupSLIObserver: startupSLIObserver,
} }
} }
@ -242,7 +245,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
// ignore static pods // ignore static pods
if !kubetypes.IsStaticPod(ref) { if !kubetypes.IsStaticPod(ref) {
s.podStartupLatencyTracker.ObservedPodOnWatch(ref, time.Now()) s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now())
} }
if existing, found := oldPods[ref.UID]; found { if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing pods[ref.UID] = existing

View File

@ -34,7 +34,6 @@ import (
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
) )
@ -62,6 +61,10 @@ func (s sortedPods) Less(i, j int) bool {
return s[i].Namespace < s[j].Namespace return s[i].Namespace < s[j].Namespace
} }
type mockPodStartupSLIObserver struct{}
func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {}
func CreateValidPod(name, namespace string) *v1.Pod { func CreateValidPod(name, namespace string) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -91,7 +94,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), util.NewPodStartupLatencyTracker()) config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
channel := config.Channel(ctx, TestSource) channel := config.Channel(ctx, TestSource)
ch := config.Updates() ch := config.Updates()
return channel, ch, config return channel, ch, config
@ -462,7 +465,7 @@ func TestPodUpdateLabels(t *testing.T) {
func TestPodConfigRace(t *testing.T) { func TestPodConfigRace(t *testing.T) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), util.NewPodStartupLatencyTracker()) config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
seenSources := sets.NewString(TestSource) seenSources := sets.NewString(TestSource)
var wg sync.WaitGroup var wg sync.WaitGroup
const iterations = 100 const iterations = 100

View File

@ -22,6 +22,7 @@ import (
dockerref "github.com/docker/distribution/reference" dockerref "github.com/docker/distribution/reference"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -29,9 +30,13 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
) )
type ImagePodPullingTimeRecorder interface {
RecordImageStartedPulling(podUID types.UID)
RecordImageFinishedPulling(podUID types.UID)
}
// imageManager provides the functionalities for image pulling. // imageManager provides the functionalities for image pulling.
type imageManager struct { type imageManager struct {
recorder record.EventRecorder recorder record.EventRecorder
@ -40,13 +45,13 @@ type imageManager struct {
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly. // It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
puller imagePuller puller imagePuller
podStartupLatencyTracker *kubeletutil.PodStartupLatencyTracker podPullingTimeRecorder ImagePodPullingTimeRecorder
} }
var _ ImageManager = &imageManager{} var _ ImageManager = &imageManager{}
// NewImageManager instantiates a new ImageManager object. // NewImageManager instantiates a new ImageManager object.
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int, podStartupLatencyTracker *kubeletutil.PodStartupLatencyTracker) ImageManager { func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst) imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller var puller imagePuller
@ -56,11 +61,11 @@ func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.I
puller = newParallelImagePuller(imageService) puller = newParallelImagePuller(imageService)
} }
return &imageManager{ return &imageManager{
recorder: recorder, recorder: recorder,
imageService: imageService, imageService: imageService,
backOff: imageBackOff, backOff: imageBackOff,
puller: puller, puller: puller,
podStartupLatencyTracker: podStartupLatencyTracker, podPullingTimeRecorder: podPullingTimeRecorder,
} }
} }
@ -142,7 +147,7 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info) m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
return "", msg, ErrImagePullBackOff return "", msg, ErrImagePullBackOff
} }
m.podStartupLatencyTracker.RecordImageStartedPulling(pod.UID) m.podPullingTimeRecorder.RecordImageStartedPulling(pod.UID)
m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info) m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info)
startTime := time.Now() startTime := time.Now()
pullChan := make(chan pullResult) pullChan := make(chan pullResult)
@ -158,7 +163,7 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
return "", imagePullResult.err.Error(), ErrImagePull return "", imagePullResult.err.Error(), ErrImagePull
} }
m.podStartupLatencyTracker.RecordImageFinishedPulling(pod.UID) m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v", container.Image, time.Since(startTime)), klog.Info) m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v", container.Image, time.Since(startTime)), klog.Info)
m.backOff.GC() m.backOff.GC()
return imagePullResult.imageRef, "", nil return imagePullResult.imageRef, "", nil

View File

@ -24,11 +24,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -159,6 +159,12 @@ func pullerTestCases() []pullerTestCase {
} }
} }
type mockPodPullingTimeRecorder struct{}
func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) {}
func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) { func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) {
container = &v1.Container{ container = &v1.Container{
Name: "container_name", Name: "container_name",
@ -177,9 +183,7 @@ func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fake
fakeRuntime.Err = c.pullerErr fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr fakeRuntime.InspectErr = c.inspectErr
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst, &mockPodPullingTimeRecorder{})
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst, podStartupLatencyTracker)
return return
} }

View File

@ -244,7 +244,7 @@ type Dependencies struct {
TLSOptions *server.TLSOptions TLSOptions *server.TLSOptions
RemoteRuntimeService internalapi.RuntimeService RemoteRuntimeService internalapi.RuntimeService
RemoteImageService internalapi.ImageManagerService RemoteImageService internalapi.ImageManagerService
PodStartupLatencyTracker *util.PodStartupLatencyTracker PodStartupLatencyTracker util.PodStartupLatencyTracker
// remove it after cadvisor.UsingLegacyCadvisorStats dropped. // remove it after cadvisor.UsingLegacyCadvisorStats dropped.
useLegacyCadvisorStats bool useLegacyCadvisorStats bool
} }

View File

@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/logs"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util"
) )
const ( const (
@ -83,6 +82,12 @@ func (f *fakePodStateProvider) ShouldPodContentBeRemoved(uid types.UID) bool {
return found return found
} }
type fakePodPullingTimeRecorder struct{}
func (f *fakePodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) {}
func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) { func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
recorder := &record.FakeRecorder{} recorder := &record.FakeRecorder{}
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2) logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
@ -124,7 +129,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
false, false,
0, // Disable image pull throttling by setting QPS to 0, 0, // Disable image pull throttling by setting QPS to 0,
0, 0,
util.NewPodStartupLatencyTracker(), &fakePodPullingTimeRecorder{},
) )
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner( kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(
&fakeHTTP{}, &fakeHTTP{},

View File

@ -54,7 +54,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/sysctl" "k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
sc "k8s.io/kubernetes/pkg/securitycontext" sc "k8s.io/kubernetes/pkg/securitycontext"
@ -195,7 +194,7 @@ func NewKubeGenericRuntimeManager(
memorySwapBehavior string, memorySwapBehavior string,
getNodeAllocatable func() v1.ResourceList, getNodeAllocatable func() v1.ResourceList,
memoryThrottlingFactor float64, memoryThrottlingFactor float64,
podStartupLatencyTracker *util.PodStartupLatencyTracker, podPullingTimeRecorder images.ImagePodPullingTimeRecorder,
) (KubeGenericRuntime, error) { ) (KubeGenericRuntime, error) {
runtimeService = newInstrumentedRuntimeService(runtimeService) runtimeService = newInstrumentedRuntimeService(runtimeService)
imageService = newInstrumentedImageManagerService(imageService) imageService = newInstrumentedImageManagerService(imageService)
@ -267,7 +266,7 @@ func NewKubeGenericRuntimeManager(
serializeImagePulls, serializeImagePulls,
imagePullQPS, imagePullQPS,
imagePullBurst, imagePullBurst,
podStartupLatencyTracker) podPullingTimeRecorder)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder) kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager) kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider kubeRuntimeManager.podStateProvider = podStateProvider

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
statusutil "k8s.io/kubernetes/pkg/util/pod" statusutil "k8s.io/kubernetes/pkg/util/pod"
) )
@ -77,7 +76,7 @@ type manager struct {
apiStatusVersions map[kubetypes.MirrorPodUID]uint64 apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyTracker *util.PodStartupLatencyTracker podStartupLatencyHelper PodStartupLatencyStateHelper
} }
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components // PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
@ -96,6 +95,11 @@ type PodDeletionSafetyProvider interface {
PodCouldHaveRunningContainers(pod *v1.Pod) bool PodCouldHaveRunningContainers(pod *v1.Pod) bool
} }
type PodStartupLatencyStateHelper interface {
RecordStatusUpdated(pod *v1.Pod)
DeletePodStartupState(podUID types.UID)
}
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
// the latest v1.PodStatus. It also syncs updates back to the API server. // the latest v1.PodStatus. It also syncs updates back to the API server.
type Manager interface { type Manager interface {
@ -127,15 +131,15 @@ type Manager interface {
const syncPeriod = 10 * time.Second const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager. // NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyTracker *util.PodStartupLatencyTracker) Manager { func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper) Manager {
return &manager{ return &manager{
kubeClient: kubeClient, kubeClient: kubeClient,
podManager: podManager, podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus), podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety, podDeletionSafety: podDeletionSafety,
podStartupLatencyTracker: podStartupLatencyTracker, podStartupLatencyHelper: podStartupLatencyHelper,
} }
} }
@ -603,7 +607,7 @@ func (m *manager) deletePodStatus(uid types.UID) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
delete(m.podStatuses, uid) delete(m.podStatuses, uid)
m.podStartupLatencyTracker.DeletePodStartupState(uid) m.podStartupLatencyHelper.DeletePodStartupState(uid)
} }
// TODO(filipg): It'd be cleaner if we can do this without signal from user. // TODO(filipg): It'd be cleaner if we can do this without signal from user.
@ -716,7 +720,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod pod = newPod
// We pass a new object (result of API call which contains updated ResourceVersion) // We pass a new object (result of API call which contains updated ResourceVersion)
m.podStartupLatencyTracker.RecordStatusUpdated(pod) m.podStartupLatencyHelper.RecordStatusUpdated(pod)
} }
// measure how long the status update took to propagate from generation to update on the server // measure how long the status update took to propagate from generation to update on the server

View File

@ -29,7 +29,15 @@ import (
// PodStartupLatencyTracker records key moments for startup latency calculation, // PodStartupLatencyTracker records key moments for startup latency calculation,
// e.g. image pulling or pod observed running on watch. // e.g. image pulling or pod observed running on watch.
type PodStartupLatencyTracker struct { type PodStartupLatencyTracker interface {
ObservedPodOnWatch(pod *v1.Pod, when time.Time)
RecordImageStartedPulling(podUID types.UID)
RecordImageFinishedPulling(podUID types.UID)
RecordStatusUpdated(pod *v1.Pod)
DeletePodStartupState(podUID types.UID)
}
type basicPodStartupLatencyTracker struct {
// protect against concurrent read and write on pods map // protect against concurrent read and write on pods map
lock sync.Mutex lock sync.Mutex
pods map[types.UID]*perPodState pods map[types.UID]*perPodState
@ -46,15 +54,15 @@ type perPodState struct {
metricRecorded bool metricRecorded bool
} }
func NewPodStartupLatencyTracker() *PodStartupLatencyTracker { // NewPodStartupLatencyTracker creates an instance of PodStartupLatencyTracker
return &PodStartupLatencyTracker{ func NewPodStartupLatencyTracker() PodStartupLatencyTracker {
return &basicPodStartupLatencyTracker{
pods: map[types.UID]*perPodState{}, pods: map[types.UID]*perPodState{},
clock: clock.RealClock{}, clock: clock.RealClock{},
} }
} }
// ObservedPodOnWatch to be called from somewhere where we look for pods. func (p *basicPodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {
func (p *PodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -105,7 +113,7 @@ func (p *PodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when time.Tim
} }
} }
func (p *PodStartupLatencyTracker) RecordImageStartedPulling(podUID types.UID) { func (p *basicPodStartupLatencyTracker) RecordImageStartedPulling(podUID types.UID) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -119,7 +127,7 @@ func (p *PodStartupLatencyTracker) RecordImageStartedPulling(podUID types.UID) {
} }
} }
func (p *PodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.UID) { func (p *basicPodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.UID) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -131,7 +139,7 @@ func (p *PodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.UID)
state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past. state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past.
} }
func (p *PodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) { func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -151,7 +159,7 @@ func (p *PodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) {
} }
if hasPodStartedSLO(pod) { if hasPodStartedSLO(pod) {
klog.V(2).InfoS("Mark when the pod was running for the first time", "pod", klog.KObj(pod), "rv", pod.ResourceVersion) klog.V(3).InfoS("Mark when the pod was running for the first time", "pod", klog.KObj(pod), "rv", pod.ResourceVersion)
state.observedRunningTime = p.clock.Now() state.observedRunningTime = p.clock.Now()
} }
} }
@ -170,7 +178,7 @@ func hasPodStartedSLO(pod *v1.Pod) bool {
return true return true
} }
func (p *PodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) { func (p *basicPodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()

View File

@ -45,7 +45,10 @@ func TestNoEvents(t *testing.T) {
wants := "" wants := ""
metrics.Register() metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker := &basicPodStartupLatencyTracker{
pods: map[types.UID]*perPodState{},
}
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil { if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err) t.Fatal(err)
@ -63,7 +66,10 @@ func TestPodsRunningBeforeKubeletStarted(t *testing.T) {
wants := "" wants := ""
metrics.Register() metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker := &basicPodStartupLatencyTracker{
pods: map[types.UID]*perPodState{},
}
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil { if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil {
t.Fatal(err) t.Fatal(err)
@ -121,8 +127,11 @@ kubelet_pod_start_sli_duration_seconds_count 1
fakeClock := testingclock.NewFakeClock(frozenTime) fakeClock := testingclock.NewFakeClock(frozenTime)
metrics.Register() metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker.clock = fakeClock tracker := &basicPodStartupLatencyTracker{
pods: map[types.UID]*perPodState{},
clock: fakeClock,
}
podInit := buildInitializingPod() podInit := buildInitializingPod()
tracker.ObservedPodOnWatch(podInit, frozenTime) tracker.ObservedPodOnWatch(podInit, frozenTime)
@ -190,8 +199,11 @@ kubelet_pod_start_sli_duration_seconds_count 1
fakeClock := testingclock.NewFakeClock(frozenTime) fakeClock := testingclock.NewFakeClock(frozenTime)
metrics.Register() metrics.Register()
tracker := NewPodStartupLatencyTracker()
tracker.clock = fakeClock tracker := &basicPodStartupLatencyTracker{
pods: map[types.UID]*perPodState{},
clock: fakeClock,
}
podInitializing := buildInitializingPod() podInitializing := buildInitializingPod()
tracker.ObservedPodOnWatch(podInitializing, frozenTime) tracker.ObservedPodOnWatch(podInitializing, frozenTime)