Merge pull request #47410 from justinsb/fix_45050

Automatic merge from submit-queue (batch tested with PRs 47451, 47410, 47598, 47616, 47473)

AWS: Cache instances for ELB to avoid #45050

We maintain a cache of all instances, and we invalidate the cache
whenever we see a new instance.  For ELBs that should be sufficient,
because our usage is limited to instance ids and security groups, which
should not change.

Fix #45050

```release-note
AWS: Maintain a cache of all instances, to fix problem with > 200 nodes with ELBs
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-15 19:52:07 -07:00 committed by GitHub
commit 06e8e0c877
5 changed files with 313 additions and 106 deletions

View File

@ -377,9 +377,7 @@ type Cloud struct {
// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance
selfAWSInstance *awsInstance
mutex sync.Mutex
lastNodeNames sets.String
lastInstancesByNodeNames []*ec2.Instance
instanceCache instanceCache
// We keep an active list of devices we have assigned but not yet
// attached, to avoid a race condition where we assign a device mapping
@ -862,6 +860,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
attaching: make(map[types.NodeName]map[mountDevice]awsVolumeID),
deviceAllocators: make(map[types.NodeName]DeviceAllocator),
}
awsCloud.instanceCache.cloud = awsCloud
if cfg.Global.VPC != "" && cfg.Global.SubnetID != "" && (cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "") {
// When the master is running on a different AWS account, cloud provider or on-premise
@ -2556,14 +2555,6 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts
return listener, nil
}
func nodeNames(nodes []*v1.Node) sets.String {
ret := sets.String{}
for _, node := range nodes {
ret.Insert(node.Name)
}
return ret
}
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
annotations := apiService.Annotations
@ -2601,7 +2592,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
}
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running")
instances, err := c.findInstancesForELB(nodes)
if err != nil {
return nil, err
}
@ -2955,7 +2946,7 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error)
// Open security group ingress rules on the instances so that the load balancer can talk to them
// Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances
func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, allInstances []*ec2.Instance) error {
func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, instances map[awsInstanceID]*ec2.Instance) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
@ -3010,7 +3001,7 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
instanceSecurityGroupIds := map[string]bool{}
// Scan instances for groups we want open
for _, instance := range allInstances {
for _, instance := range instances {
securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups)
if err != nil {
return err
@ -3188,7 +3179,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running")
instances, err := c.findInstancesForELB(nodes)
if err != nil {
return err
}
@ -3203,7 +3194,7 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node
return fmt.Errorf("Load balancer not found")
}
err = c.ensureLoadBalancerInstances(orEmpty(lb.LoadBalancerName), lb.Instances, instances)
err = c.ensureLoadBalancerInstances(aws.StringValue(lb.LoadBalancerName), lb.Instances, instances)
if err != nil {
return nil
}
@ -3260,37 +3251,6 @@ func (c *Cloud) getInstancesByIDs(instanceIDs []*string) (map[string]*ec2.Instan
return instancesByID, nil
}
// Fetches and caches instances in the given state, by node names; returns an error if any cannot be found. If no states
// are given, no state filter is used and instances of all states are fetched.
// This is implemented with a multi value filter on the node names, fetching the desired instances with a single query.
// TODO(therc): make all the caching more rational during the 1.4 timeframe
func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String, states ...string) ([]*ec2.Instance, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if nodeNames.Equal(c.lastNodeNames) {
if len(c.lastInstancesByNodeNames) > 0 {
// We assume that if the list of nodes is the same, the underlying
// instances have not changed. Later we might guard this with TTLs.
glog.V(2).Infof("Returning cached instances for %v", nodeNames)
return c.lastInstancesByNodeNames, nil
}
}
instances, err := c.getInstancesByNodeNames(nodeNames.List(), states...)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, nil
}
glog.V(2).Infof("Caching instances for %v", nodeNames)
c.lastNodeNames = nodeNames
c.lastInstancesByNodeNames = instances
return instances, nil
}
func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([]*ec2.Instance, error) {
names := aws.StringSlice(nodeNames)
ec2Instances := []*ec2.Instance{}
@ -3328,6 +3288,7 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([
return ec2Instances, nil
}
// TODO: Move to instanceCache
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
filters = c.tagging.addFilters(filters)
request := &ec2.DescribeInstancesInput{

View File

@ -28,6 +28,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
)
const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"
@ -417,10 +418,10 @@ func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDesc
}
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error {
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[awsInstanceID]*ec2.Instance) error {
expected := sets.NewString()
for _, instance := range instances {
expected.Insert(orEmpty(instance.InstanceId))
for id := range instanceIDs {
expected.Insert(string(id))
}
actual := sets.NewString()
@ -519,3 +520,25 @@ func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool {
return false
}
// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB
// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider,
// and we ignore instances which are not found
func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[awsInstanceID]*ec2.Instance, error) {
// Map to instance ids ignoring Nodes where we cannot find the id (but logging)
instanceIDs := mapToAWSInstanceIDsTolerant(nodes)
cacheCriteria := cacheCriteria{
// MaxAge not required, because we only care about security groups, which should not change
HasInstances: instanceIDs, // Refresh if any of the instance ids are missing
}
snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria)
if err != nil {
return nil, err
}
instances := snapshot.FindInstances(instanceIDs)
// We ignore instances that cannot be found
return instances, nil
}

View File

@ -1012,62 +1012,6 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) {
}
}
func TestFindInstancesByNodeNameCached(t *testing.T) {
awsServices := NewFakeAWSServices()
nodeNameOne := "my-dns.internal"
nodeNameTwo := "my-dns-two.internal"
var tag ec2.Tag
tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId)
tag.Value = aws.String("")
tags := []*ec2.Tag{&tag}
var runningInstance ec2.Instance
runningInstance.InstanceId = aws.String("i-running")
runningInstance.PrivateDnsName = aws.String(nodeNameOne)
runningInstance.State = &ec2.InstanceState{Code: aws.Int64(16), Name: aws.String("running")}
runningInstance.Tags = tags
var secondInstance ec2.Instance
secondInstance.InstanceId = aws.String("i-running")
secondInstance.PrivateDnsName = aws.String(nodeNameTwo)
secondInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("running")}
secondInstance.Tags = tags
var terminatedInstance ec2.Instance
terminatedInstance.InstanceId = aws.String("i-terminated")
terminatedInstance.PrivateDnsName = aws.String(nodeNameOne)
terminatedInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("terminated")}
terminatedInstance.Tags = tags
instances := []*ec2.Instance{&secondInstance, &runningInstance, &terminatedInstance}
awsServices.instances = append(awsServices.instances, instances...)
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
if err != nil {
t.Errorf("Error building aws cloud: %v", err)
return
}
nodeNames := sets.NewString(nodeNameOne)
returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames, "running")
if errr != nil {
t.Errorf("Failed to find instance: %v", err)
return
}
if len(returnedInstances) != 1 {
t.Errorf("Expected a single isntance but found: %v", returnedInstances)
}
if *returnedInstances[0].PrivateDnsName != nodeNameOne {
t.Errorf("Expected node name %v but got %v", nodeNameOne, returnedInstances[0].PrivateDnsName)
}
}
func TestGetInstanceByNodeNameBatching(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)

View File

@ -23,6 +23,10 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
"sync"
"time"
)
// awsInstanceID represents the ID of the instance in the AWS API, e.g. i-12345678
@ -80,6 +84,42 @@ func (name kubernetesInstanceID) mapToAWSInstanceID() (awsInstanceID, error) {
return awsInstanceID(awsID), nil
}
// mapToAWSInstanceID extracts the awsInstanceIDs from the Nodes, returning an error if a Node cannot be mapped
func mapToAWSInstanceIDs(nodes []*v1.Node) ([]awsInstanceID, error) {
var instanceIDs []awsInstanceID
for _, node := range nodes {
if node.Spec.ProviderID == "" {
return nil, fmt.Errorf("node %q did not have ProviderID set", node.Name)
}
instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID()
if err != nil {
return nil, fmt.Errorf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name)
}
instanceIDs = append(instanceIDs, instanceID)
}
return instanceIDs, nil
}
// mapToAWSInstanceIDsTolerant extracts the awsInstanceIDs from the Nodes, skipping Nodes that cannot be mapped
func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []awsInstanceID {
var instanceIDs []awsInstanceID
for _, node := range nodes {
if node.Spec.ProviderID == "" {
glog.Warningf("node %q did not have ProviderID set", node.Name)
continue
}
instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID()
if err != nil {
glog.Warningf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name)
continue
}
instanceIDs = append(instanceIDs, instanceID)
}
return instanceIDs
}
// Gets the full information about this instance from the EC2 API
func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, error) {
request := &ec2.DescribeInstancesInput{
@ -98,3 +138,132 @@ func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, e
}
return instances[0], nil
}
// instanceCache manages the cache of DescribeInstances
type instanceCache struct {
// TODO: Get rid of this field, send all calls through the instanceCache
cloud *Cloud
mutex sync.Mutex
snapshot *allInstancesSnapshot
}
// Gets the full information about these instance from the EC2 API
func (c *instanceCache) describeAllInstancesUncached() (*allInstancesSnapshot, error) {
now := time.Now()
glog.V(4).Infof("EC2 DescribeInstances - fetching all instances")
filters := []*ec2.Filter{}
instances, err := c.cloud.describeInstances(filters)
if err != nil {
return nil, err
}
m := make(map[awsInstanceID]*ec2.Instance)
for _, i := range instances {
id := awsInstanceID(aws.StringValue(i.InstanceId))
m[id] = i
}
snapshot := &allInstancesSnapshot{now, m}
c.mutex.Lock()
defer c.mutex.Unlock()
if c.snapshot != nil && snapshot.olderThan(c.snapshot) {
// If this happens a lot, we could run this function in a mutex and only return one result
glog.Infof("Not caching concurrent AWS DescribeInstances results")
} else {
c.snapshot = snapshot
}
return snapshot, nil
}
// cacheCriteria holds criteria that must hold to use a cached snapshot
type cacheCriteria struct {
// MaxAge indicates the maximum age of a cached snapshot we can accept.
// If set to 0 (i.e. unset), cached values will not time out because of age.
MaxAge time.Duration
// HasInstances is a list of awsInstanceIDs that must be in a cached snapshot for it to be considered valid.
// If an instance is not found in the cached snapshot, the snapshot be ignored and we will re-fetch.
HasInstances []awsInstanceID
}
// describeAllInstancesCached returns all instances, using cached results if applicable
func (c *instanceCache) describeAllInstancesCached(criteria cacheCriteria) (*allInstancesSnapshot, error) {
var err error
snapshot := c.getSnapshot()
if snapshot != nil && !snapshot.MeetsCriteria(criteria) {
snapshot = nil
}
if snapshot == nil {
snapshot, err = c.describeAllInstancesUncached()
if err != nil {
return nil, err
}
} else {
glog.V(6).Infof("EC2 DescribeInstances - using cached results")
}
return snapshot, nil
}
// getSnapshot returns a snapshot if one exists
func (c *instanceCache) getSnapshot() *allInstancesSnapshot {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.snapshot
}
// olderThan is a simple helper to encapsulate timestamp comparison
func (s *allInstancesSnapshot) olderThan(other *allInstancesSnapshot) bool {
// After() is technically broken by time changes until we have monotonic time
return other.timestamp.After(s.timestamp)
}
// MeetsCriteria returns true if the snapshot meets the criteria in cacheCriteria
func (s *allInstancesSnapshot) MeetsCriteria(criteria cacheCriteria) bool {
if criteria.MaxAge > 0 {
// Sub() is technically broken by time changes until we have monotonic time
now := time.Now()
if now.Sub(s.timestamp) > criteria.MaxAge {
glog.V(6).Infof("instanceCache snapshot cannot be used as is older than MaxAge=%s", criteria.MaxAge)
return false
}
}
if len(criteria.HasInstances) != 0 {
for _, id := range criteria.HasInstances {
if nil == s.instances[id] {
glog.V(6).Infof("instanceCache snapshot cannot be used as does not contain instance %s", id)
return false
}
}
}
return true
}
// allInstancesSnapshot holds the results from querying for all instances,
// along with the timestamp for cache-invalidation purposes
type allInstancesSnapshot struct {
timestamp time.Time
instances map[awsInstanceID]*ec2.Instance
}
// FindInstances returns the instances corresponding to the specified ids. If an id is not found, it is ignored.
func (s *allInstancesSnapshot) FindInstances(ids []awsInstanceID) map[awsInstanceID]*ec2.Instance {
m := make(map[awsInstanceID]*ec2.Instance)
for _, id := range ids {
instance := s.instances[id]
if instance != nil {
m[id] = instance
}
}
return m
}

View File

@ -17,7 +17,12 @@ limitations under the License.
package aws
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api/v1"
"testing"
"time"
)
func TestParseInstance(t *testing.T) {
@ -86,4 +91,109 @@ func TestParseInstance(t *testing.T) {
}
}
}
for _, test := range tests {
node := &v1.Node{}
node.Spec.ProviderID = string(test.Kubernetes)
awsInstanceIds, err := mapToAWSInstanceIDs([]*v1.Node{node})
if err != nil {
if !test.ExpectError {
t.Errorf("unexpected error parsing %s: %v", test.Kubernetes, err)
}
} else {
if test.ExpectError {
t.Errorf("expected error parsing %s", test.Kubernetes)
} else if len(awsInstanceIds) != 1 {
t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds)
} else if awsInstanceIds[0] != test.Aws {
t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds)
}
}
awsInstanceIds = mapToAWSInstanceIDsTolerant([]*v1.Node{node})
if test.ExpectError {
if len(awsInstanceIds) != 0 {
t.Errorf("unexpected results parsing %s: %s", test.Kubernetes, awsInstanceIds)
}
} else {
if len(awsInstanceIds) != 1 {
t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds)
} else if awsInstanceIds[0] != test.Aws {
t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds)
}
}
}
}
func TestSnapshotMeetsCriteria(t *testing.T) {
snapshot := &allInstancesSnapshot{timestamp: time.Now().Add(-3601 * time.Second)}
if !snapshot.MeetsCriteria(cacheCriteria{}) {
t.Errorf("Snapshot should always meet empty criteria")
}
if snapshot.MeetsCriteria(cacheCriteria{MaxAge: time.Hour}) {
t.Errorf("Snapshot did not honor MaxAge")
}
if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) {
t.Errorf("Snapshot did not honor HasInstances with missing instances")
}
snapshot.instances = make(map[awsInstanceID]*ec2.Instance)
snapshot.instances[awsInstanceID("i-12345678")] = &ec2.Instance{}
if !snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) {
t.Errorf("Snapshot did not honor HasInstances with matching instances")
}
if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-00000000")}}) {
t.Errorf("Snapshot did not honor HasInstances with partially matching instances")
}
}
func TestOlderThan(t *testing.T) {
t1 := time.Now()
t2 := t1.Add(time.Second)
s1 := &allInstancesSnapshot{timestamp: t1}
s2 := &allInstancesSnapshot{timestamp: t2}
assert.True(t, s1.olderThan(s2), "s1 should be olderThan s2")
assert.False(t, s2.olderThan(s1), "s2 not should be olderThan s1")
assert.False(t, s1.olderThan(s1), "s1 not should be olderThan itself")
}
func TestSnapshotFindInstances(t *testing.T) {
snapshot := &allInstancesSnapshot{}
snapshot.instances = make(map[awsInstanceID]*ec2.Instance)
{
id := awsInstanceID("i-12345678")
snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()}
}
{
id := awsInstanceID("i-23456789")
snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()}
}
instances := snapshot.FindInstances([]awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789"), awsInstanceID("i-00000000")})
if len(instances) != 2 {
t.Errorf("findInstances returned %d results, expected 2", len(instances))
}
for _, id := range []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789")} {
i := instances[id]
if i == nil {
t.Errorf("findInstances did not return %s", id)
continue
}
if aws.StringValue(i.InstanceId) != string(id) {
t.Errorf("findInstances did not return expected instanceId for %s", id)
}
if i != snapshot.instances[id] {
t.Errorf("findInstances did not return expected instance (reference equality) for %s", id)
}
}
}