Avoid using tag filters for EC2 API where possible

For very large clusters these tag filters are not efficient within the
EC2 API and will result in rate limiting. Most of these queries have
filters that are targeted narrowly enough that the elimination of the
tags filter will not return significantly more data but will be executed
more efficiently by the EC2 API.

Additionally, some API wrappers did not support pagination despite the
underlying API calls being paginated. This change adds pagination to
prevent truncating the returned results.
This commit is contained in:
Mike Crute 2019-04-17 21:45:32 -07:00
parent 11611ee3e9
commit c8edfa2417
4 changed files with 101 additions and 36 deletions

View File

@ -904,12 +904,28 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
// Implements EC2.DescribeSecurityGroups // Implements EC2.DescribeSecurityGroups
func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
// Security groups are not paged // Security groups are paged
results := []*ec2.SecurityGroup{}
var nextToken *string
requestTime := time.Now()
for {
response, err := s.ec2.DescribeSecurityGroups(request) response, err := s.ec2.DescribeSecurityGroups(request)
if err != nil { if err != nil {
recordAWSMetric("describe_security_groups", 0, err)
return nil, fmt.Errorf("error listing AWS security groups: %q", err) return nil, fmt.Errorf("error listing AWS security groups: %q", err)
} }
return response.SecurityGroups, nil
results = append(results, response.SecurityGroups...)
nextToken = response.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAWSMetric("describe_security_groups", timeTaken, nil)
return results, nil
} }
func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) { func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) {
@ -1034,12 +1050,27 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut
} }
func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) { func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) {
// Not paged results := []*ec2.RouteTable{}
var nextToken *string
requestTime := time.Now()
for {
response, err := s.ec2.DescribeRouteTables(request) response, err := s.ec2.DescribeRouteTables(request)
if err != nil { if err != nil {
recordAWSMetric("describe_route_tables", 0, err)
return nil, fmt.Errorf("error listing AWS route tables: %q", err) return nil, fmt.Errorf("error listing AWS route tables: %q", err)
} }
return response.RouteTables, nil
results = append(results, response.RouteTables...)
nextToken = response.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAWSMetric("describe_route_tables", timeTaken, nil)
return results, nil
} }
func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) { func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) {
@ -1573,13 +1604,32 @@ func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) {
// TODO: Caching / expose v1.Nodes to the cloud provider? // TODO: Caching / expose v1.Nodes to the cloud provider?
// TODO: We could also query for subnets, I think // TODO: We could also query for subnets, I think
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} // Note: It is more efficient to call the EC2 API twice with different tag
// filters than to call it once with a tag filter that results in a logical
// OR. For really large clusters the logical OR will result in EC2 API rate
// limiting.
instances := []*ec2.Instance{}
instances, err := c.describeInstances(filters) baseFilters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters := c.tagging.addFilters(baseFilters)
di, err := c.describeInstances(filters)
if err != nil { if err != nil {
return nil, err return nil, err
} }
instances = append(instances, di...)
if c.tagging.usesLegacyTags {
filters = c.tagging.addLegacyFilters(baseFilters)
di, err = c.describeInstances(filters)
if err != nil {
return nil, err
}
instances = append(instances, di...)
}
if len(instances) == 0 { if len(instances) == 0 {
return nil, fmt.Errorf("no instances returned") return nil, fmt.Errorf("no instances returned")
} }
@ -3022,17 +3072,16 @@ func (c *Cloud) ensureSecurityGroup(name string, description string, additionalT
for { for {
attempt++ attempt++
request := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{
newEc2Filter("group-name", name),
newEc2Filter("vpc-id", c.vpcID),
}
// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key. // Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
// However, we do check that it matches our tags. // However, we do check that it matches our tags.
// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before. // If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
// If it has a different cluster's tags, that is an error. // If it has a different cluster's tags, that is an error.
// This shouldn't happen because name is expected to be globally unique (UUID derived) // This shouldn't happen because name is expected to be globally unique (UUID derived)
request.Filters = filters request := &ec2.DescribeSecurityGroupsInput{}
request.Filters = []*ec2.Filter{
newEc2Filter("group-name", name),
newEc2Filter("vpc-id", c.vpcID),
}
securityGroups, err := c.ec2.DescribeSecurityGroups(request) securityGroups, err := c.ec2.DescribeSecurityGroups(request)
if err != nil { if err != nil {
@ -3108,8 +3157,7 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) {
// However, in future this will likely be treated as an error. // However, in future this will likely be treated as an error.
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) { func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
request := &ec2.DescribeSubnetsInput{} request := &ec2.DescribeSubnetsInput{}
filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)} request.Filters = []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
request.Filters = c.tagging.addFilters(filters)
subnets, err := c.ec2.DescribeSubnets(request) subnets, err := c.ec2.DescribeSubnets(request)
if err != nil { if err != nil {
@ -3131,8 +3179,7 @@ func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.") klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.")
request = &ec2.DescribeSubnetsInput{} request = &ec2.DescribeSubnetsInput{}
filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)} request.Filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
request.Filters = filters
subnets, err = c.ec2.DescribeSubnets(request) subnets, err = c.ec2.DescribeSubnets(request)
if err != nil { if err != nil {
@ -3888,7 +3935,6 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m
// Return all the security groups that are tagged as being part of our cluster // Return all the security groups that are tagged as being part of our cluster
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) { func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
request := &ec2.DescribeSecurityGroupsInput{} request := &ec2.DescribeSecurityGroupsInput{}
request.Filters = c.tagging.addFilters(nil)
groups, err := c.ec2.DescribeSecurityGroups(request) groups, err := c.ec2.DescribeSecurityGroups(request)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying security groups: %q", err) return nil, fmt.Errorf("error querying security groups: %q", err)
@ -3937,10 +3983,9 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
var actualGroups []*ec2.SecurityGroup var actualGroups []*ec2.SecurityGroup
{ {
describeRequest := &ec2.DescribeSecurityGroupsInput{} describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{ describeRequest.Filters = []*ec2.Filter{
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID), newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
} }
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest) response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil { if err != nil {
return fmt.Errorf("error querying security groups for ELB: %q", err) return fmt.Errorf("error querying security groups for ELB: %q", err)
@ -4098,10 +4143,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
{ {
// Server side filter // Server side filter
describeRequest := &ec2.DescribeSecurityGroupsInput{} describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{ describeRequest.Filters = []*ec2.Filter{
newEc2Filter("ip-permission.protocol", "tcp"), newEc2Filter("ip-permission.protocol", "tcp"),
} }
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest) response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil { if err != nil {
return fmt.Errorf("Error querying security groups for NLB: %q", err) return fmt.Errorf("Error querying security groups for NLB: %q", err)
@ -4229,10 +4273,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups) var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups)
describeRequest := &ec2.DescribeSecurityGroupsInput{} describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{ describeRequest.Filters = []*ec2.Filter{
newEc2Filter("group-id", loadBalancerSGs...), newEc2Filter("group-id", loadBalancerSGs...),
} }
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest) response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil { if err != nil {
return fmt.Errorf("error querying security groups for ELB: %q", err) return fmt.Errorf("error querying security groups for ELB: %q", err)
@ -4444,7 +4487,6 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([
// TODO: Move to instanceCache // TODO: Move to instanceCache
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
filters = c.tagging.addFilters(filters)
request := &ec2.DescribeInstancesInput{ request := &ec2.DescribeInstancesInput{
Filters: filters, Filters: filters,
} }

View File

@ -935,10 +935,10 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, in
{ {
// Server side filter // Server side filter
describeRequest := &ec2.DescribeSecurityGroupsInput{} describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{ describeRequest.Filters = []*ec2.Filter{
newEc2Filter("ip-permission.protocol", "tcp"), newEc2Filter("ip-permission.protocol", "tcp"),
newEc2Filter("vpc-id", c.vpcID),
} }
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest) response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil { if err != nil {
return fmt.Errorf("Error querying security groups for NLB: %q", err) return fmt.Errorf("Error querying security groups for NLB: %q", err)

View File

@ -42,7 +42,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
tables = response tables = response
} else { } else {
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)} request := &ec2.DescribeRouteTablesInput{}
response, err := c.ec2.DescribeRouteTables(request) response, err := c.ec2.DescribeRouteTables(request)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -247,9 +247,32 @@ func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter {
} }
return filters return filters
} }
// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade
// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter f := newEc2Filter("tag-key", t.clusterTagKey())
f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey())
// We can't pass a zero-length Filters to AWS (it's an error)
// So if we end up with no filters; we need to return nil
filters = append(filters, f)
return filters
}
// Add additional filters, to match on our tags. This uses the tag for legacy
// 1.5 -> 1.6 clusters and exists for backwards compatibility
//
// This lets us run multiple k8s clusters in a single EC2 AZ
func (t *awsTagging) addLegacyFilters(filters []*ec2.Filter) []*ec2.Filter {
// if there are no clusterID configured - no filtering by special tag names
// should be applied to revert to legacy behaviour.
if len(t.ClusterID) == 0 {
if len(filters) == 0 {
// We can't pass a zero-length Filters to AWS (it's an error)
// So if we end up with no filters; just return nil
return nil
}
return filters
}
f := newEc2Filter(fmt.Sprintf("tag:%s", TagNameKubernetesClusterLegacy), t.ClusterID)
// We can't pass a zero-length Filters to AWS (it's an error) // We can't pass a zero-length Filters to AWS (it's an error)
// So if we end up with no filters; we need to return nil // So if we end up with no filters; we need to return nil