diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c1ae3312209..d5bbf795b3d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -115,14 +115,8 @@ const ( plegChannelCapacity = 1000 // Generic PLEG relies on relisting for discovering container events. - // A longer period means that kubelet will take longer to detect container - // changes and to update pod status. On the other hand, a shorter period - // will cause more frequent relisting (e.g., container runtime operations), - // leading to higher cpu usage. - // Note that even though we set the period to 1s, the relisting itself can - // take more than 1s to finish if the container runtime responds slowly - // and/or when there are many container changes in one cycle. - plegRelistPeriod = time.Second * 1 + // The period directly affects the response time of kubelet. + plegRelistPeriod = time.Second * 3 // backOffPeriod is the period to back off when pod syncing resulting in an // error. It is also used as the base period for the exponential backoff @@ -346,8 +340,6 @@ func NewMainKubelet( klet.livenessManager = proberesults.NewManager() - klet.podCache = kubecontainer.NewCache() - // Initialize the runtime. switch containerRuntime { case "docker": @@ -373,6 +365,8 @@ func NewMainKubelet( imageBackOff, serializeImagePulls, ) + + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -393,13 +387,14 @@ func NewMainKubelet( return nil, err } klet.containerRuntime = rktRuntime + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) + // No Docker daemon to put in a container. dockerDaemonContainer = "" default: return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible) // setup containerGC @@ -446,7 +441,7 @@ func NewMainKubelet( } klet.runtimeCache = runtimeCache klet.workQueue = queue.NewBasicWorkQueue() - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod) klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) @@ -583,9 +578,6 @@ type Kubelet struct { // Generates pod events. pleg pleg.PodLifecycleEventGenerator - // Store kubecontainer.PodStatus for all pods. - podCache kubecontainer.Cache - // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -1572,42 +1564,31 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -// TODO: Remove runningPod from the arguments. -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { + start := kl.clock.Now() var firstSeenTime time.Time - if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { + if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok { + glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) + } else { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } - if updateType == kubetypes.SyncPodCreate { - if !firstSeenTime.IsZero() { - // This is the first time we are syncing the pod. Record the latency - // since kubelet first saw the pod if firstSeenTime is set. - metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) + // Before returning, regenerate status and store it in the cache. + defer func() { + status, err := kl.generatePodStatus(pod) + if err != nil { + glog.Errorf("Unable to generate status for pod %q with error(%v)", format.Pod(pod), err) + // Propagate the error upstream. + syncErr = err } else { - glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) + existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) + if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning && + !firstSeenTime.IsZero() { + metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) + } + kl.statusManager.SetPodStatus(pod, status) } - } - - // Query the container runtime (or cache) to retrieve the pod status, and - // update it in the status manager. - podStatus, statusErr := kl.getRuntimePodStatus(pod) - apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr) - if err != nil { - return err - } - // Record the time it takes for the pod to become running. - existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) - // TODO: The logic seems wrong since the pod phase can become pending when - // the container runtime is temporarily not available. - if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && - !firstSeenTime.IsZero() { - metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) - } - kl.statusManager.SetPodStatus(pod, apiPodStatus) - if statusErr != nil { - return statusErr - } + }() // Kill pods we can't run. if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { @@ -1658,6 +1639,51 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } kl.volumeManager.SetVolumes(pod.UID, podVolumes) + // The kubelet is the source of truth for pod status. It ignores the status sent from + // the apiserver and regenerates status for every pod update, incrementally updating + // the status it received at pod creation time. + // + // The container runtime needs 2 pieces of information from the status to sync a pod: + // The terminated state of containers (to restart them) and the podIp (for liveness probes). + // New pods don't have either, so we skip the expensive status generation step. + // + // If we end up here with a create event for an already running pod, it could result in a + // restart of its containers. This cannot happen unless the kubelet restarts, because the + // delete before the second create would cancel this pod worker. + // + // If the kubelet restarts, we have a bunch of running containers for which we get create + // events. This is ok, because the pod status for these will include the podIp and terminated + // status. Any race conditions here effectively boils down to -- the pod worker didn't sync + // state of a newly started container with the apiserver before the kubelet restarted, so + // it's OK to pretend like the kubelet started them after it restarted. + + var apiPodStatus api.PodStatus + var podStatus *kubecontainer.PodStatus + + // Always generate the kubecontainer.PodStatus to know whether there are + // running containers associated with the pod. + podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod) + if err != nil { + glog.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err) + return err + } + apiPodStatus = *apiPodStatusPtr + podStatus = podStatusPtr + + if updateType == kubetypes.SyncPodCreate { + // This is the first time we are syncing the pod. Record the latency + // since kubelet first saw the pod if firstSeenTime is set. + if !firstSeenTime.IsZero() { + metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) + } + // kubelet may have just been restarted. Re-use the last known + // apiPodStatus. + apiPodStatus = pod.Status + apiPodStatus.StartTime = &unversioned.Time{Time: start} + kl.statusManager.SetPodStatus(pod, apiPodStatus) + glog.V(3).Infof("Reusing api pod status for new pod %q", format.Pod(pod)) + } + pullSecrets, err := kl.getPullSecretsForPod(pod) if err != nil { glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) @@ -2259,6 +2285,10 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case e := <-plegCh: + // Filter out started events since we don't use them now. + if e.Type == pleg.ContainerStarted { + break + } pod, ok := kl.podManager.GetPodByUID(e.ID) if !ok { // If the pod no longer exists, ignore the event. @@ -3054,21 +3084,19 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { } } -// Get the internal PodStatus from the cache if the cache exists; -// otherwise, query the runtime directly. -func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, error) { +// By passing the pod directly, this method avoids pod lookup, which requires +// grabbing a lock. +// TODO(random-liu): api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions +// after refactoring, modify them later. +func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { + start := kl.clock.Now() defer func() { metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) }() - if kl.podCache != nil { - return kl.podCache.Get(pod.UID) - } - return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) -} -func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) { glog.V(3).Infof("Generating status for %q", format.Pod(pod)) + // TODO: Consider include the container information. if kl.pastActiveDeadline(pod) { reason := "DeadlineExceeded" @@ -3079,41 +3107,44 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS Message: "Pod was active on the node longer than specified deadline"}, nil } - if statusErr != nil { - // TODO: Re-evaluate whether we should set the status to "Pending". - glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr) - return api.PodStatus{ + spec := &pod.Spec + podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod) + + if err != nil { + // Error handling + glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err) + if strings.Contains(err.Error(), "resource temporarily unavailable") { + // Leave upstream layer to decide what to do + return api.PodStatus{}, err + } + + pendingStatus := api.PodStatus{ Phase: api.PodPending, Reason: "GeneralError", - Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr), - }, nil - } - // Ask the runtime to convert the internal PodStatus to api.PodStatus. - s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus) - if err != nil { - glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err) - return api.PodStatus{}, err + Message: fmt.Sprintf("Query container info failed with error (%v)", err), + } + return pendingStatus, nil } // Assume info is ready to process - spec := &pod.Spec - s.Phase = GetPhase(spec, s.ContainerStatuses) - kl.probeManager.UpdatePodStatus(pod.UID, s) - s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase)) + podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses) + kl.probeManager.UpdatePodStatus(pod.UID, podStatus) + + podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase)) if !kl.standaloneMode { hostIP, err := kl.GetHostIP() if err != nil { glog.V(4).Infof("Cannot get host IP: %v", err) } else { - s.HostIP = hostIP.String() - if podUsesHostNetwork(pod) && s.PodIP == "" { - s.PodIP = hostIP.String() + podStatus.HostIP = hostIP.String() + if podUsesHostNetwork(pod) && podStatus.PodIP == "" { + podStatus.PodIP = hostIP.String() } } } - return *s, nil + return *podStatus, nil } // Returns logs of current machine. diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 36d8178fe5d..d05b647d650 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -71,9 +71,6 @@ type podWorkers struct { // resyncInterval is the duration to wait until the next sync. resyncInterval time.Duration - - // podCache stores kubecontainer.PodStatus for all pods. - podCache kubecontainer.Cache } type workUpdate struct { @@ -91,7 +88,7 @@ type workUpdate struct { } func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, - recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { + recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, @@ -102,27 +99,13 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT workQueue: workQueue, resyncInterval: resyncInterval, backOffPeriod: backOffPeriod, - podCache: podCache, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { var minRuntimeCacheTime time.Time - for newWork := range podUpdates { err := func() (err error) { - podID := newWork.pod.UID - if p.podCache != nil { - // This is a blocking call that would return only if the cache - // has an entry for the pod that is newer than minRuntimeCache - // Time. This ensures the worker doesn't start syncing until - // after the cache is at least newer than the finished time of - // the previous sync. - // TODO: We don't consume the return PodStatus yet, but we - // should pass it to syncPod() eventually. - p.podCache.GetNewerThan(podID, minRuntimeCacheTime) - } - // TODO: Deprecate the runtime cache. // We would like to have the state of the containers from at least // the moment when we finished the previous processing of that pod. if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { @@ -223,13 +206,10 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { // Requeue the last update if the last sync returned error. - switch { - case syncErr == nil: - // No error; requeue at the regular resync interval. - p.workQueue.Enqueue(uid, p.resyncInterval) - default: - // Error occurred during the sync; back off and then retry. + if syncErr != nil { p.workQueue.Enqueue(uid, p.backOffPeriod) + } else { + p.workQueue.Enqueue(uid, p.resyncInterval) } p.checkForUpdates(uid) } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 3d90a1f9300..9eccaf3fc67 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -60,7 +60,6 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { queue.NewBasicWorkQueue(), time.Second, time.Second, - nil, ) return podWorkers, processed } @@ -191,7 +190,7 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{} - realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil) + realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second) fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} tests := []struct {