From fc8b4657c18a6e58b2587060ad3c862d3d2ae262 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Sat, 22 Oct 2022 00:47:04 +0200 Subject: [PATCH] Add tests --- .../client-go/util/workqueue/testing/queue.go | 53 +++++++ .../controllers/service/controller_test.go | 141 ++++++++++++++++-- 2 files changed, 181 insertions(+), 13 deletions(-) create mode 100644 staging/src/k8s.io/client-go/util/workqueue/testing/queue.go diff --git a/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go b/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go new file mode 100644 index 00000000000..03368ae16c0 --- /dev/null +++ b/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// SpyWorkQueue implements a work queue and adds the ability to inspect processed +// items for testing purposes. +type SpyWorkQueue struct { + workqueue.RateLimitingInterface + items []SpyQueueItem +} + +// SpyQueueItem represents an item that was being processed. +type SpyQueueItem struct { + Key interface{} + // Delay represents the delayed duration if and only if AddAfter was invoked. + Delay time.Duration +} + +// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the +// added key and delay internally. +func (f *SpyWorkQueue) AddAfter(key interface{}, delay time.Duration) { + f.items = append(f.items, SpyQueueItem{ + Key: key, + Delay: delay, + }) + + f.RateLimitingInterface.AddAfter(key, delay) +} + +// GetItems returns all items that were recorded. +func (f *SpyWorkQueue) GetItems() []SpyQueueItem { + return f.items +} diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 3e31faf3c92..d59a6877d36 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -41,9 +41,13 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + queuetesting "k8s.io/client-go/util/workqueue/testing" + "k8s.io/cloud-provider/api" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -1093,22 +1097,24 @@ func TestSyncService(t *testing.T) { } for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Run(tc.testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - tc.updateFn() - obtainedErr := controller.syncService(ctx, tc.key) + tc.updateFn() + obtainedErr := controller.syncService(ctx, tc.key) - //expected matches obtained ??. - if exp := tc.expectedFn(obtainedErr); exp != nil { - t.Errorf("%v Error:%v", tc.testName, exp) - } + //expected matches obtained ??. + if exp := tc.expectedFn(obtainedErr); exp != nil { + t.Errorf("%v Error:%v", tc.testName, exp) + } - //Post processing, the element should not be in the sync queue. - _, exist := controller.cache.get(tc.key) - if exist { - t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) - } + //Post processing, the element should not be in the sync queue. + _, exist := controller.cache.get(tc.key) + if exist { + t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) + } + }) } } @@ -2253,6 +2259,82 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } +func TestServiceQueueDelay(t *testing.T) { + const ns = metav1.NamespaceDefault + + tests := []struct { + name string + lbCloudErr error + wantRetryDelay time.Duration + }{ + { + name: "processing successful", + lbCloudErr: nil, + }, + { + name: "regular error", + lbCloudErr: errors.New("something went wrong"), + }, + { + name: "retry error", + lbCloudErr: api.NewRetryError("LB create in progress", 42*time.Second), + wantRetryDelay: 42 * time.Second, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller, cloud, client := newController() + queue := &queuetesting.SpyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} + controller.serviceQueue = queue + cloud.Err = tc.lbCloudErr + + svc := defaultExternalService() + controller.serviceLister = newFakeServiceLister(svc) + + ctx := context.Background() + _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + t.Fatalf("creating meta namespace key: %s", err) + } + queue.Add(key) + + done := controller.processNextServiceItem(ctx) + if !done { + t.Fatal("processNextServiceItem stopped prematurely") + } + + // Expect no requeues unless we hit an error that is not a retry + // error. + wantNumRequeues := 0 + var re *api.RetryError + isRetryError := errors.As(tc.lbCloudErr, &re) + if tc.lbCloudErr != nil && !isRetryError { + wantNumRequeues = 1 + } + + if gotNumRequeues := queue.NumRequeues(key); gotNumRequeues != wantNumRequeues { + t.Fatalf("got %d requeue(s), want %d", gotNumRequeues, wantNumRequeues) + } + + if tc.wantRetryDelay > 0 { + items := queue.GetItems() + if len(items) != 1 { + t.Fatalf("got %d item(s), want 1", len(items)) + } + if gotDelay := items[0].Delay; gotDelay != tc.wantRetryDelay { + t.Fatalf("got delay %s, want %s", gotDelay, tc.wantRetryDelay) + } + } + }) + } +} + type fakeNodeLister struct { cache []*v1.Node err error @@ -2281,3 +2363,36 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) { } return nil, nil } + +type fakeServiceLister struct { + cache []*v1.Service +} + +func newFakeServiceLister(services ...*v1.Service) *fakeServiceLister { + ret := &fakeServiceLister{} + ret.cache = services + return ret +} + +// List lists all Nodes in the indexer. +// Objects returned here must be treated as read-only. +func (l *fakeServiceLister) List(_ labels.Selector) ([]*v1.Service, error) { + return l.cache, nil +} + +// Get retrieves the Service from the index for a given name. +// Objects returned here must be treated as read-only. +func (l *fakeServiceLister) Get(name string) (*v1.Service, error) { + for _, svc := range l.cache { + if svc.Name == name { + return svc, nil + } + } + return nil, nil +} + +// Services returns an object that can list and get Services. +// This is fakeServiceLister itself. +func (l *fakeServiceLister) Services(_ string) corelisters.ServiceNamespaceLister { + return l +}