diff --git a/staging/src/k8s.io/cloud-provider/api/retry_error.go b/staging/src/k8s.io/cloud-provider/api/retry_error.go new file mode 100644 index 00000000000..ac0e5e6e728 --- /dev/null +++ b/staging/src/k8s.io/cloud-provider/api/retry_error.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "time" +) + +// RetryError indicates that a service reconciliation should be retried after a +// fixed duration (as opposed to backing off exponentially). +type RetryError struct { + msg string + retryAfter time.Duration +} + +// NewRetryError returns a RetryError. +func NewRetryError(msg string, retryAfter time.Duration) *RetryError { + return &RetryError{ + msg: msg, + retryAfter: retryAfter, + } +} + +// Error shows the details of the retry reason. +func (re *RetryError) Error() string { + return re.msg +} + +// RetryAfter returns the defined retry-after duration. +func (re *RetryError) RetryAfter() time.Duration { + return re.retryAfter +} diff --git a/staging/src/k8s.io/cloud-provider/cloud.go b/staging/src/k8s.io/cloud-provider/cloud.go index 7e7bf9dfab8..c9a04085f48 100644 --- a/staging/src/k8s.io/cloud-provider/cloud.go +++ b/staging/src/k8s.io/cloud-provider/cloud.go @@ -131,11 +131,11 @@ func GetInstanceProviderID(ctx context.Context, cloud Interface, nodeName types. // irrespective of the ImplementedElsewhere error. Additional finalizers for // LB services must be managed in the alternate implementation. type LoadBalancer interface { - // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. // Implementations must treat the *v1.Service parameter as read-only and not modify it. - // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager. + // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) // GetLoadBalancerName returns the name of the load balancer. Implementations must treat the // *v1.Service parameter as read-only and not modify it. @@ -143,7 +143,13 @@ type LoadBalancer interface { // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer // Implementations must treat the *v1.Service and *v1.Node // parameters as read-only and not modify them. - // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager. + // + // Implementations may return a (possibly wrapped) api.RetryError to enforce + // backing off at a fixed duration. This can be used for cases like when the + // load balancer is not ready yet (e.g., it is still being provisioned) and + // polling at a fixed rate is preferred over backing off exponentially in + // order to minimize latency. EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. // Implementations must treat the *v1.Service and *v1.Node diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index a8b4bdf477f..179b2d4e8f0 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -18,13 +18,14 @@ package service import ( "context" + "errors" "fmt" "reflect" "sync" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -39,6 +40,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + "k8s.io/cloud-provider/api" servicehelper "k8s.io/cloud-provider/service/helpers" "k8s.io/component-base/featuregate" controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" @@ -288,8 +290,15 @@ func (c *Controller) processNextServiceItem(ctx context.Context) bool { return true } - runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - c.serviceQueue.AddRateLimited(key) + var re *api.RetryError + if errors.As(err, &re) { + klog.Warningf("error processing service %v (retrying in %s): %v", key, re.RetryAfter(), err) + c.serviceQueue.AddAfter(key, re.RetryAfter()) + } else { + runtime.HandleError(fmt.Errorf("error processing service %v (retrying with exponential backoff): %v", key, err)) + c.serviceQueue.AddRateLimited(key) + } + return true } @@ -401,7 +410,8 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName()) return op, nil } - return op, fmt.Errorf("failed to ensure load balancer: %v", err) + // Use %w deliberately so that a returned RetryError can be handled. + return op, fmt.Errorf("failed to ensure load balancer: %w", err) } if newStatus == nil { return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil") @@ -415,7 +425,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S // - Not found error mostly happens when service disappears right after // we remove the finalizer. // - We can't patch status on non-exist service anyway. - if !errors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { return op, fmt.Errorf("failed to update load balancer status: %v", err) } } @@ -837,7 +847,7 @@ func (c *Controller) syncService(ctx context.Context, key string) error { // service holds the latest service info from apiserver service, err := c.serviceLister.Services(namespace).Get(name) switch { - case errors.IsNotFound(err): + case apierrors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned err = c.processServiceDeletion(ctx, key) case err != nil: diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 3e31faf3c92..9b7b422c4ad 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -41,9 +41,12 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/cloud-provider/api" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -1093,22 +1096,24 @@ func TestSyncService(t *testing.T) { } for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Run(tc.testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - tc.updateFn() - obtainedErr := controller.syncService(ctx, tc.key) + tc.updateFn() + obtainedErr := controller.syncService(ctx, tc.key) - //expected matches obtained ??. - if exp := tc.expectedFn(obtainedErr); exp != nil { - t.Errorf("%v Error:%v", tc.testName, exp) - } + //expected matches obtained ??. + if exp := tc.expectedFn(obtainedErr); exp != nil { + t.Errorf("%v Error:%v", tc.testName, exp) + } - //Post processing, the element should not be in the sync queue. - _, exist := controller.cache.get(tc.key) - if exist { - t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) - } + //Post processing, the element should not be in the sync queue. + _, exist := controller.cache.get(tc.key) + if exist { + t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key) + } + }) } } @@ -2253,6 +2258,87 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } +func TestServiceQueueDelay(t *testing.T) { + const ns = metav1.NamespaceDefault + + tests := []struct { + name string + lbCloudErr error + wantRetryDelay time.Duration + }{ + { + name: "processing successful", + lbCloudErr: nil, + }, + { + name: "regular error", + lbCloudErr: errors.New("something went wrong"), + }, + { + name: "retry error", + lbCloudErr: api.NewRetryError("LB create in progress", 42*time.Second), + wantRetryDelay: 42 * time.Second, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller, cloud, client := newController() + queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} + controller.serviceQueue = queue + cloud.Err = tc.lbCloudErr + + serviceCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + controller.serviceLister = corelisters.NewServiceLister(serviceCache) + + svc := defaultExternalService() + if err := serviceCache.Add(svc); err != nil { + t.Fatalf("adding service %s to cache: %s", svc.Name, err) + } + + ctx := context.Background() + _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + t.Fatalf("creating meta namespace key: %s", err) + } + queue.Add(key) + + done := controller.processNextServiceItem(ctx) + if !done { + t.Fatal("processNextServiceItem stopped prematurely") + } + + // Expect no requeues unless we hit an error that is not a retry + // error. + wantNumRequeues := 0 + var re *api.RetryError + isRetryError := errors.As(tc.lbCloudErr, &re) + if tc.lbCloudErr != nil && !isRetryError { + wantNumRequeues = 1 + } + + if gotNumRequeues := queue.NumRequeues(key); gotNumRequeues != wantNumRequeues { + t.Fatalf("got %d requeue(s), want %d", gotNumRequeues, wantNumRequeues) + } + + if tc.wantRetryDelay > 0 { + items := queue.getItems() + if len(items) != 1 { + t.Fatalf("got %d item(s), want 1", len(items)) + } + if gotDelay := items[0].Delay; gotDelay != tc.wantRetryDelay { + t.Fatalf("got delay %s, want %s", gotDelay, tc.wantRetryDelay) + } + } + }) + } +} + type fakeNodeLister struct { cache []*v1.Node err error @@ -2281,3 +2367,33 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) { } return nil, nil } + +// spyWorkQueue implements a work queue and adds the ability to inspect processed +// items for testing purposes. +type spyWorkQueue struct { + workqueue.RateLimitingInterface + items []spyQueueItem +} + +// spyQueueItem represents an item that was being processed. +type spyQueueItem struct { + Key interface{} + // Delay represents the delayed duration if and only if AddAfter was invoked. + Delay time.Duration +} + +// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the +// added key and delay internally. +func (f *spyWorkQueue) AddAfter(key interface{}, delay time.Duration) { + f.items = append(f.items, spyQueueItem{ + Key: key, + Delay: delay, + }) + + f.RateLimitingInterface.AddAfter(key, delay) +} + +// getItems returns all items that were recorded. +func (f *spyWorkQueue) getItems() []spyQueueItem { + return f.items +}