diff --git a/pkg/cloudprovider/providers/gce/cloud/BUILD b/pkg/cloudprovider/providers/gce/cloud/BUILD index b4c51d476eb..04cd70a8119 100644 --- a/pkg/cloudprovider/providers/gce/cloud/BUILD +++ b/pkg/cloudprovider/providers/gce/cloud/BUILD @@ -6,6 +6,7 @@ go_library( "constants.go", "context.go", "doc.go", + "errors.go", "gce_projects.go", "gen.go", "op.go", @@ -32,6 +33,8 @@ go_test( srcs = [ "gen_test.go", "mock_test.go", + "ratelimit_test.go", + "service_test.go", "utils_test.go", ], embed = [":go_default_library"], diff --git a/pkg/cloudprovider/providers/gce/cloud/errors.go b/pkg/cloudprovider/providers/gce/cloud/errors.go new file mode 100644 index 00000000000..1896a6d9e02 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/errors.go @@ -0,0 +1,48 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import "fmt" + +// OperationPollingError occurs when the GCE Operation cannot be retrieved for a prolonged period. +type OperationPollingError struct { + LastPollError error +} + +// Error returns a string representation including the last poll error encountered. +func (e *OperationPollingError) Error() string { + return fmt.Sprintf("GCE operation polling error: %v", e.LastPollError) +} + +// GCEOperationError occurs when the GCE Operation finishes with an error. +type GCEOperationError struct { + // HTTPStatusCode is the HTTP status code of the final error. + // For example, a failed operation may have 400 - BadRequest. + HTTPStatusCode int + // Code is GCE's code of what went wrong. + // For example, RESOURCE_IN_USE_BY_ANOTHER_RESOURCE + Code string + // Message is a human readable message. + // For example, "The network resource 'xxx' is already being used by 'xxx'" + Message string +} + +// Error returns a string representation including the HTTP Status code, GCE's error code +// and a human readable message. +func (e *GCEOperationError) Error() string { + return fmt.Sprintf("GCE %v - %v: %v", e.HTTPStatusCode, e.Code, e.Message) +} diff --git a/pkg/cloudprovider/providers/gce/cloud/op.go b/pkg/cloudprovider/providers/gce/cloud/op.go index 1bd2e0c484b..358598776fc 100644 --- a/pkg/cloudprovider/providers/gce/cloud/op.go +++ b/pkg/cloudprovider/providers/gce/cloud/op.go @@ -29,10 +29,17 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) +const ( + operationStatusDone = "DONE" +) + // operation is a GCE operation that can be watied on. type operation interface { // isDone queries GCE for the done status. This call can block. isDone(ctx context.Context) (bool, error) + // error returns the resulting error of the operation. This may be nil if the operations + // was successful. + error() error // rateLimitKey returns the rate limit key to use for the given operation. // This rate limit will govern how fast the server will be polled for // operation completion status. @@ -43,6 +50,7 @@ type gaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *gaOperation) String() string { @@ -71,7 +79,15 @@ func (o *gaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *gaOperation) rateLimitKey() *RateLimitKey { @@ -83,10 +99,15 @@ func (o *gaOperation) rateLimitKey() *RateLimitKey { } } +func (o *gaOperation) error() error { + return o.err +} + type alphaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *alphaOperation) String() string { @@ -115,7 +136,15 @@ func (o *alphaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *alphaOperation) rateLimitKey() *RateLimitKey { @@ -127,10 +156,15 @@ func (o *alphaOperation) rateLimitKey() *RateLimitKey { } } +func (o *alphaOperation) error() error { + return o.err +} + type betaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *betaOperation) String() string { @@ -159,7 +193,15 @@ func (o *betaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *betaOperation) rateLimitKey() *RateLimitKey { @@ -170,3 +212,7 @@ func (o *betaOperation) rateLimitKey() *RateLimitKey { Version: meta.VersionBeta, } } + +func (o *betaOperation) error() error { + return o.err +} diff --git a/pkg/cloudprovider/providers/gce/cloud/ratelimit.go b/pkg/cloudprovider/providers/gce/cloud/ratelimit.go index e38b8f7de3c..ca1278a008d 100644 --- a/pkg/cloudprovider/providers/gce/cloud/ratelimit.go +++ b/pkg/cloudprovider/providers/gce/cloud/ratelimit.go @@ -47,22 +47,60 @@ type RateLimiter interface { Accept(ctx context.Context, key *RateLimitKey) error } +// acceptor is an object which blocks within Accept until a call is allowed to run. +// Accept is a behavior of the flowcontrol.RateLimiter interface. +type acceptor interface { + // Accept blocks until a call is allowed to run. + Accept() +} + +// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters. +type AcceptRateLimiter struct { + // Acceptor is the underlying rate limiter. + Acceptor acceptor +} + +// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored. +func (rl *AcceptRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { + ch := make(chan struct{}) + go func() { + rl.Acceptor.Accept() + close(ch) + }() + select { + case <-ch: + break + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + // NopRateLimiter is a rate limiter that performs no rate limiting. type NopRateLimiter struct { } -// Accept the operation to be rate limited. +// Accept everything immediately. func (*NopRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { - // Rate limit polling of the Operation status to avoid hammering GCE - // for the status of an operation. - const pollTime = time.Duration(1) * time.Second - if key.Operation == "Get" && key.Service == "Operations" { - select { - case <-time.NewTimer(pollTime).C: - break - case <-ctx.Done(): - return ctx.Err() - } - } return nil } + +// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum +// duration has been met or the context is cancelled. +type MinimumRateLimiter struct { + // RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd. + RateLimiter RateLimiter + // Minimum is the minimum wait time before the underlying ratelimiter is called. + Minimum time.Duration +} + +// Accept blocks on the minimum duration and context. Once the minimum duration is met, +// the func is blocked on the underlying ratelimiter. +func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { + select { + case <-time.After(m.Minimum): + return m.RateLimiter.Accept(ctx, key) + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go b/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go new file mode 100644 index 00000000000..4bb4512e133 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "context" + "testing" + "time" +) + +type FakeAcceptor struct{ accept func() } + +func (f *FakeAcceptor) Accept() { + f.accept() +} + +func TestAcceptRateLimiter(t *testing.T) { + fa := &FakeAcceptor{accept: func() {}} + arl := &AcceptRateLimiter{fa} + err := arl.Accept(context.Background(), nil) + if err != nil { + t.Errorf("AcceptRateLimiter.Accept() = %v, want nil", err) + } + + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + + fa.accept = func() { time.Sleep(1 * time.Second) } + err = arl.Accept(ctxCancelled, nil) + if err != ctxCancelled.Err() { + t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err()) + } +} + +func TestMinimumRateLimiter(t *testing.T) { + fa := &FakeAcceptor{accept: func() {}} + arl := &AcceptRateLimiter{fa} + var called bool + fa.accept = func() { called = true } + m := &MinimumRateLimiter{RateLimiter: arl, Minimum: 10 * time.Millisecond} + + err := m.Accept(context.Background(), nil) + if err != nil { + t.Errorf("MinimumRateLimiter.Accept = %v, want nil", err) + } + if !called { + t.Errorf("`called` = false, want true") + } + + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + called = false + err = m.Accept(ctxCancelled, nil) + if err != ctxCancelled.Err() { + t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err()) + } + if called { + t.Errorf("`called` = true, want false") + } +} diff --git a/pkg/cloudprovider/providers/gce/cloud/service.go b/pkg/cloudprovider/providers/gce/cloud/service.go index 99ed7d226b7..2f332dfff85 100644 --- a/pkg/cloudprovider/providers/gce/cloud/service.go +++ b/pkg/cloudprovider/providers/gce/cloud/service.go @@ -45,19 +45,19 @@ func (s *Service) wrapOperation(anyOp interface{}) (operation, error) { if err != nil { return nil, err } - return &gaOperation{s, r.ProjectID, r.Key}, nil + return &gaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil case *alpha.Operation: r, err := ParseResourceURL(o.SelfLink) if err != nil { return nil, err } - return &alphaOperation{s, r.ProjectID, r.Key}, nil + return &alphaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil case *beta.Operation: r, err := ParseResourceURL(o.SelfLink) if err != nil { return nil, err } - return &betaOperation{s, r.ProjectID, r.Key}, nil + return &betaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil default: return nil, fmt.Errorf("invalid type %T", anyOp) } @@ -72,14 +72,39 @@ func (s *Service) WaitForCompletion(ctx context.Context, genericOp interface{}) glog.Errorf("wrapOperation(%+v) error: %v", genericOp, err) return err } - for done, err := op.isDone(ctx); !done; done, err = op.isDone(ctx) { - if err != nil { - glog.V(4).Infof("op.isDone(%v) error; op = %v, err = %v", ctx, op, err) - return err - } - glog.V(5).Infof("op.isDone(%v) waiting; op = %v", ctx, op) - s.RateLimiter.Accept(ctx, op.rateLimitKey()) - } - glog.V(5).Infof("op.isDone(%v) complete; op = %v", ctx, op) - return nil + + return s.pollOperation(ctx, op) +} + +// pollOperation calls operations.isDone until the function comes back true or context is Done. +// If an error occurs retrieving the operation, the loop will continue until the context is done. +// This is to prevent a transient error from bubbling up to controller-level logic. +func (s *Service) pollOperation(ctx context.Context, op operation) error { + var pollCount int + for { + // Check if context has been cancelled. Note that ctx.Done() must be checked before + // returning ctx.Err(). + select { + case <-ctx.Done(): + glog.V(5).Infof("op.pollOperation(%v, %v) not completed, poll count = %d, ctx.Err = %v", ctx, op, pollCount, ctx.Err()) + return ctx.Err() + default: + // ctx is not canceled, continue immediately + } + + pollCount++ + glog.V(5).Infof("op.isDone(%v) waiting; op = %v, poll count = %d", ctx, op, pollCount) + s.RateLimiter.Accept(ctx, op.rateLimitKey()) + done, err := op.isDone(ctx) + if err != nil { + glog.V(5).Infof("op.isDone(%v) error; op = %v, poll count = %d, err = %v, retrying", ctx, op, pollCount, err) + } + + if done { + break + } + } + + glog.V(5).Infof("op.isDone(%v) complete; op = %v, poll count = %d, op.err = %v", ctx, op, pollCount, op.error()) + return op.error() } diff --git a/pkg/cloudprovider/providers/gce/cloud/service_test.go b/pkg/cloudprovider/providers/gce/cloud/service_test.go new file mode 100644 index 00000000000..fc6fbb51d89 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/service_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "context" + "fmt" + "testing" +) + +func TestPollOperation(t *testing.T) { + const totalAttempts = 10 + var attempts int + fo := &fakeOperation{isDoneFunc: func(ctx context.Context) (bool, error) { + attempts++ + if attempts < totalAttempts { + return false, nil + } + return true, nil + }} + s := Service{RateLimiter: &NopRateLimiter{}} + // Check that pollOperation will retry the operation multiple times. + err := s.pollOperation(context.Background(), fo) + if err != nil { + t.Errorf("pollOperation() = %v, want nil", err) + } + if attempts != totalAttempts { + t.Errorf("`attempts` = %d, want %d", attempts, totalAttempts) + } + + // Check that the operation's error is returned. + fo.err = fmt.Errorf("test operation failed") + err = s.pollOperation(context.Background(), fo) + if err != fo.err { + t.Errorf("pollOperation() = %v, want %v", err, fo.err) + } + fo.err = nil + + fo.isDoneFunc = func(ctx context.Context) (bool, error) { + return false, nil + } + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + // Check that pollOperation returns because the context is cancelled. + err = s.pollOperation(ctxCancelled, fo) + if err == nil { + t.Errorf("pollOperation() = nil, want: %v", ctxCancelled.Err()) + } +} + +type fakeOperation struct { + isDoneFunc func(ctx context.Context) (bool, error) + err error + rateKey *RateLimitKey +} + +func (f *fakeOperation) isDone(ctx context.Context) (bool, error) { + return f.isDoneFunc(ctx) +} + +func (f *fakeOperation) error() error { + return f.err +} + +func (f *fakeOperation) rateLimitKey() *RateLimitKey { + return f.rateKey +} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 383744f23d4..c2defec3896 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -68,7 +68,7 @@ const ( // AffinityTypeClientIPProto - affinity based on Client IP and port. gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO" - operationPollInterval = 3 * time.Second + operationPollInterval = time.Second // Creating Route in very large clusters, may take more than half an hour. operationPollTimeoutDuration = time.Hour @@ -484,7 +484,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) { glog.Infof("managing multiple zones: %v", config.ManagedZones) } - operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. + operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(5, 5) // 5 qps, 5 burst. gce := &GCECloud{ service: service, diff --git a/pkg/cloudprovider/providers/gce/support.go b/pkg/cloudprovider/providers/gce/support.go index 37dc75b0b0a..7861e08acbc 100644 --- a/pkg/cloudprovider/providers/gce/support.go +++ b/pkg/cloudprovider/providers/gce/support.go @@ -50,17 +50,15 @@ type gceRateLimiter struct { // operations. func (l *gceRateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { if key.Operation == "Get" && key.Service == "Operations" { - ch := make(chan struct{}) - go func() { - l.gce.operationPollRateLimiter.Accept() - close(ch) - }() - select { - case <-ch: - break - case <-ctx.Done(): - return ctx.Err() + // Wait a minimum amount of time regardless of rate limiter. + rl := &cloud.MinimumRateLimiter{ + // Convert flowcontrol.RateLimiter into cloud.RateLimiter + RateLimiter: &cloud.AcceptRateLimiter{ + Acceptor: l.gce.operationPollRateLimiter, + }, + Minimum: operationPollInterval, } + return rl.Accept(ctx, key) } return nil }