AWS: Filter by Cluster tag, rationalize EC2 abstraction

Whenever we do a list we now filter on tags so we only see resources relating
to our cluster.

Also, rationalize all the DescribeX calls:
 * They all take a request object (so that we can pass filters)
 * They do paging if that is required (and return the underlying resources)
 * They wrap any error with a "error while listing X: %v" message
This commit is contained in:
Justin Santa Barbara 2015-06-04 22:03:50 -04:00
parent cd3eea43db
commit 33a3d884f2
3 changed files with 182 additions and 124 deletions

View File

@ -508,12 +508,13 @@ function kube-up {
echo "Using VPC $VPC_ID" echo "Using VPC $VPC_ID"
SUBNET_ID=$($AWS_CMD describe-subnets | get_subnet_id $VPC_ID $ZONE) SUBNET_ID=$($AWS_CMD describe-subnets --filters Name=tag:KubernetesCluster,Values=${CLUSTER_ID} | get_subnet_id $VPC_ID $ZONE)
if [[ -z "$SUBNET_ID" ]]; then if [[ -z "$SUBNET_ID" ]]; then
echo "Creating subnet." echo "Creating subnet."
SUBNET_ID=$($AWS_CMD create-subnet --cidr-block $INTERNAL_IP_BASE.0/24 --vpc-id $VPC_ID --availability-zone ${ZONE} | json_val '["Subnet"]["SubnetId"]') SUBNET_ID=$($AWS_CMD create-subnet --cidr-block $INTERNAL_IP_BASE.0/24 --vpc-id $VPC_ID --availability-zone ${ZONE} | json_val '["Subnet"]["SubnetId"]')
add-tag $SUBNET_ID KubernetesCluster ${CLUSTER_ID}
else else
EXISTING_CIDR=$($AWS_CMD describe-subnets | get_cidr $VPC_ID $ZONE) EXISTING_CIDR=$($AWS_CMD describe-subnets --filters Name=tag:KubernetesCluster,Values=${CLUSTER_ID} | get_cidr $VPC_ID $ZONE)
echo "Using existing CIDR $EXISTING_CIDR" echo "Using existing CIDR $EXISTING_CIDR"
INTERNAL_IP_BASE=${EXISTING_CIDR%.*} INTERNAL_IP_BASE=${EXISTING_CIDR%.*}
MASTER_INTERNAL_IP=${INTERNAL_IP_BASE}${MASTER_IP_SUFFIX} MASTER_INTERNAL_IP=${INTERNAL_IP_BASE}${MASTER_IP_SUFFIX}
@ -863,6 +864,7 @@ function kube-down {
subnet_ids=$($AWS_CMD --output text describe-subnets \ subnet_ids=$($AWS_CMD --output text describe-subnets \
--filters Name=vpc-id,Values=${vpc_id} \ --filters Name=vpc-id,Values=${vpc_id} \
Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \
--query Subnets[].SubnetId \ --query Subnets[].SubnetId \
| tr "\t" "\n") | tr "\t" "\n")
for subnet_id in ${subnet_ids}; do for subnet_id in ${subnet_ids}; do

View File

@ -57,30 +57,30 @@ type AWSServices interface {
// TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2) // TODO: Should we rename this to AWS (EBS & ELB are not technically part of EC2)
// Abstraction over EC2, to allow mocking/other implementations // Abstraction over EC2, to allow mocking/other implementations
// Note that the DescribeX functions return a list, so callers don't need to deal with paging
type EC2 interface { type EC2 interface {
// Query EC2 for instances matching the filter // Query EC2 for instances matching the filter
Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error)
// Attach a volume to an instance // Attach a volume to an instance
AttachVolume(volumeID, instanceId, mountDevice string) (resp *ec2.VolumeAttachment, err error) AttachVolume(volumeID, instanceId, mountDevice string) (resp *ec2.VolumeAttachment, err error)
// Detach a volume from an instance it is attached to // Detach a volume from an instance it is attached to
DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.VolumeAttachment, err error) DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.VolumeAttachment, err error)
// Lists volumes // Lists volumes
Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error)
// Create an EBS volume // Create an EBS volume
CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error)
// Delete an EBS volume // Delete an EBS volume
DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, err error) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput, err error)
DescribeSecurityGroups(groupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error)
// TODO(justinsb): Make all of these into pass-through methods, now that we have a much better binding
CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error)
AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error) AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIngressInput) (*ec2.AuthorizeSecurityGroupIngressOutput, error)
DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error)
DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error)
} }
// This is a simple pass-through of the ELB client interface, which allows for testing // This is a simple pass-through of the ELB client interface, which allows for testing
@ -145,20 +145,6 @@ type AWSCloudConfig struct {
} }
} }
// Similar to ec2.Filter, but the filter values can be read from tests
// (ec2.Filter only has private members)
type ec2InstanceFilter struct {
PrivateDNSName string
}
// True if the passed instance matches the filter
func (f *ec2InstanceFilter) Matches(instance *ec2.Instance) bool {
if f.PrivateDNSName != "" && orEmpty(instance.PrivateDNSName) != f.PrivateDNSName {
return false
}
return true
}
// awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go // awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go
type awsSdkEC2 struct { type awsSdkEC2 struct {
ec2 *ec2.EC2 ec2 *ec2.EC2
@ -240,39 +226,29 @@ func newEc2Filter(name string, value string) *ec2.Filter {
} }
// Implementation of EC2.Instances // Implementation of EC2.Instances
func (self *awsSdkEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp []*ec2.Instance, err error) { func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) {
var filters []*ec2.Filter // Instances are paged
if filter != nil && filter.PrivateDNSName != "" { results := []*ec2.Instance{}
filters = []*ec2.Filter{
newEc2Filter("private-dns-name", filter.PrivateDNSName),
}
}
fetchedInstances := []*ec2.Instance{}
var nextToken *string var nextToken *string
for { for {
res, err := self.ec2.DescribeInstances(&ec2.DescribeInstancesInput{ response, err := self.ec2.DescribeInstances(request)
InstanceIDs: stringPointerArray(instanceIds),
Filters: filters,
NextToken: nextToken,
})
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error listing AWS instances: %v", err)
} }
for _, reservation := range res.Reservations { for _, reservation := range response.Reservations {
fetchedInstances = append(fetchedInstances, reservation.Instances...) results = append(results, reservation.Instances...)
} }
nextToken = res.NextToken nextToken = response.NextToken
if isNilOrEmpty(nextToken) { if isNilOrEmpty(nextToken) {
break break
} }
request.NextToken = nextToken
} }
return fetchedInstances, nil return results, nil
} }
type awsSdkMetadata struct { type awsSdkMetadata struct {
@ -307,36 +283,16 @@ func (self *awsSdkMetadata) GetMetaData(key string) ([]byte, error) {
} }
// Implements EC2.DescribeSecurityGroups // Implements EC2.DescribeSecurityGroups
func (s *awsSdkEC2) DescribeSecurityGroups(securityGroupIds []string, filterName string, filterVPCId string) ([]*ec2.SecurityGroup, error) { func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
filters := []*ec2.Filter{} // Security groups are not paged
if filterName != "" {
filters = append(filters, newEc2Filter("group-name", filterName))
}
if filterVPCId != "" {
filters = append(filters, newEc2Filter("vpc-id", filterVPCId))
}
request := &ec2.DescribeSecurityGroupsInput{}
if len(securityGroupIds) != 0 {
request.GroupIDs = []*string{}
for _, securityGroupId := range securityGroupIds {
request.GroupIDs = append(request.GroupIDs, &securityGroupId)
}
}
if len(filters) != 0 {
request.Filters = filters
}
response, err := s.ec2.DescribeSecurityGroups(request) response, err := s.ec2.DescribeSecurityGroups(request)
if err != nil { if err != nil {
glog.Error("error describing groups: ", err) return nil, fmt.Errorf("error listing AWS security groups: %v", err)
return nil, err
} }
return response.SecurityGroups, nil return response.SecurityGroups, nil
} }
func (s *awsSdkEC2) AttachVolume(volumeID, instanceId, device string) (resp *ec2.VolumeAttachment, err error) { func (s *awsSdkEC2) AttachVolume(volumeID, instanceId, device string) (resp *ec2.VolumeAttachment, err error) {
request := ec2.AttachVolumeInput{ request := ec2.AttachVolumeInput{
Device: &device, Device: &device,
InstanceID: &instanceId, InstanceID: &instanceId,
@ -349,11 +305,28 @@ func (s *awsSdkEC2) DetachVolume(request *ec2.DetachVolumeInput) (*ec2.VolumeAtt
return s.ec2.DetachVolume(request) return s.ec2.DetachVolume(request)
} }
func (s *awsSdkEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) { func (s *awsSdkEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
request := ec2.DescribeVolumesInput{ // Volumes are paged
VolumeIDs: stringPointerArray(volumeIDs), results := []*ec2.Volume{}
var nextToken *string
for {
response, err := s.ec2.DescribeVolumes(request)
if err != nil {
return nil, fmt.Errorf("error listing AWS volumes: %v", err)
}
results = append(results, response.Volumes...)
nextToken = response.NextToken
if isNilOrEmpty(nextToken) {
break
}
request.NextToken = nextToken
} }
return s.ec2.DescribeVolumes(&request)
return results, nil
} }
func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) { func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolumeInput) (resp *ec2.Volume, err error) {
@ -365,12 +338,22 @@ func (s *awsSdkEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput,
return s.ec2.DeleteVolume(&request) return s.ec2.DeleteVolume(&request)
} }
func (s *awsSdkEC2) DescribeVPCs(request *ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { func (s *awsSdkEC2) DescribeVPCs(request *ec2.DescribeVPCsInput) ([]*ec2.VPC, error) {
return s.ec2.DescribeVPCs(request) // VPCs are not paged
response, err := s.ec2.DescribeVPCs(request)
if err != nil {
return nil, fmt.Errorf("error listing AWS VPCs: %v", err)
}
return response.VPCs, nil
} }
func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) {
return s.ec2.DescribeSubnets(request) // Subnets are not paged
response, err := s.ec2.DescribeSubnets(request)
if err != nil {
return nil, fmt.Errorf("error listing AWS subnets: %v", err)
}
return response.Subnets, nil
} }
func (s *awsSdkEC2) CreateSecurityGroup(request *ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) { func (s *awsSdkEC2) CreateSecurityGroup(request *ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) {
@ -597,10 +580,15 @@ func (aws *AWSCloud) InstanceID(name string) (string, error) {
// Return the instances matching the relevant private dns name. // Return the instances matching the relevant private dns name.
func (s *AWSCloud) getInstanceByDnsName(name string) (*ec2.Instance, error) { func (s *AWSCloud) getInstanceByDnsName(name string) (*ec2.Instance, error) {
f := &ec2InstanceFilter{} filters := []*ec2.Filter{
f.PrivateDNSName = name newEc2Filter("private-dns-name", name),
}
filters = s.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := s.ec2.Instances(nil, f) instances, err := s.ec2.DescribeInstances(request)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -650,8 +638,14 @@ func isAlive(instance *ec2.Instance) bool {
} }
// Return a list of instances matching regex string. // Return a list of instances matching regex string.
func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { func (s *AWSCloud) getInstancesByRegex(regex string) ([]string, error) {
instances, err := aws.ec2.Instances(nil, nil) filters := []*ec2.Filter{}
filters = s.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := s.ec2.DescribeInstances(request)
if err != nil { if err != nil {
return []string{}, err return []string{}, err
} }
@ -930,9 +924,14 @@ func (self *awsInstance) getInstanceType() *awsInstanceType {
// Gets the full information about this instance from the EC2 API // Gets the full information about this instance from the EC2 API
func (self *awsInstance) getInfo() (*ec2.Instance, error) { func (self *awsInstance) getInfo() (*ec2.Instance, error) {
instances, err := self.ec2.Instances([]string{self.awsID}, nil) instanceID := self.awsID
request := &ec2.DescribeInstancesInput{
InstanceIDs: []*string{&instanceID},
}
instances, err := self.ec2.DescribeInstances(request)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying ec2 for instance info: %v", err) return nil, err
} }
if len(instances) == 0 { if len(instances) == 0 {
return nil, fmt.Errorf("no instances found for instance: %s", self.awsID) return nil, fmt.Errorf("no instances found for instance: %s", self.awsID)
@ -1061,17 +1060,23 @@ func newAWSDisk(ec2 EC2, name string) (*awsDisk, error) {
// Gets the full information about this volume from the EC2 API // Gets the full information about this volume from the EC2 API
func (self *awsDisk) getInfo() (*ec2.Volume, error) { func (self *awsDisk) getInfo() (*ec2.Volume, error) {
resp, err := self.ec2.Volumes([]string{self.awsID}, nil) volumeID := self.awsID
request := &ec2.DescribeVolumesInput{
VolumeIDs: []*string{&volumeID},
}
volumes, err := self.ec2.DescribeVolumes(request)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying ec2 for volume info: %v", err) return nil, fmt.Errorf("error querying ec2 for volume info: %v", err)
} }
if len(resp.Volumes) == 0 { if len(volumes) == 0 {
return nil, fmt.Errorf("no volumes found for volume: %s", self.awsID) return nil, fmt.Errorf("no volumes found for volume: %s", self.awsID)
} }
if len(resp.Volumes) > 1 { if len(volumes) > 1 {
return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsID) return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsID)
} }
return resp.Volumes[0], nil return volumes[0], nil
} }
func (self *awsDisk) waitForAttachmentStatus(status string) error { func (self *awsDisk) waitForAttachmentStatus(status string) error {
@ -1259,6 +1264,9 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
// Implements Volumes.CreateVolume // Implements Volumes.CreateVolume
func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
// This is only used for testing right now
request := &ec2.CreateVolumeInput{} request := &ec2.CreateVolumeInput{}
request.AvailabilityZone = &aws.availabilityZone request.AvailabilityZone = &aws.availabilityZone
volSize := (int64(volumeOptions.CapacityMB) + 1023) / 1024 volSize := (int64(volumeOptions.CapacityMB) + 1023) / 1024
@ -1340,20 +1348,16 @@ func (self *AWSCloud) TCPLoadBalancerExists(name, region string) (bool, error) {
func (self *AWSCloud) findVPC() (*ec2.VPC, error) { func (self *AWSCloud) findVPC() (*ec2.VPC, error) {
request := &ec2.DescribeVPCsInput{} request := &ec2.DescribeVPCsInput{}
// TODO: How do we want to identify our VPC? Issue #6006
name := "kubernetes-vpc" name := "kubernetes-vpc"
request.Filters = []*ec2.Filter{newEc2Filter("tag:Name", name)} filters := []*ec2.Filter{newEc2Filter("tag:Name", name)}
request.Filters = self.addFilters(filters)
response, err := self.ec2.DescribeVPCs(request) vpcs, err := self.ec2.DescribeVPCs(request)
if err != nil { if err != nil {
glog.Error("error listing VPCs", err) glog.Error("error listing VPCs", err)
return nil, err return nil, err
} }
vpcs := response.VPCs
if err != nil {
return nil, err
}
if len(vpcs) == 0 { if len(vpcs) == 0 {
return nil, nil return nil, nil
} }
@ -1367,7 +1371,11 @@ func (self *AWSCloud) findVPC() (*ec2.VPC, error) {
// Returns true iff changes were made // Returns true iff changes were made
// The security group must already exist // The security group must already exist
func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp string, ports []*api.ServicePort) (bool, error) { func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp string, ports []*api.ServicePort) (bool, error) {
groups, err := s.ec2.DescribeSecurityGroups([]string{securityGroupId}, "", "") describeSecurityGroupsRequest := &ec2.DescribeSecurityGroupsInput{
GroupIDs: []*string{&securityGroupId},
}
// No filters as querying by id
groups, err := s.ec2.DescribeSecurityGroups(describeSecurityGroupsRequest)
if err != nil { if err != nil {
glog.Warning("error retrieving security group", err) glog.Warning("error retrieving security group", err)
return false, err return false, err
@ -1424,6 +1432,8 @@ func (s *AWSCloud) ensureSecurityGroupIngess(securityGroupId string, sourceIp st
return false, nil return false, nil
} }
glog.V(2).Infof("Adding security group ingress: %s %v", securityGroupId, newPermissions)
request := &ec2.AuthorizeSecurityGroupIngressInput{} request := &ec2.AuthorizeSecurityGroupIngressInput{}
request.GroupID = &securityGroupId request.GroupID = &securityGroupId
request.IPPermissions = newPermissions request.IPPermissions = newPermissions
@ -1477,16 +1487,17 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
request := &ec2.DescribeSubnetsInput{} request := &ec2.DescribeSubnetsInput{}
filters := []*ec2.Filter{} filters := []*ec2.Filter{}
filters = append(filters, newEc2Filter("vpc-id", orEmpty(vpc.VPCID))) filters = append(filters, newEc2Filter("vpc-id", orEmpty(vpc.VPCID)))
filters = s.addFilters(filters)
request.Filters = filters request.Filters = filters
response, err := s.ec2.DescribeSubnets(request) subnets, err := s.ec2.DescribeSubnets(request)
if err != nil { if err != nil {
glog.Error("error describing subnets: ", err) glog.Error("error describing subnets: ", err)
return nil, err return nil, err
} }
// zones := []string{} // zones := []string{}
for _, subnet := range response.Subnets { for _, subnet := range subnets {
subnetIds = append(subnetIds, subnet.SubnetID) subnetIds = append(subnetIds, subnet.SubnetID)
if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) {
glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region)
@ -1497,9 +1508,9 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
} }
// Build the load balancer itself // Build the load balancer itself
var loadBalancerName, dnsName *string var loadBalancer *elb.LoadBalancerDescription
{ {
loadBalancer, err := s.describeLoadBalancer(region, name) loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1529,8 +1540,6 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
createRequest.Listeners = listeners createRequest.Listeners = listeners
// TODO: Should we use a better identifier (the kubernetes uuid?)
// We are supposed to specify one subnet per AZ. // We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ? // TODO: What happens if we have more than one subnet per AZ?
createRequest.Subnets = subnetIds createRequest.Subnets = subnetIds
@ -1539,8 +1548,13 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
sgDescription := "Security group for Kubernetes ELB " + name sgDescription := "Security group for Kubernetes ELB " + name
{ {
// TODO: Should we do something more reliable ?? .Where("tag:kubernetes-id", kubernetesId) describeSecurityGroupsRequest := &ec2.DescribeSecurityGroupsInput{}
securityGroups, err := s.ec2.DescribeSecurityGroups(nil, sgName, orEmpty(vpc.VPCID)) filters := []*ec2.Filter{
newEc2Filter("group-name", sgName),
newEc2Filter("vpc-id", orEmpty(vpc.VPCID)),
}
describeSecurityGroupsRequest.Filters = s.addFilters(filters)
securityGroups, err := s.ec2.DescribeSecurityGroups(describeSecurityGroupsRequest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1575,22 +1589,18 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
createRequest.SecurityGroups = []*string{securityGroupId} createRequest.SecurityGroups = []*string{securityGroupId}
} }
glog.Info("Creating load balancer with name: ", createRequest.LoadBalancerName) loadBalancer, err = s.describeLoadBalancer(region, name)
createResponse, err := elbClient.CreateLoadBalancer(createRequest)
if err != nil { if err != nil {
glog.Warning("Unable to retrieve load balancer immediately after creation")
return nil, err return nil, err
} }
dnsName = createResponse.DNSName
loadBalancerName = createRequest.LoadBalancerName
} else { } else {
// TODO: Verify that load balancer configuration matches? // TODO: Verify that load balancer configuration matches?
dnsName = loadBalancer.DNSName
loadBalancerName = loadBalancer.LoadBalancerName
} }
} }
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.LoadBalancerName = loadBalancerName registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName
for _, instance := range instances { for _, instance := range instances {
registerInstance := &elb.Instance{} registerInstance := &elb.Instance{}
registerInstance.InstanceID = instance.InstanceID registerInstance.InstanceID = instance.InstanceID
@ -1601,14 +1611,15 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
if err != nil { if err != nil {
// TODO: Is it better to delete the load balancer entirely? // TODO: Is it better to delete the load balancer entirely?
glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) glog.Warningf("Error registering instances with load-balancer %s: %v", name, err)
return nil, err
} }
glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances)
glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, dnsName) glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName))
// TODO: Wait for creation? // TODO: Wait for creation?
status := toStatus(loadBalancerName, dnsName) status := toStatus(loadBalancer)
return status, nil return status, nil
} }
@ -1623,16 +1634,16 @@ func (s *AWSCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerSta
return nil, false, nil return nil, false, nil
} }
status := toStatus(lb.LoadBalancerName, lb.DNSName) status := toStatus(lb)
return status, true, nil return status, true, nil
} }
func toStatus(loadBalancerName *string, dnsName *string) *api.LoadBalancerStatus { func toStatus(lb *elb.LoadBalancerDescription) *api.LoadBalancerStatus {
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
if !isNilOrEmpty(dnsName) { if !isNilOrEmpty(lb.DNSName) {
var ingress api.LoadBalancerIngress var ingress api.LoadBalancerIngress
ingress.Hostname = *dnsName ingress.Hostname = orEmpty(lb.DNSName)
status.Ingress = []api.LoadBalancerIngress{ingress} status.Ingress = []api.LoadBalancerIngress{ingress}
} }
@ -1756,3 +1767,12 @@ func (a *AWSCloud) getInstancesByDnsNames(names []string) ([]*ec2.Instance, erro
} }
return instances, nil return instances, nil
} }
// Add additional filters, to match on our tags
// This lets us run multiple k8s clusters in a single EC2 AZ
func (s *AWSCloud) addFilters(filters []*ec2.Filter) []*ec2.Filter {
for k, v := range s.filterTags {
filters = append(filters, newEc2Filter("tag:"+k, v))
}
return filters
}

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/golang/glog"
) )
const TestClusterId = "clusterid.test" const TestClusterId = "clusterid.test"
@ -234,23 +235,58 @@ type FakeEC2 struct {
aws *FakeAWSServices aws *FakeAWSServices
} }
func contains(haystack []string, needle string) bool { func contains(haystack []*string, needle string) bool {
for _, s := range haystack { for _, s := range haystack {
if needle == s { // (deliberately panic if s == nil)
if needle == *s {
return true return true
} }
} }
return false return false
} }
func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) { func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool {
name := *filter.Name
if name == "private-dns-name" {
if instance.PrivateDNSName == nil {
return false
}
return contains(filter.Values, *instance.PrivateDNSName)
}
panic("Unknown filter name: " + name)
}
func (self *FakeEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) {
matches := []*ec2.Instance{} matches := []*ec2.Instance{}
for _, instance := range self.aws.instances { for _, instance := range self.aws.instances {
if filter != nil && !filter.Matches(instance) { if request.InstanceIDs != nil {
continue if instance.InstanceID == nil {
glog.Warning("Instance with no instance id: ", instance)
continue
}
found := false
for _, instanceId := range request.InstanceIDs {
if *instanceId == *instance.InstanceID {
found = true
break
}
}
if !found {
continue
}
} }
if instanceIds != nil && !contains(instanceIds, *instance.InstanceID) { if request.Filters != nil {
continue allMatch := true
for _, filter := range request.Filters {
if !instanceMatchesFilter(instance, filter) {
allMatch = false
break
}
}
if !allMatch {
continue
}
} }
matches = append(matches, instance) matches = append(matches, instance)
} }
@ -280,7 +316,7 @@ func (ec2 *FakeEC2) DetachVolume(request *ec2.DetachVolumeInput) (resp *ec2.Volu
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.DescribeVolumesOutput, err error) { func (ec2 *FakeEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
panic("Not implemented") panic("Not implemented")
} }
@ -292,7 +328,7 @@ func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.DeleteVolumeOutput,
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeEC2) DescribeSecurityGroups(groupIds []string, filterName string, filterVpcId string) ([]*ec2.SecurityGroup, error) { func (ec2 *FakeEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
panic("Not implemented") panic("Not implemented")
} }
@ -304,11 +340,11 @@ func (ec2 *FakeEC2) AuthorizeSecurityGroupIngress(*ec2.AuthorizeSecurityGroupIng
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) (*ec2.DescribeVPCsOutput, error) { func (ec2 *FakeEC2) DescribeVPCs(*ec2.DescribeVPCsInput) ([]*ec2.VPC, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { func (ec2 *FakeEC2) DescribeSubnets(*ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) {
panic("Not implemented") panic("Not implemented")
} }