diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index dd120735e42..cda6db22d73 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -19,11 +19,9 @@ package cloudprovider import ( "errors" "fmt" - "net" "strings" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/types" ) // Interface is an abstract, pluggable interface for cloud providers. @@ -82,18 +80,22 @@ type LoadBalancer interface { // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. - GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) + // Implementations must treat the *api.Service parameter as read-only and not modify it. + GetLoadBalancer(service *api.Service) (status *api.LoadBalancerStatus, exists bool, err error) // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer - EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) + // Implementations must treat the *api.Service parameter as read-only and not modify it. + EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. - UpdateLoadBalancer(name, region string, hosts []string) error + // Implementations must treat the *api.Service parameter as read-only and not modify it. + UpdateLoadBalancer(service *api.Service, hosts []string) error // EnsureLoadBalancerDeleted deletes the specified load balancer if it // exists, returning nil if the load balancer specified either didn't exist or // was successfully deleted. // This construction is useful because many cloud providers' load balancers // have multiple underlying components, meaning a Get could say that the LB // doesn't exist even if some part of it is still laying around. - EnsureLoadBalancerDeleted(name, region string) error + // Implementations must treat the *api.Service parameter as read-only and not modify it. + EnsureLoadBalancerDeleted(service *api.Service) error } // Instances is an abstract, pluggable interface for sets of instances. diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 9db23bb61d8..eb7c00c4782 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -2099,31 +2099,27 @@ func isSubnetPublic(rt []*ec2.RouteTable, subnetID string) (bool, error) { } // EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer -// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway. -func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations) +func (s *AWSCloud) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", + apiService.Namespace, apiService.Name, s.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations) - if region != s.region { - return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) - } - - if affinity != api.ServiceAffinityNone { + if apiService.Spec.SessionAffinity != api.ServiceAffinityNone { // ELB supports sticky sessions, but only when configured for HTTP/HTTPS - return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) + return nil, fmt.Errorf("unsupported load balancer affinity: %v", apiService.Spec.SessionAffinity) } - if len(ports) == 0 { + if len(apiService.Spec.Ports) == 0 { return nil, fmt.Errorf("requested load balancer with no ports") } - for _, port := range ports { + for _, port := range apiService.Spec.Ports { if port.Protocol != api.ProtocolTCP { return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB") } } - if publicIP != nil { - return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") + if apiService.Spec.LoadBalancerIP != "" { + return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB") } instances, err := s.getInstancesByNodeNames(hosts) @@ -2162,11 +2158,14 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB") } + loadBalancerName := cloudprovider.GetLoadBalancerName(apiService) + serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} + // Create a security group for the load balancer var securityGroupID string { - sgName := "k8s-elb-" + name - sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", name, serviceName) + sgName := "k8s-elb-" + loadBalancerName + sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", loadBalancerName, serviceName) securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription) if err != nil { glog.Error("Error creating load balancer security group: ", err) @@ -2179,7 +2178,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port } permissions := NewIPPermissionSet() - for _, port := range ports { + for _, port := range apiService.Spec.Ports { portInt64 := int64(port.Port) protocol := strings.ToLower(string(port.Protocol)) @@ -2200,7 +2199,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port // Figure out what mappings we want on the load balancer listeners := []*elb.Listener{} - for _, port := range ports { + for _, port := range apiService.Spec.Ports { if port.NodePort == 0 { glog.Errorf("Ignoring port without NodePort defined: %v", port) continue @@ -2219,7 +2218,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port } // Build the load balancer itself - loadBalancer, err := s.ensureLoadBalancer(serviceName, name, listeners, subnetIDs, securityGroupIDs, internalELB) + loadBalancer, err := s.ensureLoadBalancer(serviceName, loadBalancerName, listeners, subnetIDs, securityGroupIDs, internalELB) if err != nil { return nil, err } @@ -2241,7 +2240,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port return nil, err } - glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", name, serviceName, orEmpty(loadBalancer.DNSName)) + glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", loadBalancerName, serviceName, orEmpty(loadBalancer.DNSName)) // TODO: Wait for creation? @@ -2250,12 +2249,9 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port } // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer -func (s *AWSCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { - if region != s.region { - return nil, false, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) - } - - lb, err := s.describeLoadBalancer(name) +func (s *AWSCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + lb, err := s.describeLoadBalancer(loadBalancerName) if err != nil { return nil, false, err } @@ -2464,18 +2460,15 @@ func (s *AWSCloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalan } // EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted. -func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { - if region != s.region { - return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) - } - - lb, err := s.describeLoadBalancer(name) +func (s *AWSCloud) EnsureLoadBalancerDeleted(service *api.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + lb, err := s.describeLoadBalancer(loadBalancerName) if err != nil { return err } if lb == nil { - glog.Info("Load balancer already deleted: ", name) + glog.Info("Load balancer already deleted: ", loadBalancerName) return nil } @@ -2510,7 +2503,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { securityGroupIDs := map[string]struct{}{} for _, securityGroupID := range lb.SecurityGroups { if isNilOrEmpty(securityGroupID) { - glog.Warning("Ignoring empty security group in ", name) + glog.Warning("Ignoring empty security group in ", service.Name) continue } securityGroupIDs[*securityGroupID] = struct{}{} @@ -2540,7 +2533,7 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { } if len(securityGroupIDs) == 0 { - glog.V(2).Info("Deleted all security groups for load balancer: ", name) + glog.V(2).Info("Deleted all security groups for load balancer: ", service.Name) break } @@ -2550,10 +2543,10 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { ids = append(ids, id) } - return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", name, strings.Join(ids, ",")) + return fmt.Errorf("timed out deleting ELB: %s. Could not delete security groups %v", service.Name, strings.Join(ids, ",")) } - glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", name) + glog.V(2).Info("Waiting for load-balancer to delete so we can delete security groups: ", service.Name) time.Sleep(10 * time.Second) } @@ -2563,17 +2556,14 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(name, region string) error { } // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer -func (s *AWSCloud) UpdateLoadBalancer(name, region string, hosts []string) error { - if region != s.region { - return fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) - } - +func (s *AWSCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error { instances, err := s.getInstancesByNodeNames(hosts) if err != nil { return err } - lb, err := s.describeLoadBalancer(name) + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + lb, err := s.describeLoadBalancer(loadBalancerName) if err != nil { return err } diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go index d5dbcc0e7e3..fe48045cf6f 100644 --- a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go @@ -28,8 +28,8 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) -func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) { - loadBalancer, err := s.describeLoadBalancer(name) +func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB bool) (*elb.LoadBalancerDescription, error) { + loadBalancer, err := s.describeLoadBalancer(loadBalancerName) if err != nil { return nil, err } @@ -38,7 +38,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if loadBalancer == nil { createRequest := &elb.CreateLoadBalancerInput{} - createRequest.LoadBalancerName = aws.String(name) + createRequest.LoadBalancerName = aws.String(loadBalancerName) createRequest.Listeners = listeners @@ -57,7 +57,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name {Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())}, } - glog.Infof("Creating load balancer for %v with name: %s", namespacedName, name) + glog.Infof("Creating load balancer for %v with name: ", namespacedName, loadBalancerName) _, err := s.elb.CreateLoadBalancer(createRequest) if err != nil { return nil, err @@ -76,7 +76,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if removals.Len() != 0 { request := &elb.DetachLoadBalancerFromSubnetsInput{} - request.LoadBalancerName = aws.String(name) + request.LoadBalancerName = aws.String(loadBalancerName) request.Subnets = stringSetToPointers(removals) glog.V(2).Info("Detaching load balancer from removed subnets") _, err := s.elb.DetachLoadBalancerFromSubnets(request) @@ -88,7 +88,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if additions.Len() != 0 { request := &elb.AttachLoadBalancerToSubnetsInput{} - request.LoadBalancerName = aws.String(name) + request.LoadBalancerName = aws.String(loadBalancerName) request.Subnets = stringSetToPointers(additions) glog.V(2).Info("Attaching load balancer to added subnets") _, err := s.elb.AttachLoadBalancerToSubnets(request) @@ -107,7 +107,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if !expected.Equal(actual) { // This call just replaces the security groups, unlike e.g. subnets (!) request := &elb.ApplySecurityGroupsToLoadBalancerInput{} - request.LoadBalancerName = aws.String(name) + request.LoadBalancerName = aws.String(loadBalancerName) request.SecurityGroups = stringPointerArray(securityGroupIDs) glog.V(2).Info("Applying updated security groups to load balancer") _, err := s.elb.ApplySecurityGroupsToLoadBalancer(request) @@ -127,7 +127,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name for _, listenerDescription := range listenerDescriptions { actual := listenerDescription.Listener if actual == nil { - glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name) + glog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName) continue } @@ -167,7 +167,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if len(removals) != 0 { request := &elb.DeleteLoadBalancerListenersInput{} - request.LoadBalancerName = aws.String(name) + request.LoadBalancerName = aws.String(loadBalancerName) request.LoadBalancerPorts = removals glog.V(2).Info("Deleting removed load balancer listeners") _, err := s.elb.DeleteLoadBalancerListeners(request) @@ -179,7 +179,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name if len(additions) != 0 { request := &elb.CreateLoadBalancerListenersInput{} - request.LoadBalancerName = aws.String(name) + request.LoadBalancerName = aws.String(loadBalancerName) request.Listeners = additions glog.V(2).Info("Creating added load balancer listeners") _, err := s.elb.CreateLoadBalancerListeners(request) @@ -192,7 +192,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name } if dirty { - loadBalancer, err = s.describeLoadBalancer(name) + loadBalancer, err = s.describeLoadBalancer(loadBalancerName) if err != nil { glog.Warning("Unable to retrieve load balancer after creation/update") return nil, err diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 24e3378224e..7c956f1ac66 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -17,7 +17,6 @@ limitations under the License. package aws import ( - "fmt" "io" "reflect" "strings" @@ -31,7 +30,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -283,15 +281,14 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { return contains(filter.Values, *instance.State.Name) } - if name == "tag:"+TagNameKubernetesCluster { - for _, tag := range instance.Tags { - if *tag.Key == TagNameKubernetesCluster { - return contains(filter.Values, *tag.Value) + if strings.HasPrefix(name, "tag:") { + tagName := name[4:] + for _, instanceTag := range instance.Tags { + if aws.StringValue(instanceTag.Key) == tagName && contains(filter.Values, aws.StringValue(instanceTag.Value)) { + return true } } - return false } - panic("Unknown filter name: " + name) } @@ -443,18 +440,20 @@ func (s *FakeEC2) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeIn type FakeELB struct { aws *FakeAWSServices + mock.Mock } func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { panic("Not implemented") } -func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { +func (ec2 *FakeELB) DeleteLoadBalancer(input *elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { panic("Not implemented") } -func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { - panic("Not implemented") +func (ec2 *FakeELB) DescribeLoadBalancers(input *elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { + args := ec2.Called(input) + return args.Get(0).(*elb.DescribeLoadBalancersOutput), nil } func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { panic("Not implemented") @@ -730,40 +729,6 @@ func TestFindVPCID(t *testing.T) { } } -func TestLoadBalancerMatchesClusterRegion(t *testing.T) { - awsServices := NewFakeAWSServices() - c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) - if err != nil { - t.Errorf("Error building aws cloud: %v", err) - return - } - - badELBRegion := "bad-elb-region" - errorMessage := fmt.Sprintf("requested load balancer region '%s' does not match cluster region '%s'", badELBRegion, c.region) - - _, _, err = c.GetLoadBalancer("elb-name", badELBRegion) - if err == nil || err.Error() != errorMessage { - t.Errorf("Expected GetLoadBalancer region mismatch error.") - } - - serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"} - - _, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil) - if err == nil || err.Error() != errorMessage { - t.Errorf("Expected EnsureLoadBalancer region mismatch error.") - } - - err = c.EnsureLoadBalancerDeleted("elb-name", badELBRegion) - if err == nil || err.Error() != errorMessage { - t.Errorf("Expected EnsureLoadBalancerDeleted region mismatch error.") - } - - err = c.UpdateLoadBalancer("elb-name", badELBRegion, nil) - if err == nil || err.Error() != errorMessage { - t.Errorf("Expected UpdateLoadBalancer region mismatch error.") - } -} - func constructSubnets(subnetsIn map[int]map[string]string) (subnetsOut []*ec2.Subnet) { for i := range subnetsIn { subnetsOut = append( @@ -1076,7 +1041,6 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) { t.Errorf("Should have not been considered equal since first is not in the second array of groups") } } - func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { awsServices := NewFakeAWSServices() @@ -1197,3 +1161,41 @@ func TestGetVolumeLabels(t *testing.T) { unversioned.LabelZoneRegion: "us-east-1"}, labels) awsServices.ec2.AssertExpectations(t) } + +func (self *FakeELB) expectDescribeLoadBalancers(loadBalancerName string) { + self.On("DescribeLoadBalancers", &elb.DescribeLoadBalancersInput{LoadBalancerNames: []*string{aws.String(loadBalancerName)}}).Return(&elb.DescribeLoadBalancersOutput{ + LoadBalancerDescriptions: []*elb.LoadBalancerDescription{{}}, + }) +} + +func TestDescribeLoadBalancerOnDelete(t *testing.T) { + awsServices := NewFakeAWSServices() + c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) + awsServices.elb.expectDescribeLoadBalancers("aid") + + c.EnsureLoadBalancerDeleted(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) +} + +func TestDescribeLoadBalancerOnUpdate(t *testing.T) { + awsServices := NewFakeAWSServices() + c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) + awsServices.elb.expectDescribeLoadBalancers("aid") + + c.UpdateLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}) +} + +func TestDescribeLoadBalancerOnGet(t *testing.T) { + awsServices := NewFakeAWSServices() + c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) + awsServices.elb.expectDescribeLoadBalancers("aid") + + c.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) +} + +func TestDescribeLoadBalancerOnEnsure(t *testing.T) { + awsServices := NewFakeAWSServices() + c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) + awsServices.elb.expectDescribeLoadBalancers("aid") + + c.EnsureLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}, map[string]string{}) +} diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index d81492fc212..6bc0a0e761b 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -25,24 +25,22 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/types" ) const ProviderName = "fake" // FakeBalancer is a fake storage of balancer information type FakeBalancer struct { - Name string - Region string - ExternalIP net.IP - Ports []*api.ServicePort - Hosts []string + Name string + Region string + LoadBalancerIP string + Ports []api.ServicePort + Hosts []string } type FakeUpdateBalancerCall struct { - Name string - Region string - Hosts []string + Service *api.Service + Hosts []string } // FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing. @@ -123,7 +121,7 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) { } // GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer. -func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { +func (f *FakeCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} @@ -132,12 +130,22 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureLoadBalancer(service *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) { f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]FakeBalancer) } - f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts} + + name := cloudprovider.GetLoadBalancerName(service) + spec := service.Spec + + zone, err := f.GetZone() + if err != nil { + return nil, err + } + region := zone.Region + + f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts} status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} @@ -147,15 +155,15 @@ func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, p // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. -func (f *FakeCloud) UpdateLoadBalancer(name, region string, hosts []string) error { +func (f *FakeCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error { f.addCall("update") - f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts}) + f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts}) return f.Err } // EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted. // It adds an entry "delete" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancerDeleted(name, region string) error { +func (f *FakeCloud) EnsureLoadBalancerDeleted(service *api.Service) error { f.addCall("delete") return f.Err } diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index ceb0fa2445a..a00b17b60c7 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io" - "net" "net/http" "path" "regexp" @@ -439,8 +438,9 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error { } // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer -func (gce *GCECloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { - fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() +func (gce *GCECloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do() if err == nil { status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}} @@ -465,14 +465,7 @@ func isHTTPErrorCode(err error, code int) bool { // Due to an interesting series of design decisions, this handles both creating // new load balancers and updating existing load balancers, recognizing when // each is needed. -func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { - portStr := []string{} - for _, p := range ports { - portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) - } - serviceName := svc.String() - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations) - +func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []string, annotations map[string]string) (*api.LoadBalancerStatus, error) { if len(hostNames) == 0 { return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") } @@ -482,8 +475,21 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, return nil, err } + loadBalancerName := cloudprovider.GetLoadBalancerName(apiService) + loadBalancerIP := apiService.Spec.LoadBalancerIP + ports := apiService.Spec.Ports + portStr := []string{} + for _, p := range apiService.Spec.Ports { + portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) + } + + affinityType := apiService.Spec.SessionAffinity + + serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hosts, serviceName, annotations) + // Check if the forwarding rule exists, and if so, what its IP is. - fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(name, region, requestedIP, ports) + fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports) if err != nil { return nil, err } @@ -517,45 +523,45 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, return } if isSafeToReleaseIP { - if err := gce.deleteStaticIP(name, region); err != nil { - glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err) + if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil { + glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) } - glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", name, serviceName, ipAddress) + glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress) } else { - glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err) + glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err) } }() - if requestedIP != nil { + if loadBalancerIP != "" { // If a specific IP address has been requested, we have to respect the // user's request and use that IP. If the forwarding rule was already using // a different IP, it will be harmlessly abandoned because it was only an // ephemeral IP (or it was a different static IP owned by the user, in which // case we shouldn't delete it anyway). - if isStatic, err := gce.projectOwnsStaticIP(name, region, requestedIP.String()); err != nil { - return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", requestedIP.String(), err) + if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil { + return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err) } else if isStatic { // The requested IP is a static IP, owned and managed by the user. isUserOwnedIP = true isSafeToReleaseIP = false - ipAddress = requestedIP.String() - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", name, serviceName, ipAddress) - } else if requestedIP.String() == fwdRuleIP { + ipAddress = loadBalancerIP + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress) + } else if loadBalancerIP == fwdRuleIP { // The requested IP is not a static IP, but is currently assigned // to this forwarding rule, so we can keep it. isUserOwnedIP = false isSafeToReleaseIP = true - ipAddress, _, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP) + ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) if err != nil { return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", name, serviceName, ipAddress) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress) } else { // The requested IP is not static and it is not assigned to the // current forwarding rule. It might be attached to a different // rule or it might not be part of this project at all. Either // way, we can't use it. - return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", requestedIP.String(), name, serviceName, err) + return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err) } } else { // The user did not request a specific IP. @@ -566,7 +572,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // IP from ephemeral to static, or it will just get the IP if it is // already static. existed := false - ipAddress, existed, err = gce.ensureStaticIP(name, serviceName, region, fwdRuleIP) + ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP) if err != nil { return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err) } @@ -576,13 +582,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // use this IP and try to run through the process again, but we // should not release the IP unless it is explicitly flagged as OK. isSafeToReleaseIP = false - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", name, serviceName, ipAddress) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress) } else { // For total clarity. The IP did not pre-exist and the user did // not ask for a particular one, so we can release the IP in case // of failure or success. isSafeToReleaseIP = true - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", name, serviceName, ipAddress) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress) } } @@ -595,29 +601,29 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, return nil, err } - firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, serviceName, region, ipAddress, ports, sourceRanges) + firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges) if err != nil { return nil, err } if firewallNeedsUpdate { - desc := makeFirewallDescription(serviceName, ipAddress) + desc := makeFirewallDescription(serviceName.String(), ipAddress) // Unlike forwarding rules and target pools, firewalls can be updated // without needing to be deleted and recreated. if firewallExists { - if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { + if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName) } else { - if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { + if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName) } } - tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(name, region, affinityType) + tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType) if err != nil { return nil, err } @@ -634,36 +640,36 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // and something should fail before we recreate it, don't release the // IP. That way we can come back to it later. isSafeToReleaseIP = false - if err := gce.deleteForwardingRule(name, region); err != nil { - return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err) + if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { + return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err) } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", name, serviceName) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) } if tpExists && tpNeedsUpdate { - if err := gce.deleteTargetPool(name, region); err != nil { - return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err) + if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil { + return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", name, serviceName) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) } // Once we've deleted the resources (if necessary), build them back up (or for // the first time if they're new). if tpNeedsUpdate { - if err := gce.createTargetPool(name, serviceName, region, hosts, affinityType); err != nil { - return nil, fmt.Errorf("failed to create target pool %s: %v", name, err) + if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, hosts, affinityType); err != nil { + return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) } if tpNeedsUpdate || fwdRuleNeedsUpdate { - if err := gce.createForwardingRule(name, serviceName, region, ipAddress, ports); err != nil { - return nil, fmt.Errorf("failed to create forwarding rule %s: %v", name, err) + if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil { + return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err) } // End critical section. It is safe to release the static IP (which // just demotes it to ephemeral) now that it is attached. In the case // of a user-requested IP, the "is user-owned" flag will be set, // preventing it from actually being released. isSafeToReleaseIP = true - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", name, serviceName, ipAddress) + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress) } status := &api.LoadBalancerStatus{} @@ -675,7 +681,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // IP is being requested. // Returns whether the forwarding rule exists, whether it needs to be updated, // what its IP address is (if it exists), and any error we encountered. -func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP net.IP, ports []*api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { +func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() if err != nil { if isHTTPErrorCode(err, http.StatusNotFound) { @@ -683,7 +689,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP } return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) } - if requestedIP != nil && requestedIP.String() != fwd.IPAddress { + if loadBalancerIP != fwd.IPAddress { return true, true, fwd.IPAddress, nil } portRange, err := loadBalancerPortRange(ports) @@ -701,7 +707,7 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, requestedIP return true, false, fwd.IPAddress, nil } -func loadBalancerPortRange(ports []*api.ServicePort) (string, error) { +func loadBalancerPortRange(ports []api.ServicePort) (string, error) { if len(ports) == 0 { return "", fmt.Errorf("no ports specified for GCE load balancer") } @@ -753,7 +759,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string { } } -func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) { +func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) { fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() if err != nil { if isHTTPErrorCode(err, http.StatusNotFound) { @@ -814,7 +820,7 @@ func slicesEqual(x, y []string) bool { return true } -func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []*api.ServicePort) error { +func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []api.ServicePort) error { portRange, err := loadBalancerPortRange(ports) if err != nil { return err @@ -865,7 +871,7 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts [] return nil } -func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error { +func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error { firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return err @@ -883,7 +889,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets return nil } -func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) error { +func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error { firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return err @@ -901,7 +907,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets return nil } -func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { +func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { allowedPorts := make([]string, len(ports)) for ix := range ports { allowedPorts[ix] = strconv.Itoa(ports[ix].Port) @@ -1053,13 +1059,14 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string } // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. -func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) error { +func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error { hosts, err := gce.getInstancesByNames(hostNames) if err != nil { return err } - pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do() if err != nil { return err } @@ -1083,22 +1090,22 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) if len(toAdd) > 0 { add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} - op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do() + op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() if err != nil { return err } - if err := gce.waitForRegionOp(op, region); err != nil { + if err := gce.waitForRegionOp(op, gce.region); err != nil { return err } } if len(toRemove) > 0 { rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} - op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do() + op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() if err != nil { return err } - if err := gce.waitForRegionOp(op, region); err != nil { + if err := gce.waitForRegionOp(op, gce.region); err != nil { return err } } @@ -1106,41 +1113,44 @@ func (gce *GCECloud) UpdateLoadBalancer(name, region string, hostNames []string) // Try to verify that the correct number of nodes are now in the target pool. // We've been bitten by a bug here before (#11327) where all nodes were // accidentally removed and want to make similar problems easier to notice. - updatedPool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do() + updatedPool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do() if err != nil { return err } if len(updatedPool.Instances) != len(hosts) { glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", - len(updatedPool.Instances), name, len(hosts), strings.Join(updatedPool.Instances, ",")) - return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), name, len(hosts)) + len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ",")) + return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts)) } return nil } // EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted. -func (gce *GCECloud) EnsureLoadBalancerDeleted(name, region string) error { - glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v", name, region) - err := utilerrors.AggregateGoroutines( - func() error { return gce.deleteFirewall(name, region) }, +func (gce *GCECloud) EnsureLoadBalancerDeleted(service *api.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v)", service.Namespace, service.Name, loadBalancerName, + gce.region) + + errs := utilerrors.AggregateGoroutines( + func() error { return gce.deleteFirewall(loadBalancerName, gce.region) }, // Even though we don't hold on to static IPs for load balancers, it's // possible that EnsureLoadBalancer left one around in a failed // creation/update attempt, so make sure we clean it up here just in case. - func() error { return gce.deleteStaticIP(name, region) }, + func() error { return gce.deleteStaticIP(loadBalancerName, gce.region) }, func() error { // The forwarding rule must be deleted before either the target pool can, // unfortunately, so we have to do these two serially. - if err := gce.deleteForwardingRule(name, region); err != nil { + if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { return err } - if err := gce.deleteTargetPool(name, region); err != nil { + if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil { return err } return nil }, ) - if err != nil { - return utilerrors.Flatten(err) + if errs != nil { + return utilerrors.Flatten(errs) } return nil } @@ -1226,9 +1236,9 @@ func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNe } // TODO: This completely breaks modularity in the cloudprovider but the methods // shared with the TCPLoadBalancer take api.ServicePorts. - svcPorts := []*api.ServicePort{} + svcPorts := []api.ServicePort{} for _, p := range ports { - svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)}) + svcPorts = append(svcPorts, api.ServicePort{Port: int(p)}) } hosts, err := gce.getInstancesByNames(hostNames) if err != nil { @@ -1255,9 +1265,9 @@ func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNe } // TODO: This completely breaks modularity in the cloudprovider but the methods // shared with the TCPLoadBalancer take api.ServicePorts. - svcPorts := []*api.ServicePort{} + svcPorts := []api.ServicePort{} for _, p := range ports { - svcPorts = append(svcPorts, &api.ServicePort{Port: int(p)}) + svcPorts = append(svcPorts, api.ServicePort{Port: int(p)}) } hosts, err := gce.getInstancesByNames(hostNames) if err != nil { diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index b791e8d59fb..0fe53fcdda9 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "io/ioutil" - "net" "net/http" "regexp" "strings" @@ -47,7 +46,6 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/types" ) const ProviderName = "openstack" @@ -641,8 +639,9 @@ func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*f return &floatingIPList[0], nil } -func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) { - vip, err := getVipByName(lb.network, name) +func (lb *LoadBalancer) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + vip, err := getVipByName(lb.network, loadBalancerName) if err == ErrNotFound { return nil, false, nil } @@ -661,9 +660,10 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS // a list of regions (from config) and query/create loadbalancers in // each region. -func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations) +func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations) + ports := apiService.Spec.Ports if len(ports) > 1 { return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") } else if len(ports) == 0 { @@ -676,6 +676,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") } + affinity := apiService.Spec.SessionAffinity var persistence *vips.SessionPersistence switch affinity { case api.ServiceAffinityNone: @@ -695,8 +696,8 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") } - glog.V(2).Infof("Checking if openstack load balancer already exists: %s", name) - _, exists, err := lb.GetLoadBalancer(name, region) + glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService)) + _, exists, err := lb.GetLoadBalancer(apiService) if err != nil { return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) } @@ -704,7 +705,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n // TODO: Implement a more efficient update strategy for common changes than delete & create // In particular, if we implement hosts update, we can get rid of UpdateHosts if exists { - err := lb.EnsureLoadBalancerDeleted(name, region) + err := lb.EnsureLoadBalancerDeleted(apiService) if err != nil { return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) } @@ -714,6 +715,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n if lbmethod == "" { lbmethod = pools.LBMethodRoundRobin } + name := cloudprovider.GetLoadBalancerName(apiService) pool, err := pools.Create(lb.network, pools.CreateOpts{ Name: name, Protocol: pools.ProtocolTCP, @@ -771,8 +773,10 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n SubnetID: lb.opts.SubnetId, Persistence: persistence, } - if loadBalancerIP != nil { - createOpts.Address = loadBalancerIP.String() + + loadBalancerIP := apiService.Spec.LoadBalancerIP + if loadBalancerIP != "" { + createOpts.Address = loadBalancerIP } vip, err := vips.Create(lb.network, createOpts).Extract() @@ -805,10 +809,11 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n } -func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) error { - glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", name, region, hosts) +func (lb *LoadBalancer) UpdateLoadBalancer(service *api.Service, hosts []string) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts) - vip, err := getVipByName(lb.network, name) + vip, err := getVipByName(lb.network, loadBalancerName) if err != nil { return err } @@ -866,10 +871,11 @@ func (lb *LoadBalancer) UpdateLoadBalancer(name, region string, hosts []string) return nil } -func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error { - glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", name, region) +func (lb *LoadBalancer) EnsureLoadBalancerDeleted(service *api.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName) - vip, err := getVipByName(lb.network, name) + vip, err := getVipByName(lb.network, loadBalancerName) if err != nil && err != ErrNotFound { return err } @@ -907,7 +913,7 @@ func (lb *LoadBalancer) EnsureLoadBalancerDeleted(name, region string) error { // still exists that we failed to delete on some // previous occasion. Make a best effort attempt to // cleanup any pools with the same name as the VIP. - pool, err = getPoolByName(lb.network, name) + pool, err = getPoolByName(lb.network, service.Name) if err != nil && err != ErrNotFound { return err } diff --git a/pkg/cloudprovider/providers/openstack/openstack_test.go b/pkg/cloudprovider/providers/openstack/openstack_test.go index a4db2fafb3c..84ff57dd831 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_test.go +++ b/pkg/cloudprovider/providers/openstack/openstack_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/util/rand" "github.com/rackspace/gophercloud" + "k8s.io/kubernetes/pkg/api" ) func TestReadConfig(t *testing.T) { @@ -169,7 +170,7 @@ func TestLoadBalancer(t *testing.T) { t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") } - _, exists, err := lb.GetLoadBalancer("noexist", "region") + _, exists, err := lb.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) if err != nil { t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index e8135247bd9..e6ace262274 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -18,7 +18,6 @@ package service import ( "fmt" - "net" "sort" "sync" "time" @@ -31,7 +30,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" - unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" @@ -90,7 +89,7 @@ type ServiceController struct { // (like load balancers) in sync with the registry. func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController { broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{kubeClient.Core().Events("")}) + broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{kubeClient.Core().Events("")}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"}) return &ServiceController{ @@ -251,7 +250,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati } else if errors.IsNotFound(err) { glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName) s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region) + err := s.balancer.EnsureLoadBalancerDeleted(deltaService) if err != nil { message := "Error deleting load balancer (will retry): " + err.Error() s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) @@ -315,7 +314,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name // If we don't have any cached memory of the load balancer, we have to ask // the cloud provider for what it knows about it. // Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events - _, exists, err := s.balancer.GetLoadBalancer(s.loadBalancerName(service), s.zone.Region) + _, exists, err := s.balancer.GetLoadBalancer(service) if err != nil { return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable } @@ -327,7 +326,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name if needDelete { glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - if err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { + if err := s.balancer.EnsureLoadBalancerDeleted(service); err != nil { return err, retryable } s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") @@ -341,8 +340,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name // The load balancer doesn't exist yet, so create it. s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer") - - err := s.createLoadBalancer(service, namespacedName) + err := s.createLoadBalancer(service) if err != nil { return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable } @@ -392,21 +390,16 @@ func (s *ServiceController) persistUpdate(service *api.Service) error { return err } -func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName types.NamespacedName) error { - ports, err := getPortsForLB(service) - if err != nil { - return err - } +func (s *ServiceController) createLoadBalancer(service *api.Service) error { nodes, err := s.nodeLister.List() if err != nil { return err } - name := s.loadBalancerName(service) + // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), - ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations) + status, err := s.balancer.EnsureLoadBalancer(service, hostsFromNodeList(&nodes), service.ObjectMeta.Annotations) if err != nil { return err } else { @@ -727,16 +720,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, } // This operation doesn't normally take very long (and happens pretty often), so we only record the final event - name := cloudprovider.GetLoadBalancerName(service) - err := s.balancer.UpdateLoadBalancer(name, s.zone.Region, hosts) + err := s.balancer.UpdateLoadBalancer(service, hosts) if err == nil { s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") return nil } // It's only an actual error if the load balancer still exists. - if _, exists, err := s.balancer.GetLoadBalancer(name, s.zone.Region); err != nil { - glog.Errorf("External error while checking if load balancer %q exists: name, %v", name, err) + if _, exists, err := s.balancer.GetLoadBalancer(service); err != nil { + glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err) } else if !exists { return nil } diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 8c8bdffa5ca..a696be1c50f 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -166,7 +166,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s0", "333", api.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {Name: "a333", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {newService("s0", "333", api.ServiceTypeLoadBalancer), hosts}, }, }, { @@ -177,9 +177,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s2", "666", api.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {Name: "a444", Region: region, Hosts: []string{"node0", "node1", "node73"}}, - {Name: "a555", Region: region, Hosts: []string{"node0", "node1", "node73"}}, - {Name: "a666", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {newService("s0", "444", api.ServiceTypeLoadBalancer), hosts}, + {newService("s1", "555", api.ServiceTypeLoadBalancer), hosts}, + {newService("s2", "666", api.ServiceTypeLoadBalancer), hosts}, }, }, { @@ -191,8 +191,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s4", "123", api.ServiceTypeClusterIP), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {Name: "a888", Region: region, Hosts: []string{"node0", "node1", "node73"}}, - {Name: "a999", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {newService("s1", "888", api.ServiceTypeLoadBalancer), hosts}, + {newService("s3", "999", api.ServiceTypeLoadBalancer), hosts}, }, }, { @@ -202,7 +202,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { nil, }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {Name: "a234", Region: region, Hosts: []string{"node0", "node1", "node73"}}, + {newService("s0", "234", api.ServiceTypeLoadBalancer), hosts}, }, }, }