Merge pull request #63294 from bertinatto/throttle_aws

Automatic merge from submit-queue (batch tested with PRs 63349, 63294). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add metric for throttled requests in AWS

**What this PR does / why we need it**:

This PR adds a metric for request throttling in AWS.

**Special notes for your reviewer**:

* Added metric.
* Moved metrics-related code to `aws_metrics.go`.
* Capitalized acronyms, e.g., `recordAwsMetric` to `recordAWSMetric`.

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-05-02 11:43:10 -07:00 committed by GitHub
commit 0d43bdec2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 42 deletions

View File

@ -44,7 +44,6 @@ import (
"github.com/aws/aws-sdk-go/service/kms"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
@ -754,7 +753,7 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
for {
response, err := s.ec2.DescribeInstances(request)
if err != nil {
recordAwsMetric("describe_instance", 0, err)
recordAWSMetric("describe_instance", 0, err)
return nil, fmt.Errorf("error listing AWS instances: %q", err)
}
@ -769,7 +768,7 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("describe_instance", timeTaken, nil)
recordAWSMetric("describe_instance", timeTaken, nil)
return results, nil
}
@ -787,7 +786,7 @@ func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAtt
requestTime := time.Now()
resp, err := s.ec2.AttachVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("attach_volume", timeTaken, err)
recordAWSMetric("attach_volume", timeTaken, err)
return resp, err
}
@ -795,7 +794,7 @@ func (s *awsSdkEC2) DetachVolume(request *ec2.DetachVolumeInput) (*ec2.VolumeAtt
requestTime := time.Now()
resp, err := s.ec2.DetachVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("detach_volume", timeTaken, err)
recordAWSMetric("detach_volume", timeTaken, err)
return resp, err
}
@ -808,7 +807,7 @@ func (s *awsSdkEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.V
response, err := s.ec2.DescribeVolumes(request)
if err != nil {
recordAwsMetric("describe_volume", 0, err)
recordAWSMetric("describe_volume", 0, err)
return nil, err
}
@ -821,7 +820,7 @@ func (s *awsSdkEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.V
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("describe_volume", timeTaken, nil)
recordAWSMetric("describe_volume", timeTaken, nil)
return results, nil
}
@ -829,7 +828,7 @@ func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolumeInput) (*ec2.Volume, e
requestTime := time.Now()
resp, err := s.ec2.CreateVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("create_volume", timeTaken, err)
recordAWSMetric("create_volume", timeTaken, err)
return resp, err
}
@ -837,7 +836,7 @@ func (s *awsSdkEC2) DeleteVolume(request *ec2.DeleteVolumeInput) (*ec2.DeleteVol
requestTime := time.Now()
resp, err := s.ec2.DeleteVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("delete_volume", timeTaken, err)
recordAWSMetric("delete_volume", timeTaken, err)
return resp, err
}
@ -845,7 +844,7 @@ func (s *awsSdkEC2) ModifyVolume(request *ec2.ModifyVolumeInput) (*ec2.ModifyVol
requestTime := time.Now()
resp, err := s.ec2.ModifyVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("modify_volume", timeTaken, err)
recordAWSMetric("modify_volume", timeTaken, err)
return resp, err
}
@ -856,7 +855,7 @@ func (s *awsSdkEC2) DescribeVolumeModifications(request *ec2.DescribeVolumesModi
for {
resp, err := s.ec2.DescribeVolumesModifications(request)
if err != nil {
recordAwsMetric("describe_volume_modification", 0, err)
recordAWSMetric("describe_volume_modification", 0, err)
return nil, fmt.Errorf("error listing volume modifictions : %v", err)
}
results = append(results, resp.VolumesModifications...)
@ -867,7 +866,7 @@ func (s *awsSdkEC2) DescribeVolumeModifications(request *ec2.DescribeVolumesModi
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("describe_volume_modification", timeTaken, nil)
recordAWSMetric("describe_volume_modification", timeTaken, nil)
return results, nil
}
@ -900,7 +899,7 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut
requestTime := time.Now()
resp, err := s.ec2.CreateTags(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("create_tags", timeTaken, err)
recordAWSMetric("create_tags", timeTaken, err)
return resp, err
}
@ -4329,12 +4328,3 @@ func setNodeDisk(
}
volumeMap[volumeID] = check
}
func recordAwsMetric(actionName string, timeTaken float64, err error) {
if err != nil {
awsApiErrorMetric.With(prometheus.Labels{"request": actionName}).Inc()
} else {
awsApiMetric.With(prometheus.Labels{"request": actionName}).Observe(timeTaken)
}
}

View File

@ -18,23 +18,43 @@ package aws
import "github.com/prometheus/client_golang/prometheus"
var awsApiMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_aws_api_request_duration_seconds",
Help: "Latency of aws api call",
},
[]string{"request"},
var (
awsAPIMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_aws_api_request_duration_seconds",
Help: "Latency of AWS API calls",
},
[]string{"request"})
awsAPIErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_aws_api_request_errors",
Help: "AWS API errors",
},
[]string{"request"})
awsAPIThrottlesMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_aws_api_throttled_requests_total",
Help: "AWS API throttled requests",
},
[]string{"operation_name"})
)
var awsApiErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_aws_api_request_errors",
Help: "AWS Api errors",
},
[]string{"request"},
)
func recordAWSMetric(actionName string, timeTaken float64, err error) {
if err != nil {
awsAPIErrorMetric.With(prometheus.Labels{"request": actionName}).Inc()
} else {
awsAPIMetric.With(prometheus.Labels{"request": actionName}).Observe(timeTaken)
}
}
func recordAWSThrottlesMetric(operation string) {
awsAPIThrottlesMetric.With(prometheus.Labels{"operation_name": operation}).Inc()
}
func registerMetrics() {
prometheus.MustRegister(awsApiMetric)
prometheus.MustRegister(awsApiErrorMetric)
prometheus.MustRegister(awsAPIMetric)
prometheus.MustRegister(awsAPIErrorMetric)
prometheus.MustRegister(awsAPIThrottlesMetric)
}

View File

@ -69,16 +69,19 @@ func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) {
}
}
// Return a user-friendly string describing the request, for use in log messages
func describeRequest(r *request.Request) string {
service := r.ClientInfo.ServiceName
// Return the operation name, for use in log messages and metrics
func operationName(r *request.Request) string {
name := "?"
if r.Operation != nil {
name = r.Operation.Name
}
return name
}
return service + "::" + name
// Return a user-friendly string describing the request, for use in log messages
func describeRequest(r *request.Request) string {
service := r.ClientInfo.ServiceName
return service + "::" + operationName(r)
}
// Added to the AfterRetry chain; called after any error
@ -92,6 +95,7 @@ func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) {
}
if awsError.Code() == "RequestLimitExceeded" {
c.backoff.ReportError()
recordAWSThrottlesMetric(operationName(r))
glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)",
describeRequest(r))
}