From 1e99426d5b3068c6622569756779e18c66946a83 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 22 May 2015 20:12:53 -0400 Subject: [PATCH] Support for AWS ELB --- hack/ginkgo-e2e.sh | 1 + pkg/cloudprovider/aws/aws.go | 720 ++++++++++++++++++++++++++++-- pkg/cloudprovider/aws/aws_test.go | 192 ++++++-- test/e2e/e2e_test.go | 7 + test/e2e/kubectl.go | 4 +- test/e2e/util.go | 1 + 6 files changed, 834 insertions(+), 91 deletions(-) diff --git a/hack/ginkgo-e2e.sh b/hack/ginkgo-e2e.sh index acfb5e219cd..3437873425b 100755 --- a/hack/ginkgo-e2e.sh +++ b/hack/ginkgo-e2e.sh @@ -101,6 +101,7 @@ export PATH=$(dirname "${e2e_test}"):"${PATH}" --gce-project="${PROJECT:-}" \ --gce-zone="${ZONE:-}" \ --kube-master="${KUBE_MASTER:-}" \ + --cluster-tag="${CLUSTER_ID:-}" \ --repo-root="${KUBE_VERSION_ROOT}" \ --node-instance-group="${NODE_INSTANCE_GROUP:-}" \ --num-nodes="${NUM_MINIONS:-}" \ diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 3e905e81608..47f3da920aa 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -33,6 +33,7 @@ import ( "github.com/awslabs/aws-sdk-go/aws" "github.com/awslabs/aws-sdk-go/aws/credentials" "github.com/awslabs/aws-sdk-go/service/ec2" + "github.com/awslabs/aws-sdk-go/service/elb" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" @@ -43,6 +44,17 @@ import ( const ProviderName = "aws" +// The tag name we use to differentiate multiple logically independent clusters running in the same AZ +const TagNameKubernetesCluster = "KubernetesCluster" + +// Abstraction over AWS, to allow mocking/other implementations +type AWSServices interface { + Compute(region string) (EC2, error) + LoadBalancing(region string) (ELB, error) + Metadata() AWSMetadata +} + +// TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2) // Abstraction over EC2, to allow mocking/other implementations type EC2 interface { // Query EC2 for instances matching the filter @@ -58,6 +70,25 @@ type EC2 interface { CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) // Delete an EBS volume DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, err error) + + DescribeSecurityGroups(groupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) + + // TODO(justinsb): Make all of these into pass-through methods, now that we have a much better binding + CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) + AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) + + DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) + + DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) +} + +// This is a simple pass-through of the ELB client interface, which allows for testing +type ELB interface { + CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) + DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) + DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) + RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) + DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) } // Abstraction over the AWS metadata service @@ -87,22 +118,29 @@ type Volumes interface { // AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services. type AWSCloud struct { + awsServices AWSServices ec2 EC2 - metadata AWSMetadata cfg *AWSCloudConfig availabilityZone string region string + filterTags map[string]string + // The AWS instance that we are running on selfAWSInstance *awsInstance mutex sync.Mutex + // Protects elbClients + elbClients map[string]ELB } type AWSCloudConfig struct { Global struct { // TODO: Is there any use for this? We can get it from the instance metadata service + // Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful Zone string + + KubernetesClusterTag string } } @@ -125,11 +163,53 @@ type awsSdkEC2 struct { ec2 *ec2.EC2 } +type awsSDKProvider struct { + creds *credentials.Credentials +} + +func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { + ec2 := &awsSdkEC2{ + ec2: ec2.New(&aws.Config{ + Region: regionName, + Credentials: p.creds, + }), + } + return ec2, nil +} + +func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { + elbClient := elb.New(&aws.Config{ + Region: regionName, + Credentials: p.creds, + }) + return elbClient, nil +} + +func (p *awsSDKProvider) Metadata() AWSMetadata { + return &awsSdkMetadata{} +} + +// Builds an ELB client for the specified region +func (s *AWSCloud) getELBClient(regionName string) (ELB, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + elbClient, found := s.elbClients[regionName] + if !found { + var err error + elbClient, err = s.awsServices.LoadBalancing(regionName) + if err != nil { + return nil, err + } + s.elbClients[regionName] = elbClient + } + return elbClient, nil +} + func stringPointerArray(orig []string) []*string { if orig == nil { return nil } - n := make([]*string, len(orig)) for i := range orig { n[i] = &orig[i] @@ -141,17 +221,29 @@ func isNilOrEmpty(s *string) bool { return s == nil || *s == "" } +func orEmpty(s *string) string { + if s == nil { + return "" + } + return *s +} + +func newEc2Filter(name string, value string) *ec2.Filter { + filter := &ec2.Filter{ + Name: aws.String(name), + Values: []*string{ + aws.String(value), + }, + } + return filter +} + // Implementation of EC2.Instances func (self *awsSdkEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp []*ec2.Instance, err error) { var filters []*ec2.Filter if filter != nil && filter.PrivateDNSName != "" { filters = []*ec2.Filter{ - { - Name: aws.String("private-dns-name"), - Values: []*string{ - aws.String(filter.PrivateDNSName), - }, - }, + newEc2Filter("private-dns-name", filter.PrivateDNSName), } } @@ -213,7 +305,34 @@ func (self *awsSdkMetadata) GetMetaData(key string) ([]byte, error) { return []byte(body), nil } -type AuthFunc func() (creds *credentials.Credentials) +// Implements EC2.DescribeSecurityGroups +func (s *awsSdkEC2) DescribeSecurityGroups(securityGroupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) { + filters := []*ec2.Filter{} + if filterName != "" { + filters = append(filters, newEc2Filter("group-name", filterName)) + } + if filterVPCId != "" { + filters = append(filters, newEc2Filter("vpc-id", filterVPCId)) + } + + request := &ec2.DescribeSecurityGroupsInput{} + if len(securityGroupIds) != 0 { + request.GroupIDs = []*string{} + for _, securityGroupId := range securityGroupIds { + request.GroupIDs = append(request.GroupIDs, &securityGroupId) + } + } + if len(filters) != 0 { + request.Filters = filters + } + + response, err := s.ec2.DescribeSecurityGroups(request) + if err != nil { + glog.Error("error describing groups: ", err) + return nil, err + } + return response.SecurityGroups, nil +} func (s *awsSdkEC2) AttachVolume(volumeID, instanceId, device string) (resp *ec2.VolumeAttachment, err error) { @@ -245,19 +364,32 @@ func (s *awsSdkEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, return s.ec2.DeleteVolume(&request) } -func init() { - cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { - metadata := &awsSdkMetadata{} - return newAWSCloud(config, getAuth, metadata) - }) +func (s *awsSdkEC2) DescribeVPCs(request *ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { + return s.ec2.DescribeVPCs(request) } -func getAuth() (creds *credentials.Credentials) { - return credentials.NewChainCredentials( - []credentials.Provider{ - &credentials.EnvProvider{}, - &credentials.EC2RoleProvider{}, - }) +func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { + return s.ec2.DescribeSubnets(request) +} + +func (s *awsSdkEC2) CreateSecurityGroup(request *ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) { + return s.ec2.CreateSecurityGroup(request) +} + +func (s *awsSdkEC2) AuthorizeSecurityGroupIngress(request *ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { + return s.ec2.AuthorizeSecurityGroupIngress(request) +} + +func init() { + cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { + creds := credentials.NewChainCredentials( + []credentials.Provider{ + &credentials.EnvProvider{}, + &credentials.EC2RoleProvider{}, + }) + aws := &awsSDKProvider{creds: creds} + return newAWSCloud(config, aws) + }) } // readAWSCloudConfig reads an instance of AWSCloudConfig from config reader. @@ -320,15 +452,14 @@ func isRegionValid(region string) bool { } // newAWSCloud creates a new instance of AWSCloud. -// authFunc and instanceId are primarily for tests -func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AWSCloud, error) { +// AWSProvider and instanceId are primarily for tests +func newAWSCloud(config io.Reader, awsServices AWSServices) (*AWSCloud, error) { + metadata := awsServices.Metadata() cfg, err := readAWSCloudConfig(config, metadata) if err != nil { return nil, fmt.Errorf("unable to read AWS cloud provider config file: %v", err) } - creds := authFunc() - zone := cfg.Global.Zone if len(zone) <= 1 { return nil, fmt.Errorf("invalid AWS zone in config file: %s", zone) @@ -340,19 +471,41 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone) } - ec2 := &awsSdkEC2{ - ec2: ec2.New(&aws.Config{ - Region: regionName, - Credentials: creds, - }), - } + ec2, err := awsServices.Compute(regionName) awsCloud := &AWSCloud{ + awsServices: awsServices, ec2: ec2, cfg: cfg, region: regionName, availabilityZone: zone, - metadata: metadata, + elbClients: map[string]ELB{}, + } + + filterTags := map[string]string{} + if cfg.Global.KubernetesClusterTag != "" { + filterTags[TagNameKubernetesCluster] = cfg.Global.KubernetesClusterTag + } else { + selfInstance, err := awsCloud.getSelfAWSInstance() + if err != nil { + return nil, err + } + selfInstanceInfo, err := selfInstance.getInfo() + if err != nil { + return nil, err + } + for _, tag := range selfInstanceInfo.Tags { + if orEmpty(tag.Key) == TagNameKubernetesCluster { + filterTags[TagNameKubernetesCluster] = orEmpty(tag.Value) + } + } + } + + awsCloud.filterTags = filterTags + if len(filterTags) > 0 { + glog.Infof("AWS cloud filtering on tags: %v", filterTags) + } else { + glog.Infof("AWS cloud - no tag filtering") } return awsCloud, nil @@ -368,8 +521,8 @@ func (aws *AWSCloud) ProviderName() string { } // TCPLoadBalancer returns an implementation of TCPLoadBalancer for Amazon Web Services. -func (aws *AWSCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { - return nil, false +func (s *AWSCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { + return s, true } // Instances returns an implementation of Instances for Amazon Web Services. @@ -389,7 +542,7 @@ func (aws *AWSCloud) Routes() (cloudprovider.Routes, bool) { // NodeAddresses is an implementation of Instances.NodeAddresses. func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { - instance, err := aws.getInstancesByDnsName(name) + instance, err := aws.getInstanceByDnsName(name) if err != nil { return nil, err } @@ -423,7 +576,7 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { // ExternalID returns the cloud provider ID of the specified instance (deprecated). func (aws *AWSCloud) ExternalID(name string) (string, error) { - inst, err := aws.getInstancesByDnsName(name) + inst, err := aws.getInstanceByDnsName(name) if err != nil { return "", err } @@ -432,7 +585,7 @@ func (aws *AWSCloud) ExternalID(name string) (string, error) { // InstanceID returns the cloud provider ID of the specified instance. func (aws *AWSCloud) InstanceID(name string) (string, error) { - inst, err := aws.getInstancesByDnsName(name) + inst, err := aws.getInstanceByDnsName(name) if err != nil { return "", err } @@ -442,11 +595,11 @@ func (aws *AWSCloud) InstanceID(name string) (string, error) { } // Return the instances matching the relevant private dns name. -func (aws *AWSCloud) getInstancesByDnsName(name string) (*ec2.Instance, error) { +func (s *AWSCloud) getInstanceByDnsName(name string) (*ec2.Instance, error) { f := &ec2InstanceFilter{} f.PrivateDNSName = name - instances, err := aws.ec2.Instances(nil, f) + instances, err := s.ec2.Instances(nil, f) if err != nil { return nil, err } @@ -551,7 +704,7 @@ func (aws *AWSCloud) List(filter string) ([]string, error) { // GetNodeResources implements Instances.GetNodeResources func (aws *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) { - instance, err := aws.getInstancesByDnsName(name) + instance, err := aws.getInstanceByDnsName(name) if err != nil { return nil, err } @@ -956,20 +1109,21 @@ func (self *awsDisk) delete() error { // Gets the awsInstance for the EC2 instance on which we are running // may return nil in case of error -func (aws *AWSCloud) getSelfAWSInstance() (*awsInstance, error) { +func (s *AWSCloud) getSelfAWSInstance() (*awsInstance, error) { // Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance - aws.mutex.Lock() - defer aws.mutex.Unlock() + s.mutex.Lock() + defer s.mutex.Unlock() - i := aws.selfAWSInstance + i := s.selfAWSInstance if i == nil { - instanceIdBytes, err := aws.metadata.GetMetaData("instance-id") + metadata := s.awsServices.Metadata() + instanceIdBytes, err := metadata.GetMetaData("instance-id") if err != nil { return nil, fmt.Errorf("error fetching instance-id from ec2 metadata service: %v", err) } - i = newAWSInstance(aws.ec2, string(instanceIdBytes)) - aws.selfAWSInstance = i + i = newAWSInstance(s.ec2, string(instanceIdBytes)) + s.selfAWSInstance = i } return i, nil @@ -985,7 +1139,7 @@ func (aws *AWSCloud) getAwsInstance(instanceName string) (*awsInstance, error) { return nil, fmt.Errorf("error getting self-instance: %v", err) } } else { - instance, err := aws.getInstancesByDnsName(instanceName) + instance, err := aws.getInstanceByDnsName(instanceName) if err != nil { return nil, fmt.Errorf("error finding instance: %v", err) } @@ -1110,3 +1264,475 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error { } return awsDisk.delete() } + +func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *AWSCloud) Release(name string) error { + return nil +} + +// Gets the current load balancer state +func (s *AWSCloud) describeLoadBalancer(region, name string) (*elb.LoadBalancerDescription, error) { + elbClient, err := s.getELBClient(region) + if err != nil { + return nil, err + } + + request := &elb.DescribeLoadBalancersInput{} + request.LoadBalancerNames = []*string{&name} + + response, err := elbClient.DescribeLoadBalancers(request) + if err != nil { + if awsError := aws.Error(err); awsError != nil { + if awsError.Code == "LoadBalancerNotFound" { + return nil, nil + } + } + return nil, err + } + + var ret *elb.LoadBalancerDescription + for _, loadBalancer := range response.LoadBalancerDescriptions { + if ret != nil { + glog.Errorf("Found multiple load balancers with name: %s", name) + } + ret = loadBalancer + } + return ret, nil +} + +// TCPLoadBalancerExists implements TCPLoadBalancer.TCPLoadBalancerExists. +func (self *AWSCloud) TCPLoadBalancerExists(name, region string) (bool, error) { + lb, err := self.describeLoadBalancer(name, region) + if err != nil { + return false, err + } + + if lb != nil { + return true, nil + } + return false, nil +} + +// Find the kubernetes VPC +func (self *AWSCloud) findVPC() (*ec2.VPC, error) { + request := &ec2.DescribeVPCsInput{} + + // TODO: How do we want to identify our VPC? Issue #6006 + name := "kubernetes-vpc" + request.Filters = []*ec2.Filter{newEc2Filter("tag:Name", name)} + + response, err := self.ec2.DescribeVPCs(request) + if err != nil { + glog.Error("error listing VPCs", err) + return nil, err + } + + vpcs := response.VPCs + if err != nil { + return nil, err + } + if len(vpcs) == 0 { + return nil, nil + } + if len(vpcs) == 1 { + return vpcs[0], nil + } + return nil, fmt.Errorf("Found multiple matching VPCs for name: %s", name) +} + +// Makes sure the security group allows ingress on the specified ports (with sourceIp & protocol) +// Returns true iff changes were made +// The security group must already exist +func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp string, ports []*api.ServicePort) (bool, error) { + groups, err := s.ec2.DescribeSecurityGroups([]string{securityGroupId}, "", "") + if err != nil { + glog.Warning("error retrieving security group", err) + return false, err + } + + if len(groups) == 0 { + // We require that the security group already exist + return false, fmt.Errorf("security group not found") + } + if len(groups) != 1 { + // This should not be possible - ids should be unique + return false, fmt.Errorf("multiple security groups found with same id") + } + group := groups[0] + + newPermissions := []*ec2.IPPermission{} + + for _, port := range ports { + found := false + portInt64 := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + for _, permission := range group.IPPermissions { + if permission.FromPort == nil || *permission.FromPort != portInt64 { + continue + } + if permission.ToPort == nil || *permission.ToPort != portInt64 { + continue + } + if permission.IPProtocol == nil || *permission.IPProtocol != protocol { + continue + } + if len(permission.IPRanges) != 1 { + continue + } + if orEmpty(permission.IPRanges[0].CIDRIP) != sourceIp { + continue + } + found = true + break + } + + if !found { + newPermission := &ec2.IPPermission{} + newPermission.FromPort = &portInt64 + newPermission.ToPort = &portInt64 + newPermission.IPRanges = []*ec2.IPRange{{CIDRIP: &sourceIp}} + newPermission.IPProtocol = &protocol + + newPermissions = append(newPermissions, newPermission) + } + } + + if len(newPermissions) == 0 { + return false, nil + } + + request := &ec2.AuthorizeSecurityGroupIngressInput{} + request.GroupID = &securityGroupId + request.IPPermissions = newPermissions + + _, err = s.ec2.AuthorizeSecurityGroupIngress(request) + if err != nil { + glog.Warning("error authorizing security group ingress", err) + return false, err + } + + return true, nil +} + +// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer +// TODO(justinsb): This must be idempotent +// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. +func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) + + elbClient, err := s.getELBClient(region) + if err != nil { + return nil, err + } + + if affinity != api.ServiceAffinityNone { + // ELB supports sticky sessions, but only when configured for HTTP/HTTPS + return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) + } + + if publicIP != nil { + return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") + } + + instances, err := s.getInstancesByDnsNames(hosts) + if err != nil { + return nil, err + } + + vpc, err := s.findVPC() + if err != nil { + glog.Error("error finding VPC", err) + return nil, err + } + if vpc == nil { + return nil, fmt.Errorf("Unable to find VPC") + } + + // Construct list of configured subnets + subnetIds := []*string{} + { + request := &ec2.DescribeSubnetsInput{} + filters := []*ec2.Filter{} + filters = append(filters, newEc2Filter("vpc-id", orEmpty(vpc.VPCID))) + request.Filters = filters + + response, err := s.ec2.DescribeSubnets(request) + if err != nil { + glog.Error("error describing subnets: ", err) + return nil, err + } + + // zones := []string{} + for _, subnet := range response.Subnets { + subnetIds = append(subnetIds, subnet.SubnetID) + if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { + glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) + return nil, fmt.Errorf("invalid AZ for region") + } + // zones = append(zones, subnet.AvailabilityZone) + } + } + + // Build the load balancer itself + var loadBalancerName, dnsName *string + { + loadBalancer, err := s.describeLoadBalancer(region, name) + if err != nil { + return nil, err + } + + if loadBalancer == nil { + createRequest := &elb.CreateLoadBalancerInput{} + createRequest.LoadBalancerName = aws.String(name) + + listeners := []*elb.Listener{} + for _, port := range ports { + if port.NodePort == 0 { + glog.Errorf("Ignoring port without NodePort defined: %v", port) + continue + } + instancePort := int64(port.NodePort) + loadBalancerPort := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + + listener := &elb.Listener{} + listener.InstancePort = &instancePort + listener.LoadBalancerPort = &loadBalancerPort + listener.Protocol = &protocol + listener.InstanceProtocol = &protocol + + listeners = append(listeners, listener) + } + + createRequest.Listeners = listeners + + // TODO: Should we use a better identifier (the kubernetes uuid?) + + // We are supposed to specify one subnet per AZ. + // TODO: What happens if we have more than one subnet per AZ? + createRequest.Subnets = subnetIds + + sgName := "k8s-elb-" + name + sgDescription := "Security group for Kubernetes ELB " + name + + { + // TODO: Should we do something more reliable ?? .Where("tag:kubernetes-id", kubernetesId) + securityGroups, err := s.ec2.DescribeSecurityGroups(nil, sgName, orEmpty(vpc.VPCID)) + if err != nil { + return nil, err + } + var securityGroupId *string + for _, securityGroup := range securityGroups { + if securityGroupId != nil { + glog.Warning("Found multiple security groups with name:", sgName) + } + securityGroupId = securityGroup.GroupID + } + if securityGroupId == nil { + createSecurityGroupRequest := &ec2.CreateSecurityGroupInput{} + createSecurityGroupRequest.VPCID = vpc.VPCID + createSecurityGroupRequest.GroupName = &sgName + createSecurityGroupRequest.Description = &sgDescription + + createSecurityGroupResponse, err := s.ec2.CreateSecurityGroup(createSecurityGroupRequest) + if err != nil { + glog.Error("error creating security group: ", err) + return nil, err + } + + securityGroupId = createSecurityGroupResponse.GroupID + if isNilOrEmpty(securityGroupId) { + return nil, fmt.Errorf("created security group, but id was not returned") + } + } + _, err = s.ensureSecurityGroupIngess(*securityGroupId, "0.0.0.0/0", ports) + if err != nil { + return nil, err + } + createRequest.SecurityGroups = []*string{securityGroupId} + } + + glog.Info("Creating load balancer with name: ", createRequest.LoadBalancerName) + createResponse, err := elbClient.CreateLoadBalancer(createRequest) + if err != nil { + return nil, err + } + dnsName = createResponse.DNSName + loadBalancerName = createRequest.LoadBalancerName + } else { + // TODO: Verify that load balancer configuration matches? + dnsName = loadBalancer.DNSName + loadBalancerName = loadBalancer.LoadBalancerName + } + } + + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} + registerRequest.LoadBalancerName = loadBalancerName + for _, instance := range instances { + registerInstance := &elb.Instance{} + registerInstance.InstanceID = instance.InstanceID + registerRequest.Instances = append(registerRequest.Instances, registerInstance) + } + + registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + if err != nil { + // TODO: Is it better to delete the load balancer entirely? + glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) + } + + glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) + glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, dnsName) + + // TODO: Wait for creation? + + status := toStatus(loadBalancerName, dnsName) + return status, nil +} + +// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer +func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { + lb, err := s.describeLoadBalancer(region, name) + if err != nil { + return nil, false, err + } + + if lb == nil { + return nil, false, nil + } + + status := toStatus(lb.LoadBalancerName, lb.DNSName) + return status, true, nil +} + +func toStatus(loadBalancerName *string, dnsName *string) *api.LoadBalancerStatus { + status := &api.LoadBalancerStatus{} + + if !isNilOrEmpty(dnsName) { + var ingress api.LoadBalancerIngress + ingress.Hostname = *dnsName + status.Ingress = []api.LoadBalancerIngress{ingress} + } + + return status +} + +// EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. +func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { + // TODO(justinsb): Delete security group + + elbClient, err := s.getELBClient(region) + if err != nil { + return err + } + + lb, err := s.describeLoadBalancer(region, name) + if err != nil { + return err + } + + if lb == nil { + glog.Info("Load balancer already deleted: ", name) + return nil + } + + request := &elb.DeleteLoadBalancerInput{} + request.LoadBalancerName = lb.LoadBalancerName + + _, err = elbClient.DeleteLoadBalancer(request) + if err != nil { + // TODO: Check if error was because load balancer was concurrently deleted + glog.Error("error deleting load balancer: ", err) + return err + } + return nil +} + +// UpdateTCPLoadBalancer implements TCPLoadBalancer.UpdateTCPLoadBalancer +func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { + instances, err := s.getInstancesByDnsNames(hosts) + if err != nil { + return err + } + + elbClient, err := s.getELBClient(region) + if err != nil { + return err + } + + lb, err := s.describeLoadBalancer(region, name) + if err != nil { + return err + } + + if lb == nil { + return fmt.Errorf("Load balancer not found") + } + + existingInstances := map[string]*elb.Instance{} + for _, instance := range lb.Instances { + existingInstances[orEmpty(instance.InstanceID)] = instance + } + + wantInstances := map[string]*ec2.Instance{} + for _, instance := range instances { + wantInstances[orEmpty(instance.InstanceID)] = instance + } + + addInstances := []*elb.Instance{} + for instanceId := range wantInstances { + addInstance := &elb.Instance{} + addInstance.InstanceID = aws.String(instanceId) + addInstances = append(addInstances, addInstance) + } + + removeInstances := []*elb.Instance{} + for instanceId := range existingInstances { + _, found := wantInstances[instanceId] + if !found { + removeInstance := &elb.Instance{} + removeInstance.InstanceID = aws.String(instanceId) + removeInstances = append(removeInstances, removeInstance) + } + } + + if len(addInstances) > 0 { + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} + registerRequest.Instances = addInstances + registerRequest.LoadBalancerName = lb.LoadBalancerName + _, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + if err != nil { + return err + } + } + + if len(removeInstances) > 0 { + deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} + deregisterRequest.Instances = removeInstances + deregisterRequest.LoadBalancerName = lb.LoadBalancerName + _, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) + if err != nil { + return err + } + } + + return nil +} + +// TODO: Make efficient +func (a *AWSCloud) getInstancesByDnsNames(names []string) ([]*ec2.Instance, error) { + instances := []*ec2.Instance{} + for _, name := range names { + instance, err := a.getInstanceByDnsName(name) + if err != nil { + return nil, err + } + if instance == nil { + return nil, fmt.Errorf("unable to find instance " + name) + } + instances = append(instances, instance) + } + return instances, nil +} diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index 40e62f21edf..35ef5b4d7a4 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -23,19 +23,21 @@ import ( "testing" "github.com/awslabs/aws-sdk-go/aws" - "github.com/awslabs/aws-sdk-go/aws/credentials" "github.com/awslabs/aws-sdk-go/service/ec2" + "github.com/awslabs/aws-sdk-go/service/elb" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" ) +const TestClusterId = "clusterid.test" + func TestReadAWSCloudConfig(t *testing.T) { tests := []struct { name string - reader io.Reader - metadata AWSMetadata + reader io.Reader + aws AWSServices expectError bool zone string @@ -62,27 +64,31 @@ func TestReadAWSCloudConfig(t *testing.T) { }, { "No zone in config, metadata does not have zone", - strings.NewReader("[global]\n"), &FakeMetadata{}, + strings.NewReader("[global]\n"), NewFakeAWSServices().withAz(""), true, "", }, { "No zone in config, metadata has zone", - strings.NewReader("[global]\n"), &FakeMetadata{availabilityZone: "eu-west-1a"}, - false, "eu-west-1a", + strings.NewReader("[global]\n"), NewFakeAWSServices(), + false, "us-east-1a", }, { "Zone in config should take precedence over metadata", - strings.NewReader("[global]\nzone = us-east-1a"), &FakeMetadata{availabilityZone: "eu-west-1a"}, - false, "us-east-1a", + strings.NewReader("[global]\nzone = eu-west-1a"), NewFakeAWSServices(), + false, "eu-west-1a", }, } for _, test := range tests { t.Logf("Running test case %s", test.name) - cfg, err := readAWSCloudConfig(test.reader, test.metadata) + var metadata AWSMetadata + if test.aws != nil { + metadata = test.aws.Metadata() + } + cfg, err := readAWSCloudConfig(test.reader, metadata) if test.expectError { if err == nil { - t.Errorf("Should error for case %s", test.name) + t.Errorf("Should error for case %s (cfg=%v)", test.name, cfg) } } else { if err != nil { @@ -96,64 +102,127 @@ func TestReadAWSCloudConfig(t *testing.T) { } } -func TestNewAWSCloud(t *testing.T) { - fakeAuthFunc := func() (creds *credentials.Credentials) { - return credentials.NewStaticCredentials("", "", "") +type FakeAWSServices struct { + availabilityZone string + instances []*ec2.Instance + instanceId string + + ec2 *FakeEC2 + elb *FakeELB + metadata *FakeMetadata +} + +func NewFakeAWSServices() *FakeAWSServices { + s := &FakeAWSServices{} + s.availabilityZone = "us-east-1a" + s.ec2 = &FakeEC2{aws: s} + s.elb = &FakeELB{aws: s} + s.metadata = &FakeMetadata{aws: s} + + s.instanceId = "i-self" + var selfInstance ec2.Instance + selfInstance.InstanceID = &s.instanceId + s.instances = []*ec2.Instance{&selfInstance} + + var tag ec2.Tag + tag.Key = aws.String(TagNameKubernetesCluster) + tag.Value = aws.String(TestClusterId) + selfInstance.Tags = []*ec2.Tag{&tag} + + return s +} + +func (s *FakeAWSServices) withAz(az string) *FakeAWSServices { + s.availabilityZone = az + return s +} + +func (s *FakeAWSServices) withInstances(instances []*ec2.Instance) *FakeAWSServices { + s.instances = instances + return s +} + +func (s *FakeAWSServices) Compute(region string) (EC2, error) { + return s.ec2, nil +} + +func (s *FakeAWSServices) LoadBalancing(region string) (ELB, error) { + return s.elb, nil +} + +func (s *FakeAWSServices) Metadata() AWSMetadata { + return s.metadata +} + +func TestFilterTags(t *testing.T) { + awsServices := NewFakeAWSServices() + c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) + if err != nil { + t.Errorf("Error building aws cloud: %v", err) + return } + if len(c.filterTags) != 1 { + t.Errorf("unexpected filter tags: %v", c.filterTags) + return + } + + if c.filterTags[TagNameKubernetesCluster] != TestClusterId { + t.Errorf("unexpected filter tags: %v", c.filterTags) + } +} + +func TestNewAWSCloud(t *testing.T) { tests := []struct { name string - reader io.Reader - authFunc AuthFunc - metadata AWSMetadata + reader io.Reader + awsServices AWSServices expectError bool zone string }{ { "No config reader", - nil, fakeAuthFunc, &FakeMetadata{}, + nil, NewFakeAWSServices().withAz(""), true, "", }, { "Config specified invalid zone", - strings.NewReader("[global]\nzone = blahonga"), fakeAuthFunc, &FakeMetadata{}, + strings.NewReader("[global]\nzone = blahonga"), NewFakeAWSServices(), true, "", }, { "Config specifies valid zone", - strings.NewReader("[global]\nzone = eu-west-1a"), fakeAuthFunc, &FakeMetadata{}, + strings.NewReader("[global]\nzone = eu-west-1a"), NewFakeAWSServices(), false, "eu-west-1a", }, { "Gets zone from metadata when not in config", strings.NewReader("[global]\n"), - fakeAuthFunc, - &FakeMetadata{availabilityZone: "us-east-1a"}, - + NewFakeAWSServices(), false, "us-east-1a", }, { "No zone in config or metadata", - strings.NewReader("[global]\n"), fakeAuthFunc, &FakeMetadata{}, + strings.NewReader("[global]\n"), + NewFakeAWSServices().withAz(""), true, "", }, } for _, test := range tests { t.Logf("Running test case %s", test.name) - c, err := newAWSCloud(test.reader, test.authFunc, test.metadata) + c, err := newAWSCloud(test.reader, test.awsServices) if test.expectError { if err == nil { t.Errorf("Should error for case %s", test.name) } } else { if err != nil { - t.Errorf("Should succeed for case: %s", test.name) - } - if c.availabilityZone != test.zone { + t.Errorf("Should succeed for case: %s, got %v", test.name, err) + } else if c.availabilityZone != test.zone { t.Errorf("Incorrect zone value (%s vs %s) for case: %s", c.availabilityZone, test.zone, test.name) } @@ -162,7 +231,7 @@ func TestNewAWSCloud(t *testing.T) { } type FakeEC2 struct { - instances []*ec2.Instance + aws *FakeAWSServices } func contains(haystack []string, needle string) bool { @@ -176,7 +245,7 @@ func contains(haystack []string, needle string) bool { func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) { matches := []*ec2.Instance{} - for _, instance := range self.instances { + for _, instance := range self.aws.instances { if filter != nil && !filter.Matches(instance) { continue } @@ -190,15 +259,14 @@ func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) } type FakeMetadata struct { - availabilityZone string - instanceId string + aws *FakeAWSServices } func (self *FakeMetadata) GetMetaData(key string) ([]byte, error) { if key == "placement/availability-zone" { - return []byte(self.availabilityZone), nil + return []byte(self.aws.availabilityZone), nil } else if key == "instance-id" { - return []byte(self.instanceId), nil + return []byte(self.aws.instanceId), nil } else { return nil, nil } @@ -224,23 +292,63 @@ func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, panic("Not implemented") } -func mockInstancesResp(instances []*ec2.Instance) (aws *AWSCloud) { - availabilityZone := "us-west-2d" +func (ec2 *FakeEC2) DescribeSecurityGroups(groupIds []string, filterName string, filterVpcId string) ([]*ec2.SecurityGroup, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { + panic("Not implemented") +} + +type FakeELB struct { + aws *FakeAWSServices +} + +func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { + panic("Not implemented") +} +func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { + panic("Not implemented") +} +func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { + panic("Not implemented") +} +func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { + panic("Not implemented") +} +func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) { + panic("Not implemented") +} + +func mockInstancesResp(instances []*ec2.Instance) *AWSCloud { + awsServices := NewFakeAWSServices().withInstances(instances) return &AWSCloud{ - ec2: &FakeEC2{ - instances: instances, - }, - availabilityZone: availabilityZone, + awsServices: awsServices, + ec2: awsServices.ec2, + availabilityZone: awsServices.availabilityZone, } } func mockAvailabilityZone(region string, availabilityZone string) *AWSCloud { + awsServices := NewFakeAWSServices().withAz(availabilityZone) return &AWSCloud{ - ec2: &FakeEC2{}, - availabilityZone: availabilityZone, + awsServices: awsServices, + ec2: awsServices.ec2, + availabilityZone: awsServices.availabilityZone, region: region, } - } func TestList(t *testing.T) { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index bd3f28ef3e9..185ce146990 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -107,6 +107,8 @@ func init() { flag.StringVar(&cloudConfig.Zone, "gce-zone", "", "GCE zone being used, if applicable") flag.StringVar(&cloudConfig.NodeInstanceGroup, "node-instance-group", "", "Name of the managed instance group for nodes. Valid only for gce") flag.IntVar(&cloudConfig.NumNodes, "num-nodes", -1, "Number of nodes in the cluster") + + flag.StringVar(&cloudConfig.ClusterTag, "cluster-tag", "", "Tag used to identify resources. Only required if provider is aws.") } func TestE2E(t *testing.T) { @@ -126,6 +128,11 @@ func TestE2E(t *testing.T) { } awsConfig += fmt.Sprintf("Zone=%s\n", cloudConfig.Zone) + if cloudConfig.ClusterTag == "" { + glog.Fatal("--cluster-tag must be specified for AWS") + } + awsConfig += fmt.Sprintf("KubernetesClusterTag=%s\n", cloudConfig.ClusterTag) + var err error cloudConfig.Provider, err = cloudprovider.GetCloudProvider(testContext.Provider, strings.NewReader(awsConfig)) if err != nil { diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index f79985f4c4d..e6d069f417b 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -113,8 +113,8 @@ var _ = Describe("kubectl", func() { }) It("should create and stop a working application", func() { - if !providerIs("gce", "gke") { - By(fmt.Sprintf("Skipping guestbook, uses createExternalLoadBalancer, a (gce|gke) feature")) + if !providerIs("gce", "gke", "aws") { + By(fmt.Sprintf("Skipping guestbook, uses createExternalLoadBalancer, a (gce|gke|aws) feature")) return } diff --git a/test/e2e/util.go b/test/e2e/util.go index a5792fca606..5250f59d57f 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -72,6 +72,7 @@ type CloudConfig struct { MasterName string NodeInstanceGroup string NumNodes int + ClusterTag string Provider cloudprovider.Interface }