Merge pull request #47516 from gnufied/fix-filter-limit-aws

Automatic merge from submit-queue (batch tested with PRs 47510, 47516, 47482, 47521, 47537)

Batch AWS getInstancesByNodeNames calls with FilterNodeLimit

We are going to limit the getInstancesByNodeNames call with a batch
size of 150.

Fixes - #47271

```release-note
AWS: Batch DescribeInstance calls with nodeNames to 150 limit, to stay within AWS filter limits.
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-14 20:32:45 -07:00 committed by GitHub
commit 8e4ec18adf
2 changed files with 59 additions and 16 deletions

View File

@ -156,6 +156,10 @@ const (
createTagInitialDelay = 1 * time.Second
createTagFactor = 2.0
createTagSteps = 9
// Number of node names that can be added to a filter. The AWS limit is 200
// but we are using a lower limit on purpose
filterNodeLimit = 150
)
// awsTagNameMasterRoles is a set of well-known AWS tag names that indicate the instance is a master
@ -3289,28 +3293,39 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String, states ...s
func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([]*ec2.Instance, error) {
names := aws.StringSlice(nodeNames)
ec2Instances := []*ec2.Instance{}
nodeNameFilter := &ec2.Filter{
Name: aws.String("private-dns-name"),
Values: names,
for i := 0; i < len(names); i += filterNodeLimit {
end := i + filterNodeLimit
if end > len(names) {
end = len(names)
}
nameSlice := names[i:end]
nodeNameFilter := &ec2.Filter{
Name: aws.String("private-dns-name"),
Values: nameSlice,
}
filters := []*ec2.Filter{nodeNameFilter}
if len(states) > 0 {
filters = append(filters, newEc2Filter("instance-state-name", states...))
}
instances, err := c.describeInstances(filters)
if err != nil {
glog.V(2).Infof("Failed to describe instances %v", nodeNames)
return nil, err
}
ec2Instances = append(ec2Instances, instances...)
}
filters := []*ec2.Filter{nodeNameFilter}
if len(states) > 0 {
filters = append(filters, newEc2Filter("instance-state-name", states...))
}
instances, err := c.describeInstances(filters)
if err != nil {
glog.V(2).Infof("Failed to describe instances %v", nodeNames)
return nil, err
}
if len(instances) == 0 {
if len(ec2Instances) == 0 {
glog.V(3).Infof("Failed to find any instances %v", nodeNames)
return nil, nil
}
return instances, nil
return ec2Instances, nil
}
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package aws
import (
"fmt"
"io"
"reflect"
"strings"
@ -1067,6 +1068,33 @@ func TestFindInstancesByNodeNameCached(t *testing.T) {
}
}
func TestGetInstanceByNodeNameBatching(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
assert.Nil(t, err, "Error building aws cloud: %v", err)
var tag ec2.Tag
tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId)
tag.Value = aws.String("")
tags := []*ec2.Tag{&tag}
nodeNames := []string{}
for i := 0; i < 200; i++ {
nodeName := fmt.Sprintf("ip-171-20-42-%d.ec2.internal", i)
nodeNames = append(nodeNames, nodeName)
ec2Instance := &ec2.Instance{}
instanceId := fmt.Sprintf("i-abcedf%d", i)
ec2Instance.InstanceId = aws.String(instanceId)
ec2Instance.PrivateDnsName = aws.String(nodeName)
ec2Instance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("running")}
ec2Instance.Tags = tags
awsServices.instances = append(awsServices.instances, ec2Instance)
}
instances, err := c.getInstancesByNodeNames(nodeNames)
assert.NotEmpty(t, instances)
assert.Equal(t, 200, len(instances), "Expected 200 but got less")
}
func TestGetVolumeLabels(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)