diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 826989656e0..ae6ebcbc464 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -377,9 +377,7 @@ type Cloud struct { // Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance selfAWSInstance *awsInstance - mutex sync.Mutex - lastNodeNames sets.String - lastInstancesByNodeNames []*ec2.Instance + instanceCache instanceCache // We keep an active list of devices we have assigned but not yet // attached, to avoid a race condition where we assign a device mapping @@ -862,6 +860,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { attaching: make(map[types.NodeName]map[mountDevice]awsVolumeID), deviceAllocators: make(map[types.NodeName]DeviceAllocator), } + awsCloud.instanceCache.cloud = awsCloud if cfg.Global.VPC != "" && cfg.Global.SubnetID != "" && (cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "") { // When the master is running on a different AWS account, cloud provider or on-premise @@ -2556,14 +2555,6 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts return listener, nil } -func nodeNames(nodes []*v1.Node) sets.String { - ret := sets.String{} - for _, node := range nodes { - ret.Insert(node.Name) - } - return ret -} - // EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { annotations := apiService.Annotations @@ -2601,7 +2592,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB") } - instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running") + instances, err := c.findInstancesForELB(nodes) if err != nil { return nil, err } @@ -2955,7 +2946,7 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) // Open security group ingress rules on the instances so that the load balancer can talk to them // Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances -func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, allInstances []*ec2.Instance) error { +func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, instances map[awsInstanceID]*ec2.Instance) error { if c.cfg.Global.DisableSecurityGroupIngress { return nil } @@ -3010,7 +3001,7 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer instanceSecurityGroupIds := map[string]bool{} // Scan instances for groups we want open - for _, instance := range allInstances { + for _, instance := range instances { securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups) if err != nil { return err @@ -3188,7 +3179,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { - instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running") + instances, err := c.findInstancesForELB(nodes) if err != nil { return err } @@ -3203,7 +3194,7 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node return fmt.Errorf("Load balancer not found") } - err = c.ensureLoadBalancerInstances(orEmpty(lb.LoadBalancerName), lb.Instances, instances) + err = c.ensureLoadBalancerInstances(aws.StringValue(lb.LoadBalancerName), lb.Instances, instances) if err != nil { return nil } @@ -3260,37 +3251,6 @@ func (c *Cloud) getInstancesByIDs(instanceIDs []*string) (map[string]*ec2.Instan return instancesByID, nil } -// Fetches and caches instances in the given state, by node names; returns an error if any cannot be found. If no states -// are given, no state filter is used and instances of all states are fetched. -// This is implemented with a multi value filter on the node names, fetching the desired instances with a single query. -// TODO(therc): make all the caching more rational during the 1.4 timeframe -func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String, states ...string) ([]*ec2.Instance, error) { - c.mutex.Lock() - defer c.mutex.Unlock() - if nodeNames.Equal(c.lastNodeNames) { - if len(c.lastInstancesByNodeNames) > 0 { - // We assume that if the list of nodes is the same, the underlying - // instances have not changed. Later we might guard this with TTLs. - glog.V(2).Infof("Returning cached instances for %v", nodeNames) - return c.lastInstancesByNodeNames, nil - } - } - instances, err := c.getInstancesByNodeNames(nodeNames.List(), states...) - - if err != nil { - return nil, err - } - - if len(instances) == 0 { - return nil, nil - } - - glog.V(2).Infof("Caching instances for %v", nodeNames) - c.lastNodeNames = nodeNames - c.lastInstancesByNodeNames = instances - return instances, nil -} - func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([]*ec2.Instance, error) { names := aws.StringSlice(nodeNames) ec2Instances := []*ec2.Instance{} @@ -3328,6 +3288,7 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([ return ec2Instances, nil } +// TODO: Move to instanceCache func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { filters = c.tagging.addFilters(filters) request := &ec2.DescribeInstancesInput{ diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go index 46e1a05cf21..5147e584390 100644 --- a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go @@ -28,6 +28,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/api/v1" ) const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled" @@ -417,10 +418,10 @@ func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDesc } // Makes sure that exactly the specified hosts are registered as instances with the load balancer -func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { +func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[awsInstanceID]*ec2.Instance) error { expected := sets.NewString() - for _, instance := range instances { - expected.Insert(orEmpty(instance.InstanceId)) + for id := range instanceIDs { + expected.Insert(string(id)) } actual := sets.NewString() @@ -519,3 +520,25 @@ func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool { return false } + +// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB +// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider, +// and we ignore instances which are not found +func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[awsInstanceID]*ec2.Instance, error) { + // Map to instance ids ignoring Nodes where we cannot find the id (but logging) + instanceIDs := mapToAWSInstanceIDsTolerant(nodes) + + cacheCriteria := cacheCriteria{ + // MaxAge not required, because we only care about security groups, which should not change + HasInstances: instanceIDs, // Refresh if any of the instance ids are missing + } + snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria) + if err != nil { + return nil, err + } + + instances := snapshot.FindInstances(instanceIDs) + // We ignore instances that cannot be found + + return instances, nil +} diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 0fbc86eb668..7c1b9b56da4 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -1012,62 +1012,6 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { } } -func TestFindInstancesByNodeNameCached(t *testing.T) { - awsServices := NewFakeAWSServices() - - nodeNameOne := "my-dns.internal" - nodeNameTwo := "my-dns-two.internal" - - var tag ec2.Tag - tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId) - tag.Value = aws.String("") - tags := []*ec2.Tag{&tag} - - var runningInstance ec2.Instance - runningInstance.InstanceId = aws.String("i-running") - runningInstance.PrivateDnsName = aws.String(nodeNameOne) - runningInstance.State = &ec2.InstanceState{Code: aws.Int64(16), Name: aws.String("running")} - runningInstance.Tags = tags - - var secondInstance ec2.Instance - - secondInstance.InstanceId = aws.String("i-running") - secondInstance.PrivateDnsName = aws.String(nodeNameTwo) - secondInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("running")} - secondInstance.Tags = tags - - var terminatedInstance ec2.Instance - terminatedInstance.InstanceId = aws.String("i-terminated") - terminatedInstance.PrivateDnsName = aws.String(nodeNameOne) - terminatedInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("terminated")} - terminatedInstance.Tags = tags - - instances := []*ec2.Instance{&secondInstance, &runningInstance, &terminatedInstance} - awsServices.instances = append(awsServices.instances, instances...) - - c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) - if err != nil { - t.Errorf("Error building aws cloud: %v", err) - return - } - - nodeNames := sets.NewString(nodeNameOne) - returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames, "running") - - if errr != nil { - t.Errorf("Failed to find instance: %v", err) - return - } - - if len(returnedInstances) != 1 { - t.Errorf("Expected a single isntance but found: %v", returnedInstances) - } - - if *returnedInstances[0].PrivateDnsName != nodeNameOne { - t.Errorf("Expected node name %v but got %v", nodeNameOne, returnedInstances[0].PrivateDnsName) - } -} - func TestGetInstanceByNodeNameBatching(t *testing.T) { awsServices := NewFakeAWSServices() c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) diff --git a/pkg/cloudprovider/providers/aws/instances.go b/pkg/cloudprovider/providers/aws/instances.go index 2d8c1ea9655..a8733d42737 100644 --- a/pkg/cloudprovider/providers/aws/instances.go +++ b/pkg/cloudprovider/providers/aws/instances.go @@ -23,6 +23,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/v1" + "sync" + "time" ) // awsInstanceID represents the ID of the instance in the AWS API, e.g. i-12345678 @@ -80,6 +84,42 @@ func (name kubernetesInstanceID) mapToAWSInstanceID() (awsInstanceID, error) { return awsInstanceID(awsID), nil } +// mapToAWSInstanceID extracts the awsInstanceIDs from the Nodes, returning an error if a Node cannot be mapped +func mapToAWSInstanceIDs(nodes []*v1.Node) ([]awsInstanceID, error) { + var instanceIDs []awsInstanceID + for _, node := range nodes { + if node.Spec.ProviderID == "" { + return nil, fmt.Errorf("node %q did not have ProviderID set", node.Name) + } + instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID() + if err != nil { + return nil, fmt.Errorf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) + } + instanceIDs = append(instanceIDs, instanceID) + } + + return instanceIDs, nil +} + +// mapToAWSInstanceIDsTolerant extracts the awsInstanceIDs from the Nodes, skipping Nodes that cannot be mapped +func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []awsInstanceID { + var instanceIDs []awsInstanceID + for _, node := range nodes { + if node.Spec.ProviderID == "" { + glog.Warningf("node %q did not have ProviderID set", node.Name) + continue + } + instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID() + if err != nil { + glog.Warningf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) + continue + } + instanceIDs = append(instanceIDs, instanceID) + } + + return instanceIDs +} + // Gets the full information about this instance from the EC2 API func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, error) { request := &ec2.DescribeInstancesInput{ @@ -98,3 +138,132 @@ func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, e } return instances[0], nil } + +// instanceCache manages the cache of DescribeInstances +type instanceCache struct { + // TODO: Get rid of this field, send all calls through the instanceCache + cloud *Cloud + + mutex sync.Mutex + snapshot *allInstancesSnapshot +} + +// Gets the full information about these instance from the EC2 API +func (c *instanceCache) describeAllInstancesUncached() (*allInstancesSnapshot, error) { + now := time.Now() + + glog.V(4).Infof("EC2 DescribeInstances - fetching all instances") + + filters := []*ec2.Filter{} + instances, err := c.cloud.describeInstances(filters) + if err != nil { + return nil, err + } + + m := make(map[awsInstanceID]*ec2.Instance) + for _, i := range instances { + id := awsInstanceID(aws.StringValue(i.InstanceId)) + m[id] = i + } + + snapshot := &allInstancesSnapshot{now, m} + + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.snapshot != nil && snapshot.olderThan(c.snapshot) { + // If this happens a lot, we could run this function in a mutex and only return one result + glog.Infof("Not caching concurrent AWS DescribeInstances results") + } else { + c.snapshot = snapshot + } + + return snapshot, nil +} + +// cacheCriteria holds criteria that must hold to use a cached snapshot +type cacheCriteria struct { + // MaxAge indicates the maximum age of a cached snapshot we can accept. + // If set to 0 (i.e. unset), cached values will not time out because of age. + MaxAge time.Duration + + // HasInstances is a list of awsInstanceIDs that must be in a cached snapshot for it to be considered valid. + // If an instance is not found in the cached snapshot, the snapshot be ignored and we will re-fetch. + HasInstances []awsInstanceID +} + +// describeAllInstancesCached returns all instances, using cached results if applicable +func (c *instanceCache) describeAllInstancesCached(criteria cacheCriteria) (*allInstancesSnapshot, error) { + var err error + snapshot := c.getSnapshot() + if snapshot != nil && !snapshot.MeetsCriteria(criteria) { + snapshot = nil + } + + if snapshot == nil { + snapshot, err = c.describeAllInstancesUncached() + if err != nil { + return nil, err + } + } else { + glog.V(6).Infof("EC2 DescribeInstances - using cached results") + } + + return snapshot, nil +} + +// getSnapshot returns a snapshot if one exists +func (c *instanceCache) getSnapshot() *allInstancesSnapshot { + c.mutex.Lock() + defer c.mutex.Unlock() + + return c.snapshot +} + +// olderThan is a simple helper to encapsulate timestamp comparison +func (s *allInstancesSnapshot) olderThan(other *allInstancesSnapshot) bool { + // After() is technically broken by time changes until we have monotonic time + return other.timestamp.After(s.timestamp) +} + +// MeetsCriteria returns true if the snapshot meets the criteria in cacheCriteria +func (s *allInstancesSnapshot) MeetsCriteria(criteria cacheCriteria) bool { + if criteria.MaxAge > 0 { + // Sub() is technically broken by time changes until we have monotonic time + now := time.Now() + if now.Sub(s.timestamp) > criteria.MaxAge { + glog.V(6).Infof("instanceCache snapshot cannot be used as is older than MaxAge=%s", criteria.MaxAge) + return false + } + } + + if len(criteria.HasInstances) != 0 { + for _, id := range criteria.HasInstances { + if nil == s.instances[id] { + glog.V(6).Infof("instanceCache snapshot cannot be used as does not contain instance %s", id) + return false + } + } + } + + return true +} + +// allInstancesSnapshot holds the results from querying for all instances, +// along with the timestamp for cache-invalidation purposes +type allInstancesSnapshot struct { + timestamp time.Time + instances map[awsInstanceID]*ec2.Instance +} + +// FindInstances returns the instances corresponding to the specified ids. If an id is not found, it is ignored. +func (s *allInstancesSnapshot) FindInstances(ids []awsInstanceID) map[awsInstanceID]*ec2.Instance { + m := make(map[awsInstanceID]*ec2.Instance) + for _, id := range ids { + instance := s.instances[id] + if instance != nil { + m[id] = instance + } + } + return m +} diff --git a/pkg/cloudprovider/providers/aws/instances_test.go b/pkg/cloudprovider/providers/aws/instances_test.go index 79637ad91d5..6740e686db6 100644 --- a/pkg/cloudprovider/providers/aws/instances_test.go +++ b/pkg/cloudprovider/providers/aws/instances_test.go @@ -17,7 +17,12 @@ limitations under the License. package aws import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api/v1" "testing" + "time" ) func TestParseInstance(t *testing.T) { @@ -86,4 +91,109 @@ func TestParseInstance(t *testing.T) { } } } + + for _, test := range tests { + node := &v1.Node{} + node.Spec.ProviderID = string(test.Kubernetes) + + awsInstanceIds, err := mapToAWSInstanceIDs([]*v1.Node{node}) + if err != nil { + if !test.ExpectError { + t.Errorf("unexpected error parsing %s: %v", test.Kubernetes, err) + } + } else { + if test.ExpectError { + t.Errorf("expected error parsing %s", test.Kubernetes) + } else if len(awsInstanceIds) != 1 { + t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) + } else if awsInstanceIds[0] != test.Aws { + t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) + } + } + + awsInstanceIds = mapToAWSInstanceIDsTolerant([]*v1.Node{node}) + if test.ExpectError { + if len(awsInstanceIds) != 0 { + t.Errorf("unexpected results parsing %s: %s", test.Kubernetes, awsInstanceIds) + } + } else { + if len(awsInstanceIds) != 1 { + t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) + } else if awsInstanceIds[0] != test.Aws { + t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) + } + } + } +} + +func TestSnapshotMeetsCriteria(t *testing.T) { + snapshot := &allInstancesSnapshot{timestamp: time.Now().Add(-3601 * time.Second)} + + if !snapshot.MeetsCriteria(cacheCriteria{}) { + t.Errorf("Snapshot should always meet empty criteria") + } + + if snapshot.MeetsCriteria(cacheCriteria{MaxAge: time.Hour}) { + t.Errorf("Snapshot did not honor MaxAge") + } + + if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) { + t.Errorf("Snapshot did not honor HasInstances with missing instances") + } + + snapshot.instances = make(map[awsInstanceID]*ec2.Instance) + snapshot.instances[awsInstanceID("i-12345678")] = &ec2.Instance{} + + if !snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) { + t.Errorf("Snapshot did not honor HasInstances with matching instances") + } + + if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-00000000")}}) { + t.Errorf("Snapshot did not honor HasInstances with partially matching instances") + } +} + +func TestOlderThan(t *testing.T) { + t1 := time.Now() + t2 := t1.Add(time.Second) + + s1 := &allInstancesSnapshot{timestamp: t1} + s2 := &allInstancesSnapshot{timestamp: t2} + + assert.True(t, s1.olderThan(s2), "s1 should be olderThan s2") + assert.False(t, s2.olderThan(s1), "s2 not should be olderThan s1") + assert.False(t, s1.olderThan(s1), "s1 not should be olderThan itself") +} + +func TestSnapshotFindInstances(t *testing.T) { + snapshot := &allInstancesSnapshot{} + + snapshot.instances = make(map[awsInstanceID]*ec2.Instance) + { + id := awsInstanceID("i-12345678") + snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()} + } + { + id := awsInstanceID("i-23456789") + snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()} + } + + instances := snapshot.FindInstances([]awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789"), awsInstanceID("i-00000000")}) + if len(instances) != 2 { + t.Errorf("findInstances returned %d results, expected 2", len(instances)) + } + + for _, id := range []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789")} { + i := instances[id] + if i == nil { + t.Errorf("findInstances did not return %s", id) + continue + } + if aws.StringValue(i.InstanceId) != string(id) { + t.Errorf("findInstances did not return expected instanceId for %s", id) + } + if i != snapshot.instances[id] { + t.Errorf("findInstances did not return expected instance (reference equality) for %s", id) + } + } }