diff --git a/cluster/aws/util.sh b/cluster/aws/util.sh index 1c10ea70c80..116014004cc 100644 --- a/cluster/aws/util.sh +++ b/cluster/aws/util.sh @@ -44,6 +44,9 @@ INTERNAL_IP_BASE=172.20.0 MASTER_IP_SUFFIX=.9 MASTER_INTERNAL_IP=${INTERNAL_IP_BASE}${MASTER_IP_SUFFIX} +MASTER_SG_NAME="kubernetes-master-${CLUSTER_ID}" +MINION_SG_NAME="kubernetes-minion-${CLUSTER_ID}" + function json_val { python -c 'import json,sys;obj=json.load(sys.stdin);print obj'$1'' } @@ -102,6 +105,17 @@ function get_instance_private_ip { --query Reservations[].Instances[].NetworkInterfaces[0].PrivateIpAddress } +# Gets a security group id, by name ($1) +function get_security_group_id { + local name=$1 + $AWS_CMD --output text describe-security-groups \ + --filters Name=vpc-id,Values=${VPC_ID} \ + Name=group-name,Values=${name} \ + Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \ + --query SecurityGroups[].GroupId \ + | tr "\t" "\n" +} + function detect-master () { KUBE_MASTER=${MASTER_NAME} if [[ -z "${KUBE_MASTER_IP-}" ]]; then @@ -132,6 +146,27 @@ function detect-minions () { fi } +function detect-security-groups { + if [[ -z "${MASTER_SG_ID-}" ]]; then + MASTER_SG_ID=$(get_security_group_id "${MASTER_SG_NAME}") + if [[ -z "${MASTER_SG_ID}" ]]; then + echo "Could not detect Kubernetes master security group. Make sure you've launched a cluster with 'kube-up.sh'" + exit 1 + else + echo "Using master security group: ${MASTER_SG_NAME} ${MASTER_SG_ID}" + fi + fi + if [[ -z "${MINION_SG_ID-}" ]]; then + MINION_SG_ID=$(get_security_group_id "${MINION_SG_NAME}") + if [[ -z "${MINION_SG_ID}" ]]; then + echo "Could not detect Kubernetes minion security group. Make sure you've launched a cluster with 'kube-up.sh'" + exit 1 + else + echo "Using minion security group: ${MINION_SG_NAME} ${MINION_SG_ID}" + fi + fi +} + # Detects the AMI to use (considering the region) # # Vars set: @@ -224,6 +259,43 @@ function import-public-key { fi } +# Robustly try to create a security group, if it does not exist. +# $1: The name of security group; will be created if not exists +# $2: Description for security group (used if created) +# +# Note that this doesn't actually return the sgid; we need to re-query +function create-security-group { + local -r name=$1 + local -r description=$2 + + local sgid=$(get_security_group_id "${name}") + if [[ -z "$sgid" ]]; then + echo "Creating security group ${name}." + sgid=$($AWS_CMD create-security-group --group-name "${name}" --description "${description}" --vpc-id "${VPC_ID}" --query GroupId --output text) + add-tag $sgid KubernetesCluster ${CLUSTER_ID} + fi +} + +# Authorize ingress to a security group. +# Attempts to be idempotent, though we end up checking the output looking for error-strings. +# $1 group-id +# $2.. arguments to pass to authorize-security-group-ingress +function authorize-security-group-ingress { + local -r sgid=$1 + shift + local ok=1 + local output="" + output=$($AWS_CMD authorize-security-group-ingress --group-id "${sgid}" $@ 2>&1) || ok=0 + if [[ ${ok} == 0 ]]; then + # Idempotency: ignore if duplicate rule + if [[ "${output}" != *"InvalidPermission.Duplicate"* ]]; then + echo "Error creating security group ingress rule" + echo "Output: ${output}" + exit 1 + fi + fi +} + # Verify prereqs function verify-prereqs { if [[ "$(which aws)" == "" ]]; then @@ -541,17 +613,39 @@ function kube-up { echo "Using Route Table $ROUTE_TABLE_ID" - SEC_GROUP_ID=$($AWS_CMD --output text describe-security-groups \ - --filters Name=vpc-id,Values=$VPC_ID \ - Name=group-name,Values=kubernetes-sec-group \ - --query SecurityGroups[].GroupId \ - | tr "\t" "\n") - - if [[ -z "$SEC_GROUP_ID" ]]; then - echo "Creating security group." - SEC_GROUP_ID=$($AWS_CMD create-security-group --group-name kubernetes-sec-group --description kubernetes-sec-group --vpc-id $VPC_ID | json_val '["GroupId"]') - $AWS_CMD authorize-security-group-ingress --group-id $SEC_GROUP_ID --protocol -1 --port all --cidr 0.0.0.0/0 > $LOG + # Create security groups + MASTER_SG_ID=$(get_security_group_id "${MASTER_SG_NAME}") + if [[ -z "${MASTER_SG_ID}" ]]; then + echo "Creating master security group." + create-security-group "${MASTER_SG_NAME}" "Kubernetes security group applied to master nodes" fi + MINION_SG_ID=$(get_security_group_id "${MINION_SG_NAME}") + if [[ -z "${MINION_SG_ID}" ]]; then + echo "Creating minion security group." + create-security-group "${MINION_SG_NAME}" "Kubernetes security group applied to minion nodes" + fi + + detect-security-groups + + # Masters can talk to master + authorize-security-group-ingress "${MASTER_SG_ID}" "--source-group ${MASTER_SG_ID} --protocol all" + + # Minions can talk to minions + authorize-security-group-ingress "${MINION_SG_ID}" "--source-group ${MINION_SG_ID} --protocol all" + + # Masters and minions can talk to each other + authorize-security-group-ingress "${MASTER_SG_ID}" "--source-group ${MINION_SG_ID} --protocol all" + authorize-security-group-ingress "${MINION_SG_ID}" "--source-group ${MASTER_SG_ID} --protocol all" + + # TODO(justinsb): Would be fairly easy to replace 0.0.0.0/0 in these rules + + # SSH is open to the world + authorize-security-group-ingress "${MASTER_SG_ID}" "--protocol tcp --port 22 --cidr 0.0.0.0/0" + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 22 --cidr 0.0.0.0/0" + + # HTTPS to the master is allowed (for API access) + authorize-security-group-ingress "${MASTER_SG_ID}" "--protocol tcp --port 443 --cidr 0.0.0.0/0" + ( # We pipe this to the ami as a startup script in the user-data field. Requires a compatible ami echo "#! /bin/bash" @@ -595,7 +689,7 @@ function kube-up { --subnet-id $SUBNET_ID \ --private-ip-address $MASTER_INTERNAL_IP \ --key-name ${AWS_SSH_KEY_NAME} \ - --security-group-ids $SEC_GROUP_ID \ + --security-group-ids ${MASTER_SG_ID} \ --associate-public-ip-address \ --user-data file://${KUBE_TEMP}/master-start.sh | json_val '["Instances"][0]["InstanceId"]') add-tag $master_id Name $MASTER_NAME @@ -676,7 +770,7 @@ function kube-up { --subnet-id $SUBNET_ID \ --private-ip-address $INTERNAL_IP_BASE.1${i} \ --key-name ${AWS_SSH_KEY_NAME} \ - --security-group-ids $SEC_GROUP_ID \ + --security-group-ids ${MINION_SG_ID} \ ${public_ip_option} \ --user-data "file://${KUBE_TEMP}/minion-user-data-${i}" | json_val '["Instances"][0]["InstanceId"]') @@ -848,18 +942,38 @@ function kube-down { echo "Deleting VPC: ${vpc_id}" default_sg_id=$($AWS_CMD --output text describe-security-groups \ - --filters Name=vpc-id,Values=${vpc_id} Name=group-name,Values=default \ + --filters Name=vpc-id,Values=${vpc_id} \ + Name=group-name,Values=default \ --query SecurityGroups[].GroupId \ | tr "\t" "\n") sg_ids=$($AWS_CMD --output text describe-security-groups \ --filters Name=vpc-id,Values=${vpc_id} \ + Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \ --query SecurityGroups[].GroupId \ | tr "\t" "\n") + # First delete any inter-security group ingress rules + # (otherwise we get dependency violations) for sg_id in ${sg_ids}; do # EC2 doesn't let us delete the default security group - if [[ "${sg_id}" != "${default_sg_id}" ]]; then - $AWS_CMD delete-security-group --group-id ${sg_id} > $LOG + if [[ "${sg_id}" == "${default_sg_id}" ]]; then + continue fi + + echo "Cleaning up security group: ${sg_id}" + other_sgids=$(aws ec2 describe-security-groups --group-id "${sg_id}" --query SecurityGroups[].IpPermissions[].UserIdGroupPairs[].GroupId --output text) + for other_sgid in ${other_sgids}; do + $AWS_CMD revoke-security-group-ingress --group-id "${sg_id}" --source-group "${other_sgid}" --protocol all > $LOG + done + done + + for sg_id in ${sg_ids}; do + # EC2 doesn't let us delete the default security group + if [[ "${sg_id}" == "${default_sg_id}" ]]; then + continue + fi + + echo "Deleting security group: ${sg_id}" + $AWS_CMD delete-security-group --group-id ${sg_id} > $LOG done subnet_ids=$($AWS_CMD --output text describe-subnets \ @@ -942,12 +1056,25 @@ function test-build-release { # Assumed vars: # Variables from config.sh function test-setup { + VPC_ID=$(get_vpc_id) + detect-security-groups + + # Open up port 80 & 8080 so common containers on minions can be reached + # TODO(roberthbailey): Remove this once we are no longer relying on hostPorts. + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 80 --cidr 0.0.0.0/0" + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol tcp --port 8080 --cidr 0.0.0.0/0" + + # Open up the NodePort range + # TODO(justinsb): Move to main setup, if we decide whether we want to do this by default. + authorize-security-group-ingress "${MINION_SG_ID}" "--protocol all --port 30000-32767 --cidr 0.0.0.0/0" + echo "test-setup complete" } # Execute after running tests to perform any required clean-up. This is called # from hack/e2e.go function test-teardown { + # (ingress rules will be deleted along with the security group) echo "Shutting down test cluster." "${KUBE_ROOT}/cluster/kube-down.sh" } diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 0f5e084e744..6262f016a4e 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -76,11 +76,16 @@ type EC2 interface { DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) + DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) + AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) + RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error) DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) + + CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) } // This is a simple pass-through of the ELB client interface, which allows for testing @@ -360,10 +365,22 @@ func (s *awsSdkEC2) CreateSecurityGroup(request *ec2.CreateSecurityGroupInput) ( return s.ec2.CreateSecurityGroup(request) } +func (s *awsSdkEC2) DeleteSecurityGroup(request *ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) { + return s.ec2.DeleteSecurityGroup(request) +} + func (s *awsSdkEC2) AuthorizeSecurityGroupIngress(request *ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { return s.ec2.AuthorizeSecurityGroupIngress(request) } +func (s *awsSdkEC2) RevokeSecurityGroupIngress(request *ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) { + return s.ec2.RevokeSecurityGroupIngress(request) +} + +func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { + return s.ec2.CreateTags(request) +} + func init() { cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { creds := credentials.NewChainCredentials( @@ -1367,77 +1384,120 @@ func (self *AWSCloud) findVPC() (*ec2.VPC, error) { return nil, fmt.Errorf("Found multiple matching VPCs for name: %s", name) } -// Makes sure the security group allows ingress on the specified ports (with sourceIp & protocol) -// Returns true iff changes were made -// The security group must already exist -func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp string, ports []*api.ServicePort) (bool, error) { +// Retrieves the specified security group from the AWS API, or returns nil if not found +func (s *AWSCloud) findSecurityGroup(securityGroupId string) (*ec2.SecurityGroup, error) { describeSecurityGroupsRequest := &ec2.DescribeSecurityGroupsInput{ GroupIDs: []*string{&securityGroupId}, } - // No filters as querying by id + groups, err := s.ec2.DescribeSecurityGroups(describeSecurityGroupsRequest) + if err != nil { + glog.Warning("error retrieving security group", err) + return nil, err + } + + if len(groups) == 0 { + return nil, nil + } + if len(groups) != 1 { + // This should not be possible - ids should be unique + return nil, fmt.Errorf("multiple security groups found with same id") + } + group := groups[0] + return group, nil +} + +func isEqualIntPointer(l, r *int64) bool { + if l == nil { + return r == nil + } + if r == nil { + return l == nil + } + return *l == *r +} +func isEqualStringPointer(l, r *string) bool { + if l == nil { + return r == nil + } + if r == nil { + return l == nil + } + return *l == *r +} + +func isEqualIPPermission(l, r *ec2.IPPermission) bool { + if !isEqualIntPointer(l.FromPort, r.FromPort) { + return false + } + if !isEqualIntPointer(l.ToPort, r.ToPort) { + return false + } + if !isEqualStringPointer(l.IPProtocol, r.IPProtocol) { + return false + } + if len(l.IPRanges) != len(r.IPRanges) { + return false + } + for j := range l.IPRanges { + if !isEqualStringPointer(l.IPRanges[j].CIDRIP, r.IPRanges[j].CIDRIP) { + return false + } + } + + if len(l.UserIDGroupPairs) != len(r.UserIDGroupPairs) { + return false + } + for j := range l.UserIDGroupPairs { + if !isEqualStringPointer(l.UserIDGroupPairs[j].GroupID, r.UserIDGroupPairs[j].GroupID) { + return false + } + if !isEqualStringPointer(l.UserIDGroupPairs[j].UserID, r.UserIDGroupPairs[j].UserID) { + return false + } + } + + return true +} + +// Makes sure the security group includes the specified permissions +// Returns true iff changes were made +// The security group must already exist +func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, addPermissions []*ec2.IPPermission) (bool, error) { + group, err := s.findSecurityGroup(securityGroupId) if err != nil { glog.Warning("error retrieving security group", err) return false, err } - if len(groups) == 0 { - // We require that the security group already exist - return false, fmt.Errorf("security group not found") + if group == nil { + return false, fmt.Errorf("security group not found: %s", securityGroupId) } - if len(groups) != 1 { - // This should not be possible - ids should be unique - return false, fmt.Errorf("multiple security groups found with same id") - } - group := groups[0] - newPermissions := []*ec2.IPPermission{} - - for _, port := range ports { + changes := []*ec2.IPPermission{} + for _, addPermission := range addPermissions { found := false - portInt64 := int64(port.Port) - protocol := strings.ToLower(string(port.Protocol)) - for _, permission := range group.IPPermissions { - if permission.FromPort == nil || *permission.FromPort != portInt64 { - continue + for _, groupPermission := range group.IPPermissions { + if isEqualIPPermission(addPermission, groupPermission) { + found = true + break } - if permission.ToPort == nil || *permission.ToPort != portInt64 { - continue - } - if permission.IPProtocol == nil || *permission.IPProtocol != protocol { - continue - } - if len(permission.IPRanges) != 1 { - continue - } - if orEmpty(permission.IPRanges[0].CIDRIP) != sourceIp { - continue - } - found = true - break } if !found { - newPermission := &ec2.IPPermission{} - newPermission.FromPort = &portInt64 - newPermission.ToPort = &portInt64 - newPermission.IPRanges = []*ec2.IPRange{{CIDRIP: &sourceIp}} - newPermission.IPProtocol = &protocol - - newPermissions = append(newPermissions, newPermission) + changes = append(changes, addPermission) } } - if len(newPermissions) == 0 { + if len(changes) == 0 { return false, nil } - glog.V(2).Infof("Adding security group ingress: %s %v", securityGroupId, newPermissions) + glog.V(2).Infof("Adding security group ingress: %s %v", securityGroupId, changes) request := &ec2.AuthorizeSecurityGroupIngressInput{} request.GroupID = &securityGroupId - request.IPPermissions = newPermissions - + request.IPPermissions = changes _, err = s.ec2.AuthorizeSecurityGroupIngress(request) if err != nil { glog.Warning("error authorizing security group ingress", err) @@ -1447,6 +1507,111 @@ func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp st return true, nil } +// Makes sure the security group no longer includes the specified permissions +// Returns true iff changes were made +// Returns true if the security group no longer exists +func (s *AWSCloud) removeSecurityGroupIngess(securityGroupId string, removePermissions []*ec2.IPPermission) (bool, error) { + group, err := s.findSecurityGroup(securityGroupId) + if err != nil { + glog.Warning("error retrieving security group", err) + return false, err + } + + if group == nil { + glog.Warning("security group not found: ", securityGroupId) + return false, nil + } + + changes := []*ec2.IPPermission{} + for _, removePermission := range removePermissions { + found := false + for _, groupPermission := range group.IPPermissions { + if isEqualIPPermission(groupPermission, removePermission) { + found = true + break + } + } + + if found { + changes = append(changes, removePermission) + } + } + + if len(changes) == 0 { + return false, nil + } + + glog.V(2).Infof("Removing security group ingress: %s %v", securityGroupId, changes) + + request := &ec2.RevokeSecurityGroupIngressInput{} + request.GroupID = &securityGroupId + request.IPPermissions = changes + _, err = s.ec2.RevokeSecurityGroupIngress(request) + if err != nil { + glog.Warning("error revoking security group ingress", err) + return false, err + } + + return true, nil +} + +// Makes sure the security group exists +// Returns the security group id or error +func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID string) (string, error) { + request := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{ + newEc2Filter("group-name", name), + newEc2Filter("vpc-id", vpcID), + } + request.Filters = s.addFilters(filters) + + securityGroups, err := s.ec2.DescribeSecurityGroups(request) + if err != nil { + return "", err + } + + if len(securityGroups) >= 1 { + if len(securityGroups) > 1 { + glog.Warning("Found multiple security groups with name:", name) + } + return orEmpty(securityGroups[0].GroupID), nil + } + + createRequest := &ec2.CreateSecurityGroupInput{} + createRequest.VPCID = &vpcID + createRequest.GroupName = &name + createRequest.Description = &description + + createResponse, err := s.ec2.CreateSecurityGroup(createRequest) + if err != nil { + glog.Error("error creating security group: ", err) + return "", err + } + + groupID := orEmpty(createResponse.GroupID) + if groupID == "" { + return "", fmt.Errorf("created security group, but id was not returned: %s", name) + } + + tags := []*ec2.Tag{} + for k, v := range s.filterTags { + tag := &ec2.Tag{} + tag.Key = aws.String(k) + tag.Value = aws.String(v) + tags = append(tags, tag) + } + + tagRequest := &ec2.CreateTagsInput{} + tagRequest.Resources = []*string{&groupID} + tagRequest.Tags = tags + _, err = s.ec2.CreateTags(tagRequest) + if err != nil { + // Not clear how to recover fully from this; we're OK because we don't match on tags, but that is a little odd + return "", fmt.Errorf("error tagging security group: %v", err) + } + return groupID, nil +} + // CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer // TODO(justinsb): This must be idempotent // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. @@ -1507,6 +1672,37 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p } } + // Create a security group for the load balancer + var securityGroupID string + { + sgName := "k8s-elb-" + name + sgDescription := "Security group for Kubernetes ELB " + name + securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription, orEmpty(vpc.VPCID)) + if err != nil { + glog.Error("error creating load balancer security group: ", err) + return nil, err + } + + permissions := []*ec2.IPPermission{} + for _, port := range ports { + portInt64 := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + sourceIp := "0.0.0.0/0" + + permission := &ec2.IPPermission{} + permission.FromPort = &portInt64 + permission.ToPort = &portInt64 + permission.IPRanges = []*ec2.IPRange{{CIDRIP: &sourceIp}} + permission.IPProtocol = &protocol + + permissions = append(permissions, permission) + } + _, err = s.ensureSecurityGroupIngess(securityGroupID, permissions) + if err != nil { + return nil, err + } + } + // Build the load balancer itself var loadBalancer *elb.LoadBalancerDescription { @@ -1544,49 +1740,12 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p // TODO: What happens if we have more than one subnet per AZ? createRequest.Subnets = subnetIds - sgName := "k8s-elb-" + name - sgDescription := "Security group for Kubernetes ELB " + name + createRequest.SecurityGroups = []*string{&securityGroupID} - { - describeSecurityGroupsRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ - newEc2Filter("group-name", sgName), - newEc2Filter("vpc-id", orEmpty(vpc.VPCID)), - } - describeSecurityGroupsRequest.Filters = s.addFilters(filters) - securityGroups, err := s.ec2.DescribeSecurityGroups(describeSecurityGroupsRequest) - if err != nil { - return nil, err - } - var securityGroupId *string - for _, securityGroup := range securityGroups { - if securityGroupId != nil { - glog.Warning("Found multiple security groups with name:", sgName) - } - securityGroupId = securityGroup.GroupID - } - if securityGroupId == nil { - createSecurityGroupRequest := &ec2.CreateSecurityGroupInput{} - createSecurityGroupRequest.VPCID = vpc.VPCID - createSecurityGroupRequest.GroupName = &sgName - createSecurityGroupRequest.Description = &sgDescription - - createSecurityGroupResponse, err := s.ec2.CreateSecurityGroup(createSecurityGroupRequest) - if err != nil { - glog.Error("error creating security group: ", err) - return nil, err - } - - securityGroupId = createSecurityGroupResponse.GroupID - if isNilOrEmpty(securityGroupId) { - return nil, fmt.Errorf("created security group, but id was not returned") - } - } - _, err = s.ensureSecurityGroupIngess(*securityGroupId, "0.0.0.0/0", ports) - if err != nil { - return nil, err - } - createRequest.SecurityGroups = []*string{securityGroupId} + glog.Info("Creating load balancer with name: ", name) + _, err := elbClient.CreateLoadBalancer(createRequest) + if err != nil { + return nil, err } loadBalancer, err = s.describeLoadBalancer(region, name) @@ -1599,6 +1758,12 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p } } + err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) + if err != nil { + glog.Warning("Error opening ingress rules for the load balancer to the instances: ", err) + return nil, err + } + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName for _, instance := range instances { @@ -1650,10 +1815,141 @@ func toStatus(lb *elb.LoadBalancerDescription) *api.LoadBalancerStatus { return status } +// Returns the first security group for an instance, or nil +// We only create instances with one security group, so we warn if there are multiple or none +func findSecurityGroupForInstance(instance *ec2.Instance) *string { + var securityGroupId *string + for _, securityGroup := range instance.SecurityGroups { + if securityGroup == nil || securityGroup.GroupID == nil { + // Not expected, but avoid panic + glog.Warning("Unexpected empty security group for instance: ", orEmpty(instance.InstanceID)) + continue + } + + if securityGroupId != nil { + // We create instances with one SG + glog.Warning("Multiple security groups found for instance (%s); will use first group (%s)", orEmpty(instance.InstanceID), *securityGroupId) + continue + } else { + securityGroupId = securityGroup.GroupID + } + } + + if securityGroupId == nil { + glog.Warning("No security group found for instance ", orEmpty(instance.InstanceID)) + } + + return securityGroupId +} + +// Open security group ingress rules on the instances so that the load balancer can talk to them +// Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances +func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, allInstances []*ec2.Instance) error { + // Determine the load balancer security group id + loadBalancerSecurityGroupId := "" + for _, securityGroup := range lb.SecurityGroups { + if isNilOrEmpty(securityGroup) { + continue + } + if loadBalancerSecurityGroupId != "" { + // We create LBs with one SG + glog.Warning("Multiple security groups for load balancer: ", orEmpty(lb.LoadBalancerName)) + } + loadBalancerSecurityGroupId = *securityGroup + } + if loadBalancerSecurityGroupId == "" { + return fmt.Errorf("Could not determine security group for load balancer: %s", orEmpty(lb.LoadBalancerName)) + } + + // Get the actual list of groups that allow ingress from the load-balancer + describeRequest := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{} + filters = append(filters, newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupId)) + describeRequest.Filters = s.addFilters(filters) + actualGroups, err := s.ec2.DescribeSecurityGroups(describeRequest) + if err != nil { + return fmt.Errorf("error querying security groups: %v", err) + } + + // Open the firewall from the load balancer to the instance + // We don't actually have a trivial way to know in advance which security group the instance is in + // (it is probably the minion security group, but we don't easily have that). + // However, we _do_ have the list of security groups on the instance records. + + // Map containing the changes we want to make; true to add, false to remove + instanceSecurityGroupIds := map[string]bool{} + + // Scan instances for groups we want open + for _, instance := range allInstances { + securityGroupId := findSecurityGroupForInstance(instance) + if isNilOrEmpty(securityGroupId) { + glog.Warning("ignoring instance without security group: ", orEmpty(instance.InstanceID)) + continue + } + + instanceSecurityGroupIds[*securityGroupId] = true + } + + // Compare to actual groups + for _, actualGroup := range actualGroups { + if isNilOrEmpty(actualGroup.GroupID) { + glog.Warning("ignoring group without ID: ", actualGroup) + continue + } + + actualGroupID := *actualGroup.GroupID + + adding, found := instanceSecurityGroupIds[actualGroupID] + if found && adding { + // We don't need to make a change; the permission is already in place + delete(instanceSecurityGroupIds, actualGroupID) + } else { + // This group is not needed by allInstances; delete it + instanceSecurityGroupIds[actualGroupID] = false + } + } + + for instanceSecurityGroupId, add := range instanceSecurityGroupIds { + if add { + glog.V(2).Infof("Adding rule for traffic from the load balancer (%s) to instances (%s)", loadBalancerSecurityGroupId, instanceSecurityGroupId) + } else { + glog.V(2).Infof("Removing rule for traffic from the load balancer (%s) to instance (%s)", loadBalancerSecurityGroupId, instanceSecurityGroupId) + } + sourceGroupId := &ec2.UserIDGroupPair{} + sourceGroupId.GroupID = &loadBalancerSecurityGroupId + + allProtocols := "-1" + + permission := &ec2.IPPermission{} + permission.IPProtocol = &allProtocols + permission.UserIDGroupPairs = []*ec2.UserIDGroupPair{sourceGroupId} + + permissions := []*ec2.IPPermission{permission} + + if add { + changed, err := s.ensureSecurityGroupIngess(instanceSecurityGroupId, permissions) + if err != nil { + return err + } + if !changed { + glog.Warning("allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupId) + } + } else { + changed, err := s.removeSecurityGroupIngess(instanceSecurityGroupId, permissions) + if err != nil { + return err + } + if !changed { + glog.Warning("revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupId) + } + } + } + + return nil +} + // EnsureTCPLoadBalancerDeleted implements TCPLoadBalancer.EnsureTCPLoadBalancerDeleted. func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { - // TODO(justinsb): Delete security group - elbClient, err := s.getELBClient(region) if err != nil { return err @@ -1669,15 +1965,46 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return nil } - request := &elb.DeleteLoadBalancerInput{} - request.LoadBalancerName = lb.LoadBalancerName - - _, err = elbClient.DeleteLoadBalancer(request) - if err != nil { - // TODO: Check if error was because load balancer was concurrently deleted - glog.Error("error deleting load balancer: ", err) - return err + { + // De-authorize the load balancer security group from the instances security group + err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, nil) + if err != nil { + glog.Error("error deregistering load balancer from instance security groups: ", err) + return err + } } + + { + // Delete the load balancer itself + request := &elb.DeleteLoadBalancerInput{} + request.LoadBalancerName = lb.LoadBalancerName + + _, err = elbClient.DeleteLoadBalancer(request) + if err != nil { + // TODO: Check if error was because load balancer was concurrently deleted + glog.Error("error deleting load balancer: ", err) + return err + } + } + + { + // Delete the security group + for _, securityGroupId := range lb.SecurityGroups { + if isNilOrEmpty(securityGroupId) { + glog.Warning("Ignoring empty security group in ", name) + continue + } + + request := &ec2.DeleteSecurityGroupInput{} + request.GroupID = securityGroupId + _, err := s.ec2.DeleteSecurityGroup(request) + if err != nil { + glog.Errorf("error deleting security group (%s): %v", orEmpty(securityGroupId), err) + return err + } + } + } + return nil } @@ -1749,6 +2076,11 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er } } + err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) + if err != nil { + return err + } + return nil } diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index 108c3055c98..469b8e352da 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -336,10 +336,18 @@ func (ec2 *FakeEC2) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.Cre panic("Not implemented") } +func (ec2 *FakeEC2) DeleteSecurityGroup(*ec2.DeleteSecurityGroupInput) (*ec2.DeleteSecurityGroupOutput, error) { + panic("Not implemented") +} + func (ec2 *FakeEC2) AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) { panic("Not implemented") } +func (ec2 *FakeEC2) RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngressInput) (*ec2.RevokeSecurityGroupIngressOutput, error) { + panic("Not implemented") +} + func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error) { panic("Not implemented") } @@ -348,6 +356,10 @@ func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, e panic("Not implemented") } +func (ec2 *FakeEC2) CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { + panic("Not implemented") +} + type FakeELB struct { aws *FakeAWSServices }