diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 811c2e107e8..683eedf84ad 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -228,13 +228,57 @@ type awsSdkEC2 struct { type awsSDKProvider struct { creds *credentials.Credentials + + mutex sync.Mutex + regionDelayers map[string]*CrossRequestRetryDelay } -func addHandlers(h *request.Handlers) { +func newAWSSDKProvider(creds *credentials.Credentials) *awsSDKProvider { + return &awsSDKProvider{ + creds: creds, + regionDelayers: make(map[string]*CrossRequestRetryDelay), + } +} + +func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) { h.Sign.PushFrontNamed(request.NamedHandler{ Name: "k8s/logger", Fn: awsHandlerLogger, }) + + delayer := p.getCrossRequestRetryDelay(regionName) + if delayer != nil { + h.Sign.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-presign", + Fn: delayer.BeforeSign, + }) + + h.AfterRetry.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-afterretry", + Fn: delayer.AfterRetry, + }) + } +} + +// Get a CrossRequestRetryDelay, scoped to the region, not to the request. +// This means that when we hit a limit on a call, we will delay _all_ calls to the API. +// We do this to protect the AWS account from becoming overloaded and effectively locked. +// We also log when we hit request limits. +// Note that this delays the current goroutine; this is bad behaviour and will +// likely cause k8s to become slow or unresponsive for cloud operations. +// However, this throttle is intended only as a last resort. When we observe +// this throttling, we need to address the root cause (e.g. add a delay to a +// controller retry loop) +func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay { + p.mutex.Lock() + defer p.mutex.Unlock() + + delayer, found := p.regionDelayers[regionName] + if !found { + delayer = NewCrossRequestRetryDelay() + p.regionDelayers[regionName] = delayer + } + return delayer } func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { @@ -243,7 +287,7 @@ func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { Credentials: p.creds, })) - addHandlers(&service.Handlers) + p.addHandlers(regionName, &service.Handlers) ec2 := &awsSdkEC2{ ec2: service, @@ -257,7 +301,7 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { Credentials: p.creds, })) - addHandlers(&elbClient.Handlers) + p.addHandlers(regionName, &elbClient.Handlers) return elbClient, nil } @@ -268,7 +312,7 @@ func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) { Credentials: p.creds, })) - addHandlers(&client.Handlers) + p.addHandlers(regionName, &client.Handlers) return client, nil } @@ -458,7 +502,7 @@ func init() { }, &credentials.SharedCredentialsProvider{}, }) - aws := &awsSDKProvider{creds: creds} + aws := newAWSSDKProvider(creds) return newAWSCloud(config, aws) }) } diff --git a/pkg/cloudprovider/providers/aws/retry_handler.go b/pkg/cloudprovider/providers/aws/retry_handler.go new file mode 100644 index 00000000000..d6d68812899 --- /dev/null +++ b/pkg/cloudprovider/providers/aws/retry_handler.go @@ -0,0 +1,154 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "math" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/golang/glog" +) + +const ( + decayIntervalSeconds = 20 + decayFraction = 0.8 + maxDelay = 60 * time.Second +) + +// CrossRequestRetryDelay inserts delays before AWS calls, when we are observing RequestLimitExceeded errors +// Note that we share a CrossRequestRetryDelay across multiple AWS requests; this is a process-wide back-off, +// whereas the aws-sdk-go implements a per-request exponential backoff/retry +type CrossRequestRetryDelay struct { + backoff Backoff +} + +// Create a new CrossRequestRetryDelay +func NewCrossRequestRetryDelay() *CrossRequestRetryDelay { + c := &CrossRequestRetryDelay{} + c.backoff.init() + return c +} + +// Added to the Sign chain; called before each request +func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) { + now := time.Now() + delay := c.backoff.ComputeDelayForRequest(now) + if delay > 0 { + glog.Warningf("Inserting delay before AWS request (%s) to avoid RequestLimitExceeded: %s", + describeRequest(r), delay.String()) + r.Config.SleepDelay(delay) + + // Avoid clock skew problems + r.Time = now + } +} + +// Return a user-friendly string describing the request, for use in log messages +func describeRequest(r *request.Request) string { + service := r.ClientInfo.ServiceName + + name := "?" + if r.Operation != nil { + name = r.Operation.Name + } + + return service + "::" + name +} + +// Added to the AfterRetry chain; called after any error +func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) { + if r.Error == nil { + return + } + awsError, ok := r.Error.(awserr.Error) + if !ok { + return + } + if awsError.Code() == "RequestLimitExceeded" { + c.backoff.ReportError() + glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)", + describeRequest(r)) + } +} + +// Backoff manages a backoff that varies based on the recently observed failures +type Backoff struct { + mutex sync.Mutex + + // We count all requests & the number of requests which hit a + // RequestLimit. We only really care about 'recent' requests, so we + // decay the counts exponentially to bias towards recent values. + countErrorsRequestLimit float32 + countRequests float32 + lastDecay int64 +} + +func (b *Backoff) init() { + b.lastDecay = time.Now().Unix() + // Bias so that if the first request hits the limit we don't immediately apply the full delay + b.countRequests = 4 +} + +// Computes the delay required for a request, also updating internal state to count this request +func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration { + b.mutex.Lock() + defer b.mutex.Unlock() + + // Apply exponential decay to the counters + timeDeltaSeconds := now.Unix() - b.lastDecay + if timeDeltaSeconds > decayIntervalSeconds { + intervals := float64(timeDeltaSeconds) / float64(decayIntervalSeconds) + decay := float32(math.Pow(decayFraction, intervals)) + b.countErrorsRequestLimit *= decay + b.countRequests *= decay + b.lastDecay = now.Unix() + } + + // Count this request + b.countRequests += 1.0 + + // Compute the failure rate + errorFraction := float32(0.0) + if b.countRequests > 0.5 { + // Avoid tiny residuals & rounding errors + errorFraction = b.countErrorsRequestLimit / b.countRequests + } + + // Ignore a low fraction of errors + // This also allows them to time-out + if errorFraction < 0.1 { + return time.Duration(0) + } + + // Delay by the max delay multiplied by the recent error rate + // (i.e. we apply a linear delay function) + // TODO: This is pretty arbitrary + delay := time.Nanosecond * time.Duration(float32(maxDelay.Nanoseconds())*errorFraction) + // Round down to the nearest second for sanity + return time.Second * time.Duration(int(delay.Seconds())) +} + +// Called when we observe a throttling error +func (b *Backoff) ReportError() { + b.mutex.Lock() + defer b.mutex.Unlock() + + b.countErrorsRequestLimit += 1.0 +} diff --git a/pkg/cloudprovider/providers/aws/retry_handler_test.go b/pkg/cloudprovider/providers/aws/retry_handler_test.go new file mode 100644 index 00000000000..5a615e2637d --- /dev/null +++ b/pkg/cloudprovider/providers/aws/retry_handler_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "testing" + "time" +) + +// There follows a group of tests for the backoff logic. There's nothing +// particularly special about the values chosen: if we tweak the values in the +// backoff logic then we might well have to update the tests. However the key +// behavioural elements should remain (e.g. no errors => no backoff), and these +// are each tested by one of the tests below. + +// Test that we don't apply any delays when there are no errors +func TestBackoffNoErrors(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + for i := 0; i < 100; i++ { + d := b.ComputeDelayForRequest(now) + if d.Nanoseconds() != 0 { + t.Fatalf("unexpected delay during no-error case") + } + now = now.Add(time.Second) + } +} + +// Test that we always apply a delay when there are errors, and also that we +// don't "flap" - that our own delay doesn't cause us to oscillate between +// delay and no-delay. +func TestBackoffAllErrors(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + // Warm up + for i := 0; i < 10; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 100; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if d.Seconds() < 5 { + t.Fatalf("unexpected short-delay during all-error case: %v", d) + } + t.Logf("delay @%d %v", i, d) + now = now.Add(d) + } +} + +// Test that we do come close to our max delay, when we see all errors at 1 +// second intervals (this simulates multiple concurrent requests, because we +// don't wait for delay in between requests) +func TestBackoffHitsMax(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if float32(d.Nanoseconds()) < (float32(maxDelay.Nanoseconds()) * 0.95) { + t.Fatalf("expected delay to be >= 95 percent of max delay, was %v", d) + } + t.Logf("delay @%d %v", i, d) + now = now.Add(time.Second) + } +} + +// Test that after a phase of errors, we eventually stop applying a delay once there are +// no more errors. +func TestBackoffRecovers(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + + // Phase of all-errors + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if d.Seconds() < 5 { + t.Fatalf("unexpected short-delay during all-error phase: %v", d) + } + t.Logf("error phase delay @%d %v", i, d) + now = now.Add(time.Second) + } + + // Phase of no errors + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + now = now.Add(3 * time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + if d.Seconds() != 0 { + t.Fatalf("unexpected delay during error recovery phase: %v", d) + } + t.Logf("no-error phase delay @%d %v", i, d) + now = now.Add(time.Second) + } +}