From b5ed0e9b139eaa725b842d021a6d55a7ed7d53d1 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 9 Jun 2015 17:50:15 -0700 Subject: [PATCH] Dont generatePodStatus twice for new pods --- pkg/kubelet/fake_pod_workers.go | 2 +- pkg/kubelet/kubelet.go | 52 ++++++++++++++++++++------- pkg/kubelet/kubelet_test.go | 63 +++++++++++++++++---------------- pkg/kubelet/metrics/metrics.go | 21 ----------- pkg/kubelet/pod_manager.go | 33 +++++++++++++---- pkg/kubelet/pod_workers.go | 23 ++++++++++-- pkg/kubelet/pod_workers_test.go | 48 +++++++++++++++++++++---- pkg/kubelet/runonce.go | 2 +- 8 files changed, 163 insertions(+), 81 deletions(-) diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index 687f1e5a5be..5ccc39f7e7f 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -37,7 +37,7 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl if err != nil { f.t.Errorf("Unexpected error: %v", err) } - if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID)); err != nil { + if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID), SyncPodUpdate); err != nil { f.t.Errorf("Unexpected error: %v", err) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f071f35f24e..5207dbf7e6a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -88,7 +88,7 @@ type SyncHandler interface { // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occuring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). - SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod, + SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod, startTime time.Time) error } @@ -1078,7 +1078,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID @@ -1130,10 +1130,38 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } kl.volumeManager.SetVolumes(pod.UID, podVolumes) - podStatus, err := kl.generatePodStatus(pod) - if err != nil { - glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) - return err + // The kubelet is the source of truth for pod status. It ignores the status sent from + // the apiserver and regenerates status for every pod update, incrementally updating + // the status it received at pod creation time. + // + // The container runtime needs 2 pieces of information from the status to sync a pod: + // The terminated state of containers (to restart them) and the podIp (for liveness probes). + // New pods don't have either, so we skip the expensive status generation step. + // + // If we end up here with a create event for an already running pod, it could result in a + // restart of its containers. This cannot happen unless the kubelet restarts, because the + // delete before the second create is processed by SyncPods, which cancels this pod worker. + // + // If the kubelet restarts, we have a bunch of running containers for which we get create + // events. This is ok, because the pod status for these will include the podIp and terminated + // status. Any race conditions here effectively boils down to -- the pod worker didn't sync + // state of a newly started container with the apiserver before the kubelet restarted, so + // it's OK to pretend like the kubelet started them after it restarted. + // + // Also note that deletes currently have an updateType of `create` set in UpdatePods. + // This, again, does not matter because deletes are not processed by this method. + + var podStatus api.PodStatus + if updateType == SyncPodCreate { + podStatus = pod.Status + glog.V(3).Infof("Not generating pod status for new pod %v", podFullName) + } else { + var err error + podStatus, err = kl.generatePodStatus(pod) + if err != nil { + glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) + return err + } } pullSecrets, err := kl.getPullSecretsForPod(pod) @@ -1306,7 +1334,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, +func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) @@ -1344,7 +1372,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri }) // Note the number of containers for new pods. - if val, ok := podSyncTypes[pod.UID]; ok && (val == metrics.SyncPodCreate) { + if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } } @@ -1486,7 +1514,7 @@ func (kl *Kubelet) checkCapacityExceeded(pods []*api.Pod) (fitting []*api.Pod, n } // handleOutOfDisk detects if pods can't fit due to lack of disk space. -func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod { +func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { if len(podSyncTypes) == 0 { // regular sync. no new pods return pods @@ -1519,7 +1547,7 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]m for i := range pods { pod := pods[i] // Only reject pods that didn't start yet. - if podSyncTypes[pod.UID] == metrics.SyncPodCreate { + if podSyncTypes[pod.UID] == SyncPodCreate { kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.") kl.statusManager.SetPodStatus(pod, api.PodStatus{ Phase: api.PodFailed, @@ -1578,7 +1606,7 @@ func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod { // admitPods handles pod admission. It filters out terminated pods, and pods // that don't fit on the node, and may reject pods if node is overcommitted. -func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod { +func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { // Pod phase progresses monotonically. Once a pod has reached a final state, // it should never leave irregardless of the restart policy. The statuses // of such pods should not be changed, and there is no need to sync them. @@ -1616,7 +1644,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") for { unsyncedPod := false - podSyncTypes := make(map[types.UID]metrics.SyncPodType) + podSyncTypes := make(map[types.UID]SyncPodType) select { case u, ok := <-updates: if !ok { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e4b52692599..ea5d0d406f5 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -42,7 +42,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -427,7 +426,7 @@ func apiContainerToContainer(c docker.APIContainers) container.Container { } } -var emptyPodUIDs map[types.UID]metrics.SyncPodType +var emptyPodUIDs map[types.UID]SyncPodType // TODO: Remove this function after all docker-specifc tests have been migrated // to dockertools. @@ -2774,31 +2773,33 @@ func TestUpdateNodeStatusError(t *testing.T) { } func TestCreateMirrorPod(t *testing.T) { - testKubelet := newTestKubeletWithFakeRuntime(t) - kl := testKubelet.kubelet - manager := testKubelet.fakeMirrorClient - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "bar", - Namespace: "foo", - Annotations: map[string]string{ - ConfigSourceAnnotationKey: "file", + for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} { + testKubelet := newTestKubeletWithFakeRuntime(t) + kl := testKubelet.kubelet + manager := testKubelet.fakeMirrorClient + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "bar", + Namespace: "foo", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "file", + }, }, - }, - } - pods := []*api.Pod{pod} - kl.podManager.SetPods(pods) - err := kl.syncPod(pod, nil, container.Pod{}) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - podFullName := kubecontainer.GetPodFullName(pod) - if !manager.HasPod(podFullName) { - t.Errorf("expected mirror pod %q to be created", podFullName) - } - if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) { - t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods()) + } + pods := []*api.Pod{pod} + kl.podManager.SetPods(pods) + err := kl.syncPod(pod, nil, container.Pod{}, updateType) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + podFullName := kubecontainer.GetPodFullName(pod) + if !manager.HasPod(podFullName) { + t.Errorf("expected mirror pod %q to be created", podFullName) + } + if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) { + t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods()) + } } } @@ -2844,7 +2845,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*api.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(pod, mirrorPod, container.Pod{}) + err := kl.syncPod(pod, mirrorPod, container.Pod{}, SyncPodUpdate) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3044,7 +3045,7 @@ func TestHostNetworkAllowed(t *testing.T) { }, } kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, container.Pod{}) + err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3073,7 +3074,7 @@ func TestHostNetworkDisallowed(t *testing.T) { HostNetwork: true, }, } - err := kubelet.syncPod(pod, nil, container.Pod{}) + err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate) if err == nil { t.Errorf("expected pod infra creation to fail") } @@ -3100,7 +3101,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) { }, } kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, container.Pod{}) + err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3126,7 +3127,7 @@ func TestPrivilegeContainerDisallowed(t *testing.T) { }, }, } - err := kubelet.syncPod(pod, nil, container.Pod{}) + err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate) if err == nil { t.Errorf("expected pod infra creation to fail") } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 20ba153a946..199c52b9fef 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -83,27 +83,6 @@ func Register(containerCache kubecontainer.RuntimeCache) { }) } -type SyncPodType int - -const ( - SyncPodSync SyncPodType = iota - SyncPodUpdate - SyncPodCreate -) - -func (sp SyncPodType) String() string { - switch sp { - case SyncPodCreate: - return "create" - case SyncPodUpdate: - return "update" - case SyncPodSync: - return "sync" - default: - return "unknown" - } -} - // Gets the time since the specified start in microseconds. func SinceInMicroseconds(start time.Time) float64 { return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index 646eb9f2596..28501abbca1 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/golang/glog" ) @@ -48,13 +47,35 @@ type podManager interface { GetPodByName(namespace, name string) (*api.Pod, bool) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) SetPods(pods []*api.Pod) - UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) + UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) DeleteOrphanedMirrorPods() TranslatePodUID(uid types.UID) types.UID IsMirrorPodOf(mirrorPod, pod *api.Pod) bool mirrorClient } +// SyncPodType classifies pod updates, eg: create, update. +type SyncPodType int + +const ( + SyncPodSync SyncPodType = iota + SyncPodUpdate + SyncPodCreate +) + +func (sp SyncPodType) String() string { + switch sp { + case SyncPodCreate: + return "create" + case SyncPodUpdate: + return "update" + case SyncPodSync: + return "sync" + default: + return "unknown" + } +} + // All maps in basicPodManager should be set by calling UpdatePods(); // individual arrays/maps are not immutable and no other methods should attempt // to modify them. @@ -83,7 +104,7 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { } // Update the internal pods with those provided by the update. -func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { +func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) { pm.lock.Lock() defer pm.lock.Unlock() switch u.Op { @@ -101,7 +122,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me for uid := range pm.podByUID { if _, ok := existingPods[uid]; !ok { - podSyncTypes[uid] = metrics.SyncPodCreate + podSyncTypes[uid] = SyncPodCreate } } case UPDATE: @@ -110,7 +131,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me // Store the updated pods. Don't worry about filtering host ports since those // pods will never be looked up. for i := range u.Pods { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate + podSyncTypes[u.Pods[i].UID] = SyncPodUpdate } allPods := applyUpdates(u.Pods, pm.getAllPods()) pm.setPods(allPods) @@ -121,7 +142,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me // Mark all remaining pods as sync. for uid := range pm.podByUID { if _, ok := podSyncTypes[uid]; !ok { - podSyncTypes[uid] = metrics.SyncPodSync + podSyncTypes[uid] = SyncPodSync } } } diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index c88286878e6..e6af4c65c7c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -34,7 +34,7 @@ type PodWorkers interface { ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) } -type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error +type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error type podWorkers struct { // Protects all per worker fields. @@ -71,6 +71,9 @@ type workUpdate struct { // Function to call when the update is complete. updateCompleteFn func() + + // A string describing the type of this update, eg: create + updateType SyncPodType } func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, @@ -103,7 +106,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { } err = p.syncPodFn(newWork.pod, newWork.mirrorPod, - kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID)) + kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) @@ -122,6 +125,14 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete var podUpdates chan workUpdate var exists bool + // TODO: Pipe this through from the kubelet. Currently kubelets operating with + // snapshot updates (PodConfigNotificationSnapshot) will send updates, creates + // and deletes as SET operations, which makes updates indistinguishable from + // creates. The intent here is to communicate to the pod worker that it can take + // certain liberties, like skipping status generation, when it receives a create + // event for a pod. + updateType := SyncPodUpdate + p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { @@ -131,6 +142,12 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete // the channel is empty, so buffer of size 1 is enough. podUpdates = make(chan workUpdate, 1) p.podUpdates[uid] = podUpdates + + // Creating a new pod worker either means this is a new pod, or that the + // kubelet just restarted. In either case the kubelet is willing to believe + // the status of the pod for the first pod worker sync. See corresponding + // comment in syncPod. + updateType = SyncPodCreate go func() { defer util.HandleCrash() p.managePodLoop(podUpdates) @@ -142,12 +159,14 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete pod: pod, mirrorPod: mirrorPod, updateCompleteFn: updateComplete, + updateType: updateType, } } else { p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{ pod: pod, mirrorPod: mirrorPod, updateCompleteFn: updateComplete, + updateType: updateType, } } } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 93a1a2ddc95..16867b1916c 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -41,19 +41,21 @@ func newPod(uid, name string) *api.Pod { } } -func createPodWorkers() (*podWorkers, map[types.UID][]string) { +func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache { fakeDocker := &dockertools.FakeDockerClient{} - fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) - fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) + return kubecontainer.NewFakeRuntimeCache(dockerManager) +} +func createPodWorkers() (*podWorkers, map[types.UID][]string) { lock := sync.Mutex{} processed := make(map[types.UID][]string) - + fakeRecorder := &record.FakeRecorder{} + fakeRuntimeCache := createFakeRuntimeCache(fakeRecorder) podWorkers := newPodWorkers( fakeRuntimeCache, - func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { + func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { func() { lock.Lock() defer lock.Unlock() @@ -118,6 +120,38 @@ func TestUpdatePod(t *testing.T) { } } +func TestUpdateType(t *testing.T) { + syncType := make(chan SyncPodType) + fakeRecorder := &record.FakeRecorder{} + podWorkers := newPodWorkers( + createFakeRuntimeCache(fakeRecorder), + func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { + func() { + syncType <- updateType + }() + return nil + }, + fakeRecorder, + ) + cases := map[*api.Pod][]SyncPodType{ + newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate}, + newPod("u2", "n1"): {SyncPodCreate}, + } + for p, expectedTypes := range cases { + for i := range expectedTypes { + podWorkers.UpdatePod(p, nil, func() {}) + select { + case gotType := <-syncType: + if gotType != expectedTypes[i] { + t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("Unexpected delay is running pod worker") + } + } + } +} + func TestForgetNonExistingPodWorkers(t *testing.T) { podWorkers, _ := createPodWorkers() @@ -159,12 +193,12 @@ type simpleFakeKubelet struct { wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { +func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod return nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error { kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod kl.wg.Done() return nil diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 84583a038e7..a34fd8ee53c 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -104,7 +104,7 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error { glog.Infof("pod %q containers not running: syncing", pod.Name) // We don't create mirror pods in this mode; pass a dummy boolean value // to sycnPod. - if err = kl.syncPod(pod, nil, p); err != nil { + if err = kl.syncPod(pod, nil, p, SyncPodUpdate); err != nil { return fmt.Errorf("error syncing pod: %v", err) } if retry >= RunOnceMaxRetries {