diff --git a/pkg/cloudprovider/providers/aws/BUILD b/pkg/cloudprovider/providers/aws/BUILD index 67e41d258ff..33249cb917a 100644 --- a/pkg/cloudprovider/providers/aws/BUILD +++ b/pkg/cloudprovider/providers/aws/BUILD @@ -21,6 +21,7 @@ go_library( "regions.go", "retry_handler.go", "sets_ippermissions.go", + "tags.go", "volumes.go", ], tags = ["automanaged"], @@ -56,6 +57,7 @@ go_test( "device_allocator_test.go", "regions_test.go", "retry_handler_test.go", + "tags_test.go", ], library = ":go_default_library", tags = ["automanaged"], diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index d3f9456767b..050f89c4da9 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -54,10 +54,6 @@ import ( // ProviderName is the name of this cloud provider. const ProviderName = "aws" -// TagNameKubernetesCluster is the tag name we use to differentiate multiple -// logically independent clusters running in the same AZ -const TagNameKubernetesCluster = "KubernetesCluster" - // TagNameKubernetesService is the tag name we use to differentiate multiple // services. Used currently for ELBs only. const TagNameKubernetesService = "kubernetes.io/service-name" @@ -359,7 +355,7 @@ type Cloud struct { region string vpcID string - filterTags map[string]string + tagging awsTagging // The AWS instance that we are running on // Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance @@ -388,7 +384,10 @@ type CloudConfig struct { // Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful Zone string + // KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources KubernetesClusterTag string + // KubernetesClusterTag is the cluster id we'll use to identify our cluster resources + KubernetesClusterID string //The aws provider creates an inbound rule per load balancer on the node security //group. However, this can run into the AWS security group rule limit of 50 if @@ -534,12 +533,12 @@ func orEmpty(s *string) string { return aws.StringValue(s) } -func newEc2Filter(name string, value string) *ec2.Filter { +func newEc2Filter(name string, values ...string) *ec2.Filter { filter := &ec2.Filter{ Name: aws.String(name), - Values: []*string{ - aws.String(value), - }, + } + for _, value := range values { + filter.Values = append(filter.Values, aws.String(value)) } return filter } @@ -817,33 +816,21 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { awsCloud.selfAWSInstance = selfAWSInstance awsCloud.vpcID = selfAWSInstance.vpcID - filterTags := map[string]string{} - if cfg.Global.KubernetesClusterTag != "" { - filterTags[TagNameKubernetesCluster] = cfg.Global.KubernetesClusterTag + if cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "" { + if err := awsCloud.tagging.init(cfg.Global.KubernetesClusterTag, cfg.Global.KubernetesClusterID); err != nil { + return nil, err + } } else { // TODO: Clean up double-API query info, err := selfAWSInstance.describeInstance() if err != nil { return nil, err } - for _, tag := range info.Tags { - if orEmpty(tag.Key) == TagNameKubernetesCluster { - filterTags[TagNameKubernetesCluster] = orEmpty(tag.Value) - } + if err := awsCloud.tagging.initFromTags(info.Tags); err != nil { + return nil, err } } - if filterTags[TagNameKubernetesCluster] == "" { - glog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesCluster) - } - - awsCloud.filterTags = filterTags - if len(filterTags) > 0 { - glog.Infof("AWS cloud filtering on tags: %v", filterTags) - } else { - glog.Infof("AWS cloud - no tag filtering") - } - // Register regions, in particular for ECR credentials once.Do(func() { RecognizeWellKnownRegions() @@ -990,15 +977,12 @@ func (c *Cloud) InstanceType(nodeName types.NodeName) (string, error) { // Return a list of instances matching regex string. func (c *Cloud) getInstancesByRegex(regex string) ([]types.NodeName, error) { filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} - filters = c.addFilters(filters) - request := &ec2.DescribeInstancesInput{ - Filters: filters, - } - instances, err := c.ec2.DescribeInstances(request) + instances, err := c.describeInstances(filters) if err != nil { return []types.NodeName{}, err } + if len(instances) == 0 { return []types.NodeName{}, fmt.Errorf("no instances returned") } @@ -1050,15 +1034,12 @@ func (c *Cloud) getAllZones() (sets.String, error) { // TODO: We could also query for subnets, I think filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} - filters = c.addFilters(filters) - request := &ec2.DescribeInstancesInput{ - Filters: filters, - } - instances, err := c.ec2.DescribeInstances(request) + instances, err := c.describeInstances(filters) if err != nil { return nil, err } + if len(instances) == 0 { return nil, fmt.Errorf("no instances returned") } @@ -1647,26 +1628,16 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID)) // apply tags - tags := make(map[string]string) - for k, v := range volumeOptions.Tags { - tags[k] = v - } - - if c.getClusterName() != "" { - tags[TagNameKubernetesCluster] = c.getClusterName() - } - - if len(tags) != 0 { - if err := c.createTags(string(awsID), tags); err != nil { - // delete the volume and hope it succeeds - _, delerr := c.DeleteDisk(volumeName) - if delerr != nil { - // delete did not succeed, we have a stray volume! - return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr) - } - return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err) + if err := c.tagging.createTags(c.ec2, string(awsID), ResourceLifecycleOwned, volumeOptions.Tags); err != nil { + // delete the volume and hope it succeeds + _, delerr := c.DeleteDisk(volumeName) + if delerr != nil { + // delete did not succeed, we have a stray volume! + return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr) } + return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err) } + return volumeName, nil } @@ -2115,36 +2086,6 @@ func (c *Cloud) removeSecurityGroupIngress(securityGroupID string, removePermiss return true, nil } -// Ensure that a resource has the correct tags -// If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, -// and we add the tags. If it has a different cluster's tags, that is an error. -func (c *Cloud) ensureClusterTags(resourceID string, tags []*ec2.Tag) error { - actualTags := make(map[string]string) - for _, tag := range tags { - actualTags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) - } - - addTags := make(map[string]string) - for k, expected := range c.filterTags { - actual := actualTags[k] - if actual == expected { - continue - } - if actual == "" { - glog.Warningf("Resource %q was missing expected cluster tag %q. Will add (with value %q)", resourceID, k, expected) - addTags[k] = expected - } else { - return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected) - } - } - - if err := c.createTags(resourceID, addTags); err != nil { - return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err) - } - - return nil -} - // Makes sure the security group exists. // For multi-cluster isolation, name must be globally unique, for example derived from the service UUID. // Returns the security group id or error @@ -2175,7 +2116,9 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er if len(securityGroups) > 1 { glog.Warningf("Found multiple security groups with name: %q", name) } - err := c.ensureClusterTags(aws.StringValue(securityGroups[0].GroupId), securityGroups[0].Tags) + err := c.tagging.readRepairClusterTags( + c.ec2, aws.StringValue(securityGroups[0].GroupId), + ResourceLifecycleOwned, nil, securityGroups[0].Tags) if err != nil { return "", err } @@ -2212,7 +2155,7 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er return "", fmt.Errorf("created security group, but id was not returned: %s", name) } - err := c.createTags(groupID, c.filterTags) + err := c.tagging.createTags(c.ec2, groupID, ResourceLifecycleOwned, nil) if err != nil { // If we retry, ensureClusterTags will recover from this - it // will add the missing tags. We could delete the security @@ -2223,52 +2166,6 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er return groupID, nil } -// createTags calls EC2 CreateTags, but adds retry-on-failure logic -// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) -// The error code varies though (depending on what we are tagging), so we simply retry on all errors -func (c *Cloud) createTags(resourceID string, tags map[string]string) error { - if tags == nil || len(tags) == 0 { - return nil - } - - var awsTags []*ec2.Tag - for k, v := range tags { - tag := &ec2.Tag{ - Key: aws.String(k), - Value: aws.String(v), - } - awsTags = append(awsTags, tag) - } - - backoff := wait.Backoff{ - Duration: createTagInitialDelay, - Factor: createTagFactor, - Steps: createTagSteps, - } - request := &ec2.CreateTagsInput{} - request.Resources = []*string{&resourceID} - request.Tags = awsTags - - var lastErr error - err := wait.ExponentialBackoff(backoff, func() (bool, error) { - _, err := c.ec2.CreateTags(request) - if err == nil { - return true, nil - } - - // We could check that the error is retryable, but the error code changes based on what we are tagging - // SecurityGroup: InvalidGroup.NotFound - glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err) - lastErr = err - return false, nil - }) - if err == wait.ErrWaitTimeout { - // return real CreateTags error instead of timeout - err = lastErr - } - return err -} - // Finds the value for a given tag. func findTag(tags []*ec2.Tag, key string) (string, bool) { for _, tag := range tags { @@ -2284,18 +2181,23 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) { // However, in future this will likely be treated as an error. func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) { request := &ec2.DescribeSubnetsInput{} - vpcIDFilter := newEc2Filter("vpc-id", c.vpcID) - filters := []*ec2.Filter{vpcIDFilter} - filters = c.addFilters(filters) - request.Filters = filters + filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)} + request.Filters = c.tagging.addFilters(filters) subnets, err := c.ec2.DescribeSubnets(request) if err != nil { return nil, fmt.Errorf("error describing subnets: %v", err) } - if len(subnets) != 0 { - return subnets, nil + var matches []*ec2.Subnet + for _, subnet := range subnets { + if c.tagging.hasClusterTag(subnet.Tags) { + matches = append(matches, subnet) + } + } + + if len(matches) != 0 { + return matches, nil } // Fall back to the current instance subnets, if nothing is tagged @@ -2850,7 +2752,7 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m // Return all the security groups that are tagged as being part of our cluster func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) { request := &ec2.DescribeSecurityGroupsInput{} - request.Filters = c.addFilters(nil) + request.Filters = c.tagging.addFilters(nil) groups, err := c.ec2.DescribeSecurityGroups(request) if err != nil { return nil, fmt.Errorf("error querying security groups: %v", err) @@ -2858,6 +2760,10 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) m := make(map[string]*ec2.SecurityGroup) for _, group := range groups { + if !c.tagging.hasClusterTag(group.Tags) { + continue + } + id := aws.StringValue(group.GroupId) if id == "" { glog.Warningf("Ignoring group without id: %v", group) @@ -2892,13 +2798,23 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer } // Get the actual list of groups that allow ingress from the load-balancer - describeRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{} - filters = append(filters, newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID)) - describeRequest.Filters = c.addFilters(filters) - actualGroups, err := c.ec2.DescribeSecurityGroups(describeRequest) - if err != nil { - return fmt.Errorf("error querying security groups for ELB: %v", err) + var actualGroups []*ec2.SecurityGroup + { + describeRequest := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{ + newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID), + } + describeRequest.Filters = c.tagging.addFilters(filters) + response, err := c.ec2.DescribeSecurityGroups(describeRequest) + if err != nil { + return fmt.Errorf("error querying security groups for ELB: %v", err) + } + for _, sg := range response { + if !c.tagging.hasClusterTag(sg.Tags) { + continue + } + actualGroups = append(actualGroups, sg) + } } taggedSecurityGroups, err := c.getTaggedSecurityGroups() @@ -3187,12 +3103,7 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins newEc2Filter("instance-state-name", "running"), } - filters = c.addFilters(filters) - request := &ec2.DescribeInstancesInput{ - Filters: filters, - } - - instances, err := c.ec2.DescribeInstances(request) + instances, err := c.describeInstances(filters) if err != nil { glog.V(2).Infof("Failed to describe instances %v", nodeNames) return nil, err @@ -3209,6 +3120,26 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins return instances, nil } +func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { + filters = c.tagging.addFilters(filters) + request := &ec2.DescribeInstancesInput{ + Filters: filters, + } + + response, err := c.ec2.DescribeInstances(request) + if err != nil { + return nil, err + } + + var matches []*ec2.Instance + for _, instance := range response { + if c.tagging.hasClusterTag(instance.Tags) { + matches = append(matches, instance) + } + } + return matches, nil +} + // mapNodeNameToPrivateDNSName maps a k8s NodeName to an AWS Instance PrivateDNSName // This is a simple string cast func mapNodeNameToPrivateDNSName(nodeName types.NodeName) string { @@ -3228,15 +3159,12 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, newEc2Filter("private-dns-name", privateDNSName), newEc2Filter("instance-state-name", "running"), } - filters = c.addFilters(filters) - request := &ec2.DescribeInstancesInput{ - Filters: filters, - } - instances, err := c.ec2.DescribeInstances(request) + instances, err := c.describeInstances(filters) if err != nil { return nil, err } + if len(instances) == 0 { return nil, nil } @@ -3268,23 +3196,3 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins awsInstance := newAWSInstance(c.ec2, instance) return awsInstance, instance, err } - -// Add additional filters, to match on our tags -// This lets us run multiple k8s clusters in a single EC2 AZ -func (c *Cloud) addFilters(filters []*ec2.Filter) []*ec2.Filter { - for k, v := range c.filterTags { - filters = append(filters, newEc2Filter("tag:"+k, v)) - } - if len(filters) == 0 { - // We can't pass a zero-length Filters to AWS (it's an error) - // So if we end up with no filters; just return nil - return nil - } - - return filters -} - -// Returns the cluster name or an empty string -func (c *Cloud) getClusterName() string { - return c.filterTags[TagNameKubernetesCluster] -} diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go index 8bacc6035a4..2c33e708f66 100644 --- a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go @@ -55,9 +55,14 @@ func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBala createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) - createRequest.Tags = []*elb.Tag{ - {Key: aws.String(TagNameKubernetesCluster), Value: aws.String(c.getClusterName())}, - {Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())}, + tags := c.tagging.buildTags(ResourceLifecycleOwned, map[string]string{ + TagNameKubernetesService: namespacedName.String(), + }) + + for k, v := range tags { + createRequest.Tags = append(createRequest.Tags, &elb.Tag{ + Key: aws.String(k), Value: aws.String(v), + }) } glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName) diff --git a/pkg/cloudprovider/providers/aws/aws_routes.go b/pkg/cloudprovider/providers/aws/aws_routes.go index c2b2ae50a8b..4246849e8c4 100644 --- a/pkg/cloudprovider/providers/aws/aws_routes.go +++ b/pkg/cloudprovider/providers/aws/aws_routes.go @@ -29,14 +29,20 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) { // This should be unnecessary (we already filter on TagNameKubernetesCluster, // and something is broken if cluster name doesn't match, but anyway... // TODO: All clouds should be cluster-aware by default - filters := []*ec2.Filter{newEc2Filter("tag:"+TagNameKubernetesCluster, clusterName)} - request := &ec2.DescribeRouteTablesInput{Filters: c.addFilters(filters)} + request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)} - tables, err := c.ec2.DescribeRouteTables(request) + response, err := c.ec2.DescribeRouteTables(request) if err != nil { return nil, err } + var tables []*ec2.RouteTable + for _, table := range response { + if c.tagging.hasClusterTag(table.Tags) { + tables = append(tables, table) + } + } + if len(tables) == 0 { return nil, fmt.Errorf("unable to find route table for AWS cluster: %s", clusterName) } diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 4096a7b4141..49efa3ba56e 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -146,7 +146,7 @@ func NewFakeAWSServices() *FakeAWSServices { s.instances = []*ec2.Instance{selfInstance} var tag ec2.Tag - tag.Key = aws.String(TagNameKubernetesCluster) + tag.Key = aws.String(TagNameKubernetesClusterLegacy) tag.Value = aws.String(TestClusterId) selfInstance.Tags = []*ec2.Tag{&tag} @@ -177,24 +177,6 @@ func (s *FakeAWSServices) Metadata() (EC2Metadata, error) { return s.metadata, nil } -func TestFilterTags(t *testing.T) { - awsServices := NewFakeAWSServices() - c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) - if err != nil { - t.Errorf("Error building aws cloud: %v", err) - return - } - - if len(c.filterTags) != 1 { - t.Errorf("unexpected filter tags: %v", c.filterTags) - return - } - - if c.filterTags[TagNameKubernetesCluster] != TestClusterId { - t.Errorf("unexpected filter tags: %v", c.filterTags) - } -} - func TestNewAWSCloud(t *testing.T) { tests := []struct { name string @@ -279,6 +261,15 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { return contains(filter.Values, *instance.State.Name) } + if name == "tag-key" { + for _, instanceTag := range instance.Tags { + if contains(filter.Values, aws.StringValue(instanceTag.Key)) { + return true + } + } + return false + } + if strings.HasPrefix(name, "tag:") { tagName := name[4:] for _, instanceTag := range instance.Tags { @@ -286,7 +277,9 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { return true } } + return false } + panic("Unknown filter name: " + name) } @@ -969,13 +962,14 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) { t.Errorf("Should have not been considered equal since first is not in the second array of groups") } } + func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { awsServices := NewFakeAWSServices() nodeName := types.NodeName("my-dns.internal") var tag ec2.Tag - tag.Key = aws.String(TagNameKubernetesCluster) + tag.Key = aws.String(TagNameKubernetesClusterLegacy) tag.Value = aws.String(TestClusterId) tags := []*ec2.Tag{&tag} @@ -1019,8 +1013,8 @@ func TestFindInstancesByNodeNameCached(t *testing.T) { nodeNameTwo := "my-dns-two.internal" var tag ec2.Tag - tag.Key = aws.String(TagNameKubernetesCluster) - tag.Value = aws.String(TestClusterId) + tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId) + tag.Value = aws.String("") tags := []*ec2.Tag{&tag} var runningInstance ec2.Instance diff --git a/pkg/cloudprovider/providers/aws/tags.go b/pkg/cloudprovider/providers/aws/tags.go new file mode 100644 index 00000000000..e135da11ff3 --- /dev/null +++ b/pkg/cloudprovider/providers/aws/tags.go @@ -0,0 +1,256 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "strings" +) + +// TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple +// logically independent clusters running in the same AZ. +// The tag key = TagNameKubernetesClusterPrefix + clusterID +// The tag value is an ownership value +const TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/" + +// TagNameKubernetesClusterLegacy is the legacy tag name we use to differentiate multiple +// logically independent clusters running in the same AZ. The problem with it was that it +// did not allow shared resources. +const TagNameKubernetesClusterLegacy = "KubernetesCluster" + +type ResourceLifecycle string + +const ( + // ResourceLifecycleOwned is the value we use when tagging resources to indicate + // that the resource is considered owned and managed by the cluster, + // and in particular that the lifecycle is tied to the lifecycle of the cluster. + ResourceLifecycleOwned = "owned" + // ResourceLifecycleShared is the value we use when tagging resources to indicate + // that the resource is shared between multiple clusters, and should not be destroyed + // if the cluster is destroyed. + ResourceLifecycleShared = "shared" +) + +type awsTagging struct { + // ClusterID is our cluster identifier: we tag AWS resources with this value, + // and thus we can run two independent clusters in the same VPC or subnets. + // This gives us similar functionality to GCE projects. + ClusterID string + + // usesLegacyTags is true if we are using the legacy TagNameKubernetesClusterLegacy tags + usesLegacyTags bool +} + +func (t *awsTagging) init(legacyClusterID string, clusterID string) error { + if legacyClusterID != "" { + if clusterID != "" && legacyClusterID != clusterID { + return fmt.Errorf("ClusterID tags did not match: %q vs %q", clusterID, legacyClusterID) + } + t.usesLegacyTags = true + clusterID = legacyClusterID + } + + t.ClusterID = clusterID + + if clusterID != "" { + glog.Infof("AWS cloud filtering on ClusterID: %v", clusterID) + } else { + glog.Infof("AWS cloud - no clusterID filtering") + } + + return nil +} + +// Extracts a clusterID from the given tags, if one is present +// If no clusterID is found, returns "", nil +// If multiple (different) clusterIDs are found, returns an error +func (t *awsTagging) initFromTags(tags []*ec2.Tag) error { + legacyClusterID, newClusterID, err := findClusterIDs(tags) + if err != nil { + return err + } + + if legacyClusterID == "" && newClusterID == "" { + glog.Errorf("Tag %q nor %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterLegacy, TagNameKubernetesClusterPrefix+"...") + } + + return t.init(legacyClusterID, newClusterID) +} + +// Extracts the legacy & new cluster ids from the given tags, if they are present +// If duplicate tags are found, returns an error +func findClusterIDs(tags []*ec2.Tag) (string, string, error) { + legacyClusterID := "" + newClusterID := "" + + for _, tag := range tags { + tagKey := aws.StringValue(tag.Key) + if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) { + id := strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix) + if newClusterID != "" { + return "", "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, newClusterID, id) + } + newClusterID = id + } + + if tagKey == TagNameKubernetesClusterLegacy { + id := aws.StringValue(tag.Value) + if legacyClusterID != "" { + return "", "", fmt.Errorf("Found multiple %s tags (%q and %q)", TagNameKubernetesClusterLegacy, legacyClusterID, id) + } + legacyClusterID = id + } + } + + return legacyClusterID, newClusterID, nil +} + +func (t *awsTagging) clusterTagKey() string { + return TagNameKubernetesClusterPrefix + t.ClusterID +} + +func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool { + clusterTagKey := t.clusterTagKey() + for _, tag := range tags { + tagKey := aws.StringValue(tag.Key) + // For 1.6, we continue to recognize the legacy tags, for the 1.5 -> 1.6 upgrade + if tagKey == TagNameKubernetesClusterLegacy { + return aws.StringValue(tag.Value) == t.ClusterID + } + + if tagKey == clusterTagKey { + return true + } + } + return false +} + +// Ensure that a resource has the correct tags +// If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, +// and we add the tags. If it has a different cluster's tags, that is an error. +func (c *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error { + actualTagMap := make(map[string]string) + for _, tag := range observedTags { + actualTagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) + } + + expectedTags := c.buildTags(lifecycle, additionalTags) + + addTags := make(map[string]string) + for k, expected := range expectedTags { + actual := actualTagMap[k] + if actual == expected { + continue + } + if actual == "" { + glog.Warningf("Resource %q was missing expected cluster tag %q. Will add (with value %q)", resourceID, k, expected) + addTags[k] = expected + } else { + return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected) + } + } + + if err := c.createTags(client, resourceID, lifecycle, additionalTags); err != nil { + return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err) + } + + return nil +} + +// createTags calls EC2 CreateTags, but adds retry-on-failure logic +// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) +// The error code varies though (depending on what we are tagging), so we simply retry on all errors +func (t *awsTagging) createTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error { + tags := t.buildTags(lifecycle, additionalTags) + + if tags == nil || len(tags) == 0 { + return nil + } + + var awsTags []*ec2.Tag + for k, v := range tags { + tag := &ec2.Tag{ + Key: aws.String(k), + Value: aws.String(v), + } + awsTags = append(awsTags, tag) + } + + backoff := wait.Backoff{ + Duration: createTagInitialDelay, + Factor: createTagFactor, + Steps: createTagSteps, + } + request := &ec2.CreateTagsInput{} + request.Resources = []*string{&resourceID} + request.Tags = awsTags + + var lastErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + _, err := client.CreateTags(request) + if err == nil { + return true, nil + } + + // We could check that the error is retryable, but the error code changes based on what we are tagging + // SecurityGroup: InvalidGroup.NotFound + glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err) + lastErr = err + return false, nil + }) + if err == wait.ErrWaitTimeout { + // return real CreateTags error instead of timeout + err = lastErr + } + return err +} + +// Add additional filters, to match on our tags +// This lets us run multiple k8s clusters in a single EC2 AZ +func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter { + // For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade + // There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter + f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey()) + filters = append(filters, f) + + if len(filters) == 0 { + // We can't pass a zero-length Filters to AWS (it's an error) + // So if we end up with no filters; just return nil + return nil + } + + return filters +} + +func (t *awsTagging) buildTags(lifecycle ResourceLifecycle, additionalTags map[string]string) map[string]string { + tags := make(map[string]string) + for k, v := range additionalTags { + tags[k] = v + } + // We only create legacy tags if we are using legacy tags, i.e. if we have seen a legacy tag on our instance + if t.usesLegacyTags { + tags[TagNameKubernetesClusterLegacy] = t.ClusterID + } + tags[t.clusterTagKey()] = string(lifecycle) + + return tags +} diff --git a/pkg/cloudprovider/providers/aws/tags_test.go b/pkg/cloudprovider/providers/aws/tags_test.go new file mode 100644 index 00000000000..5721a54bf31 --- /dev/null +++ b/pkg/cloudprovider/providers/aws/tags_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "strings" + "testing" +) + +func TestFilterTags(t *testing.T) { + awsServices := NewFakeAWSServices() + c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) + if err != nil { + t.Errorf("Error building aws cloud: %v", err) + return + } + + if c.tagging.ClusterID != TestClusterId { + t.Errorf("unexpected ClusterID: %v", c.tagging.ClusterID) + } +} + +func TestFindClusterID(t *testing.T) { + grid := []struct { + Tags map[string]string + ExpectedNew string + ExpectedLegacy string + ExpectError bool + }{ + { + Tags: map[string]string{}, + }, + { + Tags: map[string]string{ + TagNameKubernetesClusterLegacy: "a", + }, + ExpectedLegacy: "a", + }, + { + Tags: map[string]string{ + TagNameKubernetesClusterPrefix + "a": "owned", + }, + ExpectedNew: "a", + }, + { + Tags: map[string]string{ + TagNameKubernetesClusterPrefix + "a": "", + }, + ExpectedNew: "a", + }, + { + Tags: map[string]string{ + TagNameKubernetesClusterLegacy: "a", + TagNameKubernetesClusterPrefix + "a": "", + }, + ExpectedLegacy: "a", + ExpectedNew: "a", + }, + { + Tags: map[string]string{ + TagNameKubernetesClusterPrefix + "a": "", + TagNameKubernetesClusterPrefix + "b": "", + }, + ExpectError: true, + }, + } + for _, g := range grid { + var ec2Tags []*ec2.Tag + for k, v := range g.Tags { + ec2Tags = append(ec2Tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)}) + } + actualLegacy, actualNew, err := findClusterIDs(ec2Tags) + if g.ExpectError { + if err == nil { + t.Errorf("expected error for tags %v", g.Tags) + continue + } + } else { + if err != nil { + t.Errorf("unexpected error for tags %v: %v", g.Tags, err) + continue + } + + if g.ExpectedNew != actualNew { + t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedNew, actualNew) + continue + } + + if g.ExpectedLegacy != actualLegacy { + t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedLegacy, actualLegacy) + continue + } + } + } +}