diff --git a/pkg/cloudprovider/aws/aws_loadbalancer.go b/pkg/cloudprovider/aws/aws_loadbalancer.go new file mode 100644 index 00000000000..519968be676 --- /dev/null +++ b/pkg/cloudprovider/aws/aws_loadbalancer.go @@ -0,0 +1,248 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/golang/glog" +) + +func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) { + elbClient, err := s.getELBClient(region) + if err != nil { + return nil, err + } + + loadBalancer, err := s.describeLoadBalancer(region, name) + if err != nil { + return nil, err + } + + dirty := false + + if loadBalancer == nil { + createRequest := &elb.CreateLoadBalancerInput{} + createRequest.LoadBalancerName = aws.String(name) + + createRequest.Listeners = listeners + + // We are supposed to specify one subnet per AZ. + // TODO: What happens if we have more than one subnet per AZ? + createRequest.Subnets = stringPointerArray(subnetIDs) + + createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) + + glog.Info("Creating load balancer with name: ", name) + _, err := elbClient.CreateLoadBalancer(createRequest) + if err != nil { + return nil, err + } + dirty = true + } else { + { + // Sync subnets + expected := util.NewStringSet(subnetIDs...) + actual := stringSetFromPointers(loadBalancer.Subnets) + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + if len(removals) != 0 { + request := &elb.DetachLoadBalancerFromSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(removals) + glog.V(2).Info("Detaching load balancer from removed subnets") + _, err := elbClient.DetachLoadBalancerFromSubnets(request) + if err != nil { + return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.AttachLoadBalancerToSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(additions) + glog.V(2).Info("Attaching load balancer to added subnets") + _, err := elbClient.AttachLoadBalancerToSubnets(request) + if err != nil { + return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err) + } + dirty = true + } + } + + { + // Sync security groups + expected := util.NewStringSet(securityGroupIDs...) + actual := stringSetFromPointers(loadBalancer.SecurityGroups) + + if !expected.Equal(actual) { + // This call just replaces the security groups, unlike e.g. subnets (!) + request := &elb.ApplySecurityGroupsToLoadBalancerInput{} + request.LoadBalancerName = aws.String(name) + request.SecurityGroups = stringPointerArray(securityGroupIDs) + glog.V(2).Info("Applying updated security groups to load balancer") + _, err := elbClient.ApplySecurityGroupsToLoadBalancer(request) + if err != nil { + return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err) + } + dirty = true + } + } + + { + // Sync listeners + listenerDescriptions := loadBalancer.ListenerDescriptions + + foundSet := make(map[int]bool) + removals := []*int64{} + for _, listenerDescription := range listenerDescriptions { + actual := listenerDescription.Listener + if actual == nil { + glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name) + continue + } + + found := -1 + for i, expected := range listeners { + if orEmpty(actual.Protocol) != orEmpty(expected.Protocol) { + continue + } + if orEmpty(actual.InstanceProtocol) != orEmpty(expected.InstanceProtocol) { + continue + } + if orZero(actual.InstancePort) != orZero(expected.InstancePort) { + continue + } + if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) { + continue + } + if orEmpty(actual.SSLCertificateID) != orEmpty(expected.SSLCertificateID) { + continue + } + found = i + } + if found != -1 { + foundSet[found] = true + } else { + removals = append(removals, actual.LoadBalancerPort) + } + } + + additions := []*elb.Listener{} + for i := range listeners { + if foundSet[i] { + continue + } + additions = append(additions, listeners[i]) + } + + if len(removals) != 0 { + request := &elb.DeleteLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.LoadBalancerPorts = removals + glog.V(2).Info("Deleting removed load balancer listeners") + _, err := elbClient.DeleteLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.CreateLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.Listeners = additions + glog.V(2).Info("Creating added load balancer listeners") + _, err := elbClient.CreateLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err) + } + dirty = true + } + } + } + + if dirty { + loadBalancer, err = s.describeLoadBalancer(region, name) + if err != nil { + glog.Warning("Unable to retrieve load balancer after creation/update") + return nil, err + } + } + + return loadBalancer, nil +} + +// Makes sure that exactly the specified hosts are registered as instances with the load balancer +func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { + expected := util.NewStringSet() + for _, instance := range instances { + expected.Insert(orEmpty(instance.InstanceID)) + } + + actual := util.NewStringSet() + for _, lbInstance := range lbInstances { + actual.Insert(orEmpty(lbInstance.InstanceID)) + } + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + addInstances := []*elb.Instance{} + for instanceId := range additions { + addInstance := &elb.Instance{} + addInstance.InstanceID = aws.String(instanceId) + addInstances = append(addInstances, addInstance) + } + + removeInstances := []*elb.Instance{} + for instanceId := range removals { + removeInstance := &elb.Instance{} + removeInstance.InstanceID = aws.String(instanceId) + removeInstances = append(removeInstances, removeInstance) + } + + if len(addInstances) > 0 { + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} + registerRequest.Instances = addInstances + registerRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName) + } + + if len(removeInstances) > 0 { + deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} + deregisterRequest.Instances = removeInstances + deregisterRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName) + } + + return nil +} diff --git a/pkg/cloudprovider/aws/aws_utils.go b/pkg/cloudprovider/aws/aws_utils.go new file mode 100644 index 00000000000..a4f80ad8a6b --- /dev/null +++ b/pkg/cloudprovider/aws/aws_utils.go @@ -0,0 +1,51 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/aws/aws-sdk-go/aws" +) + +func stringSetToPointers(in util.StringSet) []*string { + if in == nil { + return nil + } + out := make([]*string, len(in)) + for k := range in { + out = append(out, aws.String(k)) + } + return out +} + +func stringSetFromPointers(in []*string) util.StringSet { + if in == nil { + return nil + } + out := util.NewStringSet() + for i := range in { + out.Insert(orEmpty(in[i])) + } + return out +} + +func orZero(v *int64) int64 { + if v == nil { + return 0 + } + return *v +} diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index cf8381fe552..54cd9d4a586 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -109,6 +109,14 @@ type ELB interface { DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) + + DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) + AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) + + CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) + DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) + + ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) } // This is a simple pass-through of the Autoscaling client interface, which allows for testing @@ -1581,27 +1589,11 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp } } -// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer -// TODO(justinsb): This must be idempotent +// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) - glog.V(2).Info("Checking if load balancer already exists: %s", name) - _, exists, err := s.GetTCPLoadBalancer(name, region) - if err != nil { - return nil, fmt.Errorf("error checking if AWS load balancer already exists: %v", err) - } - - // TODO: Implement a more efficient update strategy for common changes than delete & create - // In particular, if we implement hosts update, we can get rid of UpdateHosts - if exists { - err := s.EnsureTCPLoadBalancerDeleted(name, region) - if err != nil { - return nil, fmt.Errorf("error deleting existing AWS load balancer: %v", err) - } - } - elbClient, err := s.getELBClient(region) if err != nil { return nil, err @@ -1631,7 +1623,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p } // Construct list of configured subnets - subnetIds := []*string{} + subnetIDs := []string{} { request := &ec2.DescribeSubnetsInput{} filters := []*ec2.Filter{} @@ -1647,7 +1639,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p // zones := []string{} for _, subnet := range subnets { - subnetIds = append(subnetIds, subnet.SubnetID) + subnetIDs = append(subnetIDs, orEmpty(subnet.SubnetID)) if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) return nil, fmt.Errorf("invalid AZ for region") @@ -1686,60 +1678,32 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } } + securityGroupIDs := []string{securityGroupID} + + // Figure out what mappings we want on the load balancer + listeners := []*elb.Listener{} + for _, port := range ports { + if port.NodePort == 0 { + glog.Errorf("Ignoring port without NodePort defined: %v", port) + continue + } + instancePort := int64(port.NodePort) + loadBalancerPort := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + + listener := &elb.Listener{} + listener.InstancePort = &instancePort + listener.LoadBalancerPort = &loadBalancerPort + listener.Protocol = &protocol + listener.InstanceProtocol = &protocol + + listeners = append(listeners, listener) + } // Build the load balancer itself - var loadBalancer *elb.LoadBalancerDescription - { - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - return nil, err - } - - if loadBalancer == nil { - createRequest := &elb.CreateLoadBalancerInput{} - createRequest.LoadBalancerName = aws.String(name) - - listeners := []*elb.Listener{} - for _, port := range ports { - if port.NodePort == 0 { - glog.Errorf("Ignoring port without NodePort defined: %v", port) - continue - } - instancePort := int64(port.NodePort) - loadBalancerPort := int64(port.Port) - protocol := strings.ToLower(string(port.Protocol)) - - listener := &elb.Listener{} - listener.InstancePort = &instancePort - listener.LoadBalancerPort = &loadBalancerPort - listener.Protocol = &protocol - listener.InstanceProtocol = &protocol - - listeners = append(listeners, listener) - } - - createRequest.Listeners = listeners - - // We are supposed to specify one subnet per AZ. - // TODO: What happens if we have more than one subnet per AZ? - createRequest.Subnets = subnetIds - - createRequest.SecurityGroups = []*string{&securityGroupID} - - glog.Info("Creating load balancer with name: ", name) - _, err := elbClient.CreateLoadBalancer(createRequest) - if err != nil { - return nil, err - } - - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - glog.Warning("Unable to retrieve load balancer immediately after creation") - return nil, err - } - } else { - // TODO: Verify that load balancer configuration matches? - } + loadBalancer, err := s.ensureLoadBalancer(region, name, listeners, subnetIDs, securityGroupIDs) + if err != nil { + return nil, err } err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) @@ -1748,22 +1712,12 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName - for _, instance := range instances { - registerInstance := &elb.Instance{} - registerInstance.InstanceID = instance.InstanceID - registerRequest.Instances = append(registerRequest.Instances, registerInstance) - } - - registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(loadBalancer.LoadBalancerName), loadBalancer.Instances, instances) if err != nil { - // TODO: Is it better to delete the load balancer entirely? - glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) + glog.Warning("Error registering instances with the load balancer: %v", err) return nil, err } - glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName)) // TODO: Wait for creation? @@ -2010,6 +1964,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { } if len(securityGroupIDs) == 0 { + glog.V(2).Info("deleted all security groups for load balancer: ", name) break } @@ -2047,51 +2002,9 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er return fmt.Errorf("Load balancer not found") } - existingInstances := map[string]*elb.Instance{} - for _, instance := range lb.Instances { - existingInstances[orEmpty(instance.InstanceID)] = instance - } - - wantInstances := map[string]*ec2.Instance{} - for _, instance := range instances { - wantInstances[orEmpty(instance.InstanceID)] = instance - } - - addInstances := []*elb.Instance{} - for instanceId := range wantInstances { - addInstance := &elb.Instance{} - addInstance.InstanceID = aws.String(instanceId) - addInstances = append(addInstances, addInstance) - } - - removeInstances := []*elb.Instance{} - for instanceId := range existingInstances { - _, found := wantInstances[instanceId] - if !found { - removeInstance := &elb.Instance{} - removeInstance.InstanceID = aws.String(instanceId) - removeInstances = append(removeInstances, removeInstance) - } - } - - if len(addInstances) > 0 { - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.Instances = addInstances - registerRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest) - if err != nil { - return err - } - } - - if len(removeInstances) > 0 { - deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} - deregisterRequest.Instances = removeInstances - deregisterRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) - if err != nil { - return err - } + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(lb.LoadBalancerName), lb.Instances, instances) + if err != nil { + return nil } err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 817363f7c8d..5066ca1407c 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -394,19 +394,42 @@ type FakeELB struct { func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { panic("Not implemented") } func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) { panic("Not implemented") } +func (ec2 *FakeELB) DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) { + panic("Not implemented") +} + type FakeASG struct { aws *FakeAWSServices } diff --git a/pkg/util/set.go b/pkg/util/set.go index 084593f06ba..431312c6d51 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -131,6 +131,21 @@ func (s1 StringSet) IsSuperset(s2 StringSet) bool { return true } +// Equal returns true iff s1 is equal (as a set) to s2. +// Two sets are equal if their membership is identical. +// (In practice, this means same elements, order doesn't matter) +func (s1 StringSet) Equal(s2 StringSet) bool { + if len(s1) != len(s2) { + return false + } + for item := range s2 { + if !s1.Has(item) { + return false + } + } + return true +} + // List returns the contents as a sorted string slice. func (s StringSet) List() []string { res := make([]string, 0, len(s))