diff --git a/cluster/aws/util.sh b/cluster/aws/util.sh index 22b849584ef..116014004cc 100644 --- a/cluster/aws/util.sh +++ b/cluster/aws/util.sh @@ -44,6 +44,9 @@ INTERNAL_IP_BASE=172.20.0 MASTER_IP_SUFFIX=.9 MASTER_INTERNAL_IP=${INTERNAL_IP_BASE}${MASTER_IP_SUFFIX} +MASTER_SG_NAME="kubernetes-master-${CLUSTER_ID}" +MINION_SG_NAME="kubernetes-minion-${CLUSTER_ID}" + function json_val { python -c 'import json,sys;obj=json.load(sys.stdin);print obj'$1'' } @@ -102,6 +105,17 @@ function get_instance_private_ip { --query Reservations[].Instances[].NetworkInterfaces[0].PrivateIpAddress } +# Gets a security group id, by name ($1) +function get_security_group_id { + local name=$1 + $AWS_CMD --output text describe-security-groups \ + --filters Name=vpc-id,Values=${VPC_ID} \ + Name=group-name,Values=${name} \ + Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \ + --query SecurityGroups[].GroupId \ + | tr "\t" "\n" +} + function detect-master () { KUBE_MASTER=${MASTER_NAME} if [[ -z "${KUBE_MASTER_IP-}" ]]; then @@ -132,6 +146,27 @@ function detect-minions () { fi } +function detect-security-groups { + if [[ -z "${MASTER_SG_ID-}" ]]; then + MASTER_SG_ID=$(get_security_group_id "${MASTER_SG_NAME}") + if [[ -z "${MASTER_SG_ID}" ]]; then + echo "Could not detect Kubernetes master security group. Make sure you've launched a cluster with 'kube-up.sh'" + exit 1 + else + echo "Using master security group: ${MASTER_SG_NAME} ${MASTER_SG_ID}" + fi + fi + if [[ -z "${MINION_SG_ID-}" ]]; then + MINION_SG_ID=$(get_security_group_id "${MINION_SG_NAME}") + if [[ -z "${MINION_SG_ID}" ]]; then + echo "Could not detect Kubernetes minion security group. Make sure you've launched a cluster with 'kube-up.sh'" + exit 1 + else + echo "Using minion security group: ${MINION_SG_NAME} ${MINION_SG_ID}" + fi + fi +} + # Detects the AMI to use (considering the region) # # Vars set: @@ -224,6 +259,43 @@ function import-public-key { fi } +# Robustly try to create a security group, if it does not exist. +# $1: The name of security group; will be created if not exists +# $2: Description for security group (used if created) +# +# Note that this doesn't actually return the sgid; we need to re-query +function create-security-group { + local -r name=$1 + local -r description=$2 + + local sgid=$(get_security_group_id "${name}") + if [[ -z "$sgid" ]]; then + echo "Creating security group ${name}." + sgid=$($AWS_CMD create-security-group --group-name "${name}" --description "${description}" --vpc-id "${VPC_ID}" --query GroupId --output text) + add-tag $sgid KubernetesCluster ${CLUSTER_ID} + fi +} + +# Authorize ingress to a security group. +# Attempts to be idempotent, though we end up checking the output looking for error-strings. +# $1 group-id +# $2.. arguments to pass to authorize-security-group-ingress +function authorize-security-group-ingress { + local -r sgid=$1 + shift + local ok=1 + local output="" + output=$($AWS_CMD authorize-security-group-ingress --group-id "${sgid}" $@ 2>&1) || ok=0 + if [[ ${ok} == 0 ]]; then + # Idempotency: ignore if duplicate rule + if [[ "${output}" != *"InvalidPermission.Duplicate"* ]]; then + echo "Error creating security group ingress rule" + echo "Output: ${output}" + exit 1 + fi + fi +} + # Verify prereqs function verify-prereqs { if [[ "$(which aws)" == "" ]]; then @@ -508,12 +580,13 @@ function kube-up { echo "Using VPC $VPC_ID" - SUBNET_ID=$($AWS_CMD describe-subnets | get_subnet_id $VPC_ID $ZONE) + SUBNET_ID=$($AWS_CMD describe-subnets --filters Name=tag:KubernetesCluster,Values=${CLUSTER_ID} | get_subnet_id $VPC_ID $ZONE) if [[ -z "$SUBNET_ID" ]]; then echo "Creating subnet." SUBNET_ID=$($AWS_CMD create-subnet --cidr-block $INTERNAL_IP_BASE.0/24 --vpc-id $VPC_ID --availability-zone ${ZONE} | json_val '["Subnet"]["SubnetId"]') + add-tag $SUBNET_ID KubernetesCluster ${CLUSTER_ID} else - EXISTING_CIDR=$($AWS_CMD describe-subnets | get_cidr $VPC_ID $ZONE) + EXISTING_CIDR=$($AWS_CMD describe-subnets --filters Name=tag:KubernetesCluster,Values=${CLUSTER_ID} | get_cidr $VPC_ID $ZONE) echo "Using existing CIDR $EXISTING_CIDR" INTERNAL_IP_BASE=${EXISTING_CIDR%.*} MASTER_INTERNAL_IP=${INTERNAL_IP_BASE}${MASTER_IP_SUFFIX} @@ -540,17 +613,39 @@ function kube-up { echo "Using Route Table $ROUTE_TABLE_ID" - SEC_GROUP_ID=$($AWS_CMD --output text describe-security-groups \ - --filters Name=vpc-id,Values=$VPC_ID \ - Name=group-name,Values=kubernetes-sec-group \ - --query SecurityGroups[].GroupId \ - | tr "\t" "\n") - - if [[ -z "$SEC_GROUP_ID" ]]; then - echo "Creating security group." - SEC_GROUP_ID=$($AWS_CMD create-security-group --group-name kubernetes-sec-group --description kubernetes-sec-group --vpc-id $VPC_ID | json_val '["GroupId"]') - $AWS_CMD authorize-security-group-ingress --group-id $SEC_GROUP_ID --protocol -1 --port all --cidr 0.0.0.0/0 > $LOG + # Create security groups + MASTER_SG_ID=$(get_security_group_id "${MASTER_SG_NAME}") + if [[ -z "${MASTER_SG_ID}" ]]; then + echo "Creating master security group." + create-security-group "${MASTER_SG_NAME}" "Kubernetes security group applied to master nodes" fi + MINION_SG_ID=$(get_security_group_id "${MINION_SG_NAME}") + if [[ -z "${MINION_SG_ID}" ]]; then + echo "Creating minion security group." + create-security-group "${MINION_SG_NAME}" "Kubernetes security group applied to minion nodes" + fi + + detect-security-groups + + # Masters can talk to master + authorize-security-group-ingress "${MASTER_SG_ID}" "--source-group ${MASTER_SG_ID} --protocol all" + + # Minions can talk to minions + authorize-security-group-ingress "${MINION_SG_ID}" "--source-group ${MINION_SG_ID} --protocol all" + + # Masters and minions can talk to each other + authorize-security-group-ingress "${MASTER_SG_ID}" "--source-group ${MINION_SG_ID} --protocol all" + authorize-security-group-ingress "${MINION_SG_ID}" "--source-group ${MASTER_SG_ID} --protocol all" + + # TODO(justinsb): Would be fairly easy to replace 0.0.0.0/0 in these rules + + # SSH is open to the world + authorize-security-group-ingress "${MASTER_SG_ID}" "--protocol tcp --port 22 --cidr 0.0.0.0/0" + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 22 --cidr 0.0.0.0/0" + + # HTTPS to the master is allowed (for API access) + authorize-security-group-ingress "${MASTER_SG_ID}" "--protocol tcp --port 443 --cidr 0.0.0.0/0" + ( # We pipe this to the ami as a startup script in the user-data field. Requires a compatible ami echo "#! /bin/bash" @@ -594,7 +689,7 @@ function kube-up { --subnet-id $SUBNET_ID \ --private-ip-address $MASTER_INTERNAL_IP \ --key-name ${AWS_SSH_KEY_NAME} \ - --security-group-ids $SEC_GROUP_ID \ + --security-group-ids ${MASTER_SG_ID} \ --associate-public-ip-address \ --user-data file://${KUBE_TEMP}/master-start.sh | json_val '["Instances"][0]["InstanceId"]') add-tag $master_id Name $MASTER_NAME @@ -675,7 +770,7 @@ function kube-up { --subnet-id $SUBNET_ID \ --private-ip-address $INTERNAL_IP_BASE.1${i} \ --key-name ${AWS_SSH_KEY_NAME} \ - --security-group-ids $SEC_GROUP_ID \ + --security-group-ids ${MINION_SG_ID} \ ${public_ip_option} \ --user-data "file://${KUBE_TEMP}/minion-user-data-${i}" | json_val '["Instances"][0]["InstanceId"]') @@ -847,22 +942,43 @@ function kube-down { echo "Deleting VPC: ${vpc_id}" default_sg_id=$($AWS_CMD --output text describe-security-groups \ - --filters Name=vpc-id,Values=${vpc_id} Name=group-name,Values=default \ + --filters Name=vpc-id,Values=${vpc_id} \ + Name=group-name,Values=default \ --query SecurityGroups[].GroupId \ | tr "\t" "\n") sg_ids=$($AWS_CMD --output text describe-security-groups \ --filters Name=vpc-id,Values=${vpc_id} \ + Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \ --query SecurityGroups[].GroupId \ | tr "\t" "\n") + # First delete any inter-security group ingress rules + # (otherwise we get dependency violations) for sg_id in ${sg_ids}; do # EC2 doesn't let us delete the default security group - if [[ "${sg_id}" != "${default_sg_id}" ]]; then - $AWS_CMD delete-security-group --group-id ${sg_id} > $LOG + if [[ "${sg_id}" == "${default_sg_id}" ]]; then + continue fi + + echo "Cleaning up security group: ${sg_id}" + other_sgids=$(aws ec2 describe-security-groups --group-id "${sg_id}" --query SecurityGroups[].IpPermissions[].UserIdGroupPairs[].GroupId --output text) + for other_sgid in ${other_sgids}; do + $AWS_CMD revoke-security-group-ingress --group-id "${sg_id}" --source-group "${other_sgid}" --protocol all > $LOG + done + done + + for sg_id in ${sg_ids}; do + # EC2 doesn't let us delete the default security group + if [[ "${sg_id}" == "${default_sg_id}" ]]; then + continue + fi + + echo "Deleting security group: ${sg_id}" + $AWS_CMD delete-security-group --group-id ${sg_id} > $LOG done subnet_ids=$($AWS_CMD --output text describe-subnets \ --filters Name=vpc-id,Values=${vpc_id} \ + Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \ --query Subnets[].SubnetId \ | tr "\t" "\n") for subnet_id in ${subnet_ids}; do @@ -940,12 +1056,25 @@ function test-build-release { # Assumed vars: # Variables from config.sh function test-setup { + VPC_ID=$(get_vpc_id) + detect-security-groups + + # Open up port 80 & 8080 so common containers on minions can be reached + # TODO(roberthbailey): Remove this once we are no longer relying on hostPorts. + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 80 --cidr 0.0.0.0/0" + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 8080 --cidr 0.0.0.0/0" + + # Open up the NodePort range + # TODO(justinsb): Move to main setup, if we decide whether we want to do this by default. + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol all --port 30000-32767 --cidr 0.0.0.0/0" + echo "test-setup complete" } # Execute after running tests to perform any required clean-up. This is called # from hack/e2e.go function test-teardown { + # (ingress rules will be deleted along with the security group) echo "Shutting down test cluster." "${KUBE_ROOT}/cluster/kube-down.sh" } diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 97651634f3d..6262f016a4e 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -57,30 +57,35 @@ type AWSServices interface { // TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2) // Abstraction over EC2, to allow mocking/other implementations +// Note that the DescribeX functions return a list, so callers don't need to deal with paging type EC2 interface { // Query EC2 for instances matching the filter - Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) + DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) // Attach a volume to an instance AttachVolume(volumeID, instanceId, mountDevice string) (resp *ec2.VolumeAttachment, err error) // Detach a volume from an instance it is attached to DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.VolumeAttachment, err error) // Lists volumes - Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) + DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) // Create an EBS volume CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) // Delete an EBS volume DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, err error) - DescribeSecurityGroups(groupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) + DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) - // TODO(justinsb): Make all of these into pass-through methods, now that we have a much better binding CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) + DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) + AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) + RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) - DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) + DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error) - DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) + DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) + + CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) } // This is a simple pass-through of the ELB client interface, which allows for testing @@ -145,20 +150,6 @@ type AWSCloudConfig struct { } } -// Similar to ec2.Filter, but the filter values can be read from tests -// (ec2.Filter only has private members) -type ec2InstanceFilter struct { - PrivateDNSName string -} - -// True if the passed instance matches the filter -func (f *ec2InstanceFilter) Matches(instance *ec2.Instance) bool { - if f.PrivateDNSName != "" && orEmpty(instance.PrivateDNSName) != f.PrivateDNSName { - return false - } - return true -} - // awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go type awsSdkEC2 struct { ec2 *ec2.EC2 @@ -240,39 +231,29 @@ func newEc2Filter(name string, value string) *ec2.Filter { } // Implementation of EC2.Instances -func (self *awsSdkEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp []*ec2.Instance, err error) { - var filters []*ec2.Filter - if filter != nil && filter.PrivateDNSName != "" { - filters = []*ec2.Filter{ - newEc2Filter("private-dns-name", filter.PrivateDNSName), - } - } - - fetchedInstances := []*ec2.Instance{} +func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) { + // Instances are paged + results := []*ec2.Instance{} var nextToken *string for { - res, err := self.ec2.DescribeInstances(&ec2.DescribeInstancesInput{ - InstanceIDs: stringPointerArray(instanceIds), - Filters: filters, - NextToken: nextToken, - }) - + response, err := self.ec2.DescribeInstances(request) if err != nil { - return nil, err + return nil, fmt.Errorf("error listing AWS instances: %v", err) } - for _, reservation := range res.Reservations { - fetchedInstances = append(fetchedInstances, reservation.Instances...) + for _, reservation := range response.Reservations { + results = append(results, reservation.Instances...) } - nextToken = res.NextToken + nextToken = response.NextToken if isNilOrEmpty(nextToken) { break } + request.NextToken = nextToken } - return fetchedInstances, nil + return results, nil } type awsSdkMetadata struct { @@ -307,36 +288,16 @@ func (self *awsSdkMetadata) GetMetaData(key string) ([]byte, error) { } // Implements EC2.DescribeSecurityGroups -func (s *awsSdkEC2) DescribeSecurityGroups(securityGroupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) { - filters := []*ec2.Filter{} - if filterName != "" { - filters = append(filters, newEc2Filter("group-name", filterName)) - } - if filterVPCId != "" { - filters = append(filters, newEc2Filter("vpc-id", filterVPCId)) - } - - request := &ec2.DescribeSecurityGroupsInput{} - if len(securityGroupIds) != 0 { - request.GroupIDs = []*string{} - for _, securityGroupId := range securityGroupIds { - request.GroupIDs = append(request.GroupIDs, &securityGroupId) - } - } - if len(filters) != 0 { - request.Filters = filters - } - +func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { + // Security groups are not paged response, err := s.ec2.DescribeSecurityGroups(request) if err != nil { - glog.Error("error describing groups: ", err) - return nil, err + return nil, fmt.Errorf("error listing AWS security groups: %v", err) } return response.SecurityGroups, nil } func (s *awsSdkEC2) AttachVolume(volumeID, instanceId, device string) (resp *ec2.VolumeAttachment, err error) { - request := ec2.AttachVolumeInput{ Device: &device, InstanceID: &instanceId, @@ -349,11 +310,28 @@ func (s *awsSdkEC2) DetachVolume(request *ec2.DetachVolumeInput) (*ec2.VolumeAtt return s.ec2.DetachVolume(request) } -func (s *awsSdkEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) { - request := ec2.DescribeVolumesInput{ - VolumeIDs: stringPointerArray(volumeIDs), +func (s *awsSdkEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) { + // Volumes are paged + results := []*ec2.Volume{} + var nextToken *string + + for { + response, err := s.ec2.DescribeVolumes(request) + + if err != nil { + return nil, fmt.Errorf("error listing AWS volumes: %v", err) + } + + results = append(results, response.Volumes...) + + nextToken = response.NextToken + if isNilOrEmpty(nextToken) { + break + } + request.NextToken = nextToken } - return s.ec2.DescribeVolumes(&request) + + return results, nil } func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) { @@ -365,22 +343,44 @@ func (s *awsSdkEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, return s.ec2.DeleteVolume(&request) } -func (s *awsSdkEC2) DescribeVPCs(request *ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { - return s.ec2.DescribeVPCs(request) +func (s *awsSdkEC2) DescribeVPCs(request *ec2.DescribeVPCsInput) ([]*ec2.VPC, error) { + // VPCs are not paged + response, err := s.ec2.DescribeVPCs(request) + if err != nil { + return nil, fmt.Errorf("error listing AWS VPCs: %v", err) + } + return response.VPCs, nil } -func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { - return s.ec2.DescribeSubnets(request) +func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) { + // Subnets are not paged + response, err := s.ec2.DescribeSubnets(request) + if err != nil { + return nil, fmt.Errorf("error listing AWS subnets: %v", err) + } + return response.Subnets, nil } func (s *awsSdkEC2) CreateSecurityGroup(request *ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) { return s.ec2.CreateSecurityGroup(request) } +func (s *awsSdkEC2) DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) { + return s.ec2.DeleteSecurityGroup(request) +} + func (s *awsSdkEC2) AuthorizeSecurityGroupIngress(request *ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { return s.ec2.AuthorizeSecurityGroupIngress(request) } +func (s *awsSdkEC2) RevokeSecurityGroupIngress(request *ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) { + return s.ec2.RevokeSecurityGroupIngress(request) +} + +func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { + return s.ec2.CreateTags(request) +} + func init() { cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { creds := credentials.NewChainCredentials( @@ -597,10 +597,15 @@ func (aws *AWSCloud) InstanceID(name string) (string, error) { // Return the instances matching the relevant private dns name. func (s *AWSCloud) getInstanceByDnsName(name string) (*ec2.Instance, error) { - f := &ec2InstanceFilter{} - f.PrivateDNSName = name + filters := []*ec2.Filter{ + newEc2Filter("private-dns-name", name), + } + filters = s.addFilters(filters) + request := &ec2.DescribeInstancesInput{ + Filters: filters, + } - instances, err := s.ec2.Instances(nil, f) + instances, err := s.ec2.DescribeInstances(request) if err != nil { return nil, err } @@ -650,8 +655,14 @@ func isAlive(instance *ec2.Instance) bool { } // Return a list of instances matching regex string. -func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { - instances, err := aws.ec2.Instances(nil, nil) +func (s *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { + filters := []*ec2.Filter{} + filters = s.addFilters(filters) + request := &ec2.DescribeInstancesInput{ + Filters: filters, + } + + instances, err := s.ec2.DescribeInstances(request) if err != nil { return []string{}, err } @@ -930,9 +941,14 @@ func (self *awsInstance) getInstanceType() *awsInstanceType { // Gets the full information about this instance from the EC2 API func (self *awsInstance) getInfo() (*ec2.Instance, error) { - instances, err := self.ec2.Instances([]string{self.awsID}, nil) + instanceID := self.awsID + request := &ec2.DescribeInstancesInput{ + InstanceIDs: []*string{&instanceID}, + } + + instances, err := self.ec2.DescribeInstances(request) if err != nil { - return nil, fmt.Errorf("error querying ec2 for instance info: %v", err) + return nil, err } if len(instances) == 0 { return nil, fmt.Errorf("no instances found for instance: %s", self.awsID) @@ -1061,17 +1077,23 @@ func newAWSDisk(ec2 EC2, name string) (*awsDisk, error) { // Gets the full information about this volume from the EC2 API func (self *awsDisk) getInfo() (*ec2.Volume, error) { - resp, err := self.ec2.Volumes([]string{self.awsID}, nil) + volumeID := self.awsID + + request := &ec2.DescribeVolumesInput{ + VolumeIDs: []*string{&volumeID}, + } + + volumes, err := self.ec2.DescribeVolumes(request) if err != nil { return nil, fmt.Errorf("error querying ec2 for volume info: %v", err) } - if len(resp.Volumes) == 0 { + if len(volumes) == 0 { return nil, fmt.Errorf("no volumes found for volume: %s", self.awsID) } - if len(resp.Volumes) > 1 { + if len(volumes) > 1 { return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsID) } - return resp.Volumes[0], nil + return volumes[0], nil } func (self *awsDisk) waitForAttachmentStatus(status string) error { @@ -1259,6 +1281,9 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { // Implements Volumes.CreateVolume func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { + // TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?) + // This is only used for testing right now + request := &ec2.CreateVolumeInput{} request.AvailabilityZone = &aws.availabilityZone volSize := (int64(volumeOptions.CapacityMB) + 1023) / 1024 @@ -1340,20 +1365,16 @@ func (self *AWSCloud) TCPLoadBalancerExists(name, region string) (bool, error) { func (self *AWSCloud) findVPC() (*ec2.VPC, error) { request := &ec2.DescribeVPCsInput{} - // TODO: How do we want to identify our VPC? Issue #6006 name := "kubernetes-vpc" - request.Filters = []*ec2.Filter{newEc2Filter("tag:Name", name)} + filters := []*ec2.Filter{newEc2Filter("tag:Name", name)} + request.Filters = self.addFilters(filters) - response, err := self.ec2.DescribeVPCs(request) + vpcs, err := self.ec2.DescribeVPCs(request) if err != nil { glog.Error("error listing VPCs", err) return nil, err } - vpcs := response.VPCs - if err != nil { - return nil, err - } if len(vpcs) == 0 { return nil, nil } @@ -1363,71 +1384,120 @@ func (self *AWSCloud) findVPC() (*ec2.VPC, error) { return nil, fmt.Errorf("Found multiple matching VPCs for name: %s", name) } -// Makes sure the security group allows ingress on the specified ports (with sourceIp & protocol) +// Retrieves the specified security group from the AWS API, or returns nil if not found +func (s *AWSCloud) findSecurityGroup(securityGroupId string) (*ec2.SecurityGroup, error) { + describeSecurityGroupsRequest := &ec2.DescribeSecurityGroupsInput{ + GroupIDs: []*string{&securityGroupId}, + } + + groups, err := s.ec2.DescribeSecurityGroups(describeSecurityGroupsRequest) + if err != nil { + glog.Warning("error retrieving security group", err) + return nil, err + } + + if len(groups) == 0 { + return nil, nil + } + if len(groups) != 1 { + // This should not be possible - ids should be unique + return nil, fmt.Errorf("multiple security groups found with same id") + } + group := groups[0] + return group, nil +} + +func isEqualIntPointer(l, r *int64) bool { + if l == nil { + return r == nil + } + if r == nil { + return l == nil + } + return *l == *r +} +func isEqualStringPointer(l, r *string) bool { + if l == nil { + return r == nil + } + if r == nil { + return l == nil + } + return *l == *r +} + +func isEqualIPPermission(l, r *ec2.IPPermission) bool { + if !isEqualIntPointer(l.FromPort, r.FromPort) { + return false + } + if !isEqualIntPointer(l.ToPort, r.ToPort) { + return false + } + if !isEqualStringPointer(l.IPProtocol, r.IPProtocol) { + return false + } + if len(l.IPRanges) != len(r.IPRanges) { + return false + } + for j := range l.IPRanges { + if !isEqualStringPointer(l.IPRanges[j].CIDRIP, r.IPRanges[j].CIDRIP) { + return false + } + } + + if len(l.UserIDGroupPairs) != len(r.UserIDGroupPairs) { + return false + } + for j := range l.UserIDGroupPairs { + if !isEqualStringPointer(l.UserIDGroupPairs[j].GroupID, r.UserIDGroupPairs[j].GroupID) { + return false + } + if !isEqualStringPointer(l.UserIDGroupPairs[j].UserID, r.UserIDGroupPairs[j].UserID) { + return false + } + } + + return true +} + +// Makes sure the security group includes the specified permissions // Returns true iff changes were made // The security group must already exist -func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp string, ports []*api.ServicePort) (bool, error) { - groups, err := s.ec2.DescribeSecurityGroups([]string{securityGroupId}, "", "") +func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, addPermissions []*ec2.IPPermission) (bool, error) { + group, err := s.findSecurityGroup(securityGroupId) if err != nil { glog.Warning("error retrieving security group", err) return false, err } - if len(groups) == 0 { - // We require that the security group already exist - return false, fmt.Errorf("security group not found") + if group == nil { + return false, fmt.Errorf("security group not found: %s", securityGroupId) } - if len(groups) != 1 { - // This should not be possible - ids should be unique - return false, fmt.Errorf("multiple security groups found with same id") - } - group := groups[0] - newPermissions := []*ec2.IPPermission{} - - for _, port := range ports { + changes := []*ec2.IPPermission{} + for _, addPermission := range addPermissions { found := false - portInt64 := int64(port.Port) - protocol := strings.ToLower(string(port.Protocol)) - for _, permission := range group.IPPermissions { - if permission.FromPort == nil || *permission.FromPort != portInt64 { - continue + for _, groupPermission := range group.IPPermissions { + if isEqualIPPermission(addPermission, groupPermission) { + found = true + break } - if permission.ToPort == nil || *permission.ToPort != portInt64 { - continue - } - if permission.IPProtocol == nil || *permission.IPProtocol != protocol { - continue - } - if len(permission.IPRanges) != 1 { - continue - } - if orEmpty(permission.IPRanges[0].CIDRIP) != sourceIp { - continue - } - found = true - break } if !found { - newPermission := &ec2.IPPermission{} - newPermission.FromPort = &portInt64 - newPermission.ToPort = &portInt64 - newPermission.IPRanges = []*ec2.IPRange{{CIDRIP: &sourceIp}} - newPermission.IPProtocol = &protocol - - newPermissions = append(newPermissions, newPermission) + changes = append(changes, addPermission) } } - if len(newPermissions) == 0 { + if len(changes) == 0 { return false, nil } + glog.V(2).Infof("Adding security group ingress: %s %v", securityGroupId, changes) + request := &ec2.AuthorizeSecurityGroupIngressInput{} request.GroupID = &securityGroupId - request.IPPermissions = newPermissions - + request.IPPermissions = changes _, err = s.ec2.AuthorizeSecurityGroupIngress(request) if err != nil { glog.Warning("error authorizing security group ingress", err) @@ -1437,6 +1507,111 @@ func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp st return true, nil } +// Makes sure the security group no longer includes the specified permissions +// Returns true iff changes were made +// Returns true if the security group no longer exists +func (s *AWSCloud) removeSecurityGroupIngess(securityGroupId string, removePermissions []*ec2.IPPermission) (bool, error) { + group, err := s.findSecurityGroup(securityGroupId) + if err != nil { + glog.Warning("error retrieving security group", err) + return false, err + } + + if group == nil { + glog.Warning("security group not found: ", securityGroupId) + return false, nil + } + + changes := []*ec2.IPPermission{} + for _, removePermission := range removePermissions { + found := false + for _, groupPermission := range group.IPPermissions { + if isEqualIPPermission(groupPermission, removePermission) { + found = true + break + } + } + + if found { + changes = append(changes, removePermission) + } + } + + if len(changes) == 0 { + return false, nil + } + + glog.V(2).Infof("Removing security group ingress: %s %v", securityGroupId, changes) + + request := &ec2.RevokeSecurityGroupIngressInput{} + request.GroupID = &securityGroupId + request.IPPermissions = changes + _, err = s.ec2.RevokeSecurityGroupIngress(request) + if err != nil { + glog.Warning("error revoking security group ingress", err) + return false, err + } + + return true, nil +} + +// Makes sure the security group exists +// Returns the security group id or error +func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID string) (string, error) { + request := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{ + newEc2Filter("group-name", name), + newEc2Filter("vpc-id", vpcID), + } + request.Filters = s.addFilters(filters) + + securityGroups, err := s.ec2.DescribeSecurityGroups(request) + if err != nil { + return "", err + } + + if len(securityGroups) >= 1 { + if len(securityGroups) > 1 { + glog.Warning("Found multiple security groups with name:", name) + } + return orEmpty(securityGroups[0].GroupID), nil + } + + createRequest := &ec2.CreateSecurityGroupInput{} + createRequest.VPCID = &vpcID + createRequest.GroupName = &name + createRequest.Description = &description + + createResponse, err := s.ec2.CreateSecurityGroup(createRequest) + if err != nil { + glog.Error("error creating security group: ", err) + return "", err + } + + groupID := orEmpty(createResponse.GroupID) + if groupID == "" { + return "", fmt.Errorf("created security group, but id was not returned: %s", name) + } + + tags := []*ec2.Tag{} + for k, v := range s.filterTags { + tag := &ec2.Tag{} + tag.Key = aws.String(k) + tag.Value = aws.String(v) + tags = append(tags, tag) + } + + tagRequest := &ec2.CreateTagsInput{} + tagRequest.Resources = []*string{&groupID} + tagRequest.Tags = tags + _, err = s.ec2.CreateTags(tagRequest) + if err != nil { + // Not clear how to recover fully from this; we're OK because we don't match on tags, but that is a little odd + return "", fmt.Errorf("error tagging security group: %v", err) + } + return groupID, nil +} + // CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer // TODO(justinsb): This must be idempotent // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. @@ -1477,16 +1652,17 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p request := &ec2.DescribeSubnetsInput{} filters := []*ec2.Filter{} filters = append(filters, newEc2Filter("vpc-id", orEmpty(vpc.VPCID))) + filters = s.addFilters(filters) request.Filters = filters - response, err := s.ec2.DescribeSubnets(request) + subnets, err := s.ec2.DescribeSubnets(request) if err != nil { glog.Error("error describing subnets: ", err) return nil, err } // zones := []string{} - for _, subnet := range response.Subnets { + for _, subnet := range subnets { subnetIds = append(subnetIds, subnet.SubnetID) if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) @@ -1496,10 +1672,41 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p } } - // Build the load balancer itself - var loadBalancerName, dnsName *string + // Create a security group for the load balancer + var securityGroupID string { - loadBalancer, err := s.describeLoadBalancer(region, name) + sgName := "k8s-elb-" + name + sgDescription := "Security group for Kubernetes ELB " + name + securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription, orEmpty(vpc.VPCID)) + if err != nil { + glog.Error("error creating load balancer security group: ", err) + return nil, err + } + + permissions := []*ec2.IPPermission{} + for _, port := range ports { + portInt64 := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + sourceIp := "0.0.0.0/0" + + permission := &ec2.IPPermission{} + permission.FromPort = &portInt64 + permission.ToPort = &portInt64 + permission.IPRanges = []*ec2.IPRange{{CIDRIP: &sourceIp}} + permission.IPProtocol = &protocol + + permissions = append(permissions, permission) + } + _, err = s.ensureSecurityGroupIngess(securityGroupID, permissions) + if err != nil { + return nil, err + } + } + + // Build the load balancer itself + var loadBalancer *elb.LoadBalancerDescription + { + loadBalancer, err = s.describeLoadBalancer(region, name) if err != nil { return nil, err } @@ -1529,68 +1736,36 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p createRequest.Listeners = listeners - // TODO: Should we use a better identifier (the kubernetes uuid?) - // We are supposed to specify one subnet per AZ. // TODO: What happens if we have more than one subnet per AZ? createRequest.Subnets = subnetIds - sgName := "k8s-elb-" + name - sgDescription := "Security group for Kubernetes ELB " + name + createRequest.SecurityGroups = []*string{&securityGroupID} - { - // TODO: Should we do something more reliable ?? .Where("tag:kubernetes-id", kubernetesId) - securityGroups, err := s.ec2.DescribeSecurityGroups(nil, sgName, orEmpty(vpc.VPCID)) - if err != nil { - return nil, err - } - var securityGroupId *string - for _, securityGroup := range securityGroups { - if securityGroupId != nil { - glog.Warning("Found multiple security groups with name:", sgName) - } - securityGroupId = securityGroup.GroupID - } - if securityGroupId == nil { - createSecurityGroupRequest := &ec2.CreateSecurityGroupInput{} - createSecurityGroupRequest.VPCID = vpc.VPCID - createSecurityGroupRequest.GroupName = &sgName - createSecurityGroupRequest.Description = &sgDescription - - createSecurityGroupResponse, err := s.ec2.CreateSecurityGroup(createSecurityGroupRequest) - if err != nil { - glog.Error("error creating security group: ", err) - return nil, err - } - - securityGroupId = createSecurityGroupResponse.GroupID - if isNilOrEmpty(securityGroupId) { - return nil, fmt.Errorf("created security group, but id was not returned") - } - } - _, err = s.ensureSecurityGroupIngess(*securityGroupId, "0.0.0.0/0", ports) - if err != nil { - return nil, err - } - createRequest.SecurityGroups = []*string{securityGroupId} - } - - glog.Info("Creating load balancer with name: ", createRequest.LoadBalancerName) - createResponse, err := elbClient.CreateLoadBalancer(createRequest) + glog.Info("Creating load balancer with name: ", name) + _, err := elbClient.CreateLoadBalancer(createRequest) if err != nil { return nil, err } - dnsName = createResponse.DNSName - loadBalancerName = createRequest.LoadBalancerName + + loadBalancer, err = s.describeLoadBalancer(region, name) + if err != nil { + glog.Warning("Unable to retrieve load balancer immediately after creation") + return nil, err + } } else { // TODO: Verify that load balancer configuration matches? - dnsName = loadBalancer.DNSName - loadBalancerName = loadBalancer.LoadBalancerName } } + err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) + if err != nil { + glog.Warning("Error opening ingress rules for the load balancer to the instances: ", err) + return nil, err + } + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.LoadBalancerName = loadBalancerName + registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName for _, instance := range instances { registerInstance := &elb.Instance{} registerInstance.InstanceID = instance.InstanceID @@ -1601,14 +1776,15 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p if err != nil { // TODO: Is it better to delete the load balancer entirely? glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) + return nil, err } glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) - glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, dnsName) + glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName)) // TODO: Wait for creation? - status := toStatus(loadBalancerName, dnsName) + status := toStatus(loadBalancer) return status, nil } @@ -1623,26 +1799,157 @@ func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerSta return nil, false, nil } - status := toStatus(lb.LoadBalancerName, lb.DNSName) + status := toStatus(lb) return status, true, nil } -func toStatus(loadBalancerName *string, dnsName *string) *api.LoadBalancerStatus { +func toStatus(lb *elb.LoadBalancerDescription) *api.LoadBalancerStatus { status := &api.LoadBalancerStatus{} - if !isNilOrEmpty(dnsName) { + if !isNilOrEmpty(lb.DNSName) { var ingress api.LoadBalancerIngress - ingress.Hostname = *dnsName + ingress.Hostname = orEmpty(lb.DNSName) status.Ingress = []api.LoadBalancerIngress{ingress} } return status } +// Returns the first security group for an instance, or nil +// We only create instances with one security group, so we warn if there are multiple or none +func findSecurityGroupForInstance(instance *ec2.Instance) *string { + var securityGroupId *string + for _, securityGroup := range instance.SecurityGroups { + if securityGroup == nil || securityGroup.GroupID == nil { + // Not expected, but avoid panic + glog.Warning("Unexpected empty security group for instance: ", orEmpty(instance.InstanceID)) + continue + } + + if securityGroupId != nil { + // We create instances with one SG + glog.Warning("Multiple security groups found for instance (%s); will use first group (%s)", orEmpty(instance.InstanceID), *securityGroupId) + continue + } else { + securityGroupId = securityGroup.GroupID + } + } + + if securityGroupId == nil { + glog.Warning("No security group found for instance ", orEmpty(instance.InstanceID)) + } + + return securityGroupId +} + +// Open security group ingress rules on the instances so that the load balancer can talk to them +// Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances +func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, allInstances []*ec2.Instance) error { + // Determine the load balancer security group id + loadBalancerSecurityGroupId := "" + for _, securityGroup := range lb.SecurityGroups { + if isNilOrEmpty(securityGroup) { + continue + } + if loadBalancerSecurityGroupId != "" { + // We create LBs with one SG + glog.Warning("Multiple security groups for load balancer: ", orEmpty(lb.LoadBalancerName)) + } + loadBalancerSecurityGroupId = *securityGroup + } + if loadBalancerSecurityGroupId == "" { + return fmt.Errorf("Could not determine security group for load balancer: %s", orEmpty(lb.LoadBalancerName)) + } + + // Get the actual list of groups that allow ingress from the load-balancer + describeRequest := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{} + filters = append(filters, newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupId)) + describeRequest.Filters = s.addFilters(filters) + actualGroups, err := s.ec2.DescribeSecurityGroups(describeRequest) + if err != nil { + return fmt.Errorf("error querying security groups: %v", err) + } + + // Open the firewall from the load balancer to the instance + // We don't actually have a trivial way to know in advance which security group the instance is in + // (it is probably the minion security group, but we don't easily have that). + // However, we _do_ have the list of security groups on the instance records. + + // Map containing the changes we want to make; true to add, false to remove + instanceSecurityGroupIds := map[string]bool{} + + // Scan instances for groups we want open + for _, instance := range allInstances { + securityGroupId := findSecurityGroupForInstance(instance) + if isNilOrEmpty(securityGroupId) { + glog.Warning("ignoring instance without security group: ", orEmpty(instance.InstanceID)) + continue + } + + instanceSecurityGroupIds[*securityGroupId] = true + } + + // Compare to actual groups + for _, actualGroup := range actualGroups { + if isNilOrEmpty(actualGroup.GroupID) { + glog.Warning("ignoring group without ID: ", actualGroup) + continue + } + + actualGroupID := *actualGroup.GroupID + + adding, found := instanceSecurityGroupIds[actualGroupID] + if found && adding { + // We don't need to make a change; the permission is already in place + delete(instanceSecurityGroupIds, actualGroupID) + } else { + // This group is not needed by allInstances; delete it + instanceSecurityGroupIds[actualGroupID] = false + } + } + + for instanceSecurityGroupId, add := range instanceSecurityGroupIds { + if add { + glog.V(2).Infof("Adding rule for traffic from the load balancer (%s) to instances (%s)", loadBalancerSecurityGroupId, instanceSecurityGroupId) + } else { + glog.V(2).Infof("Removing rule for traffic from the load balancer (%s) to instance (%s)", loadBalancerSecurityGroupId, instanceSecurityGroupId) + } + sourceGroupId := &ec2.UserIDGroupPair{} + sourceGroupId.GroupID = &loadBalancerSecurityGroupId + + allProtocols := "-1" + + permission := &ec2.IPPermission{} + permission.IPProtocol = &allProtocols + permission.UserIDGroupPairs = []*ec2.UserIDGroupPair{sourceGroupId} + + permissions := []*ec2.IPPermission{permission} + + if add { + changed, err := s.ensureSecurityGroupIngess(instanceSecurityGroupId, permissions) + if err != nil { + return err + } + if !changed { + glog.Warning("allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupId) + } + } else { + changed, err := s.removeSecurityGroupIngess(instanceSecurityGroupId, permissions) + if err != nil { + return err + } + if !changed { + glog.Warning("revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupId) + } + } + } + + return nil +} + // EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { - // TODO(justinsb): Delete security group - elbClient, err := s.getELBClient(region) if err != nil { return err @@ -1658,15 +1965,46 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return nil } - request := &elb.DeleteLoadBalancerInput{} - request.LoadBalancerName = lb.LoadBalancerName - - _, err = elbClient.DeleteLoadBalancer(request) - if err != nil { - // TODO: Check if error was because load balancer was concurrently deleted - glog.Error("error deleting load balancer: ", err) - return err + { + // De-authorize the load balancer security group from the instances security group + err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, nil) + if err != nil { + glog.Error("error deregistering load balancer from instance security groups: ", err) + return err + } } + + { + // Delete the load balancer itself + request := &elb.DeleteLoadBalancerInput{} + request.LoadBalancerName = lb.LoadBalancerName + + _, err = elbClient.DeleteLoadBalancer(request) + if err != nil { + // TODO: Check if error was because load balancer was concurrently deleted + glog.Error("error deleting load balancer: ", err) + return err + } + } + + { + // Delete the security group + for _, securityGroupId := range lb.SecurityGroups { + if isNilOrEmpty(securityGroupId) { + glog.Warning("Ignoring empty security group in ", name) + continue + } + + request := &ec2.DeleteSecurityGroupInput{} + request.GroupID = securityGroupId + _, err := s.ec2.DeleteSecurityGroup(request) + if err != nil { + glog.Errorf("error deleting security group (%s): %v", orEmpty(securityGroupId), err) + return err + } + } + } + return nil } @@ -1738,6 +2076,11 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er } } + err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) + if err != nil { + return err + } + return nil } @@ -1756,3 +2099,12 @@ func (a *AWSCloud) getInstancesByDnsNames(names []string) ([]*ec2.Instance, erro } return instances, nil } + +// Add additional filters, to match on our tags +// This lets us run multiple k8s clusters in a single EC2 AZ +func (s *AWSCloud) addFilters(filters []*ec2.Filter) []*ec2.Filter { + for k, v := range s.filterTags { + filters = append(filters, newEc2Filter("tag:"+k, v)) + } + return filters +} diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index 7b3ccd6266a..469b8e352da 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/golang/glog" ) const TestClusterId = "clusterid.test" @@ -234,23 +235,58 @@ type FakeEC2 struct { aws *FakeAWSServices } -func contains(haystack []string, needle string) bool { +func contains(haystack []*string, needle string) bool { for _, s := range haystack { - if needle == s { + // (deliberately panic if s == nil) + if needle == *s { return true } } return false } -func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) { +func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { + name := *filter.Name + if name == "private-dns-name" { + if instance.PrivateDNSName == nil { + return false + } + return contains(filter.Values, *instance.PrivateDNSName) + } + panic("Unknown filter name: " + name) +} + +func (self *FakeEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) { matches := []*ec2.Instance{} for _, instance := range self.aws.instances { - if filter != nil && !filter.Matches(instance) { - continue + if request.InstanceIDs != nil { + if instance.InstanceID == nil { + glog.Warning("Instance with no instance id: ", instance) + continue + } + + found := false + for _, instanceId := range request.InstanceIDs { + if *instanceId == *instance.InstanceID { + found = true + break + } + } + if !found { + continue + } } - if instanceIds != nil && !contains(instanceIds, *instance.InstanceID) { - continue + if request.Filters != nil { + allMatch := true + for _, filter := range request.Filters { + if !instanceMatchesFilter(instance, filter) { + allMatch = false + break + } + } + if !allMatch { + continue + } } matches = append(matches, instance) } @@ -280,7 +316,7 @@ func (ec2 *FakeEC2) DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.Volu panic("Not implemented") } -func (ec2 *FakeEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) { +func (ec2 *FakeEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) { panic("Not implemented") } @@ -292,7 +328,7 @@ func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, panic("Not implemented") } -func (ec2 *FakeEC2) DescribeSecurityGroups(groupIds []string, filterName string, filterVpcId string) ([]*ec2.SecurityGroup, error) { +func (ec2 *FakeEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { panic("Not implemented") } @@ -300,15 +336,27 @@ func (ec2 *FakeEC2) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.Cre panic("Not implemented") } +func (ec2 *FakeEC2) DeleteSecurityGroup(*ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) { + panic("Not implemented") +} + func (ec2 *FakeEC2) AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { panic("Not implemented") } -func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { +func (ec2 *FakeEC2) RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) { panic("Not implemented") } -func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { +func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) { + panic("Not implemented") +} + +func (ec2 *FakeEC2) CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { panic("Not implemented") }