From 744f33d886a509ae714b5312a3d2f71b340481c9 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Tue, 24 Feb 2015 15:29:18 -0800 Subject: [PATCH] Adding sync pod latency metric. Latency is broken down by create, update, and sync pods. Part of #4604. --- pkg/kubelet/kubelet.go | 52 ++++++++++++++++++++++++++++------ pkg/kubelet/kubelet_test.go | 29 ++++++++++--------- pkg/kubelet/metrics/metrics.go | 38 ++++++++++++++++++++++++- pkg/kubelet/pod_workers.go | 34 +++++++++++++++------- 4 files changed, 120 insertions(+), 33 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 06625aee68f..c7a5e321b54 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,7 +61,10 @@ const podOomScoreAdj = -100 // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - SyncPods([]api.BoundPod) error + // Syncs current state to match the specified pods. SyncPodType specified what + // type of sync is occuring per pod. StartTime specifies the time at which + // syncing began (for use in monitoring). + SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error } type SourceReadyFn func(source string) bool @@ -942,7 +945,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { start := time.Now() defer func() { - metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())) + metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start)) }() if err := kl.dockerPuller.Pull(img); err != nil { @@ -1270,7 +1273,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { +func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { glog.V(4).Infof("Desired: %#v", pods) var err error desiredContainers := make(map[podContainer]empty) @@ -1296,7 +1299,9 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(*pod) + kl.podWorkers.UpdatePod(pod, func() { + metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) + }) } // Stop the workers for no-longer existing pods. @@ -1416,19 +1421,21 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { for { unsyncedPod := false + podSyncTypes := make(map[types.UID]metrics.SyncPodType) select { case u := <-updates: - kl.updatePods(u) + kl.updatePods(u, podSyncTypes) unsyncedPod = true case <-time.After(kl.resyncInterval): glog.V(4).Infof("Periodic sync") } + start := time.Now() // If we already caught some update, try to wait for some short time // to possibly batch it with other incoming updates. for unsyncedPod { select { case u := <-updates: - kl.updatePods(u) + kl.updatePods(u, podSyncTypes) case <-time.After(5 * time.Millisecond): // Break the for loop. unsyncedPod = false @@ -1440,25 +1447,54 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Errorf("Failed to get bound pods.") return } - if err := handler.SyncPods(pods); err != nil { + if err := handler.SyncPods(pods, podSyncTypes, start); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } } } -func (kl *Kubelet) updatePods(u PodUpdate) { +// Updated the Kubelet's internal pods with those provided by the update. +// Records new and updated pods in newPods and updatedPods. +func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { switch u.Op { case SET: glog.V(3).Infof("SET: Containers changed") + + // Store the new pods. Don't worry about filtering host ports since those + // pods will never be looked up. + existingPods := make(map[types.UID]struct{}) + for i := range kl.pods { + existingPods[kl.pods[i].UID] = struct{}{} + } + for i := range u.Pods { + if _, ok := existingPods[u.Pods[i].UID]; !ok { + podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate + } + } + kl.pods = u.Pods kl.pods = filterHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") + + // Store the updated pods. Don't worry about filtering host ports since those + // pods will never be looked up. + for i := range u.Pods { + podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate + } + kl.pods = updateBoundPods(u.Pods, kl.pods) kl.pods = filterHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } + + // Mark all remaining pods as sync. + for i := range kl.pods { + if _, ok := podSyncTypes[kl.pods[i].UID]; !ok { + podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync + } + } } // Returns Docker version for this Kubelet. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ad110072e5b..805e45196ab 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,6 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -382,6 +383,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod { return cr.list } +var emptyPodUIDs map[types.UID]metrics.SyncPodType + func TestSyncPodsDoesNothing(t *testing.T) { kubelet, fakeDocker, waitGroup := newTestKubelet(t) container := api.Container{Name: "bar"} @@ -413,7 +416,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -444,7 +447,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -491,7 +494,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -542,7 +545,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -590,7 +593,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -645,7 +648,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -690,7 +693,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -728,7 +731,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -736,7 +739,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -787,7 +790,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -795,7 +798,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -833,7 +836,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]api.BoundPod{}) + err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2091,7 +2094,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, }, }, - }) + }, emptyPodUIDs, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 1ce8d29e2ab..bb6a5267822 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -35,8 +36,16 @@ var ( Help: "Image pull latency in microseconds.", }, ) + // TODO(vmarmol): Break down by number of containers in pod? + SyncPodLatency = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Subsystem: kubeletSubsystem, + Name: "sync_pod_latency_microseconds", + Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync", + }, + []string{"operation_type"}, + ) // TODO(vmarmol): Containers per pod - // TODO(vmarmol): Latency of pod startup // TODO(vmarmol): Latency of SyncPods ) @@ -47,10 +56,37 @@ func Register(containerCache dockertools.DockerCache) { // Register the metrics. registerMetrics.Do(func() { prometheus.MustRegister(ImagePullLatency) + prometheus.MustRegister(SyncPodLatency) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) }) } +type SyncPodType int + +const ( + SyncPodCreate SyncPodType = iota + SyncPodUpdate + SyncPodSync +) + +func (self SyncPodType) String() string { + switch self { + case SyncPodCreate: + return "create" + case SyncPodUpdate: + return "update" + case SyncPodSync: + return "sync" + default: + return "unknown" + } +} + +// Gets the time since the specified start in microseconds. +func SinceInMicroseconds(start time.Time) float64 { + return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +} + func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector { return &podAndContainerCollector{ containerCache: containerCache, diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index b8995ccaa1c..a4eab949799 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -35,7 +35,7 @@ type podWorkers struct { // Tracks all running per-pod goroutines - per-pod goroutine will be // processing updates received through its corresponding channel. - podUpdates map[types.UID]chan api.BoundPod + podUpdates map[types.UID]chan workUpdate // DockerCache is used for listing running containers. dockerCache dockertools.DockerCache @@ -45,16 +45,24 @@ type podWorkers struct { syncPodFun syncPodFunType } +type workUpdate struct { + // The pod state to reflect. + pod *api.BoundPod + + // Function to call when the update is complete. + updateCompleteFun func() +} + func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers { return &podWorkers{ - podUpdates: map[types.UID]chan api.BoundPod{}, + podUpdates: map[types.UID]chan workUpdate{}, dockerCache: dockerCache, syncPodFun: syncPodFun, } } -func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) { - for newPod := range podUpdates { +func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { + for newWork := range podUpdates { // Since we use docker cache, getting current state shouldn't cause // performance overhead on Docker. Moreover, as long as we run syncPod // no matter if it changes anything, having an old version of "containers" @@ -64,29 +72,33 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) { glog.Errorf("Error listing containers while syncing pod: %v", err) continue } - err = p.syncPodFun(&newPod, containers) + err = p.syncPodFun(newWork.pod, containers) if err != nil { - glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err) - record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err) + glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) + record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) continue } } } -func (p *podWorkers) UpdatePod(pod api.BoundPod) { +// Apply the new setting to the specified pod. updateComplete is called when the update is completed. +func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) { uid := pod.UID - var podUpdates chan api.BoundPod + var podUpdates chan workUpdate var exists bool p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { // TODO(wojtek-t): Adjust the size of the buffer in this channel - podUpdates = make(chan api.BoundPod, 5) + podUpdates = make(chan workUpdate, 5) p.podUpdates[uid] = podUpdates go p.managePodLoop(podUpdates) } - podUpdates <- pod + podUpdates <- workUpdate{ + pod: pod, + updateCompleteFun: updateComplete, + } } func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {