Merge pull request #9457 from yujuhong/seen

Kubelet: record the initial pod processing latency
This commit is contained in:
Satnam Singh 2015-06-19 15:24:16 -07:00
commit 2012238204
6 changed files with 74 additions and 4 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
utilerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" utilerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@ -183,6 +184,11 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil return nil
} }
// recordFirstSeenTime records the first seen time of this pod.
func recordFirstSeenTime(pod *api.Pod) {
pod.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = kubeletTypes.NewTimestamp().GetString()
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) { func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) {
s.podLock.Lock() s.podLock.Lock()
defer s.podLock.Unlock() defer s.podLock.Unlock()
@ -223,6 +229,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations = make(map[string]string) ref.Annotations = make(map[string]string)
} }
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
recordFirstSeenTime(ref)
pods[name] = ref pods[name] = ref
adds.Pods = append(adds.Pods, ref) adds.Pods = append(adds.Pods, ref)
} }
@ -265,6 +272,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations = make(map[string]string) ref.Annotations = make(map[string]string)
} }
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
recordFirstSeenTime(ref)
pods[name] = ref pods[name] = ref
adds.Pods = append(adds.Pods, ref) adds.Pods = append(adds.Pods, ref)
} }

View File

@ -91,6 +91,14 @@ func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kube
for i := range expected { for i := range expected {
update := <-ch update := <-ch
sort.Sort(sortedPods(update.Pods)) sort.Sort(sortedPods(update.Pods))
// Clear the annotation field before the comparision.
// TODO: consider mock out recordFirstSeen in config.go
for _, pod := range update.Pods {
delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey)
}
for _, pod := range expected[i].Pods {
delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey)
}
if !api.Semantic.DeepEqual(expected[i], update) { if !api.Semantic.DeepEqual(expected[i], update) {
t.Fatalf("Expected %#v, Got %#v", expected[i], update) t.Fatalf("Expected %#v, Got %#v", expected[i], update)
} }

View File

@ -1098,6 +1098,12 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
start := time.Now() start := time.Now()
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
} else {
firstSeenTime = kubeletTypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// Before returning, regenerate status and store it in the cache. // Before returning, regenerate status and store it in the cache.
defer func() { defer func() {
@ -1115,9 +1121,9 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
podToUpdate = mirrorPod podToUpdate = mirrorPod
} }
existingStatus, ok := kl.statusManager.GetPodStatus(podFullName) existingStatus, ok := kl.statusManager.GetPodStatus(podFullName)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning { if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
// TODO: Check the pod annotation instead of using `start` !firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(start)) metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} }
kl.statusManager.SetPodStatus(podToUpdate, status) kl.statusManager.SetPodStatus(podToUpdate, status)
} }
@ -1175,6 +1181,12 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
var podStatus api.PodStatus var podStatus api.PodStatus
if updateType == SyncPodCreate { if updateType == SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if !firstSeenTime.IsZero() {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
podStatus = pod.Status podStatus = pod.Status
podStatus.StartTime = &util.Time{start} podStatus.StartTime = &util.Time{start}
kl.statusManager.SetPodStatus(pod, podStatus) kl.statusManager.SetPodStatus(pod, podStatus)

View File

@ -34,6 +34,7 @@ const (
ContainerManagerOperationsKey = "container_manager_latency_microseconds" ContainerManagerOperationsKey = "container_manager_latency_microseconds"
DockerOperationsKey = "docker_operations_latency_microseconds" DockerOperationsKey = "docker_operations_latency_microseconds"
DockerErrorsKey = "docker_errors" DockerErrorsKey = "docker_errors"
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
) )
var ( var (
@ -81,6 +82,13 @@ var (
}, },
[]string{"operation_type"}, []string{"operation_type"},
) )
PodWorkerStartLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: PodWorkerStartLatencyKey,
Help: "Latency in microseconds from seeing a pod to starting a worker.",
},
)
DockerOperationsLatency = prometheus.NewSummaryVec( DockerOperationsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
Subsystem: KubeletSubsystem, Subsystem: KubeletSubsystem,
@ -111,6 +119,7 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(DockerOperationsLatency) prometheus.MustRegister(DockerOperationsLatency)
prometheus.MustRegister(ContainerManagerLatency) prometheus.MustRegister(ContainerManagerLatency)
prometheus.MustRegister(SyncPodsLatency) prometheus.MustRegister(SyncPodsLatency)
prometheus.MustRegister(PodWorkerStartLatency)
prometheus.MustRegister(ContainersPerPodCount) prometheus.MustRegister(ContainersPerPodCount)
prometheus.MustRegister(DockerErrors) prometheus.MustRegister(DockerErrors)
prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(newPodAndContainerCollector(containerCache))

View File

@ -24,6 +24,7 @@ import (
const ConfigSourceAnnotationKey = "kubernetes.io/config.source" const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror" const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror"
const ConfigFirstSeenAnnotationKey = "kubernetes.io/config.seen"
// PodOperation defines what changes will be made on a pod configuration. // PodOperation defines what changes will be made on a pod configuration.
type PodOperation int type PodOperation int

View File

@ -16,7 +16,10 @@ limitations under the License.
package types package types
import "net/http" import (
"net/http"
"time"
)
// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string type DockerID string
@ -24,3 +27,32 @@ type DockerID string
type HttpGetter interface { type HttpGetter interface {
Get(url string) (*http.Response, error) Get(url string) (*http.Response, error)
} }
// Timestamp wraps around time.Time and offers utilities to format and parse
// the time using RFC3339Nano
type Timestamp struct {
time time.Time
}
// NewTimestamp returns a Timestamp object using the current time.
func NewTimestamp() *Timestamp {
return &Timestamp{time.Now()}
}
// ConvertToTimestamp takes a string, parses it using the RFC3339Nano layout,
// and converts it to a Timestamp object.
func ConvertToTimestamp(timeString string) *Timestamp {
parsed, _ := time.Parse(time.RFC3339Nano, timeString)
return &Timestamp{parsed}
}
// Get returns the time as time.Time.
func (t *Timestamp) Get() time.Time {
return t.time
}
// GetString returns the time in the string format using the RFC3339Nano
// layout.
func (t *Timestamp) GetString() string {
return t.time.Format(time.RFC3339Nano)
}