From 2eb17df46b1dffa3b0a4cbc7133fdd18ed6d3223 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 2 Sep 2015 10:18:11 -0700 Subject: [PATCH] kubelet: independent pod syncs and backoff on error Currently kubelet syncs all pods every 10s. This is not preferred because * Some pods may have been sync'd recently. * This may cause all the pods to be sync'd at once, causing undesirable CPU spikes. This PR replaces the global syncs with independent, periodic pod syncs. At the end of syncing, each pod worker will enqueue itslef with a future timestamp ( current time + sync interval), when it will be due for another sync. * If the pod worker encoutners an sync error, it may requeue with a different timestamp to retry sooner. * If a sync is triggered by the update channel (events or spec changes), the pod worker would enqueue a new sync time. This change is necessary for moving to long or no periodic sync period once pod lifecycle event generator is completed. We will still rely on the mechanism to requeue the pod on sync error. This change also makes sure that if a sync does not succeed (either due to real error or the per-container backoff mechanism), an error would be propagated back to the pod worker, which is responsible for requeuing. --- pkg/kubelet/dockertools/manager.go | 12 ++-- pkg/kubelet/dockertools/manager_test.go | 65 ++++++++++--------- pkg/kubelet/kubelet.go | 45 +++++++++---- pkg/kubelet/kubelet_test.go | 2 + pkg/kubelet/pod/manager.go | 10 +++ pkg/kubelet/pod_workers.go | 40 +++++++++--- pkg/kubelet/pod_workers_test.go | 6 +- pkg/kubelet/util/queue/work_queue.go | 73 +++++++++++++++++++++ pkg/kubelet/util/queue/work_queue_test.go | 77 +++++++++++++++++++++++ 9 files changed, 272 insertions(+), 58 deletions(-) create mode 100644 pkg/kubelet/util/queue/work_queue.go create mode 100644 pkg/kubelet/util/queue/work_queue_test.go diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 4304ec701a5..1332ff0b354 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1827,8 +1827,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod } // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) - err = dm.KillPod(pod, runningPod) - if err != nil { + if err := dm.KillPod(pod, runningPod); err != nil { return err } } else { @@ -1845,9 +1844,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod break } } - err = dm.KillContainerInPod(container.ID, podContainer, pod) - if err != nil { + if err := dm.KillContainerInPod(container.ID, podContainer, pod); err != nil { glog.Errorf("Error killing container: %v", err) + return err } } } @@ -1893,6 +1892,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) } + containersStarted := 0 // Start everything for idx := range containerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] @@ -1946,11 +1946,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err) continue } + containersStarted++ // Successfully started the container; clear the entry in the failure // reason cache. dm.clearReasonCache(pod, container) } + if containersStarted != len(containerChanges.ContainersToStart) { + return fmt.Errorf("not all containers have started: %d != %d", containersStarted, containerChanges.ContainersToStart) + } return nil } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 85779bde752..dbbe9d8da93 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -538,7 +538,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 { // runSyncPod is a helper function to retrieve the running pods from the fake // docker client and runs SyncPod for the given pod. -func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) { +func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) { runningPods, err := dm.GetPods(false) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -554,8 +554,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p backOff = util.NewBackOff(time.Second, time.Minute) } err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff) - if err != nil { + if err != nil && !expectErr { t.Errorf("unexpected error: %v", err) + } else if err == nil && expectErr { + t.Errorf("expected error didn't occur") } } @@ -576,7 +578,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. "create", "start", "inspect_container", "inspect_container", @@ -623,7 +625,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. @@ -675,7 +677,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Inspect pod infra container (but does not create)" @@ -722,7 +724,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Kill the container since pod infra container is not running. @@ -795,7 +797,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -849,7 +851,7 @@ func TestSyncPodBadHash(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -906,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { } dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -963,7 +965,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra contianer. @@ -1004,7 +1006,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) { Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}}, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) statuses, err := dm.GetPodStatus(pod) if err != nil { t.Errorf("unable to get pod status") @@ -1147,7 +1149,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) { fakeDocker.ContainerMap = containerMap pod.Spec.RestartPolicy = tt.policy - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) // 'stop' is because the pod infra container is killed when no container is running. verifyCalls(t, fakeDocker, tt.calls) @@ -1267,7 +1269,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) @@ -1377,16 +1379,17 @@ func TestSyncPodBackoff(t *testing.T) { backoff int killDelay int result []string + expectErr bool }{ - {1, 1, 1, startCalls}, - {2, 2, 2, startCalls}, - {3, 2, 3, backOffCalls}, - {4, 4, 4, startCalls}, - {5, 4, 5, backOffCalls}, - {6, 4, 6, backOffCalls}, - {7, 4, 7, backOffCalls}, - {8, 8, 129, startCalls}, - {130, 1, 0, startCalls}, + {1, 1, 1, startCalls, false}, + {2, 2, 2, startCalls, false}, + {3, 2, 3, backOffCalls, true}, + {4, 4, 4, startCalls, false}, + {5, 4, 5, backOffCalls, true}, + {6, 4, 6, backOffCalls, true}, + {7, 4, 7, backOffCalls, true}, + {8, 8, 129, startCalls, false}, + {130, 1, 0, startCalls, false}, } backOff := util.NewBackOff(time.Second, time.Minute) @@ -1397,7 +1400,7 @@ func TestSyncPodBackoff(t *testing.T) { fakeDocker.ContainerList = containerList fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second) - runSyncPod(t, dm, fakeDocker, pod, backOff) + runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr) verifyCalls(t, fakeDocker, c.result) if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second { @@ -1448,7 +1451,7 @@ func TestGetPodCreationFailureReason(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) if err != nil { @@ -1504,7 +1507,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) if err != nil { @@ -1544,7 +1547,7 @@ func TestGetRestartCount(t *testing.T) { // Helper function for verifying the restart count. verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus { - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) status, err := dm.GetPodStatus(pod) if err != nil { t.Fatalf("unexpected error %v", err) @@ -1621,7 +1624,7 @@ func TestGetTerminationMessagePath(t *testing.T) { fakeDocker.ContainerMap = map[string]*docker.Container{} - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) containerList := fakeDocker.ContainerList if len(containerList) != 2 { @@ -1680,7 +1683,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -1743,7 +1746,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -1817,7 +1820,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. "create", "start", "inspect_container", "inspect_container", @@ -1857,7 +1860,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e83e0498bbe..cf156a56da8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" @@ -432,7 +433,10 @@ func NewMainKubelet( return nil, err } klet.runtimeCache = runtimeCache - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder) + klet.workQueue = queue.NewBasicWorkQueue() + // TODO(yujuhong): backoff and resync interval should be set differently + // once we switch to using pod event generator. + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval) metrics.Register(runtimeCache) @@ -468,13 +472,14 @@ type nodeLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - nodeName string - dockerClient dockertools.DockerInterface - runtimeCache kubecontainer.RuntimeCache - kubeClient client.Interface - rootDirectory string - podWorkers PodWorkers + hostname string + nodeName string + dockerClient dockertools.DockerInterface + runtimeCache kubecontainer.RuntimeCache + kubeClient client.Interface + rootDirectory string + podWorkers PodWorkers + resyncInterval time.Duration resyncTicker *time.Ticker sourcesReady SourcesReadyFn @@ -642,6 +647,9 @@ type Kubelet struct { // Information about the ports which are opened by daemons on Node running this Kubelet server. daemonEndpoints *api.NodeDaemonEndpoints + + // A queue used to trigger pod workers. + workQueue queue.WorkQueue } func (kl *Kubelet) allSourcesReady() bool { @@ -1417,7 +1425,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID start := time.Now() @@ -1438,6 +1446,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont status, err := kl.generatePodStatus(pod) if err != nil { glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) + // Propagate the error upstream. + syncErr = err } else { podToUpdate := pod if mirrorPod != nil { @@ -2073,7 +2083,10 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str // state every sync-frequency seconds. Never returns. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") - kl.resyncTicker = time.NewTicker(kl.resyncInterval) + // The resyncTicker wakes up kubelet to checks if there are any pod workers + // that need to be sync'd. A one-second period is sufficient because the + // sync interval is defaulted to 10s. + kl.resyncTicker = time.NewTicker(time.Second) var housekeepingTimestamp time.Time for { if !kl.containerRuntimeUp() { @@ -2138,9 +2151,15 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case <-kl.resyncTicker.C: - // Periodically syncs all the pods and performs cleanup tasks. - glog.V(4).Infof("SyncLoop (periodic sync)") - handler.HandlePodSyncs(kl.podManager.GetPods()) + podUIDs := kl.workQueue.GetWork() + var podsToSync []*api.Pod + for _, uid := range podUIDs { + if pod, ok := kl.podManager.GetPodByUID(uid); ok { + podsToSync = append(podsToSync, pod) + } + } + glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync)) + kl.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): // We only care about failures (signalling container death) here. if update.Result == proberesults.Failure { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 34922a96767..d4929756a1f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -50,6 +50,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -145,6 +146,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.resyncInterval = 10 * time.Second + kubelet.workQueue = queue.NewBasicWorkQueue() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } diff --git a/pkg/kubelet/pod/manager.go b/pkg/kubelet/pod/manager.go index fb4813f8c1c..292ec99afcb 100644 --- a/pkg/kubelet/pod/manager.go +++ b/pkg/kubelet/pod/manager.go @@ -43,6 +43,7 @@ type Manager interface { GetPods() []*api.Pod GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool) + GetPodByUID(types.UID) (*api.Pod, bool) GetPodByMirrorPod(*api.Pod) (*api.Pod, bool) GetMirrorPodByPod(*api.Pod) (*api.Pod, bool) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) @@ -177,6 +178,15 @@ func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { return pm.GetPodByFullName(podFullName) } +// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as +// whether the pod was found. +func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pod, ok := pm.podByUID[uid] + return pod, ok +} + // GetPodByName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 4ab840572dc..b820aa80c26 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -55,6 +56,8 @@ type podWorkers struct { // runtimeCache is used for listing running containers. runtimeCache kubecontainer.RuntimeCache + workQueue queue.WorkQueue + // This function is run to sync the desired stated of pod. // NOTE: This function has to be thread-safe - it can be called for // different pods at the same time. @@ -62,6 +65,12 @@ type podWorkers struct { // The EventRecorder to use recorder record.EventRecorder + + // backOffPeriod is the duration to back off when there is a sync error. + backOffPeriod time.Duration + + // resyncInterval is the duration to wait until the next sync. + resyncInterval time.Duration } type workUpdate struct { @@ -79,7 +88,7 @@ type workUpdate struct { } func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, - recorder record.EventRecorder) *podWorkers { + recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, @@ -87,37 +96,40 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT runtimeCache: runtimeCache, syncPodFn: syncPodFn, recorder: recorder, + workQueue: workQueue, + resyncInterval: resyncInterval, + backOffPeriod: backOffPeriod, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { var minRuntimeCacheTime time.Time for newWork := range podUpdates { - func() { - defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn) + err := func() (err error) { // We would like to have the state of the containers from at least // the moment when we finished the previous processing of that pod. if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { glog.Errorf("Error updating the container runtime cache: %v", err) - return + return err } pods, err := p.runtimeCache.GetPods() if err != nil { glog.Errorf("Error getting pods while syncing pod: %v", err) - return + return err } err = p.syncPodFn(newWork.pod, newWork.mirrorPod, kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) + minRuntimeCacheTime = time.Now() if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err) - return + return err } - minRuntimeCacheTime = time.Now() - newWork.updateCompleteFn() + return nil }() + p.wrapUp(newWork.pod.UID, err) } } @@ -192,7 +204,17 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty } } -func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) { +func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { + // Requeue the last update if the last sync returned error. + if syncErr != nil { + p.workQueue.Enqueue(uid, p.backOffPeriod) + } else { + p.workQueue.Enqueue(uid, p.resyncInterval) + } + p.checkForUpdates(uid) +} + +func (p *podWorkers) checkForUpdates(uid types.UID) { p.podLock.Lock() defer p.podLock.Unlock() if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index bee5ee1f3e4..2c01755c76e 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" ) @@ -66,6 +67,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { return nil }, fakeRecorder, + queue.NewBasicWorkQueue(), + time.Second, + time.Second, ) return podWorkers, processed } @@ -200,7 +204,7 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{} - realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder) + realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second) fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} tests := []struct { diff --git a/pkg/kubelet/util/queue/work_queue.go b/pkg/kubelet/util/queue/work_queue.go new file mode 100644 index 00000000000..97ab440b79c --- /dev/null +++ b/pkg/kubelet/util/queue/work_queue.go @@ -0,0 +1,73 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// WorkQueue allows queueing items with a timestamp. An item is +// considered ready to process if the timestamp has expired. +type WorkQueue interface { + // GetWork dequeues and returns all ready items. + GetWork() []types.UID + // Enqueue inserts a new item or overwrites an existing item with the + // new timestamp (time.Now() + delay) if it is greater. + Enqueue(item types.UID, delay time.Duration) +} + +type basicWorkQueue struct { + clock util.Clock + lock sync.Mutex + queue map[types.UID]time.Time +} + +var _ WorkQueue = &basicWorkQueue{} + +func NewBasicWorkQueue() WorkQueue { + queue := make(map[types.UID]time.Time) + return &basicWorkQueue{queue: queue, clock: util.RealClock{}} +} + +func (q *basicWorkQueue) GetWork() []types.UID { + q.lock.Lock() + defer q.lock.Unlock() + now := q.clock.Now() + var items []types.UID + for k, v := range q.queue { + if v.Before(now) { + items = append(items, k) + delete(q.queue, k) + } + } + return items +} + +func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) { + q.lock.Lock() + defer q.lock.Unlock() + now := q.clock.Now() + timestamp := now.Add(delay) + existing, ok := q.queue[item] + if !ok || (ok && existing.Before(timestamp)) { + q.queue[item] = timestamp + } +} diff --git a/pkg/kubelet/util/queue/work_queue_test.go b/pkg/kubelet/util/queue/work_queue_test.go new file mode 100644 index 00000000000..f105b037321 --- /dev/null +++ b/pkg/kubelet/util/queue/work_queue_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" +) + +func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) { + fakeClock := &util.FakeClock{Time: time.Now()} + wq := &basicWorkQueue{ + clock: fakeClock, + queue: make(map[types.UID]time.Time), + } + return wq, fakeClock +} + +func compareResults(t *testing.T, expected, actual []types.UID) { + expectedSet := sets.NewString() + for _, u := range expected { + expectedSet.Insert(string(u)) + } + actualSet := sets.NewString() + for _, u := range actual { + actualSet.Insert(string(u)) + } + if !expectedSet.Equal(actualSet) { + t.Errorf("Expected %#v, got %#v", expectedSet.List(), actualSet.List()) + } +} + +func TestGetWork(t *testing.T) { + q, clock := newTestBasicWorkQueue() + q.Enqueue(types.UID("foo1"), -1*time.Minute) + q.Enqueue(types.UID("foo2"), -1*time.Minute) + q.Enqueue(types.UID("foo3"), 1*time.Minute) + q.Enqueue(types.UID("foo4"), 1*time.Minute) + expected := []types.UID{types.UID("foo1"), types.UID("foo2")} + compareResults(t, expected, q.GetWork()) + compareResults(t, []types.UID{}, q.GetWork()) + // Dial the time to 1 hour ahead. + clock.Step(time.Hour) + expected = []types.UID{types.UID("foo3"), types.UID("foo4")} + compareResults(t, expected, q.GetWork()) + compareResults(t, []types.UID{}, q.GetWork()) +} + +func TestEnqueueKeepGreaterTimestamp(t *testing.T) { + q, _ := newTestBasicWorkQueue() + item := types.UID("foo") + q.Enqueue(item, -7*time.Hour) + q.Enqueue(item, 3*time.Hour) + compareResults(t, []types.UID{}, q.GetWork()) + + q.Enqueue(item, 3*time.Hour) + q.Enqueue(item, -7*time.Hour) + compareResults(t, []types.UID{}, q.GetWork()) +}