AWS: Make load balancer creation idempotent on AWS

This turned out to be a little convoluted, but is needed because deleting an ELB on AWS
is a painful UX - it won't have the same endpoint when it is recreated.

Also started splitting the provider into files, but only for new functions (so far!)
This commit is contained in:
Justin Santa Barbara 2015-06-13 14:45:38 -04:00
parent 8c365d51c7
commit 924350d5f6
5 changed files with 378 additions and 128 deletions

View File

@ -0,0 +1,248 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws_cloud
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/golang/glog"
)
func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
elbClient, err := s.getELBClient(region)
if err != nil {
return nil, err
}
loadBalancer, err := s.describeLoadBalancer(region, name)
if err != nil {
return nil, err
}
dirty := false
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name)
createRequest.Listeners = listeners
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.Subnets = stringPointerArray(subnetIDs)
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.Info("Creating load balancer with name: ", name)
_, err := elbClient.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
}
dirty = true
} else {
{
// Sync subnets
expected := util.NewStringSet(subnetIDs...)
actual := stringSetFromPointers(loadBalancer.Subnets)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if len(removals) != 0 {
request := &elb.DetachLoadBalancerFromSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.Subnets = stringSetToPointers(removals)
glog.V(2).Info("Detaching load balancer from removed subnets")
_, err := elbClient.DetachLoadBalancerFromSubnets(request)
if err != nil {
return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err)
}
dirty = true
}
if len(additions) != 0 {
request := &elb.AttachLoadBalancerToSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.Subnets = stringSetToPointers(additions)
glog.V(2).Info("Attaching load balancer to added subnets")
_, err := elbClient.AttachLoadBalancerToSubnets(request)
if err != nil {
return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err)
}
dirty = true
}
}
{
// Sync security groups
expected := util.NewStringSet(securityGroupIDs...)
actual := stringSetFromPointers(loadBalancer.SecurityGroups)
if !expected.Equal(actual) {
// This call just replaces the security groups, unlike e.g. subnets (!)
request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
request.LoadBalancerName = aws.String(name)
request.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.V(2).Info("Applying updated security groups to load balancer")
_, err := elbClient.ApplySecurityGroupsToLoadBalancer(request)
if err != nil {
return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err)
}
dirty = true
}
}
{
// Sync listeners
listenerDescriptions := loadBalancer.ListenerDescriptions
foundSet := make(map[int]bool)
removals := []*int64{}
for _, listenerDescription := range listenerDescriptions {
actual := listenerDescription.Listener
if actual == nil {
glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name)
continue
}
found := -1
for i, expected := range listeners {
if orEmpty(actual.Protocol) != orEmpty(expected.Protocol) {
continue
}
if orEmpty(actual.InstanceProtocol) != orEmpty(expected.InstanceProtocol) {
continue
}
if orZero(actual.InstancePort) != orZero(expected.InstancePort) {
continue
}
if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) {
continue
}
if orEmpty(actual.SSLCertificateID) != orEmpty(expected.SSLCertificateID) {
continue
}
found = i
}
if found != -1 {
foundSet[found] = true
} else {
removals = append(removals, actual.LoadBalancerPort)
}
}
additions := []*elb.Listener{}
for i := range listeners {
if foundSet[i] {
continue
}
additions = append(additions, listeners[i])
}
if len(removals) != 0 {
request := &elb.DeleteLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerPorts = removals
glog.V(2).Info("Deleting removed load balancer listeners")
_, err := elbClient.DeleteLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err)
}
dirty = true
}
if len(additions) != 0 {
request := &elb.CreateLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.Listeners = additions
glog.V(2).Info("Creating added load balancer listeners")
_, err := elbClient.CreateLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err)
}
dirty = true
}
}
}
if dirty {
loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil {
glog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err
}
}
return loadBalancer, nil
}
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error {
expected := util.NewStringSet()
for _, instance := range instances {
expected.Insert(orEmpty(instance.InstanceID))
}
actual := util.NewStringSet()
for _, lbInstance := range lbInstances {
actual.Insert(orEmpty(lbInstance.InstanceID))
}
additions := expected.Difference(actual)
removals := actual.Difference(expected)
addInstances := []*elb.Instance{}
for instanceId := range additions {
addInstance := &elb.Instance{}
addInstance.InstanceID = aws.String(instanceId)
addInstances = append(addInstances, addInstance)
}
removeInstances := []*elb.Instance{}
for instanceId := range removals {
removeInstance := &elb.Instance{}
removeInstance.InstanceID = aws.String(instanceId)
removeInstances = append(removeInstances, removeInstance)
}
if len(addInstances) > 0 {
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil {
return err
}
glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName)
}
if len(removeInstances) > 0 {
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName)
}
return nil
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws_cloud
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/aws/aws-sdk-go/aws"
)
func stringSetToPointers(in util.StringSet) []*string {
if in == nil {
return nil
}
out := make([]*string, len(in))
for k := range in {
out = append(out, aws.String(k))
}
return out
}
func stringSetFromPointers(in []*string) util.StringSet {
if in == nil {
return nil
}
out := util.NewStringSet()
for i := range in {
out.Insert(orEmpty(in[i]))
}
return out
}
func orZero(v *int64) int64 {
if v == nil {
return 0
}
return *v
}

View File

@ -109,6 +109,14 @@ type ELB interface {
DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error)
RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error)
DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error)
DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error)
AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error)
CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error)
DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error)
ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error)
}
// This is a simple pass-through of the Autoscaling client interface, which allows for testing
@ -1581,27 +1589,11 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp
}
}
// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer
// TODO(justinsb): This must be idempotent
// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay.
func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
glog.V(2).Info("Checking if load balancer already exists: %s", name)
_, exists, err := s.GetTCPLoadBalancer(name, region)
if err != nil {
return nil, fmt.Errorf("error checking if AWS load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := s.EnsureTCPLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing AWS load balancer: %v", err)
}
}
elbClient, err := s.getELBClient(region)
if err != nil {
return nil, err
@ -1631,7 +1623,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
}
// Construct list of configured subnets
subnetIds := []*string{}
subnetIDs := []string{}
{
request := &ec2.DescribeSubnetsInput{}
filters := []*ec2.Filter{}
@ -1647,7 +1639,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
// zones := []string{}
for _, subnet := range subnets {
subnetIds = append(subnetIds, subnet.SubnetID)
subnetIDs = append(subnetIDs, orEmpty(subnet.SubnetID))
if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) {
glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region)
return nil, fmt.Errorf("invalid AZ for region")
@ -1686,60 +1678,32 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, err
}
}
securityGroupIDs := []string{securityGroupID}
// Figure out what mappings we want on the load balancer
listeners := []*elb.Listener{}
for _, port := range ports {
if port.NodePort == 0 {
glog.Errorf("Ignoring port without NodePort defined: %v", port)
continue
}
instancePort := int64(port.NodePort)
loadBalancerPort := int64(port.Port)
protocol := strings.ToLower(string(port.Protocol))
listener := &elb.Listener{}
listener.InstancePort = &instancePort
listener.LoadBalancerPort = &loadBalancerPort
listener.Protocol = &protocol
listener.InstanceProtocol = &protocol
listeners = append(listeners, listener)
}
// Build the load balancer itself
var loadBalancer *elb.LoadBalancerDescription
{
loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil {
return nil, err
}
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name)
listeners := []*elb.Listener{}
for _, port := range ports {
if port.NodePort == 0 {
glog.Errorf("Ignoring port without NodePort defined: %v", port)
continue
}
instancePort := int64(port.NodePort)
loadBalancerPort := int64(port.Port)
protocol := strings.ToLower(string(port.Protocol))
listener := &elb.Listener{}
listener.InstancePort = &instancePort
listener.LoadBalancerPort = &loadBalancerPort
listener.Protocol = &protocol
listener.InstanceProtocol = &protocol
listeners = append(listeners, listener)
}
createRequest.Listeners = listeners
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.Subnets = subnetIds
createRequest.SecurityGroups = []*string{&securityGroupID}
glog.Info("Creating load balancer with name: ", name)
_, err := elbClient.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
}
loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil {
glog.Warning("Unable to retrieve load balancer immediately after creation")
return nil, err
}
} else {
// TODO: Verify that load balancer configuration matches?
}
loadBalancer, err := s.ensureLoadBalancer(region, name, listeners, subnetIDs, securityGroupIDs)
if err != nil {
return nil, err
}
err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances)
@ -1748,22 +1712,12 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, err
}
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName
for _, instance := range instances {
registerInstance := &elb.Instance{}
registerInstance.InstanceID = instance.InstanceID
registerRequest.Instances = append(registerRequest.Instances, registerInstance)
}
registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
err = s.ensureLoadBalancerInstances(elbClient, orEmpty(loadBalancer.LoadBalancerName), loadBalancer.Instances, instances)
if err != nil {
// TODO: Is it better to delete the load balancer entirely?
glog.Warningf("Error registering instances with load-balancer %s: %v", name, err)
glog.Warning("Error registering instances with the load balancer: %v", err)
return nil, err
}
glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances)
glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName))
// TODO: Wait for creation?
@ -2010,6 +1964,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
}
if len(securityGroupIDs) == 0 {
glog.V(2).Info("deleted all security groups for load balancer: ", name)
break
}
@ -2047,51 +2002,9 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er
return fmt.Errorf("Load balancer not found")
}
existingInstances := map[string]*elb.Instance{}
for _, instance := range lb.Instances {
existingInstances[orEmpty(instance.InstanceID)] = instance
}
wantInstances := map[string]*ec2.Instance{}
for _, instance := range instances {
wantInstances[orEmpty(instance.InstanceID)] = instance
}
addInstances := []*elb.Instance{}
for instanceId := range wantInstances {
addInstance := &elb.Instance{}
addInstance.InstanceID = aws.String(instanceId)
addInstances = append(addInstances, addInstance)
}
removeInstances := []*elb.Instance{}
for instanceId := range existingInstances {
_, found := wantInstances[instanceId]
if !found {
removeInstance := &elb.Instance{}
removeInstance.InstanceID = aws.String(instanceId)
removeInstances = append(removeInstances, removeInstance)
}
}
if len(addInstances) > 0 {
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = lb.LoadBalancerName
_, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil {
return err
}
}
if len(removeInstances) > 0 {
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = lb.LoadBalancerName
_, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
err = s.ensureLoadBalancerInstances(elbClient, orEmpty(lb.LoadBalancerName), lb.Instances, instances)
if err != nil {
return nil
}
err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances)

View File

@ -394,19 +394,42 @@ type FakeELB struct {
func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) {
panic("Not implemented")
}
type FakeASG struct {
aws *FakeAWSServices
}

View File

@ -131,6 +131,21 @@ func (s1 StringSet) IsSuperset(s2 StringSet) bool {
return true
}
// Equal returns true iff s1 is equal (as a set) to s2.
// Two sets are equal if their membership is identical.
// (In practice, this means same elements, order doesn't matter)
func (s1 StringSet) Equal(s2 StringSet) bool {
if len(s1) != len(s2) {
return false
}
for item := range s2 {
if !s1.Has(item) {
return false
}
}
return true
}
// List returns the contents as a sorted string slice.
func (s StringSet) List() []string {
res := make([]string, 0, len(s))