Merge pull request #127789 from googs1025/dra/controller/workqueue

feature(dra): use mock workqueue for controller unit test
This commit is contained in:
Kubernetes Prow Robot 2024-10-02 18:15:49 +01:00 committed by GitHub
commit 3a0f6c1ea3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 257 additions and 38 deletions

View File

@ -328,7 +328,7 @@ func (ctrl *controller) Run(workers int) {
}
for i := 0; i < workers; i++ {
go wait.Until(ctrl.sync, 0, stopCh)
go wait.Until(func() { ctrl.sync(ctrl.queue) }, 0, stopCh)
}
<-stopCh
@ -344,12 +344,12 @@ var errRequeue = errors.New("requeue")
var errPeriodic = errors.New("periodic")
// sync is the main worker.
func (ctrl *controller) sync() {
key, quit := ctrl.queue.Get()
func (ctrl *controller) sync(queue workqueue.TypedRateLimitingInterface[string]) {
key, quit := queue.Get()
if quit {
return
}
defer ctrl.queue.Done(key)
defer queue.Done(key)
logger := klog.LoggerWithValues(ctrl.logger, "key", key)
ctx := klog.NewContext(ctrl.ctx, logger)
@ -358,20 +358,20 @@ func (ctrl *controller) sync() {
switch err {
case nil:
logger.V(5).Info("completed")
ctrl.queue.Forget(key)
queue.Forget(key)
case errRequeue:
logger.V(5).Info("requeue")
ctrl.queue.AddRateLimited(key)
queue.AddRateLimited(key)
case errPeriodic:
logger.V(5).Info("recheck periodically")
ctrl.queue.AddAfter(key, recheckDelay)
queue.AddAfter(key, recheckDelay)
default:
logger.Error(err, "processing failed")
if obj != nil {
// TODO: We don't know here *what* failed. Determine based on error?
ctrl.eventRecorder.Event(obj, v1.EventTypeWarning, "Failed", err.Error())
}
ctrl.queue.AddRateLimited(key)
queue.AddRateLimited(key)
}
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -49,7 +50,7 @@ func TestController(t *testing.T) {
claim := createClaim(claimName, claimNamespace, driverName)
otherClaim := createClaim(claimName, claimNamespace, otherDriverName)
podName := "pod"
podKey := "schedulingCtx:default/pod"
podSchedulingCtxKey := "schedulingCtx:default/pod"
pod := createPod(podName, claimNamespace, nil)
podClaimName := "my-pod-claim"
podSchedulingCtx := createPodSchedulingContexts(pod)
@ -125,11 +126,15 @@ func TestController(t *testing.T) {
pod *corev1.Pod
schedulingCtx, expectedSchedulingCtx *resourceapi.PodSchedulingContext
claim, expectedClaim *resourceapi.ResourceClaim
expectedError string
expectedWorkQueueState Mock[string]
}{
"invalid-key": {
key: "claim:x/y/z",
expectedError: `unexpected key format: "x/y/z"`,
key: "claim:x/y/z",
expectedWorkQueueState: Mock[string]{
Failures: map[string]int{
"claim:x/y/z": 1,
},
},
},
"not-found": {
key: "claim:default/claim",
@ -154,7 +159,11 @@ func TestController(t *testing.T) {
claim: withDeallocate(withAllocate(claim)),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withDeallocate(withAllocate(claim)),
expectedError: "deallocate: fake error",
expectedWorkQueueState: Mock[string]{
Failures: map[string]int{
claimKey: 1,
},
},
},
// deletion time stamp set, our finalizer set, not allocated -> remove finalizer
@ -170,7 +179,11 @@ func TestController(t *testing.T) {
claim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer),
expectedError: "stop allocation: fake error",
expectedWorkQueueState: Mock[string]{
Failures: map[string]int{
claimKey: 1,
},
},
},
// deletion time stamp set, other finalizer set, not allocated -> do nothing
"deleted-finalizer-no-removal": {
@ -191,7 +204,11 @@ func TestController(t *testing.T) {
claim: withAllocate(withDeletionTimestamp(claim)),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withAllocate(withDeletionTimestamp(claim)),
expectedError: "deallocate: fake error",
expectedWorkQueueState: Mock[string]{
Failures: map[string]int{
claimKey: 1,
},
},
},
// deletion time stamp set, finalizer not set -> do nothing
"deleted-no-finalizer": {
@ -208,16 +225,23 @@ func TestController(t *testing.T) {
// pod with no claims -> shouldn't occur, check again anyway
"pod-nop": {
key: podKey,
key: podSchedulingCtxKey,
pod: pod,
schedulingCtx: withSelectedNode(podSchedulingCtx),
expectedSchedulingCtx: withSelectedNode(podSchedulingCtx),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: podSchedulingCtxKey,
Duration: time.Second * 30,
},
},
},
},
// no potential nodes -> shouldn't occur
"no-nodes": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -227,7 +251,7 @@ func TestController(t *testing.T) {
// potential nodes -> provide unsuitable nodes
"info": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -236,12 +260,19 @@ func TestController(t *testing.T) {
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: unsuitableNodes}, nil),
expectedSchedulingCtx: withUnsuitableNodes(withPotentialNodes(podSchedulingCtx)),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: podSchedulingCtxKey,
Duration: time.Second * 30,
},
},
},
},
// potential nodes, selected node -> allocate
"allocate": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: withReservedFor(withAllocate(claim), pod),
pod: podWithClaim,
@ -251,11 +282,18 @@ func TestController(t *testing.T) {
expectUnsuitableNodes(map[string][]string{podClaimName: unsuitableNodes}, nil).
expectAllocate(map[string]allocate{claimName: {allocResult: &allocation, selectedNode: nodeName, allocErr: nil}}),
expectedSchedulingCtx: withUnsuitableNodes(withSelectedNode(withPotentialNodes(podSchedulingCtx))),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: "schedulingCtx:default/pod",
Duration: time.Second * 30,
},
},
},
},
// potential nodes, selected node, all unsuitable -> update unsuitable nodes
"is-potential-node": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -264,11 +302,18 @@ func TestController(t *testing.T) {
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: potentialNodes}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withPotentialNodes(podSchedulingCtx)), potentialNodes),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: podSchedulingCtxKey,
Duration: time.Second * 30,
},
},
},
},
// max potential nodes, other selected node, all unsuitable -> update unsuitable nodes with truncation at start
"is-potential-node-truncate-first": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -277,11 +322,18 @@ func TestController(t *testing.T) {
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: append(maxNodes, nodeName)}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), append(maxNodes[1:], nodeName)),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: podSchedulingCtxKey,
Duration: time.Second * 30,
},
},
},
},
// max potential nodes, other selected node, all unsuitable (but in reverse order) -> update unsuitable nodes with truncation at end
"pod-selected-is-potential-node-truncate-last": {
key: podKey,
key: podSchedulingCtxKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -290,7 +342,14 @@ func TestController(t *testing.T) {
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: append([]string{nodeName}, maxNodes...)}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), append([]string{nodeName}, maxNodes[:len(maxNodes)-1]...)),
expectedError: errPeriodic.Error(),
expectedWorkQueueState: Mock[string]{
Later: []MockDelayedItem[string]{
{
Item: podSchedulingCtxKey,
Duration: time.Second * 30,
},
},
},
},
} {
t.Run(name, func(t *testing.T) {
@ -340,16 +399,11 @@ func TestController(t *testing.T) {
) {
t.Fatal("could not sync caches")
}
_, err := ctrl.(*controller).syncKey(ctx, test.key)
if err != nil && test.expectedError == "" {
t.Fatalf("unexpected error: %v", err)
}
if err == nil && test.expectedError != "" {
t.Fatalf("did not get expected error %q", test.expectedError)
}
if err != nil && err.Error() != test.expectedError {
t.Fatalf("expected error %q, got %q", test.expectedError, err.Error())
}
var workQueueState Mock[string]
c := ctrl.(*controller)
workQueueState.SyncOne(test.key, c.sync)
assert.Equal(t, test.expectedWorkQueueState, workQueueState)
claims, err := kubeClient.ResourceV1alpha3().ResourceClaims("").List(ctx, metav1.ListOptions{})
require.NoError(t, err, "list claims")
var expectedClaims []resourceapi.ResourceClaim

View File

@ -0,0 +1,165 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"slices"
"time"
"k8s.io/client-go/util/workqueue"
)
// TODO (pohly): move this to k8s.io/client-go/util/workqueue/workqueue.go
// if it turns out to be generally useful. Doc comments are already written
// as if the code was there.
// MockQueue is an implementation of [TypedRateLimitingInterface] which
// can be used to test a function which pulls work items out of a queue
// and processes them.
//
// A null instance is directly usable. The usual usage is:
//
// var m workqueue.Mock[string]
// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } )
// if diff := cmp.Diff(workqueue.Mock[string]{}, m); diff != "" {
// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff)
// }
//
// All slices get reset to nil when they become empty, so there are no spurious
// differences because of the nil vs. empty slice.
type Mock[T comparable] struct {
// Ready contains the items which are ready for processing.
Ready []T
// InFlight contains the items which are currently being processed (= Get
// was called, Done not yet).
InFlight []T
// MismatchedDone contains the items for which Done was called without
// a matching Get.
MismatchedDone []T
// Later contains the items which are meant to be added to the queue after
// a certain delay (= AddAfter was called for them).
Later []MockDelayedItem[T]
// Failures contains the items and their retry count which failed to be
// processed (AddRateLimited called at least once, Forget not yet).
// The retry count is always larger than zero.
Failures map[T]int
// ShutDownCalled tracks how often ShutDown got called.
ShutDownCalled int
// ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called.
ShutDownWithDrainCalled int
}
// MockDelayedItem is an item which was queue for later processing.
type MockDelayedItem[T comparable] struct {
Item T
Duration time.Duration
}
// SyncOne adds the item to the work queue and calls sync.
// That sync function can pull one or more items from the work
// queue until the queue is empty. Then it is told that the queue
// is shutting down, which must cause it to return.
//
// The test can then retrieve the state of the queue to check the result.
func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) {
m.Ready = append(m.Ready, item)
sync(m)
}
// Add implements [TypedInterface].
func (m *Mock[T]) Add(item T) {
m.Ready = append(m.Ready, item)
}
// Len implements [TypedInterface].
func (m *Mock[T]) Len() int {
return len(m.Ready)
}
// Get implements [TypedInterface].
func (m *Mock[T]) Get() (item T, shutdown bool) {
if len(m.Ready) == 0 {
shutdown = true
return
}
item = m.Ready[0]
m.Ready = m.Ready[1:]
if len(m.Ready) == 0 {
m.Ready = nil
}
m.InFlight = append(m.InFlight, item)
return item, false
}
// Done implements [TypedInterface].
func (m *Mock[T]) Done(item T) {
index := slices.Index(m.InFlight, item)
if index < 0 {
m.MismatchedDone = append(m.MismatchedDone, item)
}
m.InFlight = slices.Delete(m.InFlight, index, index+1)
if len(m.InFlight) == 0 {
m.InFlight = nil
}
}
// ShutDown implements [TypedInterface].
func (m *Mock[T]) ShutDown() {
m.ShutDownCalled++
}
// ShutDownWithDrain implements [TypedInterface].
func (m *Mock[T]) ShutDownWithDrain() {
m.ShutDownWithDrainCalled++
}
// ShuttingDown implements [TypedInterface].
func (m *Mock[T]) ShuttingDown() bool {
return m.ShutDownCalled > 0 || m.ShutDownWithDrainCalled > 0
}
// AddAfter implements [TypedDelayingInterface.AddAfter]
func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
m.Later = append(m.Later, MockDelayedItem[T]{Item: item, Duration: duration})
}
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
func (m *Mock[T]) AddRateLimited(item T) {
if m.Failures == nil {
m.Failures = make(map[T]int)
}
m.Failures[item]++
}
// Forget implements [TypedRateLimitingInterface.Forget].
func (m *Mock[T]) Forget(item T) {
if m.Failures == nil {
return
}
delete(m.Failures, item)
}
// NumRequeues implements [TypedRateLimitingInterface.NumRequeues].
func (m *Mock[T]) NumRequeues(item T) int {
return m.Failures[item]
}