Fix race in informer

This commit is contained in:
Wojciech Tyczynski 2016-06-14 09:32:57 +02:00
parent e0cdf47809
commit 5d702a32c1
10 changed files with 82 additions and 63 deletions

View File

@ -363,10 +363,12 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
// added/updated. The item is removed from the queue (and the store) before it // added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back // is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent(). // with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys).
// //
// Pop returns a 'Deltas', which has a complete list of all the things // Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue. // that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop() interface{} { func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
for { for {
@ -386,7 +388,7 @@ func (f *DeltaFIFO) Pop() interface{} {
delete(f.items, id) delete(f.items, id)
// Don't need to copyDeltas here, because we're transferring // Don't need to copyDeltas here, because we're transferring
// ownership to the caller. // ownership to the caller.
return item return item, process(item)
} }
} }

View File

@ -24,7 +24,7 @@ import (
// helper function to reduce stuttering // helper function to reduce stuttering
func testPop(f *DeltaFIFO) testFifoObject { func testPop(f *DeltaFIFO) testFifoObject {
return f.Pop().(Deltas).Newest().Object.(testFifoObject) return Pop(f).(Deltas).Newest().Object.(testFifoObject)
} }
// keyLookupFunc adapts a raw function to be a KeyLookup. // keyLookupFunc adapts a raw function to be a KeyLookup.
@ -104,7 +104,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) { if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) { if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) {
t.Fatalf("Expected %#v, got %#v", e, a) t.Fatalf("Expected %#v, got %#v", e, a)
} }
@ -126,7 +126,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
got := make(chan testFifoObject, 2) got := make(chan testFifoObject, 2)
go func() { go func() {
for { for {
obj := f.Pop().(Deltas).Newest().Object.(testFifoObject) obj := testPop(f)
t.Logf("got a thing %#v", obj) t.Logf("got a thing %#v", obj)
t.Logf("D len: %v", len(f.queue)) t.Logf("D len: %v", len(f.queue))
got <- obj got <- obj
@ -240,7 +240,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
} }
for _, expected := range expectedList { for _, expected := range expectedList {
cur := f.Pop().(Deltas) cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) { if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }
@ -279,9 +279,9 @@ func TestDeltaFIFO_addIfNotPresent(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
f.Add(mkFifoObj("b", 3)) f.Add(mkFifoObj("b", 3))
b3 := f.Pop() b3 := Pop(f)
f.Add(mkFifoObj("c", 4)) f.Add(mkFifoObj("c", 4))
c4 := f.Pop() c4 := Pop(f)
if e, a := 0, len(f.items); e != a { if e, a := 0, len(f.items); e != a {
t.Fatalf("Expected %v, got %v items in queue", e, a) t.Fatalf("Expected %v, got %v items in queue", e, a)
} }
@ -358,15 +358,15 @@ func TestDeltaFIFO_HasSynced(t *testing.T) {
{ {
actions: []func(f *DeltaFIFO){ actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() }, func(f *DeltaFIFO) { Pop(f) },
}, },
expectedSynced: false, expectedSynced: false,
}, },
{ {
actions: []func(f *DeltaFIFO){ actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() }, func(f *DeltaFIFO) { Pop(f) },
func(f *DeltaFIFO) { f.Pop() }, func(f *DeltaFIFO) { Pop(f) },
}, },
expectedSynced: true, expectedSynced: true,
}, },

View File

@ -22,12 +22,17 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the element popped from the queue.
type PopProcessFunc func(interface{}) error
// Queue is exactly like a Store, but has a Pop() method too. // Queue is exactly like a Store, but has a Pop() method too.
type Queue interface { type Queue interface {
Store Store
// Pop blocks until it has something to return. // Pop blocks until it has something to process.
Pop() interface{} // It returns the object that was process and the result of processing.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously // AddIfNotPresent adds a value previously
// returned by Pop back into the queue as long // returned by Pop back into the queue as long
@ -39,6 +44,16 @@ type Queue interface {
HasSynced() bool HasSynced() bool
} }
// Helper function for popping from Queue.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
result = obj
return nil
})
return result
}
// FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO receives adds and updates from a Reflector, and puts them in a queue for
// FIFO order processing. If multiple adds/updates of a single item happen while // FIFO order processing. If multiple adds/updates of a single item happen while
// an item is in the queue before it has been processed, it will only be // an item is in the queue before it has been processed, it will only be
@ -183,12 +198,13 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
return item, exists, nil return item, exists, nil
} }
// Pop waits until an item is ready and returns it. If multiple items are // Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated. // ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is returned, // The item is removed from the queue (and the store) before it is processed,
// so if you don't successfully process it, you need to add it back with // so if you don't successfully process it, it should be added back with
// AddIfNotPresent(). // AddIfNotPresent(). process function is called under lock, so it is safe
func (f *FIFO) Pop() interface{} { // update data structures in it that need to be in sync with the queue.
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
for { for {
@ -206,7 +222,7 @@ func (f *FIFO) Pop() interface{} {
continue continue
} }
delete(f.items, id) delete(f.items, id)
return item return item, process(item)
} }
} }

View File

@ -52,7 +52,7 @@ func TestFIFO_basic(t *testing.T) {
lastInt := int(0) lastInt := int(0)
lastUint := uint64(0) lastUint := uint64(0)
for i := 0; i < amount*2; i++ { for i := 0; i < amount*2; i++ {
switch obj := f.Pop().(testFifoObject).val.(type) { switch obj := Pop(f).(testFifoObject).val.(type) {
case int: case int:
if obj <= lastInt { if obj <= lastInt {
t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) t.Errorf("got %v (int) out of order, last was %v", obj, lastInt)
@ -85,7 +85,7 @@ func TestFIFO_addUpdate(t *testing.T) {
got := make(chan testFifoObject, 2) got := make(chan testFifoObject, 2)
go func() { go func() {
for { for {
got <- f.Pop().(testFifoObject) got <- Pop(f).(testFifoObject)
} }
}() }()
@ -111,7 +111,7 @@ func TestFIFO_addReplace(t *testing.T) {
got := make(chan testFifoObject, 2) got := make(chan testFifoObject, 2)
go func() { go func() {
for { for {
got <- f.Pop().(testFifoObject) got <- Pop(f).(testFifoObject)
} }
}() }()
@ -139,21 +139,21 @@ func TestFIFO_detectLineJumpers(t *testing.T) {
f.Add(mkFifoObj("foo", 13)) f.Add(mkFifoObj("foo", 13))
f.Add(mkFifoObj("zab", 30)) f.Add(mkFifoObj("zab", 30))
if e, a := 13, f.Pop().(testFifoObject).val; a != e { if e, a := 13, Pop(f).(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a) t.Fatalf("expected %d, got %d", e, a)
} }
f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line
if e, a := 1, f.Pop().(testFifoObject).val; a != e { if e, a := 1, Pop(f).(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a) t.Fatalf("expected %d, got %d", e, a)
} }
if e, a := 30, f.Pop().(testFifoObject).val; a != e { if e, a := 30, Pop(f).(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a) t.Fatalf("expected %d, got %d", e, a)
} }
if e, a := 14, f.Pop().(testFifoObject).val; a != e { if e, a := 14, Pop(f).(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a) t.Fatalf("expected %d, got %d", e, a)
} }
} }
@ -172,7 +172,7 @@ func TestFIFO_addIfNotPresent(t *testing.T) {
expectedValues := []int{1, 2, 4} expectedValues := []int{1, 2, 4}
for _, expected := range expectedValues { for _, expected := range expectedValues {
if actual := f.Pop().(testFifoObject).val; actual != expected { if actual := Pop(f).(testFifoObject).val; actual != expected {
t.Fatalf("expected value %d, got %d", expected, actual) t.Fatalf("expected value %d, got %d", expected, actual)
} }
} }
@ -208,15 +208,15 @@ func TestFIFO_HasSynced(t *testing.T) {
{ {
actions: []func(f *FIFO){ actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() }, func(f *FIFO) { Pop(f) },
}, },
expectedSynced: false, expectedSynced: false,
}, },
{ {
actions: []func(f *FIFO){ actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() }, func(f *FIFO) { Pop(f) },
func(f *FIFO) { f.Pop() }, func(f *FIFO) { Pop(f) },
}, },
expectedSynced: true, expectedSynced: true,
}, },

View File

@ -252,7 +252,7 @@ func TestReflectorListAndWatch(t *testing.T) {
// Verify we received the right ids with the right resource versions. // Verify we received the right ids with the right resource versions.
for i, id := range ids { for i, id := range ids {
pod := s.Pop().(*api.Pod) pod := Pop(s).(*api.Pod)
if e, a := id, pod.Name; e != a { if e, a := id, pod.Name; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a) t.Errorf("%v: Expected %v, got %v", i, e, a)
} }

View File

@ -124,8 +124,7 @@ func (c *Controller) Requeue(obj interface{}) error {
// concurrently. // concurrently.
func (c *Controller) processLoop() { func (c *Controller) processLoop() {
for { for {
obj := c.config.Queue.Pop() obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
err := c.config.Process(obj)
if err != nil { if err != nil {
if c.config.RetryOnError { if c.config.RetryOnError {
// This is the safe way to re-enqueue. // This is the safe way to re-enqueue.

View File

@ -184,29 +184,32 @@ func (s *ServiceController) init() error {
// Loop infinitely, processing all service updates provided by the queue. // Loop infinitely, processing all service updates provided by the queue.
func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
for { for {
newItem := serviceQueue.Pop() serviceQueue.Pop(func(obj interface{}) error {
deltas, ok := newItem.(cache.Deltas) deltas, ok := obj.(cache.Deltas)
if !ok { if !ok {
glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem) runtime.HandleError(fmt.Errorf("Received object from service watcher that wasn't Deltas: %+v", obj))
} return nil
delta := deltas.Newest() }
if delta == nil { delta := deltas.Newest()
glog.Errorf("Received nil delta from watcher queue.") if delta == nil {
continue runtime.HandleError(fmt.Errorf("Received nil delta from watcher queue."))
} return nil
err, retryDelay := s.processDelta(delta) }
if retryDelay != 0 { err, retryDelay := s.processDelta(delta)
// Add the failed service back to the queue so we'll retry it. if retryDelay != 0 {
glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err) // Add the failed service back to the queue so we'll retry it.
go func(deltas cache.Deltas, delay time.Duration) { runtime.HandleError(fmt.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err))
time.Sleep(delay) go func(deltas cache.Deltas, delay time.Duration) {
if err := serviceQueue.AddIfNotPresent(deltas); err != nil { time.Sleep(delay)
glog.Errorf("Error requeuing service delta - will not retry: %v", err) if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
} runtime.HandleError(fmt.Errorf("Error requeuing service delta - will not retry: %v", err))
}(deltas, retryDelay) }
} else if err != nil { }(deltas, retryDelay)
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) } else if err != nil {
} runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
}
return nil
})
} }
} }

View File

@ -142,8 +142,7 @@ func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan s
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel) cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
return func() *api.Pod { return func() *api.Pod {
obj := queue.Pop() return cache.Pop(queue).(*api.Pod)
return obj.(*api.Pod)
} }
} }

View File

@ -415,7 +415,7 @@ func (f *ConfigFactory) Run() {
func (f *ConfigFactory) getNextPod() *api.Pod { func (f *ConfigFactory) getNextPod() *api.Pod {
for { for {
pod := f.PodQueue.Pop().(*api.Pod) pod := cache.Pop(f.PodQueue).(*api.Pod)
if f.responsibleForPod(pod) { if f.responsibleForPod(pod) {
glog.V(4).Infof("About to try and schedule pod %v", pod.Name) glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod return pod

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache" clientcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/diff"
@ -202,8 +202,8 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// Setup stores to test pod's workflow: // Setup stores to test pod's workflow:
// - queuedPodStore: pods queued before processing // - queuedPodStore: pods queued before processing
// - scheduledPodStore: pods that has a scheduling decision // - scheduledPodStore: pods that has a scheduling decision
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) scheduledPodStore := clientcache.NewStore(clientcache.MetaNamespaceKeyFunc)
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
// Port is the easiest way to cause a fit predicate failure // Port is the easiest way to cause a fit predicate failure
podPort := 8080 podPort := 8080
@ -232,7 +232,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
return nil return nil
}}, }},
NextPod: func() *api.Pod { NextPod: func() *api.Pod {
return queuedPodStore.Pop().(*api.Pod) return clientcache.Pop(queuedPodStore).(*api.Pod)
}, },
Error: func(p *api.Pod, err error) { Error: func(p *api.Pod, err error) {
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err) t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)