diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d5bbf795b3d..c1ae3312209 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -115,8 +115,14 @@ const ( plegChannelCapacity = 1000 // Generic PLEG relies on relisting for discovering container events. - // The period directly affects the response time of kubelet. - plegRelistPeriod = time.Second * 3 + // A longer period means that kubelet will take longer to detect container + // changes and to update pod status. On the other hand, a shorter period + // will cause more frequent relisting (e.g., container runtime operations), + // leading to higher cpu usage. + // Note that even though we set the period to 1s, the relisting itself can + // take more than 1s to finish if the container runtime responds slowly + // and/or when there are many container changes in one cycle. + plegRelistPeriod = time.Second * 1 // backOffPeriod is the period to back off when pod syncing resulting in an // error. It is also used as the base period for the exponential backoff @@ -340,6 +346,8 @@ func NewMainKubelet( klet.livenessManager = proberesults.NewManager() + klet.podCache = kubecontainer.NewCache() + // Initialize the runtime. switch containerRuntime { case "docker": @@ -365,8 +373,6 @@ func NewMainKubelet( imageBackOff, serializeImagePulls, ) - - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -387,14 +393,13 @@ func NewMainKubelet( return nil, err } klet.containerRuntime = rktRuntime - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) - // No Docker daemon to put in a container. dockerDaemonContainer = "" default: return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible) // setup containerGC @@ -441,7 +446,7 @@ func NewMainKubelet( } klet.runtimeCache = runtimeCache klet.workQueue = queue.NewBasicWorkQueue() - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod) + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) @@ -578,6 +583,9 @@ type Kubelet struct { // Generates pod events. pleg pleg.PodLifecycleEventGenerator + // Store kubecontainer.PodStatus for all pods. + podCache kubecontainer.Cache + // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -1564,31 +1572,42 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { - start := kl.clock.Now() +// TODO: Remove runningPod from the arguments. +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { var firstSeenTime time.Time - if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok { - glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) - } else { + if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } - // Before returning, regenerate status and store it in the cache. - defer func() { - status, err := kl.generatePodStatus(pod) - if err != nil { - glog.Errorf("Unable to generate status for pod %q with error(%v)", format.Pod(pod), err) - // Propagate the error upstream. - syncErr = err + if updateType == kubetypes.SyncPodCreate { + if !firstSeenTime.IsZero() { + // This is the first time we are syncing the pod. Record the latency + // since kubelet first saw the pod if firstSeenTime is set. + metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } else { - existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) - if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning && - !firstSeenTime.IsZero() { - metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) - } - kl.statusManager.SetPodStatus(pod, status) + glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) } - }() + } + + // Query the container runtime (or cache) to retrieve the pod status, and + // update it in the status manager. + podStatus, statusErr := kl.getRuntimePodStatus(pod) + apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr) + if err != nil { + return err + } + // Record the time it takes for the pod to become running. + existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) + // TODO: The logic seems wrong since the pod phase can become pending when + // the container runtime is temporarily not available. + if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && + !firstSeenTime.IsZero() { + metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) + } + kl.statusManager.SetPodStatus(pod, apiPodStatus) + if statusErr != nil { + return statusErr + } // Kill pods we can't run. if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { @@ -1639,51 +1658,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } kl.volumeManager.SetVolumes(pod.UID, podVolumes) - // The kubelet is the source of truth for pod status. It ignores the status sent from - // the apiserver and regenerates status for every pod update, incrementally updating - // the status it received at pod creation time. - // - // The container runtime needs 2 pieces of information from the status to sync a pod: - // The terminated state of containers (to restart them) and the podIp (for liveness probes). - // New pods don't have either, so we skip the expensive status generation step. - // - // If we end up here with a create event for an already running pod, it could result in a - // restart of its containers. This cannot happen unless the kubelet restarts, because the - // delete before the second create would cancel this pod worker. - // - // If the kubelet restarts, we have a bunch of running containers for which we get create - // events. This is ok, because the pod status for these will include the podIp and terminated - // status. Any race conditions here effectively boils down to -- the pod worker didn't sync - // state of a newly started container with the apiserver before the kubelet restarted, so - // it's OK to pretend like the kubelet started them after it restarted. - - var apiPodStatus api.PodStatus - var podStatus *kubecontainer.PodStatus - - // Always generate the kubecontainer.PodStatus to know whether there are - // running containers associated with the pod. - podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod) - if err != nil { - glog.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err) - return err - } - apiPodStatus = *apiPodStatusPtr - podStatus = podStatusPtr - - if updateType == kubetypes.SyncPodCreate { - // This is the first time we are syncing the pod. Record the latency - // since kubelet first saw the pod if firstSeenTime is set. - if !firstSeenTime.IsZero() { - metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) - } - // kubelet may have just been restarted. Re-use the last known - // apiPodStatus. - apiPodStatus = pod.Status - apiPodStatus.StartTime = &unversioned.Time{Time: start} - kl.statusManager.SetPodStatus(pod, apiPodStatus) - glog.V(3).Infof("Reusing api pod status for new pod %q", format.Pod(pod)) - } - pullSecrets, err := kl.getPullSecretsForPod(pod) if err != nil { glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) @@ -2285,10 +2259,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case e := <-plegCh: - // Filter out started events since we don't use them now. - if e.Type == pleg.ContainerStarted { - break - } pod, ok := kl.podManager.GetPodByUID(e.ID) if !ok { // If the pod no longer exists, ignore the event. @@ -3084,19 +3054,21 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { } } -// By passing the pod directly, this method avoids pod lookup, which requires -// grabbing a lock. -// TODO(random-liu): api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions -// after refactoring, modify them later. -func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { - +// Get the internal PodStatus from the cache if the cache exists; +// otherwise, query the runtime directly. +func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, error) { start := kl.clock.Now() defer func() { metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) }() + if kl.podCache != nil { + return kl.podCache.Get(pod.UID) + } + return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) +} +func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) { glog.V(3).Infof("Generating status for %q", format.Pod(pod)) - // TODO: Consider include the container information. if kl.pastActiveDeadline(pod) { reason := "DeadlineExceeded" @@ -3107,44 +3079,41 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { Message: "Pod was active on the node longer than specified deadline"}, nil } - spec := &pod.Spec - podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod) - - if err != nil { - // Error handling - glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err) - if strings.Contains(err.Error(), "resource temporarily unavailable") { - // Leave upstream layer to decide what to do - return api.PodStatus{}, err - } - - pendingStatus := api.PodStatus{ + if statusErr != nil { + // TODO: Re-evaluate whether we should set the status to "Pending". + glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr) + return api.PodStatus{ Phase: api.PodPending, Reason: "GeneralError", - Message: fmt.Sprintf("Query container info failed with error (%v)", err), - } - return pendingStatus, nil + Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr), + }, nil + } + // Ask the runtime to convert the internal PodStatus to api.PodStatus. + s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus) + if err != nil { + glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err) + return api.PodStatus{}, err } // Assume info is ready to process - podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses) - kl.probeManager.UpdatePodStatus(pod.UID, podStatus) - - podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase)) + spec := &pod.Spec + s.Phase = GetPhase(spec, s.ContainerStatuses) + kl.probeManager.UpdatePodStatus(pod.UID, s) + s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase)) if !kl.standaloneMode { hostIP, err := kl.GetHostIP() if err != nil { glog.V(4).Infof("Cannot get host IP: %v", err) } else { - podStatus.HostIP = hostIP.String() - if podUsesHostNetwork(pod) && podStatus.PodIP == "" { - podStatus.PodIP = hostIP.String() + s.HostIP = hostIP.String() + if podUsesHostNetwork(pod) && s.PodIP == "" { + s.PodIP = hostIP.String() } } } - return *podStatus, nil + return *s, nil } // Returns logs of current machine. diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index d05b647d650..36d8178fe5d 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -71,6 +71,9 @@ type podWorkers struct { // resyncInterval is the duration to wait until the next sync. resyncInterval time.Duration + + // podCache stores kubecontainer.PodStatus for all pods. + podCache kubecontainer.Cache } type workUpdate struct { @@ -88,7 +91,7 @@ type workUpdate struct { } func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, - recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers { + recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, @@ -99,13 +102,27 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT workQueue: workQueue, resyncInterval: resyncInterval, backOffPeriod: backOffPeriod, + podCache: podCache, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { var minRuntimeCacheTime time.Time + for newWork := range podUpdates { err := func() (err error) { + podID := newWork.pod.UID + if p.podCache != nil { + // This is a blocking call that would return only if the cache + // has an entry for the pod that is newer than minRuntimeCache + // Time. This ensures the worker doesn't start syncing until + // after the cache is at least newer than the finished time of + // the previous sync. + // TODO: We don't consume the return PodStatus yet, but we + // should pass it to syncPod() eventually. + p.podCache.GetNewerThan(podID, minRuntimeCacheTime) + } + // TODO: Deprecate the runtime cache. // We would like to have the state of the containers from at least // the moment when we finished the previous processing of that pod. if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { @@ -206,10 +223,13 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { // Requeue the last update if the last sync returned error. - if syncErr != nil { - p.workQueue.Enqueue(uid, p.backOffPeriod) - } else { + switch { + case syncErr == nil: + // No error; requeue at the regular resync interval. p.workQueue.Enqueue(uid, p.resyncInterval) + default: + // Error occurred during the sync; back off and then retry. + p.workQueue.Enqueue(uid, p.backOffPeriod) } p.checkForUpdates(uid) } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 9eccaf3fc67..3d90a1f9300 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -60,6 +60,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { queue.NewBasicWorkQueue(), time.Second, time.Second, + nil, ) return podWorkers, processed } @@ -190,7 +191,7 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{} - realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second) + realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil) fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} tests := []struct {