Merge pull request #21378 from sky-uk/pass-service-to-lb-provider

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-03-23 18:18:58 -07:00
commit f3af5d26a4
10 changed files with 254 additions and 243 deletions

View File

@ -19,11 +19,9 @@ package cloudprovider
import ( import (
"errors" "errors"
"fmt" "fmt"
"net"
"strings" "strings"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
) )
// Interface is an abstract, pluggable interface for cloud providers. // Interface is an abstract, pluggable interface for cloud providers.
@ -82,18 +80,22 @@ type LoadBalancer interface {
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// GetLoadBalancer returns whether the specified load balancer exists, and // GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is. // if so, what its status is.
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) // Implementations must treat the *api.Service parameter as read-only and not modify it.
GetLoadBalancer(service *api.Service) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) // Implementations must treat the *api.Service parameter as read-only and not modify it.
EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer. // UpdateLoadBalancer updates hosts under the specified load balancer.
UpdateLoadBalancer(name, region string, hosts []string) error // Implementations must treat the *api.Service parameter as read-only and not modify it.
UpdateLoadBalancer(service *api.Service, hosts []string) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it // EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or // exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted. // was successfully deleted.
// This construction is useful because many cloud providers' load balancers // This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB // have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around. // doesn't exist even if some part of it is still laying around.
EnsureLoadBalancerDeleted(name, region string) error // Implementations must treat the *api.Service parameter as read-only and not modify it.
EnsureLoadBalancerDeleted(service *api.Service) error
} }
// Instances is an abstract, pluggable interface for sets of instances. // Instances is an abstract, pluggable interface for sets of instances.

View File

@ -2099,31 +2099,27 @@ func isSubnetPublic(rt []*ec2.RouteTable, subnetID string) (bool, error) {
} }
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer // EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway. func (s *AWSCloud) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations) apiService.Namespace, apiService.Name, s.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
if region != s.region { if apiService.Spec.SessionAffinity != api.ServiceAffinityNone {
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
if affinity != api.ServiceAffinityNone {
// ELB supports sticky sessions, but only when configured for HTTP/HTTPS // ELB supports sticky sessions, but only when configured for HTTP/HTTPS
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) return nil, fmt.Errorf("unsupported load balancer affinity: %v", apiService.Spec.SessionAffinity)
} }
if len(ports) == 0 { if len(apiService.Spec.Ports) == 0 {
return nil, fmt.Errorf("requested load balancer with no ports") return nil, fmt.Errorf("requested load balancer with no ports")
} }
for _, port := range ports { for _, port := range apiService.Spec.Ports {
if port.Protocol != api.ProtocolTCP { if port.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB") return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
} }
} }
if publicIP != nil { if apiService.Spec.LoadBalancerIP != "" {
return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
} }
instances, err := s.getInstancesByNodeNames(hosts) instances, err := s.getInstancesByNodeNames(hosts)
@ -2162,11 +2158,14 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB") return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB")
} }
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
// Create a security group for the load balancer // Create a security group for the load balancer
var securityGroupID string var securityGroupID string
{ {
sgName := "k8s-elb-" + name sgName := "k8s-elb-" + loadBalancerName
sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", name, serviceName) sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", loadBalancerName, serviceName)
securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription) securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription)
if err != nil { if err != nil {
glog.Error("Error creating load balancer security group: ", err) glog.Error("Error creating load balancer security group: ", err)
@ -2179,7 +2178,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
} }
permissions := NewIPPermissionSet() permissions := NewIPPermissionSet()
for _, port := range ports { for _, port := range apiService.Spec.Ports {
portInt64 := int64(port.Port) portInt64 := int64(port.Port)
protocol := strings.ToLower(string(port.Protocol)) protocol := strings.ToLower(string(port.Protocol))
@ -2200,7 +2199,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
// Figure out what mappings we want on the load balancer // Figure out what mappings we want on the load balancer
listeners := []*elb.Listener{} listeners := []*elb.Listener{}
for _, port := range ports { for _, port := range apiService.Spec.Ports {
if port.NodePort == 0 { if port.NodePort == 0 {
glog.Errorf("Ignoring port without NodePort defined: %v", port) glog.Errorf("Ignoring port without NodePort defined: %v", port)
continue continue
@ -2219,7 +2218,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
} }
// Build the load balancer itself // Build the load balancer itself
loadBalancer, err := s.ensureLoadBalancer(serviceName, name, listeners, subnetIDs, securityGroupIDs, internalELB) loadBalancer, err := s.ensureLoadBalancer(serviceName, loadBalancerName, listeners, subnetIDs, securityGroupIDs, internalELB)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2241,7 +2240,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
return nil, err return nil, err
} }
glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", name, serviceName, orEmpty(loadBalancer.DNSName)) glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", loadBalancerName, serviceName, orEmpty(loadBalancer.DNSName))
// TODO: Wait for creation? // TODO: Wait for creation?
@ -2250,12 +2249,9 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
} }
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (s *AWSCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
if region != s.region { loadBalancerName := cloudprovider.GetLoadBalancerName(service)
return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) lb, err := s.describeLoadBalancer(loadBalancerName)
}
lb, err := s.describeLoadBalancer(name)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -2464,18 +2460,15 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan
} }
// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted. // EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { func (s *AWSCloud) EnsureLoadBalancerDeleted(service *api.Service) error {
if region != s.region { loadBalancerName := cloudprovider.GetLoadBalancerName(service)
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) lb, err := s.describeLoadBalancer(loadBalancerName)
}
lb, err := s.describeLoadBalancer(name)
if err != nil { if err != nil {
return err return err
} }
if lb == nil { if lb == nil {
glog.Info("Load balancer already deleted: ", name) glog.Info("Load balancer already deleted: ", loadBalancerName)
return nil return nil
} }
@ -2510,7 +2503,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
securityGroupIDs := map[string]struct{}{} securityGroupIDs := map[string]struct{}{}
for _, securityGroupID := range lb.SecurityGroups { for _, securityGroupID := range lb.SecurityGroups {
if isNilOrEmpty(securityGroupID) { if isNilOrEmpty(securityGroupID) {
glog.Warning("Ignoring empty security group in ", name) glog.Warning("Ignoring empty security group in ", service.Name)
continue continue
} }
securityGroupIDs[*securityGroupID] = struct{}{} securityGroupIDs[*securityGroupID] = struct{}{}
@ -2540,7 +2533,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
} }
if len(securityGroupIDs) == 0 { if len(securityGroupIDs) == 0 {
glog.V(2).Info("Deleted all security groups for load balancer: ", name) glog.V(2).Info("Deleted all security groups for load balancer: ", service.Name)
break break
} }
@ -2550,10 +2543,10 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
ids = append(ids, id) ids = append(ids, id)
} }
return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", name, strings.Join(ids, ",")) return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", service.Name, strings.Join(ids, ","))
} }
glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", name) glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", service.Name)
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} }
@ -2563,17 +2556,14 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error {
} }
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error { func (s *AWSCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
if region != s.region {
return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
}
instances, err := s.getInstancesByNodeNames(hosts) instances, err := s.getInstancesByNodeNames(hosts)
if err != nil { if err != nil {
return err return err
} }
lb, err := s.describeLoadBalancer(name) loadBalancerName := cloudprovider.GetLoadBalancerName(service)
lb, err := s.describeLoadBalancer(loadBalancerName)
if err != nil { if err != nil {
return err return err
} }

View File

@ -28,8 +28,8 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) { func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := s.describeLoadBalancer(name) loadBalancer, err := s.describeLoadBalancer(loadBalancerName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -38,7 +38,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if loadBalancer == nil { if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{} createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name) createRequest.LoadBalancerName = aws.String(loadBalancerName)
createRequest.Listeners = listeners createRequest.Listeners = listeners
@ -57,7 +57,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())}, {Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
} }
glog.Infof("Creating load balancer for %v with name: %s", namespacedName, name) glog.Infof("Creating load balancer for %v with name: ", namespacedName, loadBalancerName)
_, err := s.elb.CreateLoadBalancer(createRequest) _, err := s.elb.CreateLoadBalancer(createRequest)
if err != nil { if err != nil {
return nil, err return nil, err
@ -76,7 +76,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if removals.Len() != 0 { if removals.Len() != 0 {
request := &elb.DetachLoadBalancerFromSubnetsInput{} request := &elb.DetachLoadBalancerFromSubnetsInput{}
request.LoadBalancerName = aws.String(name) request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(removals) request.Subnets = stringSetToPointers(removals)
glog.V(2).Info("Detaching load balancer from removed subnets") glog.V(2).Info("Detaching load balancer from removed subnets")
_, err := s.elb.DetachLoadBalancerFromSubnets(request) _, err := s.elb.DetachLoadBalancerFromSubnets(request)
@ -88,7 +88,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if additions.Len() != 0 { if additions.Len() != 0 {
request := &elb.AttachLoadBalancerToSubnetsInput{} request := &elb.AttachLoadBalancerToSubnetsInput{}
request.LoadBalancerName = aws.String(name) request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(additions) request.Subnets = stringSetToPointers(additions)
glog.V(2).Info("Attaching load balancer to added subnets") glog.V(2).Info("Attaching load balancer to added subnets")
_, err := s.elb.AttachLoadBalancerToSubnets(request) _, err := s.elb.AttachLoadBalancerToSubnets(request)
@ -107,7 +107,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if !expected.Equal(actual) { if !expected.Equal(actual) {
// This call just replaces the security groups, unlike e.g. subnets (!) // This call just replaces the security groups, unlike e.g. subnets (!)
request := &elb.ApplySecurityGroupsToLoadBalancerInput{} request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
request.LoadBalancerName = aws.String(name) request.LoadBalancerName = aws.String(loadBalancerName)
request.SecurityGroups = stringPointerArray(securityGroupIDs) request.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.V(2).Info("Applying updated security groups to load balancer") glog.V(2).Info("Applying updated security groups to load balancer")
_, err := s.elb.ApplySecurityGroupsToLoadBalancer(request) _, err := s.elb.ApplySecurityGroupsToLoadBalancer(request)
@ -127,7 +127,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
for _, listenerDescription := range listenerDescriptions { for _, listenerDescription := range listenerDescriptions {
actual := listenerDescription.Listener actual := listenerDescription.Listener
if actual == nil { if actual == nil {
glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name) glog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName)
continue continue
} }
@ -167,7 +167,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if len(removals) != 0 { if len(removals) != 0 {
request := &elb.DeleteLoadBalancerListenersInput{} request := &elb.DeleteLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name) request.LoadBalancerName = aws.String(loadBalancerName)
request.LoadBalancerPorts = removals request.LoadBalancerPorts = removals
glog.V(2).Info("Deleting removed load balancer listeners") glog.V(2).Info("Deleting removed load balancer listeners")
_, err := s.elb.DeleteLoadBalancerListeners(request) _, err := s.elb.DeleteLoadBalancerListeners(request)
@ -179,7 +179,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
if len(additions) != 0 { if len(additions) != 0 {
request := &elb.CreateLoadBalancerListenersInput{} request := &elb.CreateLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name) request.LoadBalancerName = aws.String(loadBalancerName)
request.Listeners = additions request.Listeners = additions
glog.V(2).Info("Creating added load balancer listeners") glog.V(2).Info("Creating added load balancer listeners")
_, err := s.elb.CreateLoadBalancerListeners(request) _, err := s.elb.CreateLoadBalancerListeners(request)
@ -192,7 +192,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name
} }
if dirty { if dirty {
loadBalancer, err = s.describeLoadBalancer(name) loadBalancer, err = s.describeLoadBalancer(loadBalancerName)
if err != nil { if err != nil {
glog.Warning("Unable to retrieve load balancer after creation/update") glog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err return nil, err

View File

@ -17,7 +17,6 @@ limitations under the License.
package aws package aws
import ( import (
"fmt"
"io" "io"
"reflect" "reflect"
"strings" "strings"
@ -31,7 +30,6 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
@ -283,15 +281,14 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool {
return contains(filter.Values, *instance.State.Name) return contains(filter.Values, *instance.State.Name)
} }
if name == "tag:"+TagNameKubernetesCluster { if strings.HasPrefix(name, "tag:") {
for _, tag := range instance.Tags { tagName := name[4:]
if *tag.Key == TagNameKubernetesCluster { for _, instanceTag := range instance.Tags {
return contains(filter.Values, *tag.Value) if aws.StringValue(instanceTag.Key) == tagName && contains(filter.Values, aws.StringValue(instanceTag.Value)) {
return true
} }
} }
return false
} }
panic("Unknown filter name: " + name) panic("Unknown filter name: " + name)
} }
@ -443,18 +440,20 @@ func (s *FakeEC2) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeIn
type FakeELB struct { type FakeELB struct {
aws *FakeAWSServices aws *FakeAWSServices
mock.Mock
} }
func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { func (ec2 *FakeELB) DeleteLoadBalancer(input *elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { func (ec2 *FakeELB) DescribeLoadBalancers(input *elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) {
panic("Not implemented") args := ec2.Called(input)
return args.Get(0).(*elb.DescribeLoadBalancersOutput), nil
} }
func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
@ -730,40 +729,6 @@ func TestFindVPCID(t *testing.T) {
} }
} }
func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
if err != nil {
t.Errorf("Error building aws cloud: %v", err)
return
}
badELBRegion := "bad-elb-region"
errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region)
_, _, err = c.GetLoadBalancer("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected GetLoadBalancer region mismatch error.")
}
serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
}
err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.")
}
err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected UpdateLoadBalancer region mismatch error.")
}
}
func constructSubnets(subnetsIn map[int]map[string]string) (subnetsOut []*ec2.Subnet) { func constructSubnets(subnetsIn map[int]map[string]string) (subnetsOut []*ec2.Subnet) {
for i := range subnetsIn { for i := range subnetsIn {
subnetsOut = append( subnetsOut = append(
@ -1076,7 +1041,6 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) {
t.Errorf("Should have not been considered equal since first is not in the second array of groups") t.Errorf("Should have not been considered equal since first is not in the second array of groups")
} }
} }
func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) {
awsServices := NewFakeAWSServices() awsServices := NewFakeAWSServices()
@ -1197,3 +1161,41 @@ func TestGetVolumeLabels(t *testing.T) {
unversioned.LabelZoneRegion: "us-east-1"}, labels) unversioned.LabelZoneRegion: "us-east-1"}, labels)
awsServices.ec2.AssertExpectations(t) awsServices.ec2.AssertExpectations(t)
} }
func (self *FakeELB) expectDescribeLoadBalancers(loadBalancerName string) {
self.On("DescribeLoadBalancers", &elb.DescribeLoadBalancersInput{LoadBalancerNames: []*string{aws.String(loadBalancerName)}}).Return(&elb.DescribeLoadBalancersOutput{
LoadBalancerDescriptions: []*elb.LoadBalancerDescription{{}},
})
}
func TestDescribeLoadBalancerOnDelete(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancerDeleted(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnUpdate(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.UpdateLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{})
}
func TestDescribeLoadBalancerOnGet(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
awsServices := NewFakeAWSServices()
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}, map[string]string{})
}

View File

@ -25,24 +25,22 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
) )
const ProviderName = "fake" const ProviderName = "fake"
// FakeBalancer is a fake storage of balancer information // FakeBalancer is a fake storage of balancer information
type FakeBalancer struct { type FakeBalancer struct {
Name string Name string
Region string Region string
ExternalIP net.IP LoadBalancerIP string
Ports []*api.ServicePort Ports []api.ServicePort
Hosts []string Hosts []string
} }
type FakeUpdateBalancerCall struct { type FakeUpdateBalancerCall struct {
Name string Service *api.Service
Region string Hosts []string
Hosts []string
} }
// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing. // FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
@ -123,7 +121,7 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
} }
// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer. // GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (f *FakeCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -132,12 +130,22 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record. // It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { func (f *FakeCloud) EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
f.addCall("create") f.addCall("create")
if f.Balancers == nil { if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer) f.Balancers = make(map[string]FakeBalancer)
} }
f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts}
name := cloudprovider.GetLoadBalancerName(service)
spec := service.Spec
zone, err := f.GetZone()
if err != nil {
return nil, err
}
region := zone.Region
f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts}
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -147,15 +155,15 @@ func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, p
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record. // It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error { func (f *FakeCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
f.addCall("update") f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts}) f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts})
return f.Err return f.Err
} }
// EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted. // EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted.
// It adds an entry "delete" into the internal method call record. // It adds an entry "delete" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error { func (f *FakeCloud) EnsureLoadBalancerDeleted(service *api.Service) error {
f.addCall("delete") f.addCall("delete")
return f.Err return f.Err
} }

View File

@ -20,7 +20,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net"
"net/http" "net/http"
"path" "path"
"regexp" "regexp"
@ -439,8 +438,9 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
} }
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (gce *GCECloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() loadBalancerName := cloudprovider.GetLoadBalancerName(service)
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err == nil { if err == nil {
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}} status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
@ -465,14 +465,7 @@ func isHTTPErrorCode(err error, code int) bool {
// Due to an interesting series of design decisions, this handles both creating // Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when // new load balancers and updating existing load balancers, recognizing when
// each is needed. // each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
portStr := []string{}
for _, p := range ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
serviceName := svc.String()
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations)
if len(hostNames) == 0 { if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
} }
@ -482,8 +475,21 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return nil, err return nil, err
} }
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancerIP := apiService.Spec.LoadBalancerIP
ports := apiService.Spec.Ports
portStr := []string{}
for _, p := range apiService.Spec.Ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hosts, serviceName, annotations)
// Check if the forwarding rule exists, and if so, what its IP is. // Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports) fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -517,45 +523,45 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return return
} }
if isSafeToReleaseIP { if isSafeToReleaseIP {
if err := gce.deleteStaticIP(name, region); err != nil { if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err) glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
} }
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", name, serviceName, ipAddress) glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
} else { } else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err) glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
} }
}() }()
if requestedIP != nil { if loadBalancerIP != "" {
// If a specific IP address has been requested, we have to respect the // If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using // user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an // a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which // ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway). // case we shouldn't delete it anyway).
if isStatic, err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil { if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", requestedIP.String(), err) return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err)
} else if isStatic { } else if isStatic {
// The requested IP is a static IP, owned and managed by the user. // The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true isUserOwnedIP = true
isSafeToReleaseIP = false isSafeToReleaseIP = false
ipAddress = requestedIP.String() ipAddress = loadBalancerIP
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", name, serviceName, ipAddress) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress)
} else if requestedIP.String() == fwdRuleIP { } else if loadBalancerIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned // The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it. // to this forwarding rule, so we can keep it.
isUserOwnedIP = false isUserOwnedIP = false
isSafeToReleaseIP = true isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP) ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", name, serviceName, ipAddress) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress)
} else { } else {
// The requested IP is not static and it is not assigned to the // The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different // current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either // rule or it might not be part of this project at all. Either
// way, we can't use it. // way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", requestedIP.String(), name, serviceName, err) return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err)
} }
} else { } else {
// The user did not request a specific IP. // The user did not request a specific IP.
@ -566,7 +572,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// IP from ephemeral to static, or it will just get the IP if it is // IP from ephemeral to static, or it will just get the IP if it is
// already static. // already static.
existed := false existed := false
ipAddress, existed, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP) ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
} }
@ -576,13 +582,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// use this IP and try to run through the process again, but we // use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK. // should not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = false isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", name, serviceName, ipAddress) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress)
} else { } else {
// For total clarity. The IP did not pre-exist and the user did // For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case // not ask for a particular one, so we can release the IP in case
// of failure or success. // of failure or success.
isSafeToReleaseIP = true isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", name, serviceName, ipAddress) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress)
} }
} }
@ -595,29 +601,29 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
return nil, err return nil, err
} }
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, serviceName, region, ipAddress, ports, sourceRanges) firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if firewallNeedsUpdate { if firewallNeedsUpdate {
desc := makeFirewallDescription(serviceName, ipAddress) desc := makeFirewallDescription(serviceName.String(), ipAddress)
// Unlike forwarding rules and target pools, firewalls can be updated // Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated. // without needing to be deleted and recreated.
if firewallExists { if firewallExists {
if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else { } else {
if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
} }
} }
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType) tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -634,36 +640,36 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// and something should fail before we recreate it, don't release the // and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later. // IP. That way we can come back to it later.
isSafeToReleaseIP = false isSafeToReleaseIP = false
if err := gce.deleteForwardingRule(name, region); err != nil { if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err) return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
} }
if tpExists && tpNeedsUpdate { if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(name, region); err != nil { if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err) return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
} }
// Once we've deleted the resources (if necessary), build them back up (or for // Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new). // the first time if they're new).
if tpNeedsUpdate { if tpNeedsUpdate {
if err := gce.createTargetPool(name, serviceName, region, hosts, affinityType); err != nil { if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err) return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
} }
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
} }
if tpNeedsUpdate || fwdRuleNeedsUpdate { if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(name, serviceName, region, ipAddress, ports); err != nil { if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err) return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
} }
// End critical section. It is safe to release the static IP (which // End critical section. It is safe to release the static IP (which
// just demotes it to ephemeral) now that it is attached. In the case // just demotes it to ephemeral) now that it is attached. In the case
// of a user-requested IP, the "is user-owned" flag will be set, // of a user-requested IP, the "is user-owned" flag will be set,
// preventing it from actually being released. // preventing it from actually being released.
isSafeToReleaseIP = true isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", name, serviceName, ipAddress) glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
} }
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
@ -675,7 +681,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// IP is being requested. // IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated, // Returns whether the forwarding rule exists, whether it needs to be updated,
// what its IP address is (if it exists), and any error we encountered. // what its IP address is (if it exists), and any error we encountered.
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil { if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) { if isHTTPErrorCode(err, http.StatusNotFound) {
@ -683,7 +689,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
} }
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
} }
if requestedIP != nil && requestedIP.String() != fwd.IPAddress { if loadBalancerIP != fwd.IPAddress {
return true, true, fwd.IPAddress, nil return true, true, fwd.IPAddress, nil
} }
portRange, err := loadBalancerPortRange(ports) portRange, err := loadBalancerPortRange(ports)
@ -701,7 +707,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP
return true, false, fwd.IPAddress, nil return true, false, fwd.IPAddress, nil
} }
func loadBalancerPortRange(ports []*api.ServicePort) (string, error) { func loadBalancerPortRange(ports []api.ServicePort) (string, error) {
if len(ports) == 0 { if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer") return "", fmt.Errorf("no ports specified for GCE load balancer")
} }
@ -753,7 +759,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
} }
} }
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) { func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil { if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) { if isHTTPErrorCode(err, http.StatusNotFound) {
@ -814,7 +820,7 @@ func slicesEqual(x, y []string) bool {
return true return true
} }
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []*api.ServicePort) error { func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports) portRange, err := loadBalancerPortRange(ports)
if err != nil { if err != nil {
return err return err
@ -865,7 +871,7 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []
return nil return nil
} }
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error { func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil { if err != nil {
return err return err
@ -883,7 +889,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
return nil return nil
} }
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error { func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil { if err != nil {
return err return err
@ -901,7 +907,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets
return nil return nil
} }
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports)) allowedPorts := make([]string, len(ports))
for ix := range ports { for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port) allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
@ -1053,13 +1059,14 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
} }
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) error { func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames) hosts, err := gce.getInstancesByNames(hostNames)
if err != nil { if err != nil {
return err return err
} }
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil { if err != nil {
return err return err
} }
@ -1083,22 +1090,22 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
if len(toAdd) > 0 { if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do() op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil { if err != nil {
return err return err
} }
if err := gce.waitForRegionOp(op, region); err != nil { if err := gce.waitForRegionOp(op, gce.region); err != nil {
return err return err
} }
} }
if len(toRemove) > 0 { if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do() op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil { if err != nil {
return err return err
} }
if err := gce.waitForRegionOp(op, region); err != nil { if err := gce.waitForRegionOp(op, gce.region); err != nil {
return err return err
} }
} }
@ -1106,41 +1113,44 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string)
// Try to verify that the correct number of nodes are now in the target pool. // Try to verify that the correct number of nodes are now in the target pool.
// We've been bitten by a bug here before (#11327) where all nodes were // We've been bitten by a bug here before (#11327) where all nodes were
// accidentally removed and want to make similar problems easier to notice. // accidentally removed and want to make similar problems easier to notice.
updatedPool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() updatedPool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil { if err != nil {
return err return err
} }
if len(updatedPool.Instances) != len(hosts) { if len(updatedPool.Instances) != len(hosts) {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), name, len(hosts), strings.Join(updatedPool.Instances, ",")) len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), name, len(hosts)) return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
} }
return nil return nil
} }
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted. // EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error { func (gce *GCECloud) EnsureLoadBalancerDeleted(service *api.Service) error {
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v", name, region) loadBalancerName := cloudprovider.GetLoadBalancerName(service)
err := utilerrors.AggregateGoroutines( glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v)", service.Namespace, service.Name, loadBalancerName,
func() error { return gce.deleteFirewall(name, region) }, gce.region)
errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(loadBalancerName, gce.region) },
// Even though we don't hold on to static IPs for load balancers, it's // Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed // possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case. // creation/update attempt, so make sure we clean it up here just in case.
func() error { return gce.deleteStaticIP(name, region) }, func() error { return gce.deleteStaticIP(loadBalancerName, gce.region) },
func() error { func() error {
// The forwarding rule must be deleted before either the target pool can, // The forwarding rule must be deleted before either the target pool can,
// unfortunately, so we have to do these two serially. // unfortunately, so we have to do these two serially.
if err := gce.deleteForwardingRule(name, region); err != nil { if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return err return err
} }
if err := gce.deleteTargetPool(name, region); err != nil { if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return err return err
} }
return nil return nil
}, },
) )
if err != nil { if errs != nil {
return utilerrors.Flatten(err) return utilerrors.Flatten(errs)
} }
return nil return nil
} }
@ -1226,9 +1236,9 @@ func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNe
} }
// TODO: This completely breaks modularity in the cloudprovider but the methods // TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take api.ServicePorts. // shared with the TCPLoadBalancer take api.ServicePorts.
svcPorts := []*api.ServicePort{} svcPorts := []api.ServicePort{}
for _, p := range ports { for _, p := range ports {
svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)}) svcPorts = append(svcPorts, api.ServicePort{Port: int(p)})
} }
hosts, err := gce.getInstancesByNames(hostNames) hosts, err := gce.getInstancesByNames(hostNames)
if err != nil { if err != nil {
@ -1255,9 +1265,9 @@ func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNe
} }
// TODO: This completely breaks modularity in the cloudprovider but the methods // TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take api.ServicePorts. // shared with the TCPLoadBalancer take api.ServicePorts.
svcPorts := []*api.ServicePort{} svcPorts := []api.ServicePort{}
for _, p := range ports { for _, p := range ports {
svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)}) svcPorts = append(svcPorts, api.ServicePort{Port: int(p)})
} }
hosts, err := gce.getInstancesByNames(hostNames) hosts, err := gce.getInstancesByNames(hostNames)
if err != nil { if err != nil {

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"regexp" "regexp"
"strings" "strings"
@ -47,7 +46,6 @@ import (
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
) )
const ProviderName = "openstack" const ProviderName = "openstack"
@ -641,8 +639,9 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f
return &floatingIPList[0], nil return &floatingIPList[0], nil
} }
func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { func (lb *LoadBalancer) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
vip, err := getVipByName(lb.network, name) loadBalancerName := cloudprovider.GetLoadBalancerName(service)
vip, err := getVipByName(lb.network, loadBalancerName)
if err == ErrNotFound { if err == ErrNotFound {
return nil, false, nil return nil, false, nil
} }
@ -661,9 +660,10 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
// a list of regions (from config) and query/create loadbalancers in // a list of regions (from config) and query/create loadbalancers in
// each region. // each region.
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations) glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
ports := apiService.Spec.Ports
if len(ports) > 1 { if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 { } else if len(ports) == 0 {
@ -676,6 +676,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
} }
affinity := apiService.Spec.SessionAffinity
var persistence *vips.SessionPersistence var persistence *vips.SessionPersistence
switch affinity { switch affinity {
case api.ServiceAffinityNone: case api.ServiceAffinityNone:
@ -695,8 +696,8 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
} }
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", name) glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lb.GetLoadBalancer(name, region) _, exists, err := lb.GetLoadBalancer(apiService)
if err != nil { if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
} }
@ -704,7 +705,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
// TODO: Implement a more efficient update strategy for common changes than delete & create // TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts // In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists { if exists {
err := lb.EnsureLoadBalancerDeleted(name, region) err := lb.EnsureLoadBalancerDeleted(apiService)
if err != nil { if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
} }
@ -714,6 +715,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
if lbmethod == "" { if lbmethod == "" {
lbmethod = pools.LBMethodRoundRobin lbmethod = pools.LBMethodRoundRobin
} }
name := cloudprovider.GetLoadBalancerName(apiService)
pool, err := pools.Create(lb.network, pools.CreateOpts{ pool, err := pools.Create(lb.network, pools.CreateOpts{
Name: name, Name: name,
Protocol: pools.ProtocolTCP, Protocol: pools.ProtocolTCP,
@ -771,8 +773,10 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
SubnetID: lb.opts.SubnetId, SubnetID: lb.opts.SubnetId,
Persistence: persistence, Persistence: persistence,
} }
if loadBalancerIP != nil {
createOpts.Address = loadBalancerIP.String() loadBalancerIP := apiService.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.Address = loadBalancerIP
} }
vip, err := vips.Create(lb.network, createOpts).Extract() vip, err := vips.Create(lb.network, createOpts).Extract()
@ -805,10 +809,11 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
} }
func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error { func (lb *LoadBalancer) UpdateLoadBalancer(service *api.Service, hosts []string) error {
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts) loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil { if err != nil {
return err return err
} }
@ -866,10 +871,11 @@ func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string)
return nil return nil
} }
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error { func (lb *LoadBalancer) EnsureLoadBalancerDeleted(service *api.Service) error {
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region) loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return err return err
} }
@ -907,7 +913,7 @@ func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error {
// still exists that we failed to delete on some // still exists that we failed to delete on some
// previous occasion. Make a best effort attempt to // previous occasion. Make a best effort attempt to
// cleanup any pools with the same name as the VIP. // cleanup any pools with the same name as the VIP.
pool, err = getPoolByName(lb.network, name) pool, err = getPoolByName(lb.network, service.Name)
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return err return err
} }

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/rand"
"github.com/rackspace/gophercloud" "github.com/rackspace/gophercloud"
"k8s.io/kubernetes/pkg/api"
) )
func TestReadConfig(t *testing.T) { func TestReadConfig(t *testing.T) {
@ -169,7 +170,7 @@ func TestLoadBalancer(t *testing.T) {
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
} }
_, exists, err := lb.GetLoadBalancer("noexist", "region") _, exists, err := lb.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}})
if err != nil { if err != nil {
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
} }

View File

@ -18,7 +18,6 @@ package service
import ( import (
"fmt" "fmt"
"net"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -31,7 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -90,7 +89,7 @@ type ServiceController struct {
// (like load balancers) in sync with the registry. // (like load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController { func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{kubeClient.Core().Events("")}) broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
return &ServiceController{ return &ServiceController{
@ -251,7 +250,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
} else if errors.IsNotFound(err) { } else if errors.IsNotFound(err) {
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName) glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region) err := s.balancer.EnsureLoadBalancerDeleted(deltaService)
if err != nil { if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error() message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
@ -315,7 +314,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
// If we don't have any cached memory of the load balancer, we have to ask // If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it. // the cloud provider for what it knows about it.
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events // Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region) _, exists, err := s.balancer.GetLoadBalancer(service)
if err != nil { if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
} }
@ -327,7 +326,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
if needDelete { if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { if err := s.balancer.EnsureLoadBalancerDeleted(service); err != nil {
return err, retryable return err, retryable
} }
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
@ -341,8 +340,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
// The load balancer doesn't exist yet, so create it. // The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer") s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service)
err := s.createLoadBalancer(service, namespacedName)
if err != nil { if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
} }
@ -392,21 +390,16 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
return err return err
} }
func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName types.NamespacedName) error { func (s *ServiceController) createLoadBalancer(service *api.Service) error {
ports, err := getPortsForLB(service)
if err != nil {
return err
}
nodes, err := s.nodeLister.List() nodes, err := s.nodeLister.List()
if err != nil { if err != nil {
return err return err
} }
name := s.loadBalancerName(service)
// - Only one protocol supported per service // - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return // - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols // an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), status, err := s.balancer.EnsureLoadBalancer(service, hostsFromNodeList(&nodes), service.ObjectMeta.Annotations)
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations)
if err != nil { if err != nil {
return err return err
} else { } else {
@ -727,16 +720,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
} }
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
name := cloudprovider.GetLoadBalancerName(service) err := s.balancer.UpdateLoadBalancer(service, hosts)
err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts)
if err == nil { if err == nil {
s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
return nil return nil
} }
// It's only an actual error if the load balancer still exists. // It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil { if _, exists, err := s.balancer.GetLoadBalancer(service); err != nil {
glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err) glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
} else if !exists { } else if !exists {
return nil return nil
} }

View File

@ -166,7 +166,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s0", "333", api.ServiceTypeLoadBalancer), newService("s0", "333", api.ServiceTypeLoadBalancer),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a333", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s0", "333", api.ServiceTypeLoadBalancer), hosts},
}, },
}, },
{ {
@ -177,9 +177,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s2", "666", api.ServiceTypeLoadBalancer), newService("s2", "666", api.ServiceTypeLoadBalancer),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a444", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s0", "444", api.ServiceTypeLoadBalancer), hosts},
{Name: "a555", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s1", "555", api.ServiceTypeLoadBalancer), hosts},
{Name: "a666", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s2", "666", api.ServiceTypeLoadBalancer), hosts},
}, },
}, },
{ {
@ -191,8 +191,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s4", "123", api.ServiceTypeClusterIP), newService("s4", "123", api.ServiceTypeClusterIP),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a888", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s1", "888", api.ServiceTypeLoadBalancer), hosts},
{Name: "a999", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s3", "999", api.ServiceTypeLoadBalancer), hosts},
}, },
}, },
{ {
@ -202,7 +202,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nil, nil,
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Name: "a234", Region: region, Hosts: []string{"node0", "node1", "node73"}}, {newService("s0", "234", api.ServiceTypeLoadBalancer), hosts},
}, },
}, },
} }