mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Wait a minimum amount of time for polling operations
This commit is contained in:
parent
de8cc31355
commit
115ddc5a8e
@ -6,6 +6,7 @@ go_library(
|
||||
"constants.go",
|
||||
"context.go",
|
||||
"doc.go",
|
||||
"errors.go",
|
||||
"gce_projects.go",
|
||||
"gen.go",
|
||||
"op.go",
|
||||
@ -32,6 +33,8 @@ go_test(
|
||||
srcs = [
|
||||
"gen_test.go",
|
||||
"mock_test.go",
|
||||
"ratelimit_test.go",
|
||||
"service_test.go",
|
||||
"utils_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
|
48
pkg/cloudprovider/providers/gce/cloud/errors.go
Normal file
48
pkg/cloudprovider/providers/gce/cloud/errors.go
Normal file
@ -0,0 +1,48 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloud
|
||||
|
||||
import "fmt"
|
||||
|
||||
// OperationPollingError occurs when the GCE Operation cannot be retrieved for a prolonged period.
|
||||
type OperationPollingError struct {
|
||||
LastPollError error
|
||||
}
|
||||
|
||||
// Error returns a string representation including the last poll error encountered.
|
||||
func (e *OperationPollingError) Error() string {
|
||||
return fmt.Sprintf("GCE operation polling error: %v", e.LastPollError)
|
||||
}
|
||||
|
||||
// GCEOperationError occurs when the GCE Operation finishes with an error.
|
||||
type GCEOperationError struct {
|
||||
// HTTPStatusCode is the HTTP status code of the final error.
|
||||
// For example, a failed operation may have 400 - BadRequest.
|
||||
HTTPStatusCode int
|
||||
// Code is GCE's code of what went wrong.
|
||||
// For example, RESOURCE_IN_USE_BY_ANOTHER_RESOURCE
|
||||
Code string
|
||||
// Message is a human readable message.
|
||||
// For example, "The network resource 'xxx' is already being used by 'xxx'"
|
||||
Message string
|
||||
}
|
||||
|
||||
// Error returns a string representation including the HTTP Status code, GCE's error code
|
||||
// and a human readable message.
|
||||
func (e *GCEOperationError) Error() string {
|
||||
return fmt.Sprintf("GCE %v - %v: %v", e.HTTPStatusCode, e.Code, e.Message)
|
||||
}
|
@ -29,10 +29,17 @@ import (
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
|
||||
)
|
||||
|
||||
const (
|
||||
operationStatusDone = "DONE"
|
||||
)
|
||||
|
||||
// operation is a GCE operation that can be watied on.
|
||||
type operation interface {
|
||||
// isDone queries GCE for the done status. This call can block.
|
||||
isDone(ctx context.Context) (bool, error)
|
||||
// error returns the resulting error of the operation. This may be nil if the operations
|
||||
// was successful.
|
||||
error() error
|
||||
// rateLimitKey returns the rate limit key to use for the given operation.
|
||||
// This rate limit will govern how fast the server will be polled for
|
||||
// operation completion status.
|
||||
@ -43,6 +50,7 @@ type gaOperation struct {
|
||||
s *Service
|
||||
projectID string
|
||||
key *meta.Key
|
||||
err error
|
||||
}
|
||||
|
||||
func (o *gaOperation) String() string {
|
||||
@ -71,7 +79,15 @@ func (o *gaOperation) isDone(ctx context.Context) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return op != nil && op.Status == "DONE", nil
|
||||
if op == nil || op.Status != operationStatusDone {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
|
||||
e := op.Error.Errors[0]
|
||||
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (o *gaOperation) rateLimitKey() *RateLimitKey {
|
||||
@ -83,10 +99,15 @@ func (o *gaOperation) rateLimitKey() *RateLimitKey {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *gaOperation) error() error {
|
||||
return o.err
|
||||
}
|
||||
|
||||
type alphaOperation struct {
|
||||
s *Service
|
||||
projectID string
|
||||
key *meta.Key
|
||||
err error
|
||||
}
|
||||
|
||||
func (o *alphaOperation) String() string {
|
||||
@ -115,7 +136,15 @@ func (o *alphaOperation) isDone(ctx context.Context) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return op != nil && op.Status == "DONE", nil
|
||||
if op == nil || op.Status != operationStatusDone {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
|
||||
e := op.Error.Errors[0]
|
||||
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (o *alphaOperation) rateLimitKey() *RateLimitKey {
|
||||
@ -127,10 +156,15 @@ func (o *alphaOperation) rateLimitKey() *RateLimitKey {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *alphaOperation) error() error {
|
||||
return o.err
|
||||
}
|
||||
|
||||
type betaOperation struct {
|
||||
s *Service
|
||||
projectID string
|
||||
key *meta.Key
|
||||
err error
|
||||
}
|
||||
|
||||
func (o *betaOperation) String() string {
|
||||
@ -159,7 +193,15 @@ func (o *betaOperation) isDone(ctx context.Context) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return op != nil && op.Status == "DONE", nil
|
||||
if op == nil || op.Status != operationStatusDone {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
|
||||
e := op.Error.Errors[0]
|
||||
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (o *betaOperation) rateLimitKey() *RateLimitKey {
|
||||
@ -170,3 +212,7 @@ func (o *betaOperation) rateLimitKey() *RateLimitKey {
|
||||
Version: meta.VersionBeta,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *betaOperation) error() error {
|
||||
return o.err
|
||||
}
|
||||
|
@ -47,22 +47,60 @@ type RateLimiter interface {
|
||||
Accept(ctx context.Context, key *RateLimitKey) error
|
||||
}
|
||||
|
||||
// acceptor is an object which blocks within Accept until a call is allowed to run.
|
||||
// Accept is a behavior of the flowcontrol.RateLimiter interface.
|
||||
type acceptor interface {
|
||||
// Accept blocks until a call is allowed to run.
|
||||
Accept()
|
||||
}
|
||||
|
||||
// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters.
|
||||
type AcceptRateLimiter struct {
|
||||
// Acceptor is the underlying rate limiter.
|
||||
Acceptor acceptor
|
||||
}
|
||||
|
||||
// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored.
|
||||
func (rl *AcceptRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
rl.Acceptor.Accept()
|
||||
close(ch)
|
||||
}()
|
||||
select {
|
||||
case <-ch:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NopRateLimiter is a rate limiter that performs no rate limiting.
|
||||
type NopRateLimiter struct {
|
||||
}
|
||||
|
||||
// Accept the operation to be rate limited.
|
||||
// Accept everything immediately.
|
||||
func (*NopRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
|
||||
// Rate limit polling of the Operation status to avoid hammering GCE
|
||||
// for the status of an operation.
|
||||
const pollTime = time.Duration(1) * time.Second
|
||||
if key.Operation == "Get" && key.Service == "Operations" {
|
||||
select {
|
||||
case <-time.NewTimer(pollTime).C:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum
|
||||
// duration has been met or the context is cancelled.
|
||||
type MinimumRateLimiter struct {
|
||||
// RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd.
|
||||
RateLimiter RateLimiter
|
||||
// Minimum is the minimum wait time before the underlying ratelimiter is called.
|
||||
Minimum time.Duration
|
||||
}
|
||||
|
||||
// Accept blocks on the minimum duration and context. Once the minimum duration is met,
|
||||
// the func is blocked on the underlying ratelimiter.
|
||||
func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
|
||||
select {
|
||||
case <-time.After(m.Minimum):
|
||||
return m.RateLimiter.Accept(ctx, key)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
80
pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go
Normal file
80
pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go
Normal file
@ -0,0 +1,80 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloud
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FakeAcceptor struct{ accept func() }
|
||||
|
||||
func (f *FakeAcceptor) Accept() {
|
||||
f.accept()
|
||||
}
|
||||
|
||||
func TestAcceptRateLimiter(t *testing.T) {
|
||||
fa := &FakeAcceptor{accept: func() {}}
|
||||
arl := &AcceptRateLimiter{fa}
|
||||
err := arl.Accept(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Errorf("AcceptRateLimiter.Accept() = %v, want nil", err)
|
||||
}
|
||||
|
||||
// Use context that has been cancelled and expect a context error returned.
|
||||
ctxCancelled, cancelled := context.WithCancel(context.Background())
|
||||
cancelled()
|
||||
// Verify context is cancelled by now.
|
||||
<-ctxCancelled.Done()
|
||||
|
||||
fa.accept = func() { time.Sleep(1 * time.Second) }
|
||||
err = arl.Accept(ctxCancelled, nil)
|
||||
if err != ctxCancelled.Err() {
|
||||
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinimumRateLimiter(t *testing.T) {
|
||||
fa := &FakeAcceptor{accept: func() {}}
|
||||
arl := &AcceptRateLimiter{fa}
|
||||
var called bool
|
||||
fa.accept = func() { called = true }
|
||||
m := &MinimumRateLimiter{RateLimiter: arl, Minimum: 10 * time.Millisecond}
|
||||
|
||||
err := m.Accept(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Errorf("MinimumRateLimiter.Accept = %v, want nil", err)
|
||||
}
|
||||
if !called {
|
||||
t.Errorf("`called` = false, want true")
|
||||
}
|
||||
|
||||
// Use context that has been cancelled and expect a context error returned.
|
||||
ctxCancelled, cancelled := context.WithCancel(context.Background())
|
||||
cancelled()
|
||||
// Verify context is cancelled by now.
|
||||
<-ctxCancelled.Done()
|
||||
called = false
|
||||
err = m.Accept(ctxCancelled, nil)
|
||||
if err != ctxCancelled.Err() {
|
||||
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
|
||||
}
|
||||
if called {
|
||||
t.Errorf("`called` = true, want false")
|
||||
}
|
||||
}
|
@ -45,19 +45,19 @@ func (s *Service) wrapOperation(anyOp interface{}) (operation, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &gaOperation{s, r.ProjectID, r.Key}, nil
|
||||
return &gaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
|
||||
case *alpha.Operation:
|
||||
r, err := ParseResourceURL(o.SelfLink)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &alphaOperation{s, r.ProjectID, r.Key}, nil
|
||||
return &alphaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
|
||||
case *beta.Operation:
|
||||
r, err := ParseResourceURL(o.SelfLink)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &betaOperation{s, r.ProjectID, r.Key}, nil
|
||||
return &betaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid type %T", anyOp)
|
||||
}
|
||||
@ -72,14 +72,39 @@ func (s *Service) WaitForCompletion(ctx context.Context, genericOp interface{})
|
||||
glog.Errorf("wrapOperation(%+v) error: %v", genericOp, err)
|
||||
return err
|
||||
}
|
||||
for done, err := op.isDone(ctx); !done; done, err = op.isDone(ctx) {
|
||||
if err != nil {
|
||||
glog.V(4).Infof("op.isDone(%v) error; op = %v, err = %v", ctx, op, err)
|
||||
return err
|
||||
}
|
||||
glog.V(5).Infof("op.isDone(%v) waiting; op = %v", ctx, op)
|
||||
s.RateLimiter.Accept(ctx, op.rateLimitKey())
|
||||
}
|
||||
glog.V(5).Infof("op.isDone(%v) complete; op = %v", ctx, op)
|
||||
return nil
|
||||
|
||||
return s.pollOperation(ctx, op)
|
||||
}
|
||||
|
||||
// pollOperation calls operations.isDone until the function comes back true or context is Done.
|
||||
// If an error occurs retrieving the operation, the loop will continue until the context is done.
|
||||
// This is to prevent a transient error from bubbling up to controller-level logic.
|
||||
func (s *Service) pollOperation(ctx context.Context, op operation) error {
|
||||
var pollCount int
|
||||
for {
|
||||
// Check if context has been cancelled. Note that ctx.Done() must be checked before
|
||||
// returning ctx.Err().
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
glog.V(5).Infof("op.pollOperation(%v, %v) not completed, poll count = %d, ctx.Err = %v", ctx, op, pollCount, ctx.Err())
|
||||
return ctx.Err()
|
||||
default:
|
||||
// ctx is not canceled, continue immediately
|
||||
}
|
||||
|
||||
pollCount++
|
||||
glog.V(5).Infof("op.isDone(%v) waiting; op = %v, poll count = %d", ctx, op, pollCount)
|
||||
s.RateLimiter.Accept(ctx, op.rateLimitKey())
|
||||
done, err := op.isDone(ctx)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("op.isDone(%v) error; op = %v, poll count = %d, err = %v, retrying", ctx, op, pollCount, err)
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(5).Infof("op.isDone(%v) complete; op = %v, poll count = %d, op.err = %v", ctx, op, pollCount, op.error())
|
||||
return op.error()
|
||||
}
|
||||
|
84
pkg/cloudprovider/providers/gce/cloud/service_test.go
Normal file
84
pkg/cloudprovider/providers/gce/cloud/service_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloud
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPollOperation(t *testing.T) {
|
||||
const totalAttempts = 10
|
||||
var attempts int
|
||||
fo := &fakeOperation{isDoneFunc: func(ctx context.Context) (bool, error) {
|
||||
attempts++
|
||||
if attempts < totalAttempts {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}}
|
||||
s := Service{RateLimiter: &NopRateLimiter{}}
|
||||
// Check that pollOperation will retry the operation multiple times.
|
||||
err := s.pollOperation(context.Background(), fo)
|
||||
if err != nil {
|
||||
t.Errorf("pollOperation() = %v, want nil", err)
|
||||
}
|
||||
if attempts != totalAttempts {
|
||||
t.Errorf("`attempts` = %d, want %d", attempts, totalAttempts)
|
||||
}
|
||||
|
||||
// Check that the operation's error is returned.
|
||||
fo.err = fmt.Errorf("test operation failed")
|
||||
err = s.pollOperation(context.Background(), fo)
|
||||
if err != fo.err {
|
||||
t.Errorf("pollOperation() = %v, want %v", err, fo.err)
|
||||
}
|
||||
fo.err = nil
|
||||
|
||||
fo.isDoneFunc = func(ctx context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
// Use context that has been cancelled and expect a context error returned.
|
||||
ctxCancelled, cancelled := context.WithCancel(context.Background())
|
||||
cancelled()
|
||||
// Verify context is cancelled by now.
|
||||
<-ctxCancelled.Done()
|
||||
// Check that pollOperation returns because the context is cancelled.
|
||||
err = s.pollOperation(ctxCancelled, fo)
|
||||
if err == nil {
|
||||
t.Errorf("pollOperation() = nil, want: %v", ctxCancelled.Err())
|
||||
}
|
||||
}
|
||||
|
||||
type fakeOperation struct {
|
||||
isDoneFunc func(ctx context.Context) (bool, error)
|
||||
err error
|
||||
rateKey *RateLimitKey
|
||||
}
|
||||
|
||||
func (f *fakeOperation) isDone(ctx context.Context) (bool, error) {
|
||||
return f.isDoneFunc(ctx)
|
||||
}
|
||||
|
||||
func (f *fakeOperation) error() error {
|
||||
return f.err
|
||||
}
|
||||
|
||||
func (f *fakeOperation) rateLimitKey() *RateLimitKey {
|
||||
return f.rateKey
|
||||
}
|
@ -68,7 +68,7 @@ const (
|
||||
// AffinityTypeClientIPProto - affinity based on Client IP and port.
|
||||
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
|
||||
|
||||
operationPollInterval = 3 * time.Second
|
||||
operationPollInterval = time.Second
|
||||
// Creating Route in very large clusters, may take more than half an hour.
|
||||
operationPollTimeoutDuration = time.Hour
|
||||
|
||||
@ -484,7 +484,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
|
||||
glog.Infof("managing multiple zones: %v", config.ManagedZones)
|
||||
}
|
||||
|
||||
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
|
||||
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(5, 5) // 5 qps, 5 burst.
|
||||
|
||||
gce := &GCECloud{
|
||||
service: service,
|
||||
|
@ -50,17 +50,15 @@ type gceRateLimiter struct {
|
||||
// operations.
|
||||
func (l *gceRateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
|
||||
if key.Operation == "Get" && key.Service == "Operations" {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
l.gce.operationPollRateLimiter.Accept()
|
||||
close(ch)
|
||||
}()
|
||||
select {
|
||||
case <-ch:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
// Wait a minimum amount of time regardless of rate limiter.
|
||||
rl := &cloud.MinimumRateLimiter{
|
||||
// Convert flowcontrol.RateLimiter into cloud.RateLimiter
|
||||
RateLimiter: &cloud.AcceptRateLimiter{
|
||||
Acceptor: l.gce.operationPollRateLimiter,
|
||||
},
|
||||
Minimum: operationPollInterval,
|
||||
}
|
||||
return rl.Accept(ctx, key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user