scheduler: add FIFO queue

This is a basic implementation of a first-in-first-out queue with unbounded
size. It's useful for cases where a channel with fixed size might deadlock.

The caller is responsible for locking.
This commit is contained in:
Patrick Ohly 2024-04-01 14:50:58 +02:00
parent bae83009d3
commit 639f86915b
2 changed files with 227 additions and 0 deletions

View File

@ -0,0 +1,110 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
const (
// normalSize limits the size of the buffer that is kept
// for reuse.
normalSize = 4
)
// FIFO implements a first-in-first-out queue with unbounded size.
// The null FIFO is a valid empty queue.
//
// Access must be protected by the caller when used concurrently by
// different goroutines, the queue itself implements no locking.
type FIFO[T any] struct {
// elements contains a buffer for elements which have been
// pushed and not popped yet. Two scenarios are possible:
// - one chunk in the middle (start <= end)
// - one chunk at the end, followed by one chunk at the
// beginning (end <= start)
//
// start == end can be either an empty queue or a completely
// full one (with two chunks).
elements []T
// len counts the number of elements which have been pushed and
// not popped yet.
len int
// start is the index of the first valid element.
start int
// end is the index after the last valid element.
end int
}
func (q *FIFO[T]) Len() int {
return q.len
}
func (q *FIFO[T]) Push(element T) {
size := len(q.elements)
if q.len == size {
// Need larger buffer.
newSize := size * 2
if newSize == 0 {
newSize = normalSize
}
elements := make([]T, newSize)
if q.start == 0 {
copy(elements, q.elements)
} else {
copy(elements, q.elements[q.start:])
copy(elements[len(q.elements)-q.start:], q.elements[0:q.end])
}
q.start = 0
q.end = q.len
q.elements = elements
size = newSize
}
if q.end == size {
// Wrap around.
q.elements[0] = element
q.end = 1
q.len++
return
}
q.elements[q.end] = element
q.end++
q.len++
}
func (q *FIFO[T]) Pop() (element T, ok bool) {
if q.len == 0 {
return
}
element = q.elements[q.start]
q.start++
if q.start == len(q.elements) {
// Wrap around.
q.start = 0
}
q.len--
// Once it is empty, shrink down to avoid hanging onto
// a large buffer forever.
if q.len == 0 && len(q.elements) > normalSize {
q.elements = make([]T, normalSize)
q.start = 0
q.end = 0
}
ok = true
return
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) {
t.Helper()
actual, ok := queue.Pop()
require.Equal(t, expectedOk, ok)
require.Equal(t, expectedValue, actual)
}
func verifyEmpty(t *testing.T, queue *FIFO[int]) {
t.Helper()
require.Equal(t, 0, queue.Len())
verifyPop(t, 0, false, queue)
}
func TestNull(t *testing.T) {
var queue FIFO[int]
verifyEmpty(t, &queue)
}
func TestOnePushPop(t *testing.T) {
var queue FIFO[int]
expected := 10
queue.Push(10)
require.Equal(t, 1, queue.Len())
verifyPop(t, expected, true, &queue)
verifyEmpty(t, &queue)
}
// Pushes some elements, pops all of them, then the same again.
func TestWrapAroundEmpty(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
for i := 0; i < 5; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 5; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Pushes some elements, pops one, adds more, then pops all.
func TestWrapAroundPartial(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
verifyPop(t, 0, true, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 1; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Push an unusual amount of elements, pop all, and verify that
// the FIFO shrinks back again.
func TestShrink(t *testing.T) {
var queue FIFO[int]
for i := 0; i < normalSize*2; i++ {
queue.Push(i)
}
require.Equal(t, normalSize*2, queue.Len())
require.LessOrEqual(t, 2*normalSize, len(queue.elements))
// Pop all, should be shrunken when done.
for i := 0; i < normalSize*2; i++ {
verifyPop(t, i, true, &queue)
}
require.Equal(t, 0, queue.Len())
require.Equal(t, normalSize, len(queue.elements))
// Still usable after shrinking?
queue.Push(42)
verifyPop(t, 42, true, &queue)
require.Equal(t, 0, queue.Len())
require.Equal(t, normalSize, len(queue.elements))
}