diff --git a/pkg/kubelet/container/fake_cache.go b/pkg/kubelet/container/fake_cache.go new file mode 100644 index 00000000000..adfe706c8da --- /dev/null +++ b/pkg/kubelet/container/fake_cache.go @@ -0,0 +1,48 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "time" + + "k8s.io/kubernetes/pkg/types" +) + +type fakeCache struct { + runtime Runtime +} + +func NewFakeCache(runtime Runtime) Cache { + return &fakeCache{runtime: runtime} +} + +func (c *fakeCache) Get(id types.UID) (*PodStatus, error) { + return c.runtime.GetPodStatus(id, "", "") +} + +func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) { + return c.Get(id) +} + +func (c *fakeCache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { +} + +func (c *fakeCache) Delete(id types.UID) { +} + +func (c *fakeCache) UpdateTime(_ time.Time) { +} diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index 405eb21d22c..790b8ddad16 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -26,17 +26,17 @@ import ( // fakePodWorkers runs sync pod function in serial, so we can have // deterministic behaviour in testing. type fakePodWorkers struct { - syncPodFn syncPodFnType - runtimeCache kubecontainer.RuntimeCache - t TestingInterface + syncPodFn syncPodFnType + cache kubecontainer.Cache + t TestingInterface } func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) { - pods, err := f.runtimeCache.GetPods() + status, err := f.cache.Get(pod.UID) if err != nil { f.t.Errorf("Unexpected error: %v", err) } - if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID), kubetypes.SyncPodUpdate); err != nil { + if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil { f.t.Errorf("Unexpected error: %v", err) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9597903acc5..46595323b3c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -447,7 +447,7 @@ func NewMainKubelet( klet.runtimeCache = runtimeCache klet.reasonCache = NewReasonCache() klet.workQueue = queue.NewBasicWorkQueue() - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) + klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) @@ -1571,9 +1571,16 @@ func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []s return nameservers, searches, nil } -// Kill all running containers in a pod (includes the pod infra container). -func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error { - return kl.containerRuntime.KillPod(pod, runningPod) +// One of the following aruguements must be non-nil: runningPod, status. +// TODO: Modify containerRuntime.KillPod() to accept the right arguements. +func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus) error { + var p kubecontainer.Pod + if runningPod != nil { + p = *runningPod + } else if status != nil { + p = kubecontainer.ConvertPodStatusToRunningPod(status) + } + return kl.containerRuntime.KillPod(pod, p) } type empty struct{} @@ -1593,8 +1600,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -// TODO: Remove runningPod from the arguments. -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() @@ -1610,29 +1616,21 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } } - // Query the container runtime (or cache) to retrieve the pod status, and - // update it in the status manager. - podStatus, statusErr := kl.getRuntimePodStatus(pod) - apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr) + apiPodStatus, err := kl.generatePodStatus(pod, podStatus) if err != nil { return err } // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) - // TODO: The logic seems wrong since the pod phase can become pending when - // the container runtime is temporarily not available. - if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && + if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } kl.statusManager.SetPodStatus(pod, apiPodStatus) - if statusErr != nil { - return statusErr - } // Kill pods we can't run. if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { - if err := kl.killPod(pod, runningPod); err != nil { + if err := kl.killPod(pod, nil, podStatus); err != nil { utilruntime.HandleError(err) } return err @@ -2100,7 +2098,7 @@ func (kl *Kubelet) podKiller() { ch <- pod.ID }() glog.V(2).Infof("Killing unwanted pod %q", pod.Name) - err := kl.killPod(nil, *pod) + err := kl.killPod(nil, pod, nil) if err != nil { glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) } @@ -3092,7 +3090,7 @@ func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) } -func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) { +func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (api.PodStatus, error) { glog.V(3).Infof("Generating status for %q", format.Pod(pod)) // TODO: Consider include the container information. if kl.pastActiveDeadline(pod) { @@ -3104,16 +3102,6 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS Message: "Pod was active on the node longer than specified deadline"}, nil } - if statusErr != nil { - // TODO: Re-evaluate whether we should set the status to "Pending". - glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr) - return api.PodStatus{ - Phase: api.PodPending, - Reason: "GeneralError", - Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr), - }, nil - } - // Convert the internal PodStatus to api.PodStatus. s := kl.convertStatusToAPIStatus(pod, podStatus) // Assume info is ready to process diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4cbe0c35e68..8a38084929c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -150,10 +150,11 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.containerRuntime = fakeRuntime kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime) kubelet.reasonCache = NewReasonCache() + kubelet.podCache = kubecontainer.NewFakeCache(kubelet.containerRuntime) kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: kubelet.syncPod, - runtimeCache: kubelet.runtimeCache, - t: t, + syncPodFn: kubelet.syncPod, + cache: kubelet.podCache, + t: t, } kubelet.probeManager = prober.FakeManager{} @@ -3390,7 +3391,7 @@ func TestCreateMirrorPod(t *testing.T) { } pods := []*api.Pod{pod} kl.podManager.SetPods(pods) - err := kl.syncPod(pod, nil, container.Pod{}, updateType) + err := kl.syncPod(pod, nil, &container.PodStatus{}, updateType) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3448,7 +3449,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*api.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(pod, mirrorPod, container.Pod{}, kubetypes.SyncPodUpdate) + err := kl.syncPod(pod, mirrorPod, &container.PodStatus{}, kubetypes.SyncPodUpdate) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3614,7 +3615,7 @@ func TestHostNetworkAllowed(t *testing.T) { }, } kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3647,7 +3648,7 @@ func TestHostNetworkDisallowed(t *testing.T) { }, }, } - err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate) if err == nil { t.Errorf("expected pod infra creation to fail") } @@ -3674,7 +3675,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) { }, } kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3700,7 +3701,7 @@ func TestPrivilegeContainerDisallowed(t *testing.T) { }, }, } - err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate) if err == nil { t.Errorf("expected pod infra creation to fail") } diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 5b93137d93a..9e1f9efa60c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -37,7 +37,7 @@ type PodWorkers interface { ForgetWorker(uid types.UID) } -type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, kubetypes.SyncPodType) error +type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error type podWorkers struct { // Protects all per worker fields. @@ -53,8 +53,6 @@ type podWorkers struct { // Tracks the last undelivered work item for this pod - a work item is // undelivered if it comes in while the worker is working. lastUndeliveredWorkUpdate map[types.UID]workUpdate - // runtimeCache is used for listing running containers. - runtimeCache kubecontainer.RuntimeCache workQueue queue.WorkQueue @@ -90,13 +88,12 @@ type workUpdate struct { updateType kubetypes.SyncPodType } -func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, - recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { +func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue, + resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, lastUndeliveredWorkUpdate: map[types.UID]workUpdate{}, - runtimeCache: runtimeCache, syncPodFn: syncPodFn, recorder: recorder, workQueue: workQueue, @@ -107,45 +104,31 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { - var minRuntimeCacheTime time.Time - + var lastSyncTime time.Time for newWork := range podUpdates { - err := func() (err error) { + err := func() error { podID := newWork.pod.UID - if p.podCache != nil { - // This is a blocking call that would return only if the cache - // has an entry for the pod that is newer than minRuntimeCache - // Time. This ensures the worker doesn't start syncing until - // after the cache is at least newer than the finished time of - // the previous sync. - // TODO: We don't consume the return PodStatus yet, but we - // should pass it to syncPod() eventually. - p.podCache.GetNewerThan(podID, minRuntimeCacheTime) - } - // TODO: Deprecate the runtime cache. - // We would like to have the state of the containers from at least - // the moment when we finished the previous processing of that pod. - if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { - glog.Errorf("Error updating the container runtime cache: %v", err) + // This is a blocking call that would return only if the cache + // has an entry for the pod that is newer than minRuntimeCache + // Time. This ensures the worker doesn't start syncing until + // after the cache is at least newer than the finished time of + // the previous sync. + status, err := p.podCache.GetNewerThan(podID, lastSyncTime) + if err != nil { return err } - pods, err := p.runtimeCache.GetPods() + err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType) + lastSyncTime = time.Now() if err != nil { - glog.Errorf("Error getting pods while syncing pod: %v", err) - return err - } - - err = p.syncPodFn(newWork.pod, newWork.mirrorPod, - kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) - minRuntimeCacheTime = time.Now() - if err != nil { - glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) - p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err) return err } newWork.updateCompleteFn() return nil }() + if err != nil { + glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) + p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err) + } p.wrapUp(newWork.pod.UID, err) } } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 3d90a1f9300..ddbb063a522 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -18,7 +18,6 @@ package kubelet import ( "reflect" - "sort" "sync" "testing" "time" @@ -45,10 +44,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { processed := make(map[types.UID][]string) fakeRecorder := &record.FakeRecorder{} fakeRuntime := &kubecontainer.FakeRuntime{} - fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime) + fakeCache := kubecontainer.NewFakeCache(fakeRuntime) podWorkers := newPodWorkers( - fakeRuntimeCache, - func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { + func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { func() { lock.Lock() defer lock.Unlock() @@ -60,7 +58,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { queue.NewBasicWorkQueue(), time.Second, time.Second, - nil, + fakeCache, ) return podWorkers, processed } @@ -151,19 +149,19 @@ func TestForgetNonExistingPodWorkers(t *testing.T) { } type simpleFakeKubelet struct { - pod *api.Pod - mirrorPod *api.Pod - runningPod kubecontainer.Pod - wg sync.WaitGroup + pod *api.Pod + mirrorPod *api.Pod + podStatus *kubecontainer.PodStatus + wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { - kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod +func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { + kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status return nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { - kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { + kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status kl.wg.Done() return nil } @@ -186,25 +184,21 @@ func (b byContainerName) Less(i, j int) bool { func TestFakePodWorkers(t *testing.T) { fakeRecorder := &record.FakeRecorder{} fakeRuntime := &kubecontainer.FakeRuntime{} - fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime) + fakeCache := kubecontainer.NewFakeCache(fakeRuntime) kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{} - realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil) - fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} + realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, fakeCache) + fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t} tests := []struct { - pod *api.Pod - mirrorPod *api.Pod - podList []*kubecontainer.Pod - containersInRunningPod int + pod *api.Pod + mirrorPod *api.Pod }{ { &api.Pod{}, &api.Pod{}, - []*kubecontainer.Pod{}, - 0, }, { &api.Pod{ @@ -221,29 +215,6 @@ func TestFakePodWorkers(t *testing.T) { Namespace: "new", }, }, - []*kubecontainer.Pod{ - { - ID: "12345678", - Name: "foo", - Namespace: "new", - Containers: []*kubecontainer.Container{ - { - Name: "fooContainer", - }, - }, - }, - { - ID: "12345678", - Name: "fooMirror", - Namespace: "new", - Containers: []*kubecontainer.Container{ - { - Name: "fooContainerMirror", - }, - }, - }, - }, - 1, }, { &api.Pod{ @@ -260,42 +231,11 @@ func TestFakePodWorkers(t *testing.T) { Namespace: "new", }, }, - []*kubecontainer.Pod{ - { - ID: "98765", - Name: "bar", - Namespace: "new", - Containers: []*kubecontainer.Container{ - { - Name: "barContainer0", - }, - { - Name: "barContainer1", - }, - }, - }, - { - ID: "98765", - Name: "barMirror", - Namespace: "new", - Containers: []*kubecontainer.Container{ - { - Name: "barContainerMirror0", - }, - { - Name: "barContainerMirror1", - }, - }, - }, - }, - 2, }, } for i, tt := range tests { kubeletForRealWorkers.wg.Add(1) - - fakeRuntime.PodList = tt.podList realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {}) fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {}) @@ -309,14 +249,8 @@ func TestFakePodWorkers(t *testing.T) { t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) } - if tt.containersInRunningPod != len(kubeletForFakeWorkers.runningPod.Containers) { - t.Errorf("%d: Expected: %#v, Actual: %#v", i, tt.containersInRunningPod, len(kubeletForFakeWorkers.runningPod.Containers)) - } - - sort.Sort(byContainerName(kubeletForRealWorkers.runningPod)) - sort.Sort(byContainerName(kubeletForFakeWorkers.runningPod)) - if !reflect.DeepEqual(kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) { - t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) + if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) { + t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) } } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 0dfca2ab1fa..d1088e31061 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -122,13 +122,17 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error { } glog.Infof("pod %q containers not running: syncing", pod.Name) + status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) + if err != nil { + glog.Errorf("Unable to get status for pod %q: %v", pod.Name, err) + } glog.Infof("Creating a mirror pod for static pod %q", format.Pod(pod)) if err := kl.podManager.CreateMirrorPod(pod); err != nil { glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - if err = kl.syncPod(pod, mirrorPod, p, kubetypes.SyncPodUpdate); err != nil { + if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil { return fmt.Errorf("error syncing pod: %v", err) } if retry >= runOnceMaxRetries {