diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index 3603c5f0028..8ff9d7a36c9 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -363,10 +363,12 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). +// process function is called under lock, so it is safe update data structures +// in it that need to be in sync with the queue (e.g. knownKeys). // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. -func (f *DeltaFIFO) Pop() interface{} { +func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { @@ -386,7 +388,7 @@ func (f *DeltaFIFO) Pop() interface{} { delete(f.items, id) // Don't need to copyDeltas here, because we're transferring // ownership to the caller. - return item + return item, process(item) } } diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index 8efd982b5a4..2bead89a848 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -24,7 +24,7 @@ import ( // helper function to reduce stuttering func testPop(f *DeltaFIFO) testFifoObject { - return f.Pop().(Deltas).Newest().Object.(testFifoObject) + return Pop(f).(Deltas).Newest().Object.(testFifoObject) } // keyLookupFunc adapts a raw function to be a KeyLookup. @@ -104,7 +104,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } - if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) { + if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) { t.Fatalf("Expected %#v, got %#v", e, a) } @@ -126,7 +126,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - obj := f.Pop().(Deltas).Newest().Object.(testFifoObject) + obj := testPop(f) t.Logf("got a thing %#v", obj) t.Logf("D len: %v", len(f.queue)) got <- obj @@ -240,7 +240,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := f.Pop().(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -279,9 +279,9 @@ func TestDeltaFIFO_addIfNotPresent(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f.Add(mkFifoObj("b", 3)) - b3 := f.Pop() + b3 := Pop(f) f.Add(mkFifoObj("c", 4)) - c4 := f.Pop() + c4 := Pop(f) if e, a := 0, len(f.items); e != a { t.Fatalf("Expected %v, got %v items in queue", e, a) } @@ -358,15 +358,15 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { f.Pop() }, + func(f *DeltaFIFO) { Pop(f) }, }, expectedSynced: false, }, { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { f.Pop() }, - func(f *DeltaFIFO) { f.Pop() }, + func(f *DeltaFIFO) { Pop(f) }, + func(f *DeltaFIFO) { Pop(f) }, }, expectedSynced: true, }, diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index f98bea6f445..17e52929775 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -22,12 +22,17 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) +// PopProcessFunc is passed to Pop() method of Queue interface. +// It is supposed to process the element popped from the queue. +type PopProcessFunc func(interface{}) error + // Queue is exactly like a Store, but has a Pop() method too. type Queue interface { Store - // Pop blocks until it has something to return. - Pop() interface{} + // Pop blocks until it has something to process. + // It returns the object that was process and the result of processing. + Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent adds a value previously // returned by Pop back into the queue as long @@ -39,6 +44,16 @@ type Queue interface { HasSynced() bool } +// Helper function for popping from Queue. +func Pop(queue Queue) interface{} { + var result interface{} + queue.Pop(func(obj interface{}) error { + result = obj + return nil + }) + return result +} + // FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO order processing. If multiple adds/updates of a single item happen while // an item is in the queue before it has been processed, it will only be @@ -183,12 +198,13 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { return item, exists, nil } -// Pop waits until an item is ready and returns it. If multiple items are +// Pop waits until an item is ready and processes it. If multiple items are // ready, they are returned in the order in which they were added/updated. -// The item is removed from the queue (and the store) before it is returned, -// so if you don't successfully process it, you need to add it back with -// AddIfNotPresent(). -func (f *FIFO) Pop() interface{} { +// The item is removed from the queue (and the store) before it is processed, +// so if you don't successfully process it, it should be added back with +// AddIfNotPresent(). process function is called under lock, so it is safe +// update data structures in it that need to be in sync with the queue. +func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { @@ -206,7 +222,7 @@ func (f *FIFO) Pop() interface{} { continue } delete(f.items, id) - return item + return item, process(item) } } diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index 974fa6d3b73..0f2ceb8eaa1 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -52,7 +52,7 @@ func TestFIFO_basic(t *testing.T) { lastInt := int(0) lastUint := uint64(0) for i := 0; i < amount*2; i++ { - switch obj := f.Pop().(testFifoObject).val.(type) { + switch obj := Pop(f).(testFifoObject).val.(type) { case int: if obj <= lastInt { t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) @@ -85,7 +85,7 @@ func TestFIFO_addUpdate(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - got <- f.Pop().(testFifoObject) + got <- Pop(f).(testFifoObject) } }() @@ -111,7 +111,7 @@ func TestFIFO_addReplace(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - got <- f.Pop().(testFifoObject) + got <- Pop(f).(testFifoObject) } }() @@ -139,21 +139,21 @@ func TestFIFO_detectLineJumpers(t *testing.T) { f.Add(mkFifoObj("foo", 13)) f.Add(mkFifoObj("zab", 30)) - if e, a := 13, f.Pop().(testFifoObject).val; a != e { + if e, a := 13, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line - if e, a := 1, f.Pop().(testFifoObject).val; a != e { + if e, a := 1, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 30, f.Pop().(testFifoObject).val; a != e { + if e, a := 30, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 14, f.Pop().(testFifoObject).val; a != e { + if e, a := 14, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } } @@ -172,7 +172,7 @@ func TestFIFO_addIfNotPresent(t *testing.T) { expectedValues := []int{1, 2, 4} for _, expected := range expectedValues { - if actual := f.Pop().(testFifoObject).val; actual != expected { + if actual := Pop(f).(testFifoObject).val; actual != expected { t.Fatalf("expected value %d, got %d", expected, actual) } } @@ -208,15 +208,15 @@ func TestFIFO_HasSynced(t *testing.T) { { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { f.Pop() }, + func(f *FIFO) { Pop(f) }, }, expectedSynced: false, }, { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { f.Pop() }, - func(f *FIFO) { f.Pop() }, + func(f *FIFO) { Pop(f) }, + func(f *FIFO) { Pop(f) }, }, expectedSynced: true, }, diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index d28bdb7b247..f560e449cae 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -252,7 +252,7 @@ func TestReflectorListAndWatch(t *testing.T) { // Verify we received the right ids with the right resource versions. for i, id := range ids { - pod := s.Pop().(*api.Pod) + pod := Pop(s).(*api.Pod) if e, a := id, pod.Name; e != a { t.Errorf("%v: Expected %v, got %v", i, e, a) } diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index cfffabe8cd4..c6363952b0c 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -124,8 +124,7 @@ func (c *Controller) Requeue(obj interface{}) error { // concurrently. func (c *Controller) processLoop() { for { - obj := c.config.Queue.Pop() - err := c.config.Process(obj) + obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) if err != nil { if c.config.RetryOnError { // This is the safe way to re-enqueue. diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 0fb1820869e..30ebe095497 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -184,29 +184,32 @@ func (s *ServiceController) init() error { // Loop infinitely, processing all service updates provided by the queue. func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { for { - newItem := serviceQueue.Pop() - deltas, ok := newItem.(cache.Deltas) - if !ok { - glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem) - } - delta := deltas.Newest() - if delta == nil { - glog.Errorf("Received nil delta from watcher queue.") - continue - } - err, retryDelay := s.processDelta(delta) - if retryDelay != 0 { - // Add the failed service back to the queue so we'll retry it. - glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err) - go func(deltas cache.Deltas, delay time.Duration) { - time.Sleep(delay) - if err := serviceQueue.AddIfNotPresent(deltas); err != nil { - glog.Errorf("Error requeuing service delta - will not retry: %v", err) - } - }(deltas, retryDelay) - } else if err != nil { - runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) - } + serviceQueue.Pop(func(obj interface{}) error { + deltas, ok := obj.(cache.Deltas) + if !ok { + runtime.HandleError(fmt.Errorf("Received object from service watcher that wasn't Deltas: %+v", obj)) + return nil + } + delta := deltas.Newest() + if delta == nil { + runtime.HandleError(fmt.Errorf("Received nil delta from watcher queue.")) + return nil + } + err, retryDelay := s.processDelta(delta) + if retryDelay != 0 { + // Add the failed service back to the queue so we'll retry it. + runtime.HandleError(fmt.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err)) + go func(deltas cache.Deltas, delay time.Duration) { + time.Sleep(delay) + if err := serviceQueue.AddIfNotPresent(deltas); err != nil { + runtime.HandleError(fmt.Errorf("Error requeuing service delta - will not retry: %v", err)) + } + }(deltas, retryDelay) + } else if err != nil { + runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) + } + return nil + }) } } diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 0b20a4be734..696f9254b08 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -142,8 +142,7 @@ func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan s cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel) return func() *api.Pod { - obj := queue.Pop() - return obj.(*api.Pod) + return cache.Pop(queue).(*api.Pod) } } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index f02d7fa877b..0bbbcbf3329 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -415,7 +415,7 @@ func (f *ConfigFactory) Run() { func (f *ConfigFactory) getNextPod() *api.Pod { for { - pod := f.PodQueue.Pop().(*api.Pod) + pod := cache.Pop(f.PodQueue).(*api.Pod) if f.responsibleForPod(pod) { glog.V(4).Infof("About to try and schedule pod %v", pod.Name) return pod diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 106815f89a0..cb326ec0e26 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -26,7 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/client/cache" + clientcache "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/diff" @@ -202,8 +202,8 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // Setup stores to test pod's workflow: // - queuedPodStore: pods queued before processing // - scheduledPodStore: pods that has a scheduling decision - scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + scheduledPodStore := clientcache.NewStore(clientcache.MetaNamespaceKeyFunc) + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) // Port is the easiest way to cause a fit predicate failure podPort := 8080 @@ -232,7 +232,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { return nil }}, NextPod: func() *api.Pod { - return queuedPodStore.Pop().(*api.Pod) + return clientcache.Pop(queuedPodStore).(*api.Pod) }, Error: func(p *api.Pod, err error) { t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)