From 639f86915b2a75f0839a0e57434a244af1427263 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 1 Apr 2024 14:50:58 +0200 Subject: [PATCH] scheduler: add FIFO queue This is a basic implementation of a first-in-first-out queue with unbounded size. It's useful for cases where a channel with fixed size might deadlock. The caller is responsible for locking. --- pkg/scheduler/util/queue/fifo.go | 110 ++++++++++++++++++++++++ pkg/scheduler/util/queue/fifo_test.go | 117 ++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 pkg/scheduler/util/queue/fifo.go create mode 100644 pkg/scheduler/util/queue/fifo_test.go diff --git a/pkg/scheduler/util/queue/fifo.go b/pkg/scheduler/util/queue/fifo.go new file mode 100644 index 00000000000..ee66733fe43 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo.go @@ -0,0 +1,110 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +const ( + // normalSize limits the size of the buffer that is kept + // for reuse. + normalSize = 4 +) + +// FIFO implements a first-in-first-out queue with unbounded size. +// The null FIFO is a valid empty queue. +// +// Access must be protected by the caller when used concurrently by +// different goroutines, the queue itself implements no locking. +type FIFO[T any] struct { + // elements contains a buffer for elements which have been + // pushed and not popped yet. Two scenarios are possible: + // - one chunk in the middle (start <= end) + // - one chunk at the end, followed by one chunk at the + // beginning (end <= start) + // + // start == end can be either an empty queue or a completely + // full one (with two chunks). + elements []T + + // len counts the number of elements which have been pushed and + // not popped yet. + len int + + // start is the index of the first valid element. + start int + + // end is the index after the last valid element. + end int +} + +func (q *FIFO[T]) Len() int { + return q.len +} + +func (q *FIFO[T]) Push(element T) { + size := len(q.elements) + if q.len == size { + // Need larger buffer. + newSize := size * 2 + if newSize == 0 { + newSize = normalSize + } + elements := make([]T, newSize) + if q.start == 0 { + copy(elements, q.elements) + } else { + copy(elements, q.elements[q.start:]) + copy(elements[len(q.elements)-q.start:], q.elements[0:q.end]) + } + q.start = 0 + q.end = q.len + q.elements = elements + size = newSize + } + if q.end == size { + // Wrap around. + q.elements[0] = element + q.end = 1 + q.len++ + return + } + q.elements[q.end] = element + q.end++ + q.len++ +} + +func (q *FIFO[T]) Pop() (element T, ok bool) { + if q.len == 0 { + return + } + element = q.elements[q.start] + q.start++ + if q.start == len(q.elements) { + // Wrap around. + q.start = 0 + } + q.len-- + + // Once it is empty, shrink down to avoid hanging onto + // a large buffer forever. + if q.len == 0 && len(q.elements) > normalSize { + q.elements = make([]T, normalSize) + q.start = 0 + q.end = 0 + } + + ok = true + return +} diff --git a/pkg/scheduler/util/queue/fifo_test.go b/pkg/scheduler/util/queue/fifo_test.go new file mode 100644 index 00000000000..3d272a90248 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) { + t.Helper() + actual, ok := queue.Pop() + require.Equal(t, expectedOk, ok) + require.Equal(t, expectedValue, actual) +} + +func verifyEmpty(t *testing.T, queue *FIFO[int]) { + t.Helper() + require.Equal(t, 0, queue.Len()) + verifyPop(t, 0, false, queue) +} + +func TestNull(t *testing.T) { + var queue FIFO[int] + verifyEmpty(t, &queue) +} + +func TestOnePushPop(t *testing.T) { + var queue FIFO[int] + + expected := 10 + queue.Push(10) + require.Equal(t, 1, queue.Len()) + verifyPop(t, expected, true, &queue) + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops all of them, then the same again. +func TestWrapAroundEmpty(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + for i := 0; i < 5; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 5; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops one, adds more, then pops all. +func TestWrapAroundPartial(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + verifyPop(t, 0, true, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 1; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Push an unusual amount of elements, pop all, and verify that +// the FIFO shrinks back again. +func TestShrink(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < normalSize*2; i++ { + queue.Push(i) + } + require.Equal(t, normalSize*2, queue.Len()) + require.LessOrEqual(t, 2*normalSize, len(queue.elements)) + + // Pop all, should be shrunken when done. + for i := 0; i < normalSize*2; i++ { + verifyPop(t, i, true, &queue) + } + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) + + // Still usable after shrinking? + queue.Push(42) + verifyPop(t, 42, true, &queue) + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) +}