From a179203bdba8b05ac1a40fbc279b76e872ecb15d Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Fri, 21 Oct 2022 14:08:17 +0200 Subject: [PATCH 01/10] Support specifying custom LB retry period from cloud provider This change allows cloud providers to specify a custom retry period by returning a RetryError. The purpose is to bypass the work queue-driven exponential backoff algorithm when there is no need to back off. Specifically, this can be the case when a cloud load balancer operation such as a create or delete is still pending and the cloud API should be polled for completion at a constant interval. A backoff algorithm would not always be reasonable to apply here since there is no API or performance degradation warranting an increasing wait time between API requests. --- .../k8s.io/cloud-provider/api/retry_error.go | 43 +++++++++++++++++++ .../controllers/service/controller.go | 15 +++++-- 2 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 staging/src/k8s.io/cloud-provider/api/retry_error.go diff --git a/staging/src/k8s.io/cloud-provider/api/retry_error.go b/staging/src/k8s.io/cloud-provider/api/retry_error.go new file mode 100644 index 00000000000..6708cd3bcd0 --- /dev/null +++ b/staging/src/k8s.io/cloud-provider/api/retry_error.go @@ -0,0 +1,43 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import "time" + +// RetryError indicates after what time a service reconciliation should be +// retried. +type RetryError struct { + msg string + retryAfter time.Duration +} + +// NewRetryError returns a RetryError. +func NewRetryError(msg string, retryAfter time.Duration) *RetryError { + return &RetryError{ + msg: msg, + retryAfter: retryAfter, + } +} + +func (re *RetryError) Error() string { + return re.msg +} + +// RetryAfter returns the defined retry-after duration. +func (re *RetryError) RetryAfter() time.Duration { + return re.retryAfter +} diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index a8b4bdf477f..de4c7ceb2e9 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -18,6 +18,7 @@ package service import ( "context" + stderrors "errors" "fmt" "reflect" "sync" @@ -39,6 +40,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + "k8s.io/cloud-provider/api" servicehelper "k8s.io/cloud-provider/service/helpers" "k8s.io/component-base/featuregate" controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" @@ -288,8 +290,15 @@ func (c *Controller) processNextServiceItem(ctx context.Context) bool { return true } - runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - c.serviceQueue.AddRateLimited(key) + var re *api.RetryError + if stderrors.As(err, &re) { + klog.V(4).Infof("Retrying processing for service %v in %s", key, re.RetryAfter()) + c.serviceQueue.AddAfter(key, re.RetryAfter()) + } else { + runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) + c.serviceQueue.AddRateLimited(key) + } + return true } @@ -401,7 +410,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName()) return op, nil } - return op, fmt.Errorf("failed to ensure load balancer: %v", err) + return op, fmt.Errorf("failed to ensure load balancer: %w", err) } if newStatus == nil { return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil") From ea4ce5dc28e097dd0fd0d82c810414a5bbe8f1f4 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Fri, 21 Oct 2022 14:08:29 +0200 Subject: [PATCH 02/10] Alias api/errors instead of stdlib errors --- .../cloud-provider/controllers/service/controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index de4c7ceb2e9..230ca0248e2 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -18,14 +18,14 @@ package service import ( "context" - stderrors "errors" + "errors" "fmt" "reflect" "sync" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -291,7 +291,7 @@ func (c *Controller) processNextServiceItem(ctx context.Context) bool { } var re *api.RetryError - if stderrors.As(err, &re) { + if errors.As(err, &re) { klog.V(4).Infof("Retrying processing for service %v in %s", key, re.RetryAfter()) c.serviceQueue.AddAfter(key, re.RetryAfter()) } else { @@ -424,7 +424,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S // - Not found error mostly happens when service disappears right after // we remove the finalizer. // - We can't patch status on non-exist service anyway. - if !errors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { return op, fmt.Errorf("failed to update load balancer status: %v", err) } } @@ -846,7 +846,7 @@ func (c *Controller) syncService(ctx context.Context, key string) error { // service holds the latest service info from apiserver service, err := c.serviceLister.Services(namespace).Get(name) switch { - case errors.IsNotFound(err): + case apierrors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned err = c.processServiceDeletion(ctx, key) case err != nil: From fc8b4657c18a6e58b2587060ad3c862d3d2ae262 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Sat, 22 Oct 2022 00:47:04 +0200 Subject: [PATCH 03/10] Add tests --- .../client-go/util/workqueue/testing/queue.go | 53 +++++++ .../controllers/service/controller_test.go | 141 ++++++++++++++++-- 2 files changed, 181 insertions(+), 13 deletions(-) create mode 100644 staging/src/k8s.io/client-go/util/workqueue/testing/queue.go diff --git a/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go b/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go new file mode 100644 index 00000000000..03368ae16c0 --- /dev/null +++ b/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// SpyWorkQueue implements a work queue and adds the ability to inspect processed +// items for testing purposes. +type SpyWorkQueue struct { + workqueue.RateLimitingInterface + items []SpyQueueItem +} + +// SpyQueueItem represents an item that was being processed. +type SpyQueueItem struct { + Key interface{} + // Delay represents the delayed duration if and only if AddAfter was invoked. + Delay time.Duration +} + +// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the +// added key and delay internally. +func (f *SpyWorkQueue) AddAfter(key interface{}, delay time.Duration) { + f.items = append(f.items, SpyQueueItem{ + Key: key, + Delay: delay, + }) + + f.RateLimitingInterface.AddAfter(key, delay) +} + +// GetItems returns all items that were recorded. +func (f *SpyWorkQueue) GetItems() []SpyQueueItem { + return f.items +} diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 3e31faf3c92..d59a6877d36 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -41,9 +41,13 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + queuetesting "k8s.io/client-go/util/workqueue/testing" + "k8s.io/cloud-provider/api" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -1093,22 +1097,24 @@ func TestSyncService(t *testing.T) { } for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Run(tc.testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - tc.updateFn() - obtainedErr := controller.syncService(ctx, tc.key) + tc.updateFn() + obtainedErr := controller.syncService(ctx, tc.key) - //expected matches obtained ??. - if exp := tc.expectedFn(obtainedErr); exp != nil { - t.Errorf("%v Error:%v", tc.testName, exp) - } + //expected matches obtained ??. + if exp := tc.expectedFn(obtainedErr); exp != nil { + t.Errorf("%v Error:%v", tc.testName, exp) + } - //Post processing, the element should not be in the sync queue. - _, exist := controller.cache.get(tc.key) - if exist { - t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) - } + //Post processing, the element should not be in the sync queue. + _, exist := controller.cache.get(tc.key) + if exist { + t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) + } + }) } } @@ -2253,6 +2259,82 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } +func TestServiceQueueDelay(t *testing.T) { + const ns = metav1.NamespaceDefault + + tests := []struct { + name string + lbCloudErr error + wantRetryDelay time.Duration + }{ + { + name: "processing successful", + lbCloudErr: nil, + }, + { + name: "regular error", + lbCloudErr: errors.New("something went wrong"), + }, + { + name: "retry error", + lbCloudErr: api.NewRetryError("LB create in progress", 42*time.Second), + wantRetryDelay: 42 * time.Second, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller, cloud, client := newController() + queue := &queuetesting.SpyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} + controller.serviceQueue = queue + cloud.Err = tc.lbCloudErr + + svc := defaultExternalService() + controller.serviceLister = newFakeServiceLister(svc) + + ctx := context.Background() + _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + t.Fatalf("creating meta namespace key: %s", err) + } + queue.Add(key) + + done := controller.processNextServiceItem(ctx) + if !done { + t.Fatal("processNextServiceItem stopped prematurely") + } + + // Expect no requeues unless we hit an error that is not a retry + // error. + wantNumRequeues := 0 + var re *api.RetryError + isRetryError := errors.As(tc.lbCloudErr, &re) + if tc.lbCloudErr != nil && !isRetryError { + wantNumRequeues = 1 + } + + if gotNumRequeues := queue.NumRequeues(key); gotNumRequeues != wantNumRequeues { + t.Fatalf("got %d requeue(s), want %d", gotNumRequeues, wantNumRequeues) + } + + if tc.wantRetryDelay > 0 { + items := queue.GetItems() + if len(items) != 1 { + t.Fatalf("got %d item(s), want 1", len(items)) + } + if gotDelay := items[0].Delay; gotDelay != tc.wantRetryDelay { + t.Fatalf("got delay %s, want %s", gotDelay, tc.wantRetryDelay) + } + } + }) + } +} + type fakeNodeLister struct { cache []*v1.Node err error @@ -2281,3 +2363,36 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) { } return nil, nil } + +type fakeServiceLister struct { + cache []*v1.Service +} + +func newFakeServiceLister(services ...*v1.Service) *fakeServiceLister { + ret := &fakeServiceLister{} + ret.cache = services + return ret +} + +// List lists all Nodes in the indexer. +// Objects returned here must be treated as read-only. +func (l *fakeServiceLister) List(_ labels.Selector) ([]*v1.Service, error) { + return l.cache, nil +} + +// Get retrieves the Service from the index for a given name. +// Objects returned here must be treated as read-only. +func (l *fakeServiceLister) Get(name string) (*v1.Service, error) { + for _, svc := range l.cache { + if svc.Name == name { + return svc, nil + } + } + return nil, nil +} + +// Services returns an object that can list and get Services. +// This is fakeServiceLister itself. +func (l *fakeServiceLister) Services(_ string) corelisters.ServiceNamespaceLister { + return l +} From 18e978a89dffc48dfd804ce25753085104d21594 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Sun, 23 Oct 2022 10:15:51 +0200 Subject: [PATCH 04/10] Improve RetryError GoDocs --- staging/src/k8s.io/cloud-provider/api/retry_error.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/api/retry_error.go b/staging/src/k8s.io/cloud-provider/api/retry_error.go index 6708cd3bcd0..b6dc86dead9 100644 --- a/staging/src/k8s.io/cloud-provider/api/retry_error.go +++ b/staging/src/k8s.io/cloud-provider/api/retry_error.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,10 +16,12 @@ limitations under the License. package api -import "time" +import ( + "time" +) -// RetryError indicates after what time a service reconciliation should be -// retried. +// RetryError indicates that a service reconciliation should be retried after a +// fixed duration (as opposed to backing off exponentially). type RetryError struct { msg string retryAfter time.Duration @@ -33,6 +35,7 @@ func NewRetryError(msg string, retryAfter time.Duration) *RetryError { } } +// Error shows the details of the retry reason. func (re *RetryError) Error() string { return re.msg } From 73bc9861d25102aae830494ede899f6205226214 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Sun, 23 Oct 2022 10:16:35 +0200 Subject: [PATCH 05/10] Improve error logging in processNextServiceItem --- .../k8s.io/cloud-provider/controllers/service/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 230ca0248e2..ad02b3d72e4 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -292,10 +292,10 @@ func (c *Controller) processNextServiceItem(ctx context.Context) bool { var re *api.RetryError if errors.As(err, &re) { - klog.V(4).Infof("Retrying processing for service %v in %s", key, re.RetryAfter()) + klog.Warningf("error processing service %v (retrying in %s): %v", key, re.RetryAfter(), err) c.serviceQueue.AddAfter(key, re.RetryAfter()) } else { - runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) + runtime.HandleError(fmt.Errorf("error processing service %v (retrying with exponential backoff): %v", key, err)) c.serviceQueue.AddRateLimited(key) } From a15013ec8b1b3c52464a01f06b97cd27e3a5bb34 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Sun, 23 Oct 2022 10:27:26 +0200 Subject: [PATCH 06/10] Document RetryError properly --- staging/src/k8s.io/cloud-provider/cloud.go | 12 +++++++++--- .../cloud-provider/controllers/service/controller.go | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/cloud.go b/staging/src/k8s.io/cloud-provider/cloud.go index 7e7bf9dfab8..c9a04085f48 100644 --- a/staging/src/k8s.io/cloud-provider/cloud.go +++ b/staging/src/k8s.io/cloud-provider/cloud.go @@ -131,11 +131,11 @@ func GetInstanceProviderID(ctx context.Context, cloud Interface, nodeName types. // irrespective of the ImplementedElsewhere error. Additional finalizers for // LB services must be managed in the alternate implementation. type LoadBalancer interface { - // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. // Implementations must treat the *v1.Service parameter as read-only and not modify it. - // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager. + // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) // GetLoadBalancerName returns the name of the load balancer. Implementations must treat the // *v1.Service parameter as read-only and not modify it. @@ -143,7 +143,13 @@ type LoadBalancer interface { // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer // Implementations must treat the *v1.Service and *v1.Node // parameters as read-only and not modify them. - // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager. + // + // Implementations may return a (possibly wrapped) api.RetryError to enforce + // backing off at a fixed duration. This can be used for cases like when the + // load balancer is not ready yet (e.g., it is still being provisioned) and + // polling at a fixed rate is preferred over backing off exponentially in + // order to minimize latency. EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. // Implementations must treat the *v1.Service and *v1.Node diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index ad02b3d72e4..179b2d4e8f0 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -410,6 +410,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName()) return op, nil } + // Use %w deliberately so that a returned RetryError can be handled. return op, fmt.Errorf("failed to ensure load balancer: %w", err) } if newStatus == nil { From 208af2d1d83488de933977849581207149b829ee Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Wed, 9 Nov 2022 07:32:40 +0100 Subject: [PATCH 07/10] Localize SpyWorkQueue to cloud-provider for now --- .../cloud-provider/controllers/service/controller_test.go | 2 +- .../controllers/service}/testing/queue.go | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename staging/src/k8s.io/{client-go/util/workqueue => cloud-provider/controllers/service}/testing/queue.go (100%) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index d59a6877d36..a995407ec1b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -46,8 +46,8 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - queuetesting "k8s.io/client-go/util/workqueue/testing" "k8s.io/cloud-provider/api" + queuetesting "k8s.io/cloud-provider/controllers/service/testing" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" featuregatetesting "k8s.io/component-base/featuregate/testing" diff --git a/staging/src/k8s.io/client-go/util/workqueue/testing/queue.go b/staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go similarity index 100% rename from staging/src/k8s.io/client-go/util/workqueue/testing/queue.go rename to staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go From 0fcf42f3211d51b3c0ec4b75633286b284da255e Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Wed, 14 Dec 2022 22:08:18 +0100 Subject: [PATCH 08/10] Move test double queue to test file and unexport --- .../controllers/service/controller_test.go | 35 ++++++++++-- .../controllers/service/testing/queue.go | 53 ------------------- 2 files changed, 32 insertions(+), 56 deletions(-) delete mode 100644 staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index a995407ec1b..ae36ebaed8f 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -47,7 +47,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/cloud-provider/api" - queuetesting "k8s.io/cloud-provider/controllers/service/testing" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -2285,7 +2284,7 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { controller, cloud, client := newController() - queue := &queuetesting.SpyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} + queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr @@ -2323,7 +2322,7 @@ func TestServiceQueueDelay(t *testing.T) { } if tc.wantRetryDelay > 0 { - items := queue.GetItems() + items := queue.getItems() if len(items) != 1 { t.Fatalf("got %d item(s), want 1", len(items)) } @@ -2396,3 +2395,33 @@ func (l *fakeServiceLister) Get(name string) (*v1.Service, error) { func (l *fakeServiceLister) Services(_ string) corelisters.ServiceNamespaceLister { return l } + +// spyWorkQueue implements a work queue and adds the ability to inspect processed +// items for testing purposes. +type spyWorkQueue struct { + workqueue.RateLimitingInterface + items []spyQueueItem +} + +// spyQueueItem represents an item that was being processed. +type spyQueueItem struct { + Key interface{} + // Delay represents the delayed duration if and only if AddAfter was invoked. + Delay time.Duration +} + +// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the +// added key and delay internally. +func (f *spyWorkQueue) AddAfter(key interface{}, delay time.Duration) { + f.items = append(f.items, spyQueueItem{ + Key: key, + Delay: delay, + }) + + f.RateLimitingInterface.AddAfter(key, delay) +} + +// getItems returns all items that were recorded. +func (f *spyWorkQueue) getItems() []spyQueueItem { + return f.items +} diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go b/staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go deleted file mode 100644 index 03368ae16c0..00000000000 --- a/staging/src/k8s.io/cloud-provider/controllers/service/testing/queue.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testing - -import ( - "time" - - "k8s.io/client-go/util/workqueue" -) - -// SpyWorkQueue implements a work queue and adds the ability to inspect processed -// items for testing purposes. -type SpyWorkQueue struct { - workqueue.RateLimitingInterface - items []SpyQueueItem -} - -// SpyQueueItem represents an item that was being processed. -type SpyQueueItem struct { - Key interface{} - // Delay represents the delayed duration if and only if AddAfter was invoked. - Delay time.Duration -} - -// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the -// added key and delay internally. -func (f *SpyWorkQueue) AddAfter(key interface{}, delay time.Duration) { - f.items = append(f.items, SpyQueueItem{ - Key: key, - Delay: delay, - }) - - f.RateLimitingInterface.AddAfter(key, delay) -} - -// GetItems returns all items that were recorded. -func (f *SpyWorkQueue) GetItems() []SpyQueueItem { - return f.items -} From b464d08427864e38d5401eb3e6650331c8190460 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Tue, 2 May 2023 07:52:35 +0200 Subject: [PATCH 09/10] Remove custom fake svc lister in favor of regular indexer --- .../controllers/service/controller_test.go | 40 +++---------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index ae36ebaed8f..9b7b422c4ad 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -2288,8 +2288,13 @@ func TestServiceQueueDelay(t *testing.T) { controller.serviceQueue = queue cloud.Err = tc.lbCloudErr + serviceCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + controller.serviceLister = corelisters.NewServiceLister(serviceCache) + svc := defaultExternalService() - controller.serviceLister = newFakeServiceLister(svc) + if err := serviceCache.Add(svc); err != nil { + t.Fatalf("adding service %s to cache: %s", svc.Name, err) + } ctx := context.Background() _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) @@ -2363,39 +2368,6 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) { return nil, nil } -type fakeServiceLister struct { - cache []*v1.Service -} - -func newFakeServiceLister(services ...*v1.Service) *fakeServiceLister { - ret := &fakeServiceLister{} - ret.cache = services - return ret -} - -// List lists all Nodes in the indexer. -// Objects returned here must be treated as read-only. -func (l *fakeServiceLister) List(_ labels.Selector) ([]*v1.Service, error) { - return l.cache, nil -} - -// Get retrieves the Service from the index for a given name. -// Objects returned here must be treated as read-only. -func (l *fakeServiceLister) Get(name string) (*v1.Service, error) { - for _, svc := range l.cache { - if svc.Name == name { - return svc, nil - } - } - return nil, nil -} - -// Services returns an object that can list and get Services. -// This is fakeServiceLister itself. -func (l *fakeServiceLister) Services(_ string) corelisters.ServiceNamespaceLister { - return l -} - // spyWorkQueue implements a work queue and adds the ability to inspect processed // items for testing purposes. type spyWorkQueue struct { From 2ad2c15c9ca1e0cad0837fe3398f3818adcc8441 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Tue, 2 May 2023 07:59:48 +0200 Subject: [PATCH 10/10] Update copyright year to 2023 --- staging/src/k8s.io/cloud-provider/api/retry_error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/staging/src/k8s.io/cloud-provider/api/retry_error.go b/staging/src/k8s.io/cloud-provider/api/retry_error.go index b6dc86dead9..ac0e5e6e728 100644 --- a/staging/src/k8s.io/cloud-provider/api/retry_error.go +++ b/staging/src/k8s.io/cloud-provider/api/retry_error.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Kubernetes Authors. +Copyright 2023 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.