diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ab98f07aee3..f80b9302b86 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1525,6 +1525,9 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } else { var err error podStatus, err = kl.generatePodStatus(pod) + // TODO (random-liu) It's strange that generatePodStatus generates some podStatus in + // the phase Failed, Pending etc, even with empty ContainerStatuses but still keep going + // on. Maybe need refactor here. if err != nil { glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) return err @@ -1776,6 +1779,31 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { return false } +// Get pods which should be resynchronized. Currently, the following pod should be resynchronized: +// * pod whose work is ready. +// * pod past the active deadline. +func (kl *Kubelet) getPodsToSync() []*api.Pod { + allPods := kl.podManager.GetPods() + podUIDs := kl.workQueue.GetWork() + podUIDSet := sets.NewString() + for _, podUID := range podUIDs { + podUIDSet.Insert(string(podUID)) + } + var podsToSync []*api.Pod + for _, pod := range allPods { + if kl.pastActiveDeadline(pod) { + // The pod has passed the active deadline + podsToSync = append(podsToSync, pod) + continue + } + if podUIDSet.Has(string(pod.UID)) { + // The work of the pod is ready + podsToSync = append(podsToSync, pod) + } + } + return podsToSync +} + // Returns true if pod is in the terminated state ("Failed" or "Succeeded"). func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool { var status api.PodStatus @@ -2130,13 +2158,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case <-syncCh: - podUIDs := kl.workQueue.GetWork() - var podsToSync []*api.Pod - for _, uid := range podUIDs { - if pod, ok := kl.podManager.GetPodByUID(uid); ok { - podsToSync = append(podsToSync, pod) - } - } + podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { break } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f9bfeb28b75..cdec7b9447d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4014,3 +4014,56 @@ func TestExtractBandwidthResources(t *testing.T) { } } } + +func TestGetPodsToSync(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + pods := newTestPods(5) + podUIDs := []types.UID{} + for _, pod := range pods { + podUIDs = append(podUIDs, pod.UID) + } + + exceededActiveDeadlineSeconds := int64(30) + notYetActiveDeadlineSeconds := int64(120) + now := unversioned.Now() + startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) + pods[0].Status.StartTime = &startTime + pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds + pods[1].Status.StartTime = &startTime + pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds + pods[2].Status.StartTime = &startTime + pods[2].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds + + kubelet.podManager.SetPods(pods) + kubelet.workQueue.Enqueue(pods[2].UID, 0) + kubelet.workQueue.Enqueue(pods[3].UID, 0) + kubelet.workQueue.Enqueue(pods[4].UID, time.Hour) + + expectedPodsUID := []types.UID{pods[0].UID, pods[2].UID, pods[3].UID} + + podsToSync := kubelet.getPodsToSync() + + if len(podsToSync) == len(expectedPodsUID) { + var rightNum int + for _, podUID := range expectedPodsUID { + for _, podToSync := range podsToSync { + if podToSync.UID == podUID { + rightNum++ + break + } + } + } + if rightNum != len(expectedPodsUID) { + // Just for report error + podsToSyncUID := []types.UID{} + for _, podToSync := range podsToSync { + podsToSyncUID = append(podsToSyncUID, podToSync.UID) + } + t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID) + } + + } else { + t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync)) + } +}