AWS: Support shared tag

We recognize an additional cluster tag:

kubernetes.io/cluster/<clusterid>

This now allows us to share resources, in particular subnets.

In addition, the value is used to track ownership/lifecycle.  When we
create objects, we record the value as "owned".

We also refactor out tags into its own file & class, as we are touching
most of these functions anyway.
This commit is contained in:
Justin Santa Barbara 2017-02-18 13:11:08 -05:00
parent ff12e5688c
commit 0b5ae5391e
7 changed files with 488 additions and 206 deletions

View File

@ -21,6 +21,7 @@ go_library(
"regions.go",
"retry_handler.go",
"sets_ippermissions.go",
"tags.go",
"volumes.go",
],
tags = ["automanaged"],
@ -56,6 +57,7 @@ go_test(
"device_allocator_test.go",
"regions_test.go",
"retry_handler_test.go",
"tags_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],

View File

@ -54,10 +54,6 @@ import (
// ProviderName is the name of this cloud provider.
const ProviderName = "aws"
// TagNameKubernetesCluster is the tag name we use to differentiate multiple
// logically independent clusters running in the same AZ
const TagNameKubernetesCluster = "KubernetesCluster"
// TagNameKubernetesService is the tag name we use to differentiate multiple
// services. Used currently for ELBs only.
const TagNameKubernetesService = "kubernetes.io/service-name"
@ -359,7 +355,7 @@ type Cloud struct {
region string
vpcID string
filterTags map[string]string
tagging awsTagging
// The AWS instance that we are running on
// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance
@ -388,7 +384,10 @@ type CloudConfig struct {
// Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful
Zone string
// KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources
KubernetesClusterTag string
// KubernetesClusterTag is the cluster id we'll use to identify our cluster resources
KubernetesClusterID string
//The aws provider creates an inbound rule per load balancer on the node security
//group. However, this can run into the AWS security group rule limit of 50 if
@ -534,12 +533,12 @@ func orEmpty(s *string) string {
return aws.StringValue(s)
}
func newEc2Filter(name string, value string) *ec2.Filter {
func newEc2Filter(name string, values ...string) *ec2.Filter {
filter := &ec2.Filter{
Name: aws.String(name),
Values: []*string{
aws.String(value),
},
}
for _, value := range values {
filter.Values = append(filter.Values, aws.String(value))
}
return filter
}
@ -817,33 +816,21 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
awsCloud.selfAWSInstance = selfAWSInstance
awsCloud.vpcID = selfAWSInstance.vpcID
filterTags := map[string]string{}
if cfg.Global.KubernetesClusterTag != "" {
filterTags[TagNameKubernetesCluster] = cfg.Global.KubernetesClusterTag
if cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "" {
if err := awsCloud.tagging.init(cfg.Global.KubernetesClusterTag, cfg.Global.KubernetesClusterID); err != nil {
return nil, err
}
} else {
// TODO: Clean up double-API query
info, err := selfAWSInstance.describeInstance()
if err != nil {
return nil, err
}
for _, tag := range info.Tags {
if orEmpty(tag.Key) == TagNameKubernetesCluster {
filterTags[TagNameKubernetesCluster] = orEmpty(tag.Value)
}
if err := awsCloud.tagging.initFromTags(info.Tags); err != nil {
return nil, err
}
}
if filterTags[TagNameKubernetesCluster] == "" {
glog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesCluster)
}
awsCloud.filterTags = filterTags
if len(filterTags) > 0 {
glog.Infof("AWS cloud filtering on tags: %v", filterTags)
} else {
glog.Infof("AWS cloud - no tag filtering")
}
// Register regions, in particular for ECR credentials
once.Do(func() {
RecognizeWellKnownRegions()
@ -990,15 +977,12 @@ func (c *Cloud) InstanceType(nodeName types.NodeName) (string, error) {
// Return a list of instances matching regex string.
func (c *Cloud) getInstancesByRegex(regex string) ([]types.NodeName, error) {
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
instances, err := c.describeInstances(filters)
if err != nil {
return []types.NodeName{}, err
}
if len(instances) == 0 {
return []types.NodeName{}, fmt.Errorf("no instances returned")
}
@ -1050,15 +1034,12 @@ func (c *Cloud) getAllZones() (sets.String, error) {
// TODO: We could also query for subnets, I think
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
instances, err := c.describeInstances(filters)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no instances returned")
}
@ -1647,26 +1628,16 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er
volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID))
// apply tags
tags := make(map[string]string)
for k, v := range volumeOptions.Tags {
tags[k] = v
}
if c.getClusterName() != "" {
tags[TagNameKubernetesCluster] = c.getClusterName()
}
if len(tags) != 0 {
if err := c.createTags(string(awsID), tags); err != nil {
// delete the volume and hope it succeeds
_, delerr := c.DeleteDisk(volumeName)
if delerr != nil {
// delete did not succeed, we have a stray volume!
return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr)
}
return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err)
if err := c.tagging.createTags(c.ec2, string(awsID), ResourceLifecycleOwned, volumeOptions.Tags); err != nil {
// delete the volume and hope it succeeds
_, delerr := c.DeleteDisk(volumeName)
if delerr != nil {
// delete did not succeed, we have a stray volume!
return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr)
}
return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err)
}
return volumeName, nil
}
@ -2115,36 +2086,6 @@ func (c *Cloud) removeSecurityGroupIngress(securityGroupID string, removePermiss
return true, nil
}
// Ensure that a resource has the correct tags
// If it has no tags, we assume that this was a problem caused by an error in between creation and tagging,
// and we add the tags. If it has a different cluster's tags, that is an error.
func (c *Cloud) ensureClusterTags(resourceID string, tags []*ec2.Tag) error {
actualTags := make(map[string]string)
for _, tag := range tags {
actualTags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
}
addTags := make(map[string]string)
for k, expected := range c.filterTags {
actual := actualTags[k]
if actual == expected {
continue
}
if actual == "" {
glog.Warningf("Resource %q was missing expected cluster tag %q. Will add (with value %q)", resourceID, k, expected)
addTags[k] = expected
} else {
return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected)
}
}
if err := c.createTags(resourceID, addTags); err != nil {
return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err)
}
return nil
}
// Makes sure the security group exists.
// For multi-cluster isolation, name must be globally unique, for example derived from the service UUID.
// Returns the security group id or error
@ -2175,7 +2116,9 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er
if len(securityGroups) > 1 {
glog.Warningf("Found multiple security groups with name: %q", name)
}
err := c.ensureClusterTags(aws.StringValue(securityGroups[0].GroupId), securityGroups[0].Tags)
err := c.tagging.readRepairClusterTags(
c.ec2, aws.StringValue(securityGroups[0].GroupId),
ResourceLifecycleOwned, nil, securityGroups[0].Tags)
if err != nil {
return "", err
}
@ -2212,7 +2155,7 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er
return "", fmt.Errorf("created security group, but id was not returned: %s", name)
}
err := c.createTags(groupID, c.filterTags)
err := c.tagging.createTags(c.ec2, groupID, ResourceLifecycleOwned, nil)
if err != nil {
// If we retry, ensureClusterTags will recover from this - it
// will add the missing tags. We could delete the security
@ -2223,52 +2166,6 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er
return groupID, nil
}
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
func (c *Cloud) createTags(resourceID string, tags map[string]string) error {
if tags == nil || len(tags) == 0 {
return nil
}
var awsTags []*ec2.Tag
for k, v := range tags {
tag := &ec2.Tag{
Key: aws.String(k),
Value: aws.String(v),
}
awsTags = append(awsTags, tag)
}
backoff := wait.Backoff{
Duration: createTagInitialDelay,
Factor: createTagFactor,
Steps: createTagSteps,
}
request := &ec2.CreateTagsInput{}
request.Resources = []*string{&resourceID}
request.Tags = awsTags
var lastErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := c.ec2.CreateTags(request)
if err == nil {
return true, nil
}
// We could check that the error is retryable, but the error code changes based on what we are tagging
// SecurityGroup: InvalidGroup.NotFound
glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err)
lastErr = err
return false, nil
})
if err == wait.ErrWaitTimeout {
// return real CreateTags error instead of timeout
err = lastErr
}
return err
}
// Finds the value for a given tag.
func findTag(tags []*ec2.Tag, key string) (string, bool) {
for _, tag := range tags {
@ -2284,18 +2181,23 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) {
// However, in future this will likely be treated as an error.
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
request := &ec2.DescribeSubnetsInput{}
vpcIDFilter := newEc2Filter("vpc-id", c.vpcID)
filters := []*ec2.Filter{vpcIDFilter}
filters = c.addFilters(filters)
request.Filters = filters
filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
request.Filters = c.tagging.addFilters(filters)
subnets, err := c.ec2.DescribeSubnets(request)
if err != nil {
return nil, fmt.Errorf("error describing subnets: %v", err)
}
if len(subnets) != 0 {
return subnets, nil
var matches []*ec2.Subnet
for _, subnet := range subnets {
if c.tagging.hasClusterTag(subnet.Tags) {
matches = append(matches, subnet)
}
}
if len(matches) != 0 {
return matches, nil
}
// Fall back to the current instance subnets, if nothing is tagged
@ -2850,7 +2752,7 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m
// Return all the security groups that are tagged as being part of our cluster
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
request := &ec2.DescribeSecurityGroupsInput{}
request.Filters = c.addFilters(nil)
request.Filters = c.tagging.addFilters(nil)
groups, err := c.ec2.DescribeSecurityGroups(request)
if err != nil {
return nil, fmt.Errorf("error querying security groups: %v", err)
@ -2858,6 +2760,10 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error)
m := make(map[string]*ec2.SecurityGroup)
for _, group := range groups {
if !c.tagging.hasClusterTag(group.Tags) {
continue
}
id := aws.StringValue(group.GroupId)
if id == "" {
glog.Warningf("Ignoring group without id: %v", group)
@ -2892,13 +2798,23 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
}
// Get the actual list of groups that allow ingress from the load-balancer
describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{}
filters = append(filters, newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID))
describeRequest.Filters = c.addFilters(filters)
actualGroups, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil {
return fmt.Errorf("error querying security groups for ELB: %v", err)
var actualGroups []*ec2.SecurityGroup
{
describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
}
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil {
return fmt.Errorf("error querying security groups for ELB: %v", err)
}
for _, sg := range response {
if !c.tagging.hasClusterTag(sg.Tags) {
continue
}
actualGroups = append(actualGroups, sg)
}
}
taggedSecurityGroups, err := c.getTaggedSecurityGroups()
@ -3187,12 +3103,7 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
newEc2Filter("instance-state-name", "running"),
}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
instances, err := c.describeInstances(filters)
if err != nil {
glog.V(2).Infof("Failed to describe instances %v", nodeNames)
return nil, err
@ -3209,6 +3120,26 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
return instances, nil
}
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
filters = c.tagging.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
response, err := c.ec2.DescribeInstances(request)
if err != nil {
return nil, err
}
var matches []*ec2.Instance
for _, instance := range response {
if c.tagging.hasClusterTag(instance.Tags) {
matches = append(matches, instance)
}
}
return matches, nil
}
// mapNodeNameToPrivateDNSName maps a k8s NodeName to an AWS Instance PrivateDNSName
// This is a simple string cast
func mapNodeNameToPrivateDNSName(nodeName types.NodeName) string {
@ -3228,15 +3159,12 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance,
newEc2Filter("private-dns-name", privateDNSName),
newEc2Filter("instance-state-name", "running"),
}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := c.ec2.DescribeInstances(request)
instances, err := c.describeInstances(filters)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, nil
}
@ -3268,23 +3196,3 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
awsInstance := newAWSInstance(c.ec2, instance)
return awsInstance, instance, err
}
// Add additional filters, to match on our tags
// This lets us run multiple k8s clusters in a single EC2 AZ
func (c *Cloud) addFilters(filters []*ec2.Filter) []*ec2.Filter {
for k, v := range c.filterTags {
filters = append(filters, newEc2Filter("tag:"+k, v))
}
if len(filters) == 0 {
// We can't pass a zero-length Filters to AWS (it's an error)
// So if we end up with no filters; just return nil
return nil
}
return filters
}
// Returns the cluster name or an empty string
func (c *Cloud) getClusterName() string {
return c.filterTags[TagNameKubernetesCluster]
}

View File

@ -55,9 +55,14 @@ func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBala
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
createRequest.Tags = []*elb.Tag{
{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(c.getClusterName())},
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
tags := c.tagging.buildTags(ResourceLifecycleOwned, map[string]string{
TagNameKubernetesService: namespacedName.String(),
})
for k, v := range tags {
createRequest.Tags = append(createRequest.Tags, &elb.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)

View File

@ -29,14 +29,20 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
// This should be unnecessary (we already filter on TagNameKubernetesCluster,
// and something is broken if cluster name doesn't match, but anyway...
// TODO: All clouds should be cluster-aware by default
filters := []*ec2.Filter{newEc2Filter("tag:"+TagNameKubernetesCluster, clusterName)}
request := &ec2.DescribeRouteTablesInput{Filters: c.addFilters(filters)}
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
tables, err := c.ec2.DescribeRouteTables(request)
response, err := c.ec2.DescribeRouteTables(request)
if err != nil {
return nil, err
}
var tables []*ec2.RouteTable
for _, table := range response {
if c.tagging.hasClusterTag(table.Tags) {
tables = append(tables, table)
}
}
if len(tables) == 0 {
return nil, fmt.Errorf("unable to find route table for AWS cluster: %s", clusterName)
}

View File

@ -146,7 +146,7 @@ func NewFakeAWSServices() *FakeAWSServices {
s.instances = []*ec2.Instance{selfInstance}
var tag ec2.Tag
tag.Key = aws.String(TagNameKubernetesCluster)
tag.Key = aws.String(TagNameKubernetesClusterLegacy)
tag.Value = aws.String(TestClusterId)
selfInstance.Tags = []*ec2.Tag{&tag}
@ -177,24 +177,6 @@ func (s *FakeAWSServices) Metadata() (EC2Metadata, error) {
return s.metadata, nil
}
func TestFilterTags(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
if err != nil {
t.Errorf("Error building aws cloud: %v", err)
return
}
if len(c.filterTags) != 1 {
t.Errorf("unexpected filter tags: %v", c.filterTags)
return
}
if c.filterTags[TagNameKubernetesCluster] != TestClusterId {
t.Errorf("unexpected filter tags: %v", c.filterTags)
}
}
func TestNewAWSCloud(t *testing.T) {
tests := []struct {
name string
@ -279,6 +261,15 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool {
return contains(filter.Values, *instance.State.Name)
}
if name == "tag-key" {
for _, instanceTag := range instance.Tags {
if contains(filter.Values, aws.StringValue(instanceTag.Key)) {
return true
}
}
return false
}
if strings.HasPrefix(name, "tag:") {
tagName := name[4:]
for _, instanceTag := range instance.Tags {
@ -286,7 +277,9 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool {
return true
}
}
return false
}
panic("Unknown filter name: " + name)
}
@ -969,13 +962,14 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) {
t.Errorf("Should have not been considered equal since first is not in the second array of groups")
}
}
func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) {
awsServices := NewFakeAWSServices()
nodeName := types.NodeName("my-dns.internal")
var tag ec2.Tag
tag.Key = aws.String(TagNameKubernetesCluster)
tag.Key = aws.String(TagNameKubernetesClusterLegacy)
tag.Value = aws.String(TestClusterId)
tags := []*ec2.Tag{&tag}
@ -1019,8 +1013,8 @@ func TestFindInstancesByNodeNameCached(t *testing.T) {
nodeNameTwo := "my-dns-two.internal"
var tag ec2.Tag
tag.Key = aws.String(TagNameKubernetesCluster)
tag.Value = aws.String(TestClusterId)
tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId)
tag.Value = aws.String("")
tags := []*ec2.Tag{&tag}
var runningInstance ec2.Instance

View File

@ -0,0 +1,256 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"strings"
)
// TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple
// logically independent clusters running in the same AZ.
// The tag key = TagNameKubernetesClusterPrefix + clusterID
// The tag value is an ownership value
const TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/"
// TagNameKubernetesClusterLegacy is the legacy tag name we use to differentiate multiple
// logically independent clusters running in the same AZ. The problem with it was that it
// did not allow shared resources.
const TagNameKubernetesClusterLegacy = "KubernetesCluster"
type ResourceLifecycle string
const (
// ResourceLifecycleOwned is the value we use when tagging resources to indicate
// that the resource is considered owned and managed by the cluster,
// and in particular that the lifecycle is tied to the lifecycle of the cluster.
ResourceLifecycleOwned = "owned"
// ResourceLifecycleShared is the value we use when tagging resources to indicate
// that the resource is shared between multiple clusters, and should not be destroyed
// if the cluster is destroyed.
ResourceLifecycleShared = "shared"
)
type awsTagging struct {
// ClusterID is our cluster identifier: we tag AWS resources with this value,
// and thus we can run two independent clusters in the same VPC or subnets.
// This gives us similar functionality to GCE projects.
ClusterID string
// usesLegacyTags is true if we are using the legacy TagNameKubernetesClusterLegacy tags
usesLegacyTags bool
}
func (t *awsTagging) init(legacyClusterID string, clusterID string) error {
if legacyClusterID != "" {
if clusterID != "" && legacyClusterID != clusterID {
return fmt.Errorf("ClusterID tags did not match: %q vs %q", clusterID, legacyClusterID)
}
t.usesLegacyTags = true
clusterID = legacyClusterID
}
t.ClusterID = clusterID
if clusterID != "" {
glog.Infof("AWS cloud filtering on ClusterID: %v", clusterID)
} else {
glog.Infof("AWS cloud - no clusterID filtering")
}
return nil
}
// Extracts a clusterID from the given tags, if one is present
// If no clusterID is found, returns "", nil
// If multiple (different) clusterIDs are found, returns an error
func (t *awsTagging) initFromTags(tags []*ec2.Tag) error {
legacyClusterID, newClusterID, err := findClusterIDs(tags)
if err != nil {
return err
}
if legacyClusterID == "" && newClusterID == "" {
glog.Errorf("Tag %q nor %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterLegacy, TagNameKubernetesClusterPrefix+"...")
}
return t.init(legacyClusterID, newClusterID)
}
// Extracts the legacy & new cluster ids from the given tags, if they are present
// If duplicate tags are found, returns an error
func findClusterIDs(tags []*ec2.Tag) (string, string, error) {
legacyClusterID := ""
newClusterID := ""
for _, tag := range tags {
tagKey := aws.StringValue(tag.Key)
if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) {
id := strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix)
if newClusterID != "" {
return "", "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, newClusterID, id)
}
newClusterID = id
}
if tagKey == TagNameKubernetesClusterLegacy {
id := aws.StringValue(tag.Value)
if legacyClusterID != "" {
return "", "", fmt.Errorf("Found multiple %s tags (%q and %q)", TagNameKubernetesClusterLegacy, legacyClusterID, id)
}
legacyClusterID = id
}
}
return legacyClusterID, newClusterID, nil
}
func (t *awsTagging) clusterTagKey() string {
return TagNameKubernetesClusterPrefix + t.ClusterID
}
func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool {
clusterTagKey := t.clusterTagKey()
for _, tag := range tags {
tagKey := aws.StringValue(tag.Key)
// For 1.6, we continue to recognize the legacy tags, for the 1.5 -> 1.6 upgrade
if tagKey == TagNameKubernetesClusterLegacy {
return aws.StringValue(tag.Value) == t.ClusterID
}
if tagKey == clusterTagKey {
return true
}
}
return false
}
// Ensure that a resource has the correct tags
// If it has no tags, we assume that this was a problem caused by an error in between creation and tagging,
// and we add the tags. If it has a different cluster's tags, that is an error.
func (c *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error {
actualTagMap := make(map[string]string)
for _, tag := range observedTags {
actualTagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
}
expectedTags := c.buildTags(lifecycle, additionalTags)
addTags := make(map[string]string)
for k, expected := range expectedTags {
actual := actualTagMap[k]
if actual == expected {
continue
}
if actual == "" {
glog.Warningf("Resource %q was missing expected cluster tag %q. Will add (with value %q)", resourceID, k, expected)
addTags[k] = expected
} else {
return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected)
}
}
if err := c.createTags(client, resourceID, lifecycle, additionalTags); err != nil {
return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err)
}
return nil
}
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
func (t *awsTagging) createTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error {
tags := t.buildTags(lifecycle, additionalTags)
if tags == nil || len(tags) == 0 {
return nil
}
var awsTags []*ec2.Tag
for k, v := range tags {
tag := &ec2.Tag{
Key: aws.String(k),
Value: aws.String(v),
}
awsTags = append(awsTags, tag)
}
backoff := wait.Backoff{
Duration: createTagInitialDelay,
Factor: createTagFactor,
Steps: createTagSteps,
}
request := &ec2.CreateTagsInput{}
request.Resources = []*string{&resourceID}
request.Tags = awsTags
var lastErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := client.CreateTags(request)
if err == nil {
return true, nil
}
// We could check that the error is retryable, but the error code changes based on what we are tagging
// SecurityGroup: InvalidGroup.NotFound
glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err)
lastErr = err
return false, nil
})
if err == wait.ErrWaitTimeout {
// return real CreateTags error instead of timeout
err = lastErr
}
return err
}
// Add additional filters, to match on our tags
// This lets us run multiple k8s clusters in a single EC2 AZ
func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter {
// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade
// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter
f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey())
filters = append(filters, f)
if len(filters) == 0 {
// We can't pass a zero-length Filters to AWS (it's an error)
// So if we end up with no filters; just return nil
return nil
}
return filters
}
func (t *awsTagging) buildTags(lifecycle ResourceLifecycle, additionalTags map[string]string) map[string]string {
tags := make(map[string]string)
for k, v := range additionalTags {
tags[k] = v
}
// We only create legacy tags if we are using legacy tags, i.e. if we have seen a legacy tag on our instance
if t.usesLegacyTags {
tags[TagNameKubernetesClusterLegacy] = t.ClusterID
}
tags[t.clusterTagKey()] = string(lifecycle)
return tags
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"strings"
"testing"
)
func TestFilterTags(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
if err != nil {
t.Errorf("Error building aws cloud: %v", err)
return
}
if c.tagging.ClusterID != TestClusterId {
t.Errorf("unexpected ClusterID: %v", c.tagging.ClusterID)
}
}
func TestFindClusterID(t *testing.T) {
grid := []struct {
Tags map[string]string
ExpectedNew string
ExpectedLegacy string
ExpectError bool
}{
{
Tags: map[string]string{},
},
{
Tags: map[string]string{
TagNameKubernetesClusterLegacy: "a",
},
ExpectedLegacy: "a",
},
{
Tags: map[string]string{
TagNameKubernetesClusterPrefix + "a": "owned",
},
ExpectedNew: "a",
},
{
Tags: map[string]string{
TagNameKubernetesClusterPrefix + "a": "",
},
ExpectedNew: "a",
},
{
Tags: map[string]string{
TagNameKubernetesClusterLegacy: "a",
TagNameKubernetesClusterPrefix + "a": "",
},
ExpectedLegacy: "a",
ExpectedNew: "a",
},
{
Tags: map[string]string{
TagNameKubernetesClusterPrefix + "a": "",
TagNameKubernetesClusterPrefix + "b": "",
},
ExpectError: true,
},
}
for _, g := range grid {
var ec2Tags []*ec2.Tag
for k, v := range g.Tags {
ec2Tags = append(ec2Tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)})
}
actualLegacy, actualNew, err := findClusterIDs(ec2Tags)
if g.ExpectError {
if err == nil {
t.Errorf("expected error for tags %v", g.Tags)
continue
}
} else {
if err != nil {
t.Errorf("unexpected error for tags %v: %v", g.Tags, err)
continue
}
if g.ExpectedNew != actualNew {
t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedNew, actualNew)
continue
}
if g.ExpectedLegacy != actualLegacy {
t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedLegacy, actualLegacy)
continue
}
}
}
}