Change LoadBalancer methods to take api.Service

This is a better abstraction than passing in specific pieces of the
Service that each of the cloudproviders may or may not need. For
instance, many of the providers don't need a region, yet this is passed
in. Similarly many of the providers want a string IP for the load
balancer, but it passes in a converted net ip. Affinity is unused by
AWS. A provider change may also require adding a new parameter which has
an effect on all other cloud provider implementations.

Further, this will simplify adding provider specific load balancer
options, such as with labels or some other metadata. For example, we
could add labels for configuring the details of an AWS elastic load
balancer, such as idle timeout on connections, whether it is
internal or external, cross-zone load balancing, and so on.

Authors: @chbatey, @jsravn
This commit is contained in:
Chris Batey and James Ravn 2016-02-17 11:36:50 +00:00 committed by Chris Batey
parent 76369c42be
commit be9ce30897
10 changed files with 254 additions and 243 deletions

View File

@ -19,11 +19,9 @@ package cloudprovider
import (
"errors"
"fmt"
"net"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
)
// Interface is an abstract, pluggable interface for cloud providers.
@ -82,18 +80,22 @@ type LoadBalancer interface {
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// Implementations must treat the *api.Service parameter as read-only and not modify it.
GetLoadBalancer(service *api.Service) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error)
// Implementations must treat the *api.Service parameter as read-only and not modify it.
EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
UpdateLoadBalancer(name, region string, hosts []string) error
// Implementations must treat the *api.Service parameter as read-only and not modify it.
UpdateLoadBalancer(service *api.Service, hosts []string) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
// This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
EnsureLoadBalancerDeleted(name, region string) error
// Implementations must treat the *api.Service parameter as read-only and not modify it.
EnsureLoadBalancerDeleted(service *api.Service) error
}
// Instances is an abstract, pluggable interface for sets of instances.

View File

@ -2099,31 +2099,27 @@ func isSubnetPublic(rt []*ec2.RouteTable, subnetID string) (bool, error) {
}
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations)
func (s *AWSCloud) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
apiService.Namespace, apiService.Name, s.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
if region != s.region {
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
if affinity != api.ServiceAffinityNone {
if apiService.Spec.SessionAffinity != api.ServiceAffinityNone {
// ELB supports sticky sessions, but only when configured for HTTP/HTTPS
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
return nil, fmt.Errorf("unsupported load balancer affinity: %v", apiService.Spec.SessionAffinity)
}
if len(ports) == 0 {
if len(apiService.Spec.Ports) == 0 {
return nil, fmt.Errorf("requested load balancer with no ports")
}
for _, port := range ports {
for _, port := range apiService.Spec.Ports {
if port.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
}
}
if publicIP != nil {
return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB")
if apiService.Spec.LoadBalancerIP != "" {
return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
}
instances, err := s.getInstancesByNodeNames(hosts)
@ -2162,11 +2158,14 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB")
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
// Create a security group for the load balancer
var securityGroupID string
{
sgName := "k8s-elb-" + name
sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", name, serviceName)
sgName := "k8s-elb-" + loadBalancerName
sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", loadBalancerName, serviceName)
securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription)
if err != nil {
glog.Error("Error creating load balancer security group: ", err)
@ -2179,7 +2178,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
}
permissions := NewIPPermissionSet()
for _, port := range ports {
for _, port := range apiService.Spec.Ports {
portInt64 := int64(port.Port)
protocol := strings.ToLower(string(port.Protocol))
@ -2200,7 +2199,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
// Figure out what mappings we want on the load balancer
listeners := []*elb.Listener{}
for _, port := range ports {
for _, port := range apiService.Spec.Ports {
if port.NodePort == 0 {
glog.Errorf("Ignoring port without NodePort defined: %v", port)
continue
@ -2219,7 +2218,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
}
// Build the load balancer itself
loadBalancer, err := s.ensureLoadBalancer(serviceName, name, listeners, subnetIDs, securityGroupIDs, internalELB)
loadBalancer, err := s.ensureLoadBalancer(serviceName, loadBalancerName, listeners, subnetIDs, securityGroupIDs, internalELB)
if err != nil {
return nil, err
}
@ -2241,7 +2240,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
return nil, err
}
glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", name, serviceName, orEmpty(loadBalancer.DNSName))
glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", loadBalancerName, serviceName, orEmpty(loadBalancer.DNSName))
// TODO: Wait for creation?
@ -2250,12 +2249,9 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
if region != s.region {
return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
lb, err := s.describeLoadBalancer(name)
func (s *AWSCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
lb, err := s.describeLoadBalancer(loadBalancerName)
if err != nil {
return nil, false, err
}
@ -2464,18 +2460,15 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan
}
// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
lb, err := s.describeLoadBalancer(name)
func (s *AWSCloud) EnsureLoadBalancerDeleted(service *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
lb, err := s.describeLoadBalancer(loadBalancerName)
if err != nil {
return err
}
if lb == nil {
glog.Info("Load balancer already deleted: ", name)
glog.Info("Load balancer already deleted: ", loadBalancerName)
return nil
}
@ -2510,7 +2503,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
securityGroupIDs := map[string]struct{}{}
for _, securityGroupID := range lb.SecurityGroups {
if isNilOrEmpty(securityGroupID) {
glog.Warning("Ignoring empty security group in ", name)
glog.Warning("Ignoring empty security group in ", service.Name)
continue
}
securityGroupIDs[*securityGroupID] = struct{}{}
@ -2540,7 +2533,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
}
if len(securityGroupIDs) == 0 {
glog.V(2).Info("Deleted all security groups for load balancer: ", name)
glog.V(2).Info("Deleted all security groups for load balancer: ", service.Name)
break
}
@ -2550,10 +2543,10 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
ids = append(ids, id)
}
return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", name, strings.Join(ids, ","))
return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", service.Name, strings.Join(ids, ","))
}
glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", name)
glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", service.Name)
time.Sleep(10 * time.Second)
}
@ -2563,17 +2556,14 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
}
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
func (s *AWSCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
instances, err := s.getInstancesByNodeNames(hosts)
if err != nil {
return err
}
lb, err := s.describeLoadBalancer(name)
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
lb, err := s.describeLoadBalancer(loadBalancerName)
if err != nil {
return err
}

View File

@ -28,8 +28,8 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
)
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := s.describeLoadBalancer(name)
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := s.describeLoadBalancer(loadBalancerName)
if err != nil {
return nil, err
}
@ -38,7 +38,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name)
createRequest.LoadBalancerName = aws.String(loadBalancerName)
createRequest.Listeners = listeners
@ -57,7 +57,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
}
glog.Infof("Creating load balancer for %v with name: %s", namespacedName, name)
glog.Infof("Creating load balancer for %v with name: ", namespacedName, loadBalancerName)
_, err := s.elb.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
@ -76,7 +76,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if removals.Len() != 0 {
request := &elb.DetachLoadBalancerFromSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(removals)
glog.V(2).Info("Detaching load balancer from removed subnets")
_, err := s.elb.DetachLoadBalancerFromSubnets(request)
@ -88,7 +88,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if additions.Len() != 0 {
request := &elb.AttachLoadBalancerToSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(additions)
glog.V(2).Info("Attaching load balancer to added subnets")
_, err := s.elb.AttachLoadBalancerToSubnets(request)
@ -107,7 +107,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if !expected.Equal(actual) {
// This call just replaces the security groups, unlike e.g. subnets (!)
request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerName = aws.String(loadBalancerName)
request.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.V(2).Info("Applying updated security groups to load balancer")
_, err := s.elb.ApplySecurityGroupsToLoadBalancer(request)
@ -127,7 +127,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
for _, listenerDescription := range listenerDescriptions {
actual := listenerDescription.Listener
if actual == nil {
glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name)
glog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName)
continue
}
@ -167,7 +167,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if len(removals) != 0 {
request := &elb.DeleteLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerName = aws.String(loadBalancerName)
request.LoadBalancerPorts = removals
glog.V(2).Info("Deleting removed load balancer listeners")
_, err := s.elb.DeleteLoadBalancerListeners(request)
@ -179,7 +179,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if len(additions) != 0 {
request := &elb.CreateLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerName = aws.String(loadBalancerName)
request.Listeners = additions
glog.V(2).Info("Creating added load balancer listeners")
_, err := s.elb.CreateLoadBalancerListeners(request)
@ -192,7 +192,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
}
if dirty {
loadBalancer, err = s.describeLoadBalancer(name)
loadBalancer, err = s.describeLoadBalancer(loadBalancerName)
if err != nil {
glog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err

View File

@ -17,7 +17,6 @@ limitations under the License.
package aws
import (
"fmt"
"io"
"reflect"
"strings"
@ -31,7 +30,6 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -283,15 +281,14 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool {
return contains(filter.Values, *instance.State.Name)
}
if name == "tag:"+TagNameKubernetesCluster {
for _, tag := range instance.Tags {
if *tag.Key == TagNameKubernetesCluster {
return contains(filter.Values, *tag.Value)
if strings.HasPrefix(name, "tag:") {
tagName := name[4:]
for _, instanceTag := range instance.Tags {
if aws.StringValue(instanceTag.Key) == tagName && contains(filter.Values, aws.StringValue(instanceTag.Value)) {
return true
}
}
return false
}
panic("Unknown filter name: " + name)
}
@ -443,18 +440,20 @@ func (s *FakeEC2) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeIn
type FakeELB struct {
aws *FakeAWSServices
mock.Mock
}
func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) {
func (ec2 *FakeELB) DeleteLoadBalancer(input *elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) {
panic("Not implemented")
func (ec2 *FakeELB) DescribeLoadBalancers(input *elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) {
args := ec2.Called(input)
return args.Get(0).(*elb.DescribeLoadBalancersOutput), nil
}
func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) {
panic("Not implemented")
@ -730,40 +729,6 @@ func TestFindVPCID(t *testing.T) {
}
}
func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
if err != nil {
t.Errorf("Error building aws cloud: %v", err)
return
}
badELBRegion := "bad-elb-region"
errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region)
_, _, err = c.GetLoadBalancer("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected GetLoadBalancer region mismatch error.")
}
serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
}
err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.")
}
err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected UpdateLoadBalancer region mismatch error.")
}
}
func constructSubnets(subnetsIn map[int]map[string]string) (subnetsOut []*ec2.Subnet) {
for i := range subnetsIn {
subnetsOut = append(
@ -1076,7 +1041,6 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) {
t.Errorf("Should have not been considered equal since first is not in the second array of groups")
}
}
func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) {
awsServices := NewFakeAWSServices()
@ -1197,3 +1161,41 @@ func TestGetVolumeLabels(t *testing.T) {
unversioned.LabelZoneRegion: "us-east-1"}, labels)
awsServices.ec2.AssertExpectations(t)
}
func (self *FakeELB) expectDescribeLoadBalancers(loadBalancerName string) {
self.On("DescribeLoadBalancers", &elb.DescribeLoadBalancersInput{LoadBalancerNames: []*string{aws.String(loadBalancerName)}}).Return(&elb.DescribeLoadBalancersOutput{
LoadBalancerDescriptions: []*elb.LoadBalancerDescription{{}},
})
}
func TestDescribeLoadBalancerOnDelete(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancerDeleted(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnUpdate(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.UpdateLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{})
}
func TestDescribeLoadBalancerOnGet(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}, map[string]string{})
}

View File

@ -25,24 +25,22 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
)
const ProviderName = "fake"
// FakeBalancer is a fake storage of balancer information
type FakeBalancer struct {
Name string
Region string
ExternalIP net.IP
Ports []*api.ServicePort
Hosts []string
Name string
Region string
LoadBalancerIP string
Ports []api.ServicePort
Hosts []string
}
type FakeUpdateBalancerCall struct {
Name string
Region string
Hosts []string
Service *api.Service
Hosts []string
}
// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
@ -123,7 +121,7 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
}
// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
func (f *FakeCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -132,12 +130,22 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
func (f *FakeCloud) EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
f.addCall("create")
if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer)
}
f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts}
name := cloudprovider.GetLoadBalancerName(service)
spec := service.Spec
zone, err := f.GetZone()
if err != nil {
return nil, err
}
region := zone.Region
f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -147,15 +155,15 @@ func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, p
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error {
func (f *FakeCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts})
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts})
return f.Err
}
// EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted.
// It adds an entry "delete" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error {
func (f *FakeCloud) EnsureLoadBalancerDeleted(service *api.Service) error {
f.addCall("delete")
return f.Err
}

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"path"
"regexp"
@ -439,8 +438,9 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
func (gce *GCECloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err == nil {
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
@ -465,14 +465,7 @@ func isHTTPErrorCode(err error, code int) bool {
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
portStr := []string{}
for _, p := range ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
serviceName := svc.String()
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations)
func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
@ -482,8 +475,21 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return nil, err
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancerIP := apiService.Spec.LoadBalancerIP
ports := apiService.Spec.Ports
portStr := []string{}
for _, p := range apiService.Spec.Ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hosts, serviceName, annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports)
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
if err != nil {
return nil, err
}
@ -517,45 +523,45 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return
}
if isSafeToReleaseIP {
if err := gce.deleteStaticIP(name, region); err != nil {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", name, serviceName, ipAddress)
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
}()
if requestedIP != nil {
if loadBalancerIP != "" {
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
if isStatic, err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", requestedIP.String(), err)
if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err)
} else if isStatic {
// The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true
isSafeToReleaseIP = false
ipAddress = requestedIP.String()
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", name, serviceName, ipAddress)
} else if requestedIP.String() == fwdRuleIP {
ipAddress = loadBalancerIP
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress)
} else if loadBalancerIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it.
isUserOwnedIP = false
isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP)
ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", name, serviceName, ipAddress)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", requestedIP.String(), name, serviceName, err)
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err)
}
} else {
// The user did not request a specific IP.
@ -566,7 +572,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// IP from ephemeral to static, or it will just get the IP if it is
// already static.
existed := false
ipAddress, existed, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP)
ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
@ -576,13 +582,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", name, serviceName, ipAddress)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case
// of failure or success.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", name, serviceName, ipAddress)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress)
}
}
@ -595,29 +601,29 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return nil, err
}
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, serviceName, region, ipAddress, ports, sourceRanges)
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges)
if err != nil {
return nil, err
}
if firewallNeedsUpdate {
desc := makeFirewallDescription(serviceName, ipAddress)
desc := makeFirewallDescription(serviceName.String(), ipAddress)
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else {
if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
}
}
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType)
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType)
if err != nil {
return nil, err
}
@ -634,36 +640,36 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later.
isSafeToReleaseIP = false
if err := gce.deleteForwardingRule(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", name, serviceName)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(name, region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", name, serviceName)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
if err := gce.createTargetPool(name, serviceName, region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, serviceName, region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err)
if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
}
// End critical section. It is safe to release the static IP (which
// just demotes it to ephemeral) now that it is attached. In the case
// of a user-requested IP, the "is user-owned" flag will be set,
// preventing it from actually being released.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", name, serviceName, ipAddress)
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
}
status := &api.LoadBalancerStatus{}
@ -675,7 +681,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated,
// what its IP address is (if it exists), and any error we encountered.
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
@ -683,7 +689,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
}
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
if requestedIP != nil && requestedIP.String() != fwd.IPAddress {
if loadBalancerIP != fwd.IPAddress {
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
@ -701,7 +707,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
return true, false, fwd.IPAddress, nil
}
func loadBalancerPortRange(ports []*api.ServicePort) (string, error) {
func loadBalancerPortRange(ports []api.ServicePort) (string, error) {
if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
@ -753,7 +759,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
}
}
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
@ -814,7 +820,7 @@ func slicesEqual(x, y []string) bool {
return true
}
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []*api.ServicePort) error {
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return err
@ -865,7 +871,7 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []
return nil
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error {
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
@ -883,7 +889,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
return nil
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error {
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
@ -901,7 +907,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets
return nil
}
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
@ -1053,13 +1059,14 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
}
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) error {
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
@ -1083,22 +1090,22 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do()
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
}
if err := gce.waitForRegionOp(op, region); err != nil {
if err := gce.waitForRegionOp(op, gce.region); err != nil {
return err
}
}
if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do()
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
}
if err := gce.waitForRegionOp(op, region); err != nil {
if err := gce.waitForRegionOp(op, gce.region); err != nil {
return err
}
}
@ -1106,41 +1113,44 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
// Try to verify that the correct number of nodes are now in the target pool.
// We've been bitten by a bug here before (#11327) where all nodes were
// accidentally removed and want to make similar problems easier to notice.
updatedPool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
updatedPool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
if len(updatedPool.Instances) != len(hosts) {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), name, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), name, len(hosts))
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
}
return nil
}
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error {
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v", name, region)
err := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) },
func (gce *GCECloud) EnsureLoadBalancerDeleted(service *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v)", service.Namespace, service.Name, loadBalancerName,
gce.region)
errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(loadBalancerName, gce.region) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(name, region) },
func() error { return gce.deleteStaticIP(loadBalancerName, gce.region) },
func() error {
// The forwarding rule must be deleted before either the target pool can,
// unfortunately, so we have to do these two serially.
if err := gce.deleteForwardingRule(name, region); err != nil {
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return err
}
if err := gce.deleteTargetPool(name, region); err != nil {
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return err
}
return nil
},
)
if err != nil {
return utilerrors.Flatten(err)
if errs != nil {
return utilerrors.Flatten(errs)
}
return nil
}
@ -1226,9 +1236,9 @@ func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNe
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take api.ServicePorts.
svcPorts := []*api.ServicePort{}
svcPorts := []api.ServicePort{}
for _, p := range ports {
svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)})
svcPorts = append(svcPorts, api.ServicePort{Port: int(p)})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
@ -1255,9 +1265,9 @@ func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNe
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take api.ServicePorts.
svcPorts := []*api.ServicePort{}
svcPorts := []api.ServicePort{}
for _, p := range ports {
svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)})
svcPorts = append(svcPorts, api.ServicePort{Port: int(p)})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"regexp"
"strings"
@ -47,7 +46,6 @@ import (
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
)
const ProviderName = "openstack"
@ -641,8 +639,9 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f
return &floatingIPList[0], nil
}
func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
vip, err := getVipByName(lb.network, name)
func (lb *LoadBalancer) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
vip, err := getVipByName(lb.network, loadBalancerName)
if err == ErrNotFound {
return nil, false, nil
}
@ -661,9 +660,10 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
// a list of regions (from config) and query/create loadbalancers in
// each region.
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations)
func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
@ -676,6 +676,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
affinity := apiService.Spec.SessionAffinity
var persistence *vips.SessionPersistence
switch affinity {
case api.ServiceAffinityNone:
@ -695,8 +696,8 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
}
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", name)
_, exists, err := lb.GetLoadBalancer(name, region)
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lb.GetLoadBalancer(apiService)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
}
@ -704,7 +705,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lb.EnsureLoadBalancerDeleted(name, region)
err := lb.EnsureLoadBalancerDeleted(apiService)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
@ -714,6 +715,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
if lbmethod == "" {
lbmethod = pools.LBMethodRoundRobin
}
name := cloudprovider.GetLoadBalancerName(apiService)
pool, err := pools.Create(lb.network, pools.CreateOpts{
Name: name,
Protocol: pools.ProtocolTCP,
@ -771,8 +773,10 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
SubnetID: lb.opts.SubnetId,
Persistence: persistence,
}
if loadBalancerIP != nil {
createOpts.Address = loadBalancerIP.String()
loadBalancerIP := apiService.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.Address = loadBalancerIP
}
vip, err := vips.Create(lb.network, createOpts).Extract()
@ -805,10 +809,11 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
}
func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error {
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts)
func (lb *LoadBalancer) UpdateLoadBalancer(service *api.Service, hosts []string) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
vip, err := getVipByName(lb.network, name)
vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil {
return err
}
@ -866,10 +871,11 @@ func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string)
return nil
}
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error {
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region)
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(service *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
vip, err := getVipByName(lb.network, name)
vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil && err != ErrNotFound {
return err
}
@ -907,7 +913,7 @@ func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error {
// still exists that we failed to delete on some
// previous occasion. Make a best effort attempt to
// cleanup any pools with the same name as the VIP.
pool, err = getPoolByName(lb.network, name)
pool, err = getPoolByName(lb.network, service.Name)
if err != nil && err != ErrNotFound {
return err
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/util/rand"
"github.com/rackspace/gophercloud"
"k8s.io/kubernetes/pkg/api"
)
func TestReadConfig(t *testing.T) {
@ -169,7 +170,7 @@ func TestLoadBalancer(t *testing.T) {
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
}
_, exists, err := lb.GetLoadBalancer("noexist", "region")
_, exists, err := lb.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}})
if err != nil {
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
}

View File

@ -18,7 +18,6 @@ package service
import (
"fmt"
"net"
"sort"
"sync"
"time"
@ -31,7 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types"
@ -90,7 +89,7 @@ type ServiceController struct {
// (like load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{kubeClient.Core().Events("")})
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
return &ServiceController{
@ -251,7 +250,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
} else if errors.IsNotFound(err) {
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region)
err := s.balancer.EnsureLoadBalancerDeleted(deltaService)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
@ -315,7 +314,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
// If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it.
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region)
_, exists, err := s.balancer.GetLoadBalancer(service)
if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
}
@ -327,7 +326,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
if err := s.balancer.EnsureLoadBalancerDeleted(service); err != nil {
return err, retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
@ -341,8 +340,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service, namespacedName)
err := s.createLoadBalancer(service)
if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
}
@ -392,21 +390,16 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
return err
}
func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName types.NamespacedName) error {
ports, err := getPortsForLB(service)
if err != nil {
return err
}
func (s *ServiceController) createLoadBalancer(service *api.Service) error {
nodes, err := s.nodeLister.List()
if err != nil {
return err
}
name := s.loadBalancerName(service)
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations)
status, err := s.balancer.EnsureLoadBalancer(service, hostsFromNodeList(&nodes), service.ObjectMeta.Annotations)
if err != nil {
return err
} else {
@ -727,16 +720,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
}
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
name := cloudprovider.GetLoadBalancerName(service)
err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts)
err := s.balancer.UpdateLoadBalancer(service, hosts)
if err == nil {
s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
return nil
}
// It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil {
glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err)
if _, exists, err := s.balancer.GetLoadBalancer(service); err != nil {
glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
} else if !exists {
return nil
}

View File

@ -166,7 +166,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s0", "333", api.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a333", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{newService("s0", "333", api.ServiceTypeLoadBalancer), hosts},
},
},
{
@ -177,9 +177,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s2", "666", api.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a444", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{Name: "a555", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{Name: "a666", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{newService("s0", "444", api.ServiceTypeLoadBalancer), hosts},
{newService("s1", "555", api.ServiceTypeLoadBalancer), hosts},
{newService("s2", "666", api.ServiceTypeLoadBalancer), hosts},
},
},
{
@ -191,8 +191,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s4", "123", api.ServiceTypeClusterIP),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a888", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{Name: "a999", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{newService("s1", "888", api.ServiceTypeLoadBalancer), hosts},
{newService("s3", "999", api.ServiceTypeLoadBalancer), hosts},
},
},
{
@ -202,7 +202,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nil,
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a234", Region: region, Hosts: []string{"node0", "node1", "node73"}},
{newService("s0", "234", api.ServiceTypeLoadBalancer), hosts},
},
},
}