diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 4304ec701a5..1332ff0b354 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1827,8 +1827,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod } // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) - err = dm.KillPod(pod, runningPod) - if err != nil { + if err := dm.KillPod(pod, runningPod); err != nil { return err } } else { @@ -1845,9 +1844,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod break } } - err = dm.KillContainerInPod(container.ID, podContainer, pod) - if err != nil { + if err := dm.KillContainerInPod(container.ID, podContainer, pod); err != nil { glog.Errorf("Error killing container: %v", err) + return err } } } @@ -1893,6 +1892,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) } + containersStarted := 0 // Start everything for idx := range containerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] @@ -1946,11 +1946,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err) continue } + containersStarted++ // Successfully started the container; clear the entry in the failure // reason cache. dm.clearReasonCache(pod, container) } + if containersStarted != len(containerChanges.ContainersToStart) { + return fmt.Errorf("not all containers have started: %d != %d", containersStarted, containerChanges.ContainersToStart) + } return nil } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 85779bde752..dbbe9d8da93 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -538,7 +538,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 { // runSyncPod is a helper function to retrieve the running pods from the fake // docker client and runs SyncPod for the given pod. -func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) { +func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) { runningPods, err := dm.GetPods(false) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -554,8 +554,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p backOff = util.NewBackOff(time.Second, time.Minute) } err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff) - if err != nil { + if err != nil && !expectErr { t.Errorf("unexpected error: %v", err) + } else if err == nil && expectErr { + t.Errorf("expected error didn't occur") } } @@ -576,7 +578,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. "create", "start", "inspect_container", "inspect_container", @@ -623,7 +625,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. @@ -675,7 +677,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Inspect pod infra container (but does not create)" @@ -722,7 +724,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Kill the container since pod infra container is not running. @@ -795,7 +797,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -849,7 +851,7 @@ func TestSyncPodBadHash(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -906,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { } dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -963,7 +965,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra contianer. @@ -1004,7 +1006,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) { Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}}, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) statuses, err := dm.GetPodStatus(pod) if err != nil { t.Errorf("unable to get pod status") @@ -1147,7 +1149,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) { fakeDocker.ContainerMap = containerMap pod.Spec.RestartPolicy = tt.policy - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) // 'stop' is because the pod infra container is killed when no container is running. verifyCalls(t, fakeDocker, tt.calls) @@ -1267,7 +1269,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) @@ -1377,16 +1379,17 @@ func TestSyncPodBackoff(t *testing.T) { backoff int killDelay int result []string + expectErr bool }{ - {1, 1, 1, startCalls}, - {2, 2, 2, startCalls}, - {3, 2, 3, backOffCalls}, - {4, 4, 4, startCalls}, - {5, 4, 5, backOffCalls}, - {6, 4, 6, backOffCalls}, - {7, 4, 7, backOffCalls}, - {8, 8, 129, startCalls}, - {130, 1, 0, startCalls}, + {1, 1, 1, startCalls, false}, + {2, 2, 2, startCalls, false}, + {3, 2, 3, backOffCalls, true}, + {4, 4, 4, startCalls, false}, + {5, 4, 5, backOffCalls, true}, + {6, 4, 6, backOffCalls, true}, + {7, 4, 7, backOffCalls, true}, + {8, 8, 129, startCalls, false}, + {130, 1, 0, startCalls, false}, } backOff := util.NewBackOff(time.Second, time.Minute) @@ -1397,7 +1400,7 @@ func TestSyncPodBackoff(t *testing.T) { fakeDocker.ContainerList = containerList fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second) - runSyncPod(t, dm, fakeDocker, pod, backOff) + runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr) verifyCalls(t, fakeDocker, c.result) if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second { @@ -1448,7 +1451,7 @@ func TestGetPodCreationFailureReason(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) if err != nil { @@ -1504,7 +1507,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) // Check if we can retrieve the pod status. status, err := dm.GetPodStatus(pod) if err != nil { @@ -1544,7 +1547,7 @@ func TestGetRestartCount(t *testing.T) { // Helper function for verifying the restart count. verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus { - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) status, err := dm.GetPodStatus(pod) if err != nil { t.Fatalf("unexpected error %v", err) @@ -1621,7 +1624,7 @@ func TestGetTerminationMessagePath(t *testing.T) { fakeDocker.ContainerMap = map[string]*docker.Container{} - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) containerList := fakeDocker.ContainerList if len(containerList) != 2 { @@ -1680,7 +1683,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -1743,7 +1746,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, true) verifyCalls(t, fakeDocker, []string{ // Check the pod infra container. @@ -1817,7 +1820,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. "create", "start", "inspect_container", "inspect_container", @@ -1857,7 +1860,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) { }, } - runSyncPod(t, dm, fakeDocker, pod, nil) + runSyncPod(t, dm, fakeDocker, pod, nil, false) verifyCalls(t, fakeDocker, []string{ // Create pod infra container. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e83e0498bbe..cf156a56da8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" @@ -432,7 +433,10 @@ func NewMainKubelet( return nil, err } klet.runtimeCache = runtimeCache - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder) + klet.workQueue = queue.NewBasicWorkQueue() + // TODO(yujuhong): backoff and resync interval should be set differently + // once we switch to using pod event generator. + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval) metrics.Register(runtimeCache) @@ -468,13 +472,14 @@ type nodeLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - nodeName string - dockerClient dockertools.DockerInterface - runtimeCache kubecontainer.RuntimeCache - kubeClient client.Interface - rootDirectory string - podWorkers PodWorkers + hostname string + nodeName string + dockerClient dockertools.DockerInterface + runtimeCache kubecontainer.RuntimeCache + kubeClient client.Interface + rootDirectory string + podWorkers PodWorkers + resyncInterval time.Duration resyncTicker *time.Ticker sourcesReady SourcesReadyFn @@ -642,6 +647,9 @@ type Kubelet struct { // Information about the ports which are opened by daemons on Node running this Kubelet server. daemonEndpoints *api.NodeDaemonEndpoints + + // A queue used to trigger pod workers. + workQueue queue.WorkQueue } func (kl *Kubelet) allSourcesReady() bool { @@ -1417,7 +1425,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { +func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID start := time.Now() @@ -1438,6 +1446,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont status, err := kl.generatePodStatus(pod) if err != nil { glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) + // Propagate the error upstream. + syncErr = err } else { podToUpdate := pod if mirrorPod != nil { @@ -2073,7 +2083,10 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str // state every sync-frequency seconds. Never returns. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") - kl.resyncTicker = time.NewTicker(kl.resyncInterval) + // The resyncTicker wakes up kubelet to checks if there are any pod workers + // that need to be sync'd. A one-second period is sufficient because the + // sync interval is defaulted to 10s. + kl.resyncTicker = time.NewTicker(time.Second) var housekeepingTimestamp time.Time for { if !kl.containerRuntimeUp() { @@ -2138,9 +2151,15 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler glog.Errorf("Kubelet does not support snapshot update") } case <-kl.resyncTicker.C: - // Periodically syncs all the pods and performs cleanup tasks. - glog.V(4).Infof("SyncLoop (periodic sync)") - handler.HandlePodSyncs(kl.podManager.GetPods()) + podUIDs := kl.workQueue.GetWork() + var podsToSync []*api.Pod + for _, uid := range podUIDs { + if pod, ok := kl.podManager.GetPodByUID(uid); ok { + podsToSync = append(podsToSync, pod) + } + } + glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync)) + kl.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): // We only care about failures (signalling container death) here. if update.Result == proberesults.Failure { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 34922a96767..d4929756a1f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -50,6 +50,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -145,6 +146,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.resyncInterval = 10 * time.Second + kubelet.workQueue = queue.NewBasicWorkQueue() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } diff --git a/pkg/kubelet/pod/manager.go b/pkg/kubelet/pod/manager.go index fb4813f8c1c..292ec99afcb 100644 --- a/pkg/kubelet/pod/manager.go +++ b/pkg/kubelet/pod/manager.go @@ -43,6 +43,7 @@ type Manager interface { GetPods() []*api.Pod GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool) + GetPodByUID(types.UID) (*api.Pod, bool) GetPodByMirrorPod(*api.Pod) (*api.Pod, bool) GetMirrorPodByPod(*api.Pod) (*api.Pod, bool) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) @@ -177,6 +178,15 @@ func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { return pm.GetPodByFullName(podFullName) } +// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as +// whether the pod was found. +func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pod, ok := pm.podByUID[uid] + return pod, ok +} + // GetPodByName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 4ab840572dc..b820aa80c26 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -55,6 +56,8 @@ type podWorkers struct { // runtimeCache is used for listing running containers. runtimeCache kubecontainer.RuntimeCache + workQueue queue.WorkQueue + // This function is run to sync the desired stated of pod. // NOTE: This function has to be thread-safe - it can be called for // different pods at the same time. @@ -62,6 +65,12 @@ type podWorkers struct { // The EventRecorder to use recorder record.EventRecorder + + // backOffPeriod is the duration to back off when there is a sync error. + backOffPeriod time.Duration + + // resyncInterval is the duration to wait until the next sync. + resyncInterval time.Duration } type workUpdate struct { @@ -79,7 +88,7 @@ type workUpdate struct { } func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, - recorder record.EventRecorder) *podWorkers { + recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, @@ -87,37 +96,40 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT runtimeCache: runtimeCache, syncPodFn: syncPodFn, recorder: recorder, + workQueue: workQueue, + resyncInterval: resyncInterval, + backOffPeriod: backOffPeriod, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { var minRuntimeCacheTime time.Time for newWork := range podUpdates { - func() { - defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn) + err := func() (err error) { // We would like to have the state of the containers from at least // the moment when we finished the previous processing of that pod. if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { glog.Errorf("Error updating the container runtime cache: %v", err) - return + return err } pods, err := p.runtimeCache.GetPods() if err != nil { glog.Errorf("Error getting pods while syncing pod: %v", err) - return + return err } err = p.syncPodFn(newWork.pod, newWork.mirrorPod, kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) + minRuntimeCacheTime = time.Now() if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err) - return + return err } - minRuntimeCacheTime = time.Now() - newWork.updateCompleteFn() + return nil }() + p.wrapUp(newWork.pod.UID, err) } } @@ -192,7 +204,17 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty } } -func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) { +func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { + // Requeue the last update if the last sync returned error. + if syncErr != nil { + p.workQueue.Enqueue(uid, p.backOffPeriod) + } else { + p.workQueue.Enqueue(uid, p.resyncInterval) + } + p.checkForUpdates(uid) +} + +func (p *podWorkers) checkForUpdates(uid types.UID) { p.podLock.Lock() defer p.podLock.Unlock() if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index bee5ee1f3e4..2c01755c76e 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" ) @@ -66,6 +67,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { return nil }, fakeRecorder, + queue.NewBasicWorkQueue(), + time.Second, + time.Second, ) return podWorkers, processed } @@ -200,7 +204,7 @@ func TestFakePodWorkers(t *testing.T) { kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{} - realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder) + realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second) fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} tests := []struct { diff --git a/pkg/kubelet/util/queue/work_queue.go b/pkg/kubelet/util/queue/work_queue.go new file mode 100644 index 00000000000..97ab440b79c --- /dev/null +++ b/pkg/kubelet/util/queue/work_queue.go @@ -0,0 +1,73 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// WorkQueue allows queueing items with a timestamp. An item is +// considered ready to process if the timestamp has expired. +type WorkQueue interface { + // GetWork dequeues and returns all ready items. + GetWork() []types.UID + // Enqueue inserts a new item or overwrites an existing item with the + // new timestamp (time.Now() + delay) if it is greater. + Enqueue(item types.UID, delay time.Duration) +} + +type basicWorkQueue struct { + clock util.Clock + lock sync.Mutex + queue map[types.UID]time.Time +} + +var _ WorkQueue = &basicWorkQueue{} + +func NewBasicWorkQueue() WorkQueue { + queue := make(map[types.UID]time.Time) + return &basicWorkQueue{queue: queue, clock: util.RealClock{}} +} + +func (q *basicWorkQueue) GetWork() []types.UID { + q.lock.Lock() + defer q.lock.Unlock() + now := q.clock.Now() + var items []types.UID + for k, v := range q.queue { + if v.Before(now) { + items = append(items, k) + delete(q.queue, k) + } + } + return items +} + +func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) { + q.lock.Lock() + defer q.lock.Unlock() + now := q.clock.Now() + timestamp := now.Add(delay) + existing, ok := q.queue[item] + if !ok || (ok && existing.Before(timestamp)) { + q.queue[item] = timestamp + } +} diff --git a/pkg/kubelet/util/queue/work_queue_test.go b/pkg/kubelet/util/queue/work_queue_test.go new file mode 100644 index 00000000000..f105b037321 --- /dev/null +++ b/pkg/kubelet/util/queue/work_queue_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" +) + +func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) { + fakeClock := &util.FakeClock{Time: time.Now()} + wq := &basicWorkQueue{ + clock: fakeClock, + queue: make(map[types.UID]time.Time), + } + return wq, fakeClock +} + +func compareResults(t *testing.T, expected, actual []types.UID) { + expectedSet := sets.NewString() + for _, u := range expected { + expectedSet.Insert(string(u)) + } + actualSet := sets.NewString() + for _, u := range actual { + actualSet.Insert(string(u)) + } + if !expectedSet.Equal(actualSet) { + t.Errorf("Expected %#v, got %#v", expectedSet.List(), actualSet.List()) + } +} + +func TestGetWork(t *testing.T) { + q, clock := newTestBasicWorkQueue() + q.Enqueue(types.UID("foo1"), -1*time.Minute) + q.Enqueue(types.UID("foo2"), -1*time.Minute) + q.Enqueue(types.UID("foo3"), 1*time.Minute) + q.Enqueue(types.UID("foo4"), 1*time.Minute) + expected := []types.UID{types.UID("foo1"), types.UID("foo2")} + compareResults(t, expected, q.GetWork()) + compareResults(t, []types.UID{}, q.GetWork()) + // Dial the time to 1 hour ahead. + clock.Step(time.Hour) + expected = []types.UID{types.UID("foo3"), types.UID("foo4")} + compareResults(t, expected, q.GetWork()) + compareResults(t, []types.UID{}, q.GetWork()) +} + +func TestEnqueueKeepGreaterTimestamp(t *testing.T) { + q, _ := newTestBasicWorkQueue() + item := types.UID("foo") + q.Enqueue(item, -7*time.Hour) + q.Enqueue(item, 3*time.Hour) + compareResults(t, []types.UID{}, q.GetWork()) + + q.Enqueue(item, 3*time.Hour) + q.Enqueue(item, -7*time.Hour) + compareResults(t, []types.UID{}, q.GetWork()) +}