From 03728c1ebef88c3e26a4c5ef7c54f1525985689f Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Thu, 29 May 2025 00:34:43 +0900 Subject: [PATCH 1/6] handle context in process loop Kubernetes-commit: f67d30b3529ea970dd5fd069eaddfc7f61d74169 --- tools/cache/controller.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 1497700d..f0a483b6 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -204,13 +204,15 @@ func (c *controller) LastSyncResourceVersion() string { // concurrently. func (c *controller) processLoop(ctx context.Context) { for { - // TODO: Plumb through the ctx so that this can - // actually exit when the controller is stopped. Or just give up on this stuff - // ever being stoppable. - _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) - if err != nil { - if err == ErrFIFOClosed { - return + select { + case <-ctx.Done(): + return + default: + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + if err != nil { + if err == ErrFIFOClosed { + return + } } } } From d56ed5816f9fe01c7df0afdc27760e393b6eb521 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Sat, 14 Jun 2025 21:59:56 +0900 Subject: [PATCH 2/6] pop respects the context Kubernetes-commit: 1c33d98762511c10f89c40358a1935250b03b0c8 --- tools/cache/controller.go | 4 ++-- tools/cache/delta_fifo.go | 8 +++++++- tools/cache/fifo.go | 21 +++++++++++++++------ tools/cache/the_real_fifo.go | 8 +++++++- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index f0a483b6..8bafcb67 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) { case <-ctx.Done(): return default: - _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(ctx, PopProcessFunc(c.config.Process)) if err != nil { - if err == ErrFIFOClosed { + if errors.Is(err, ErrFIFOClosed) || errors.Is(err, ErrCtxDone) { return } } diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 9d9e238c..bfa4deb1 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "sync" @@ -495,11 +496,16 @@ func (f *DeltaFIFO) IsClosed() bool { // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. -func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *DeltaFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 5c2ca900..fde34edc 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "sync" @@ -30,6 +31,9 @@ type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") +// ErrCtxDone is used when context is done +var ErrCtxDone = errors.New("DeltaFIFO: context done") + // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -38,8 +42,8 @@ var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") type Queue interface { ReflectorStore - // Pop blocks until there is at least one key to process or the - // Queue is closed. In the latter case Pop returns with an error. + // Pop blocks until there is at least one key to process, ctx is done, + // or the Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that @@ -48,7 +52,7 @@ type Queue interface { // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. - Pop(PopProcessFunc) (interface{}, error) + Pop(context.Context, PopProcessFunc) (interface{}, error) // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace @@ -66,9 +70,9 @@ type Queue interface { // // NOTE: This function is deprecated and may be removed in the future without // additional warning. -func Pop(queue Queue) interface{} { +func Pop(ctx context.Context, queue Queue) interface{} { var result interface{} - queue.Pop(func(obj interface{}, isInInitialList bool) error { + queue.Pop(ctx, func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -191,11 +195,16 @@ func (f *FIFO) IsClosed() bool { // so if you don't successfully process it, it should be added back with // AddIfNotPresent(). process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *FIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index ef322bea..6d8f76ec 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "sync" "time" @@ -192,11 +193,16 @@ func (f *RealFIFO) IsClosed() bool { // The item is removed from the queue (and the store) before it is processed. // process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *RealFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for len(f.items) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). From 00429ab356ab4f3f691db841b3b919d00c2f130b Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Sat, 14 Jun 2025 22:04:22 +0900 Subject: [PATCH 3/6] modify tests Kubernetes-commit: 2cd5dbbdaab98020b37df7125f622db7a6dd885a --- tools/cache/delta_fifo_test.go | 43 ++++++++++++++++--------------- tools/cache/fifo_test.go | 23 +++++++++-------- tools/cache/reflector_test.go | 2 +- tools/cache/the_real_fifo_test.go | 33 ++++++++++++------------ 4 files changed, 52 insertions(+), 49 deletions(-) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 9c67012c..b3e80897 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "reflect" "runtime" @@ -86,12 +87,12 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err // helper function to reduce stuttering func testPop(f *DeltaFIFO) testFifoObject { - return Pop(f).(Deltas).Newest().Object.(testFifoObject) + return Pop(context.Background(), f).(Deltas).Newest().Object.(testFifoObject) } // testPopIfAvailable returns `{}, false` if Pop returns a nil object func testPopIfAvailable(f *DeltaFIFO) (testFifoObject, bool) { - obj := Pop(f) + obj := Pop(context.Background(), f) if obj == nil { return testFifoObject{}, false } @@ -179,7 +180,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { f.Delete(oldObj) f.Replace([]interface{}{newObj}, "") - actualDeltas := Pop(f) + actualDeltas := Pop(context.Background(), f) expectedDeltas := Deltas{ Delta{Type: Deleted, Object: oldObj}, Delta{Type: Sync, Object: newObj}, @@ -289,7 +290,7 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { }), }) tt.operations(fWithKnownObjects) - actualDeltasWithKnownObjects := Pop(fWithKnownObjects) + actualDeltasWithKnownObjects := Pop(context.Background(), fWithKnownObjects) if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) { t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects) } @@ -302,7 +303,7 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { KeyFunction: testFifoObjectKeyFunc, }) tt.operations(fWithoutKnownObjects) - actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects) + actualDeltasWithoutKnownObjects := Pop(context.Background(), fWithoutKnownObjects) if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) { t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects) } @@ -402,7 +403,7 @@ func TestDeltaFIFO_transformer(t *testing.T) { } for i := 0; i < 2; i++ { - obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + obj, err := f.Pop(context.Background(), func(o interface{}, isInInitialList bool) error { return nil }) if err != nil { t.Fatalf("got nothing on try %v?", i) } @@ -592,7 +593,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -618,7 +619,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -648,7 +649,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -679,7 +680,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -697,7 +698,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -727,7 +728,7 @@ func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -751,7 +752,7 @@ func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -773,18 +774,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } } -// pop2 captures both parameters, unlike Pop(). +// pop2 captures both parameters, unlike Pop(context.Background(), ). func pop2[T any](queue Queue) (T, bool) { var result interface{} var isList bool - queue.Pop(func(obj interface{}, isInInitialList bool) error { + queue.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { result = obj isList = isInInitialList return nil @@ -909,15 +910,15 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(f) }, + func(f *DeltaFIFO) { Pop(context.Background(), f) }, }, expectedSynced: false, }, { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(f) }, - func(f *DeltaFIFO) { Pop(f) }, + func(f *DeltaFIFO) { Pop(context.Background(), f) }, + func(f *DeltaFIFO) { Pop(context.Background(), f) }, }, expectedSynced: true, }, @@ -926,7 +927,7 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { // there cannot be duplicate keys in the list or apiserver is broken. actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(f) }, + func(f *DeltaFIFO) { Pop(context.Background(), f) }, }, expectedSynced: true, }, @@ -958,7 +959,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}, isInInitialList bool) error { + f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 39c27c6d..327f9f2d 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "reflect" "runtime" "testing" @@ -97,7 +98,7 @@ func TestFIFO_basic(t *testing.T) { lastInt := int(0) lastUint := uint64(0) for i := 0; i < amount*2; i++ { - switch obj := Pop(f).(testFifoObject).val.(type) { + switch obj := Pop(context.Background(), f).(testFifoObject).val.(type) { case int: if obj <= lastInt { t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) @@ -131,7 +132,7 @@ func TestFIFO_addUpdate(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - obj := Pop(f) + obj := Pop(context.Background(), f) if obj == nil { return } @@ -162,7 +163,7 @@ func TestFIFO_addReplace(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - obj := Pop(f) + obj := Pop(context.Background(), f) if obj == nil { return } @@ -194,21 +195,21 @@ func TestFIFO_detectLineJumpers(t *testing.T) { f.Add(mkFifoObj("foo", 13)) f.Add(mkFifoObj("zab", 30)) - if e, a := 13, Pop(f).(testFifoObject).val; a != e { + if e, a := 13, Pop(context.Background(), f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line - if e, a := 1, Pop(f).(testFifoObject).val; a != e { + if e, a := 1, Pop(context.Background(), f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 30, Pop(f).(testFifoObject).val; a != e { + if e, a := 30, Pop(context.Background(), f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 14, Pop(f).(testFifoObject).val; a != e { + if e, a := 14, Pop(context.Background(), f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } } @@ -243,15 +244,15 @@ func TestFIFO_HasSynced(t *testing.T) { { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { Pop(f) }, + func(f *FIFO) { Pop(context.Background(), f) }, }, expectedSynced: false, }, { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { Pop(f) }, - func(f *FIFO) { Pop(f) }, + func(f *FIFO) { Pop(context.Background(), f) }, + func(f *FIFO) { Pop(context.Background(), f) }, }, expectedSynced: true, }, @@ -278,7 +279,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}, isInInitialList bool) error { + f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 558cdcee..497cf45f 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -602,7 +602,7 @@ func TestReflectorListAndWatch(t *testing.T) { // Verify we received the right objects with the right resource versions. for _, expectedObj := range tc.expectedStore { - storeObj := Pop(s).(metav1.Object) + storeObj := Pop(context.Background(), s).(metav1.Object) assert.Equal(t, expectedObj.GetName(), storeObj.GetName()) assert.Equal(t, expectedObj.GetResourceVersion(), storeObj.GetResourceVersion()) } diff --git a/tools/cache/the_real_fifo_test.go b/tools/cache/the_real_fifo_test.go index 649ea368..7aa16863 100644 --- a/tools/cache/the_real_fifo_test.go +++ b/tools/cache/the_real_fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "reflect" "runtime" @@ -38,7 +39,7 @@ const closedFIFOName = "FIFO WAS CLOSED" func popN(queue Queue, count int) []interface{} { result := []interface{}{} for i := 0; i < count; i++ { - queue.Pop(func(obj interface{}, isInInitialList bool) error { + queue.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { result = append(result, obj) return nil }) @@ -48,7 +49,7 @@ func popN(queue Queue, count int) []interface{} { // helper function to reduce stuttering func testRealFIFOPop(f *RealFIFO) testFifoObject { - val := Pop(f) + val := Pop(context.Background(), f) if val == nil { return testFifoObject{name: closedFIFOName} } @@ -404,7 +405,7 @@ func TestRealFIFO_transformer(t *testing.T) { } for i := 0; i < len(expected1); i++ { - obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + obj, err := f.Pop(context.Background(), func(o interface{}, isInInitialList bool) error { return nil }) if err != nil { t.Fatalf("got nothing on try %v?", i) } @@ -593,7 +594,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -622,7 +623,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -653,7 +654,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -685,7 +686,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for i, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("%d Expected %#v, got %#v", i, e, a) } @@ -707,7 +708,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -738,7 +739,7 @@ func TestRealFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -764,7 +765,7 @@ func TestRealFIFO_UpdateResyncRace(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(f).(Deltas) + cur := Pop(context.Background(), f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -900,15 +901,15 @@ func TestRealFIFO_HasSynced(t *testing.T) { { actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(context.Background(), f) }, }, expectedSynced: false, }, { actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *RealFIFO) { Pop(f) }, - func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(context.Background(), f) }, + func(f *RealFIFO) { Pop(context.Background(), f) }, }, expectedSynced: true, }, @@ -917,9 +918,9 @@ func TestRealFIFO_HasSynced(t *testing.T) { // there cannot be duplicate keys in the list or apiserver is broken. actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, - func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(context.Background(), f) }, // ATTENTION: difference with delta_fifo_test, every event is delivered, so a is listed twice and must be popped twice to remove both - func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(context.Background(), f) }, }, expectedSynced: true, }, @@ -956,7 +957,7 @@ func TestRealFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}, isInInitialList bool) error { + f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} From bc193638216a4bf729909eec91919fdc9e05b000 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Thu, 26 Jun 2025 23:49:25 +0900 Subject: [PATCH 4/6] Revert "modify tests" This reverts commit 2cd5dbbdaab98020b37df7125f622db7a6dd885a. Kubernetes-commit: 2dcce93336e52eb16aab595fcc75254f4abfa5ae --- tools/cache/delta_fifo_test.go | 43 +++++++++++++++---------------- tools/cache/fifo_test.go | 23 ++++++++--------- tools/cache/reflector_test.go | 2 +- tools/cache/the_real_fifo_test.go | 33 ++++++++++++------------ 4 files changed, 49 insertions(+), 52 deletions(-) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index b3e80897..9c67012c 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "fmt" "reflect" "runtime" @@ -87,12 +86,12 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err // helper function to reduce stuttering func testPop(f *DeltaFIFO) testFifoObject { - return Pop(context.Background(), f).(Deltas).Newest().Object.(testFifoObject) + return Pop(f).(Deltas).Newest().Object.(testFifoObject) } // testPopIfAvailable returns `{}, false` if Pop returns a nil object func testPopIfAvailable(f *DeltaFIFO) (testFifoObject, bool) { - obj := Pop(context.Background(), f) + obj := Pop(f) if obj == nil { return testFifoObject{}, false } @@ -180,7 +179,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { f.Delete(oldObj) f.Replace([]interface{}{newObj}, "") - actualDeltas := Pop(context.Background(), f) + actualDeltas := Pop(f) expectedDeltas := Deltas{ Delta{Type: Deleted, Object: oldObj}, Delta{Type: Sync, Object: newObj}, @@ -290,7 +289,7 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { }), }) tt.operations(fWithKnownObjects) - actualDeltasWithKnownObjects := Pop(context.Background(), fWithKnownObjects) + actualDeltasWithKnownObjects := Pop(fWithKnownObjects) if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) { t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects) } @@ -303,7 +302,7 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { KeyFunction: testFifoObjectKeyFunc, }) tt.operations(fWithoutKnownObjects) - actualDeltasWithoutKnownObjects := Pop(context.Background(), fWithoutKnownObjects) + actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects) if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) { t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects) } @@ -403,7 +402,7 @@ func TestDeltaFIFO_transformer(t *testing.T) { } for i := 0; i < 2; i++ { - obj, err := f.Pop(context.Background(), func(o interface{}, isInInitialList bool) error { return nil }) + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) if err != nil { t.Fatalf("got nothing on try %v?", i) } @@ -593,7 +592,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -619,7 +618,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -649,7 +648,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -680,7 +679,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -698,7 +697,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -728,7 +727,7 @@ func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -752,7 +751,7 @@ func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -774,18 +773,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } } -// pop2 captures both parameters, unlike Pop(context.Background(), ). +// pop2 captures both parameters, unlike Pop(). func pop2[T any](queue Queue) (T, bool) { var result interface{} var isList bool - queue.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = obj isList = isInInitialList return nil @@ -910,15 +909,15 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(context.Background(), f) }, + func(f *DeltaFIFO) { Pop(f) }, }, expectedSynced: false, }, { actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(context.Background(), f) }, - func(f *DeltaFIFO) { Pop(context.Background(), f) }, + func(f *DeltaFIFO) { Pop(f) }, + func(f *DeltaFIFO) { Pop(f) }, }, expectedSynced: true, }, @@ -927,7 +926,7 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { // there cannot be duplicate keys in the list or apiserver is broken. actions: []func(f *DeltaFIFO){ func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, - func(f *DeltaFIFO) { Pop(context.Background(), f) }, + func(f *DeltaFIFO) { Pop(f) }, }, expectedSynced: true, }, @@ -959,7 +958,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 327f9f2d..39c27c6d 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "reflect" "runtime" "testing" @@ -98,7 +97,7 @@ func TestFIFO_basic(t *testing.T) { lastInt := int(0) lastUint := uint64(0) for i := 0; i < amount*2; i++ { - switch obj := Pop(context.Background(), f).(testFifoObject).val.(type) { + switch obj := Pop(f).(testFifoObject).val.(type) { case int: if obj <= lastInt { t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) @@ -132,7 +131,7 @@ func TestFIFO_addUpdate(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - obj := Pop(context.Background(), f) + obj := Pop(f) if obj == nil { return } @@ -163,7 +162,7 @@ func TestFIFO_addReplace(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - obj := Pop(context.Background(), f) + obj := Pop(f) if obj == nil { return } @@ -195,21 +194,21 @@ func TestFIFO_detectLineJumpers(t *testing.T) { f.Add(mkFifoObj("foo", 13)) f.Add(mkFifoObj("zab", 30)) - if e, a := 13, Pop(context.Background(), f).(testFifoObject).val; a != e { + if e, a := 13, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line - if e, a := 1, Pop(context.Background(), f).(testFifoObject).val; a != e { + if e, a := 1, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 30, Pop(context.Background(), f).(testFifoObject).val; a != e { + if e, a := 30, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - if e, a := 14, Pop(context.Background(), f).(testFifoObject).val; a != e { + if e, a := 14, Pop(f).(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } } @@ -244,15 +243,15 @@ func TestFIFO_HasSynced(t *testing.T) { { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { Pop(context.Background(), f) }, + func(f *FIFO) { Pop(f) }, }, expectedSynced: false, }, { actions: []func(f *FIFO){ func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *FIFO) { Pop(context.Background(), f) }, - func(f *FIFO) { Pop(context.Background(), f) }, + func(f *FIFO) { Pop(f) }, + func(f *FIFO) { Pop(f) }, }, expectedSynced: true, }, @@ -279,7 +278,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 497cf45f..558cdcee 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -602,7 +602,7 @@ func TestReflectorListAndWatch(t *testing.T) { // Verify we received the right objects with the right resource versions. for _, expectedObj := range tc.expectedStore { - storeObj := Pop(context.Background(), s).(metav1.Object) + storeObj := Pop(s).(metav1.Object) assert.Equal(t, expectedObj.GetName(), storeObj.GetName()) assert.Equal(t, expectedObj.GetResourceVersion(), storeObj.GetResourceVersion()) } diff --git a/tools/cache/the_real_fifo_test.go b/tools/cache/the_real_fifo_test.go index 7aa16863..649ea368 100644 --- a/tools/cache/the_real_fifo_test.go +++ b/tools/cache/the_real_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "fmt" "reflect" "runtime" @@ -39,7 +38,7 @@ const closedFIFOName = "FIFO WAS CLOSED" func popN(queue Queue, count int) []interface{} { result := []interface{}{} for i := 0; i < count; i++ { - queue.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = append(result, obj) return nil }) @@ -49,7 +48,7 @@ func popN(queue Queue, count int) []interface{} { // helper function to reduce stuttering func testRealFIFOPop(f *RealFIFO) testFifoObject { - val := Pop(context.Background(), f) + val := Pop(f) if val == nil { return testFifoObject{name: closedFIFOName} } @@ -405,7 +404,7 @@ func TestRealFIFO_transformer(t *testing.T) { } for i := 0; i < len(expected1); i++ { - obj, err := f.Pop(context.Background(), func(o interface{}, isInInitialList bool) error { return nil }) + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) if err != nil { t.Fatalf("got nothing on try %v?", i) } @@ -594,7 +593,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -623,7 +622,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -654,7 +653,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -686,7 +685,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for i, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("%d Expected %#v, got %#v", i, e, a) } @@ -708,7 +707,7 @@ func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -739,7 +738,7 @@ func TestRealFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -765,7 +764,7 @@ func TestRealFIFO_UpdateResyncRace(t *testing.T) { } for _, expected := range expectedList { - cur := Pop(context.Background(), f).(Deltas) + cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } @@ -901,15 +900,15 @@ func TestRealFIFO_HasSynced(t *testing.T) { { actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *RealFIFO) { Pop(context.Background(), f) }, + func(f *RealFIFO) { Pop(f) }, }, expectedSynced: false, }, { actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, - func(f *RealFIFO) { Pop(context.Background(), f) }, - func(f *RealFIFO) { Pop(context.Background(), f) }, + func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(f) }, }, expectedSynced: true, }, @@ -918,9 +917,9 @@ func TestRealFIFO_HasSynced(t *testing.T) { // there cannot be duplicate keys in the list or apiserver is broken. actions: []func(f *RealFIFO){ func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, - func(f *RealFIFO) { Pop(context.Background(), f) }, + func(f *RealFIFO) { Pop(f) }, // ATTENTION: difference with delta_fifo_test, every event is delivered, so a is listed twice and must be popped twice to remove both - func(f *RealFIFO) { Pop(context.Background(), f) }, + func(f *RealFIFO) { Pop(f) }, }, expectedSynced: true, }, @@ -957,7 +956,7 @@ func TestRealFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(context.Background(), func(obj interface{}, isInInitialList bool) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} From fc748aa158fcf9cf1a0df1a87e2e9781d768aac9 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Thu, 26 Jun 2025 23:49:34 +0900 Subject: [PATCH 5/6] Revert "pop respects the context" This reverts commit 1c33d98762511c10f89c40358a1935250b03b0c8. Kubernetes-commit: 74af3ac8ad1122528bb9971c3a2d282eff529beb --- tools/cache/controller.go | 4 ++-- tools/cache/delta_fifo.go | 8 +------- tools/cache/fifo.go | 21 ++++++--------------- tools/cache/the_real_fifo.go | 8 +------- 4 files changed, 10 insertions(+), 31 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 8bafcb67..f0a483b6 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) { case <-ctx.Done(): return default: - _, err := c.config.Queue.Pop(ctx, PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { - if errors.Is(err, ErrFIFOClosed) || errors.Is(err, ErrCtxDone) { + if err == ErrFIFOClosed { return } } diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index bfa4deb1..9d9e238c 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "errors" "fmt" "sync" @@ -496,16 +495,11 @@ func (f *DeltaFIFO) IsClosed() bool { // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. -func (f *DeltaFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index fde34edc..5c2ca900 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "errors" "sync" @@ -31,9 +30,6 @@ type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") -// ErrCtxDone is used when context is done -var ErrCtxDone = errors.New("DeltaFIFO: context done") - // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -42,8 +38,8 @@ var ErrCtxDone = errors.New("DeltaFIFO: context done") type Queue interface { ReflectorStore - // Pop blocks until there is at least one key to process, ctx is done, - // or the Queue is closed. In the latter case Pop returns with an error. + // Pop blocks until there is at least one key to process or the + // Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that @@ -52,7 +48,7 @@ type Queue interface { // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. - Pop(context.Context, PopProcessFunc) (interface{}, error) + Pop(PopProcessFunc) (interface{}, error) // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace @@ -70,9 +66,9 @@ type Queue interface { // // NOTE: This function is deprecated and may be removed in the future without // additional warning. -func Pop(ctx context.Context, queue Queue) interface{} { +func Pop(queue Queue) interface{} { var result interface{} - queue.Pop(ctx, func(obj interface{}, isInInitialList bool) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -195,16 +191,11 @@ func (f *FIFO) IsClosed() bool { // so if you don't successfully process it, it should be added back with // AddIfNotPresent(). process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *FIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 6d8f76ec..ef322bea 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "fmt" "sync" "time" @@ -193,16 +192,11 @@ func (f *RealFIFO) IsClosed() bool { // The item is removed from the queue (and the store) before it is processed. // process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *RealFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for len(f.items) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). From d9cda8802e17eb835d51a5bc010ac60803baef20 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Fri, 27 Jun 2025 00:49:02 +0900 Subject: [PATCH 6/6] resolve linter check Kubernetes-commit: 5cca03792748714547c39330ac9cb8cb9c7c60ae --- tools/cache/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index f0a483b6..5f983b6b 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) { case <-ctx.Done(): return default: - _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Pop(PopProcessFunc(c.config.Process)) if err != nil { - if err == ErrFIFOClosed { + if errors.Is(err, ErrFIFOClosed) { return } }