mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
Merge pull request #76749 from mcrute/ec2-rate-limit-fix
Avoid using tag filters for EC2 API where possible
This commit is contained in:
commit
f5d958af4b
@ -904,12 +904,28 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
|
|||||||
|
|
||||||
// Implements EC2.DescribeSecurityGroups
|
// Implements EC2.DescribeSecurityGroups
|
||||||
func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
|
func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
|
||||||
// Security groups are not paged
|
// Security groups are paged
|
||||||
|
results := []*ec2.SecurityGroup{}
|
||||||
|
var nextToken *string
|
||||||
|
requestTime := time.Now()
|
||||||
|
for {
|
||||||
response, err := s.ec2.DescribeSecurityGroups(request)
|
response, err := s.ec2.DescribeSecurityGroups(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
recordAWSMetric("describe_security_groups", 0, err)
|
||||||
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
|
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
|
||||||
}
|
}
|
||||||
return response.SecurityGroups, nil
|
|
||||||
|
results = append(results, response.SecurityGroups...)
|
||||||
|
|
||||||
|
nextToken = response.NextToken
|
||||||
|
if aws.StringValue(nextToken) == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
request.NextToken = nextToken
|
||||||
|
}
|
||||||
|
timeTaken := time.Since(requestTime).Seconds()
|
||||||
|
recordAWSMetric("describe_security_groups", timeTaken, nil)
|
||||||
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) {
|
func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) {
|
||||||
@ -1034,12 +1050,27 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) {
|
func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) {
|
||||||
// Not paged
|
results := []*ec2.RouteTable{}
|
||||||
|
var nextToken *string
|
||||||
|
requestTime := time.Now()
|
||||||
|
for {
|
||||||
response, err := s.ec2.DescribeRouteTables(request)
|
response, err := s.ec2.DescribeRouteTables(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
recordAWSMetric("describe_route_tables", 0, err)
|
||||||
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
|
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
|
||||||
}
|
}
|
||||||
return response.RouteTables, nil
|
|
||||||
|
results = append(results, response.RouteTables...)
|
||||||
|
|
||||||
|
nextToken = response.NextToken
|
||||||
|
if aws.StringValue(nextToken) == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
request.NextToken = nextToken
|
||||||
|
}
|
||||||
|
timeTaken := time.Since(requestTime).Seconds()
|
||||||
|
recordAWSMetric("describe_route_tables", timeTaken, nil)
|
||||||
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) {
|
func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) {
|
||||||
@ -1573,13 +1604,32 @@ func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) {
|
|||||||
// TODO: Caching / expose v1.Nodes to the cloud provider?
|
// TODO: Caching / expose v1.Nodes to the cloud provider?
|
||||||
// TODO: We could also query for subnets, I think
|
// TODO: We could also query for subnets, I think
|
||||||
|
|
||||||
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
|
// Note: It is more efficient to call the EC2 API twice with different tag
|
||||||
|
// filters than to call it once with a tag filter that results in a logical
|
||||||
|
// OR. For really large clusters the logical OR will result in EC2 API rate
|
||||||
|
// limiting.
|
||||||
|
instances := []*ec2.Instance{}
|
||||||
|
|
||||||
instances, err := c.describeInstances(filters)
|
baseFilters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
|
||||||
|
|
||||||
|
filters := c.tagging.addFilters(baseFilters)
|
||||||
|
di, err := c.describeInstances(filters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
instances = append(instances, di...)
|
||||||
|
|
||||||
|
if c.tagging.usesLegacyTags {
|
||||||
|
filters = c.tagging.addLegacyFilters(baseFilters)
|
||||||
|
di, err = c.describeInstances(filters)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
instances = append(instances, di...)
|
||||||
|
}
|
||||||
|
|
||||||
if len(instances) == 0 {
|
if len(instances) == 0 {
|
||||||
return nil, fmt.Errorf("no instances returned")
|
return nil, fmt.Errorf("no instances returned")
|
||||||
}
|
}
|
||||||
@ -3022,17 +3072,16 @@ func (c *Cloud) ensureSecurityGroup(name string, description string, additionalT
|
|||||||
for {
|
for {
|
||||||
attempt++
|
attempt++
|
||||||
|
|
||||||
request := &ec2.DescribeSecurityGroupsInput{}
|
|
||||||
filters := []*ec2.Filter{
|
|
||||||
newEc2Filter("group-name", name),
|
|
||||||
newEc2Filter("vpc-id", c.vpcID),
|
|
||||||
}
|
|
||||||
// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
|
// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
|
||||||
// However, we do check that it matches our tags.
|
// However, we do check that it matches our tags.
|
||||||
// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
|
// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
|
||||||
// If it has a different cluster's tags, that is an error.
|
// If it has a different cluster's tags, that is an error.
|
||||||
// This shouldn't happen because name is expected to be globally unique (UUID derived)
|
// This shouldn't happen because name is expected to be globally unique (UUID derived)
|
||||||
request.Filters = filters
|
request := &ec2.DescribeSecurityGroupsInput{}
|
||||||
|
request.Filters = []*ec2.Filter{
|
||||||
|
newEc2Filter("group-name", name),
|
||||||
|
newEc2Filter("vpc-id", c.vpcID),
|
||||||
|
}
|
||||||
|
|
||||||
securityGroups, err := c.ec2.DescribeSecurityGroups(request)
|
securityGroups, err := c.ec2.DescribeSecurityGroups(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -3108,8 +3157,7 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) {
|
|||||||
// However, in future this will likely be treated as an error.
|
// However, in future this will likely be treated as an error.
|
||||||
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
|
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
|
||||||
request := &ec2.DescribeSubnetsInput{}
|
request := &ec2.DescribeSubnetsInput{}
|
||||||
filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
|
request.Filters = []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
|
||||||
request.Filters = c.tagging.addFilters(filters)
|
|
||||||
|
|
||||||
subnets, err := c.ec2.DescribeSubnets(request)
|
subnets, err := c.ec2.DescribeSubnets(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -3131,8 +3179,7 @@ func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
|
|||||||
klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.")
|
klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.")
|
||||||
|
|
||||||
request = &ec2.DescribeSubnetsInput{}
|
request = &ec2.DescribeSubnetsInput{}
|
||||||
filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
|
request.Filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
|
||||||
request.Filters = filters
|
|
||||||
|
|
||||||
subnets, err = c.ec2.DescribeSubnets(request)
|
subnets, err = c.ec2.DescribeSubnets(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -3888,7 +3935,6 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m
|
|||||||
// Return all the security groups that are tagged as being part of our cluster
|
// Return all the security groups that are tagged as being part of our cluster
|
||||||
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
|
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
|
||||||
request := &ec2.DescribeSecurityGroupsInput{}
|
request := &ec2.DescribeSecurityGroupsInput{}
|
||||||
request.Filters = c.tagging.addFilters(nil)
|
|
||||||
groups, err := c.ec2.DescribeSecurityGroups(request)
|
groups, err := c.ec2.DescribeSecurityGroups(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error querying security groups: %q", err)
|
return nil, fmt.Errorf("error querying security groups: %q", err)
|
||||||
@ -3937,10 +3983,9 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
|
|||||||
var actualGroups []*ec2.SecurityGroup
|
var actualGroups []*ec2.SecurityGroup
|
||||||
{
|
{
|
||||||
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
||||||
filters := []*ec2.Filter{
|
describeRequest.Filters = []*ec2.Filter{
|
||||||
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
|
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
|
||||||
}
|
}
|
||||||
describeRequest.Filters = c.tagging.addFilters(filters)
|
|
||||||
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error querying security groups for ELB: %q", err)
|
return fmt.Errorf("error querying security groups for ELB: %q", err)
|
||||||
@ -4098,10 +4143,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
|
|||||||
{
|
{
|
||||||
// Server side filter
|
// Server side filter
|
||||||
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
||||||
filters := []*ec2.Filter{
|
describeRequest.Filters = []*ec2.Filter{
|
||||||
newEc2Filter("ip-permission.protocol", "tcp"),
|
newEc2Filter("ip-permission.protocol", "tcp"),
|
||||||
}
|
}
|
||||||
describeRequest.Filters = c.tagging.addFilters(filters)
|
|
||||||
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error querying security groups for NLB: %q", err)
|
return fmt.Errorf("Error querying security groups for NLB: %q", err)
|
||||||
@ -4229,10 +4273,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
|
|||||||
var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups)
|
var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups)
|
||||||
|
|
||||||
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
||||||
filters := []*ec2.Filter{
|
describeRequest.Filters = []*ec2.Filter{
|
||||||
newEc2Filter("group-id", loadBalancerSGs...),
|
newEc2Filter("group-id", loadBalancerSGs...),
|
||||||
}
|
}
|
||||||
describeRequest.Filters = c.tagging.addFilters(filters)
|
|
||||||
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error querying security groups for ELB: %q", err)
|
return fmt.Errorf("error querying security groups for ELB: %q", err)
|
||||||
@ -4444,7 +4487,6 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([
|
|||||||
|
|
||||||
// TODO: Move to instanceCache
|
// TODO: Move to instanceCache
|
||||||
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
|
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
|
||||||
filters = c.tagging.addFilters(filters)
|
|
||||||
request := &ec2.DescribeInstancesInput{
|
request := &ec2.DescribeInstancesInput{
|
||||||
Filters: filters,
|
Filters: filters,
|
||||||
}
|
}
|
||||||
|
@ -935,10 +935,10 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, in
|
|||||||
{
|
{
|
||||||
// Server side filter
|
// Server side filter
|
||||||
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
||||||
filters := []*ec2.Filter{
|
describeRequest.Filters = []*ec2.Filter{
|
||||||
newEc2Filter("ip-permission.protocol", "tcp"),
|
newEc2Filter("ip-permission.protocol", "tcp"),
|
||||||
|
newEc2Filter("vpc-id", c.vpcID),
|
||||||
}
|
}
|
||||||
describeRequest.Filters = c.tagging.addFilters(filters)
|
|
||||||
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error querying security groups for NLB: %q", err)
|
return fmt.Errorf("Error querying security groups for NLB: %q", err)
|
||||||
|
@ -42,7 +42,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
|
|||||||
|
|
||||||
tables = response
|
tables = response
|
||||||
} else {
|
} else {
|
||||||
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
|
request := &ec2.DescribeRouteTablesInput{}
|
||||||
response, err := c.ec2.DescribeRouteTables(request)
|
response, err := c.ec2.DescribeRouteTables(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -247,9 +247,32 @@ func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter {
|
|||||||
}
|
}
|
||||||
return filters
|
return filters
|
||||||
}
|
}
|
||||||
// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade
|
|
||||||
// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter
|
f := newEc2Filter("tag-key", t.clusterTagKey())
|
||||||
f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey())
|
|
||||||
|
// We can't pass a zero-length Filters to AWS (it's an error)
|
||||||
|
// So if we end up with no filters; we need to return nil
|
||||||
|
filters = append(filters, f)
|
||||||
|
return filters
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add additional filters, to match on our tags. This uses the tag for legacy
|
||||||
|
// 1.5 -> 1.6 clusters and exists for backwards compatibility
|
||||||
|
//
|
||||||
|
// This lets us run multiple k8s clusters in a single EC2 AZ
|
||||||
|
func (t *awsTagging) addLegacyFilters(filters []*ec2.Filter) []*ec2.Filter {
|
||||||
|
// if there are no clusterID configured - no filtering by special tag names
|
||||||
|
// should be applied to revert to legacy behaviour.
|
||||||
|
if len(t.ClusterID) == 0 {
|
||||||
|
if len(filters) == 0 {
|
||||||
|
// We can't pass a zero-length Filters to AWS (it's an error)
|
||||||
|
// So if we end up with no filters; just return nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return filters
|
||||||
|
}
|
||||||
|
|
||||||
|
f := newEc2Filter(fmt.Sprintf("tag:%s", TagNameKubernetesClusterLegacy), t.ClusterID)
|
||||||
|
|
||||||
// We can't pass a zero-length Filters to AWS (it's an error)
|
// We can't pass a zero-length Filters to AWS (it's an error)
|
||||||
// So if we end up with no filters; we need to return nil
|
// So if we end up with no filters; we need to return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user