Merge pull request #6271 from a-robinson/asynclb

Manage load balancer creation and deletion asynchronously in a ServiceController
This commit is contained in:
Quinton Hoole 2015-04-14 17:50:07 -07:00
commit cd6daae014
20 changed files with 791 additions and 307 deletions

View File

@ -6,12 +6,16 @@
{% set master="--master=127.0.0.1:8080" -%}
{% set machines = ""-%}
{% set cluster_name = "" -%}
{% set minion_regexp = "--minion_regexp=.*" -%}
{% set sync_nodes = "--sync_nodes=true" -%}
{% if pillar['node_instance_prefix'] is defined -%}
{% set minion_regexp = "--minion_regexp='" + pillar['node_instance_prefix'] + ".*'" -%}
{% endif -%}
{% if pillar['instance_prefix'] is defined -%}
{% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%}
{% endif -%}
{% set cloud_provider = "" -%}
{% set cloud_config = "" -%}
@ -51,4 +55,4 @@
{% endif -%} # grains.cloud is defined
DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}"
DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{cluster_name}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}"

View File

@ -41,7 +41,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
@ -221,7 +221,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
}}
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
nodeController.Run(5*time.Second, true)
cadvisorInterface := new(cadvisor.Fake)

View File

@ -272,7 +272,6 @@ func (s *APIServer) Run(_ []string) error {
}
config := &master.Config{
Cloud: cloud,
EtcdHelper: helper,
EventTTL: s.EventTTL,
KubeletClient: kubeletClient,

View File

@ -30,7 +30,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/namespace"
@ -187,11 +188,16 @@ func (s *CMServer) Run(_ []string) error {
glog.Warning("DEPRECATION NOTICE: sync_node_status flag is being deprecated. It has no effect now and it will be removed in a future version.")
}
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)

View File

@ -34,7 +34,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
@ -128,10 +129,15 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
},
}
nodeController := nodeControllerPkg.NewNodeController(
nodeController := nodecontroller.NewNodeController(
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
nodeController.Run(10*time.Second, true)
serviceController := servicecontroller.New(nil, cl, "kubernetes")
if err := serviceController.Run(); err != nil {
glog.Warningf("Running without a service controller: %v", err)
}
endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

View File

@ -927,6 +927,14 @@ func ValidateService(service *api.Service) errs.ValidationErrorList {
}
}
if service.Spec.CreateExternalLoadBalancer {
for i := range service.Spec.Ports {
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[i], "cannot create an external load balancer with non-TCP ports"))
}
}
}
return allErrs
}

View File

@ -1583,6 +1583,22 @@ func TestValidateService(t *testing.T) {
},
numErrs: 1,
},
{
name: "invalid load balancer protocol 1",
tweakSvc: func(s *api.Service) {
s.Spec.CreateExternalLoadBalancer = true
s.Spec.Ports[0].Protocol = "UDP"
},
numErrs: 1,
},
{
name: "invalid load balancer protocol 2",
tweakSvc: func(s *api.Service) {
s.Spec.CreateExternalLoadBalancer = true
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p", Port: 12345, Protocol: "UDP"})
},
numErrs: 1,
},
{
name: "valid 1",
tweakSvc: func(s *api.Service) {
@ -1620,6 +1636,21 @@ func TestValidateService(t *testing.T) {
},
numErrs: 0,
},
{
name: "valid external load balancer",
tweakSvc: func(s *api.Service) {
s.Spec.CreateExternalLoadBalancer = true
},
numErrs: 0,
},
{
name: "valid external load balancer 2 ports",
tweakSvc: func(s *api.Service) {
s.Spec.CreateExternalLoadBalancer = true
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p", Port: 12345, Protocol: "TCP"})
},
numErrs: 0,
},
}
for _, tc := range testCases {

View File

@ -42,6 +42,8 @@ type Clusters interface {
Master(clusterName string) (string, error)
}
// TODO(#6812): Use a shorter name that's less likely to be longer than cloud
// providers' name length limits.
func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string {
return clusterName + "-" + serviceNamespace + "-" + serviceName
}
@ -50,6 +52,8 @@ func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) stri
type TCPLoadBalancer interface {
// TCPLoadBalancerExists returns whether the specified load balancer exists.
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// TODO: This should really return the details of the load balancer so we can
// determine if it matches the needs of a service rather than if it exists.
TCPLoadBalancerExists(name, region string) (bool, error)
// CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address or hostname of the balancer
CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error)

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce_cloud
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -37,6 +36,7 @@ import (
"code.google.com/p/gcfg"
compute "code.google.com/p/google-api-go-client/compute/v1"
container "code.google.com/p/google-api-go-client/container/v1beta1"
"code.google.com/p/google-api-go-client/googleapi"
"github.com/golang/glog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@ -196,7 +196,7 @@ const (
GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO"
)
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) (string, error) {
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostLink(gce.projectID, gce.zone, host))
@ -208,13 +208,16 @@ func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinit
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil {
return "", err
return err
}
if err = gce.waitForRegionOp(op, region); err != nil {
return "", err
return err
}
link := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
return link, nil
return nil
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
@ -228,7 +231,10 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
}
}
if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 {
return errors.New(pollOp.Error.Errors[0].Message)
return &googleapi.Error{
Code: int(pollOp.HttpErrorStatusCode),
Message: pollOp.Error.Errors[0].Message,
}
}
return nil
}
@ -236,10 +242,21 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
// TCPLoadBalancerExists is an implementation of TCPLoadBalancer.TCPLoadBalancerExists.
func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) {
_, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err == nil {
return true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, nil
}
return false, err
}
//translate from what K8s supports to what the cloud provider supports for session affinity.
func isHTTPErrorCode(err error, code int) bool {
apiErr, ok := err.(*googleapi.Error)
return ok && apiErr.Code == code
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.AffinityType) GCEAffinityType {
switch affinityType {
case api.AffinityTypeClientIP:
@ -253,10 +270,15 @@ func translateAffinityType(affinityType api.AffinityType) GCEAffinityType {
}
// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are.
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) {
pool, err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
if err != nil {
return "", err
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err)
}
if len(ports) == 0 {
@ -276,18 +298,17 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I
Name: name,
IPProtocol: "TCP",
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
Target: pool,
}
if len(externalIP) > 0 {
req.IPAddress = externalIP.String()
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil {
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
err = gce.waitForRegionOp(op, region)
if err != nil {
return "", err
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
}
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
@ -343,23 +364,28 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
// DeleteTCPLoadBalancer is an implementation of TCPLoadBalancer.DeleteTCPLoadBalancer.
func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error {
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil {
glog.Warningln("Failed to delete Forwarding Rules %s: got error %s. Trying to delete Target Pool", name, err.Error())
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name)
} else if err != nil {
glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error())
return err
} else {
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningln("Failed waiting for Forwarding Rule %s to be deleted: got error %s. Trying to delete Target Pool", name, err.Error())
glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error())
}
}
op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil {
glog.Warningln("Failed to delete Target Pool %s, got error %s.", name, err.Error())
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted.", name)
return nil
} else if err != nil {
glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error())
return err
}
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningln("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
}
return err
}

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controller contains code for syncing cloud instances with
// Package nodecontroller contains code for syncing cloud instances with
// minion registry
package controller
package nodecontroller

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
package nodecontroller
import (
"errors"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
package nodecontroller
import (
"errors"

View File

@ -0,0 +1,19 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package servicecontroller contains code for syncing cloud load balancers
// with the service registry.
package servicecontroller

View File

@ -0,0 +1,467 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicecontroller
import (
"fmt"
"net"
"sort"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
const (
workerGoroutines = 10
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
)
type cachedService struct {
service *api.Service
// Ensures only one goroutine can operate on this service at any given time.
mu sync.Mutex
}
type serviceCache struct {
mu sync.Mutex // protects serviceMap
serviceMap map[string]*cachedService
}
type ServiceController struct {
cloud cloudprovider.Interface
kubeClient client.Interface
clusterName string
balancer cloudprovider.TCPLoadBalancer
zone cloudprovider.Zone
cache *serviceCache
}
// New returns a new service controller to keep cloud provider service resources
// (like external load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName string) *ServiceController {
return &ServiceController{
cloud: cloud,
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
}
}
// Run starts a background goroutine that watches for changes to services that
// have (or had) externalLoadBalancers=true and ensures that they have external
// load balancers created and deleted appropriately.
func (s *ServiceController) Run() error {
if err := s.init(); err != nil {
return err
}
// We have to make this check beecause the ListWatch that we use in
// WatchServices requires Client functions that aren't in the interface
// for some reason.
if _, ok := s.kubeClient.(*client.Client); !ok {
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.")
}
// Get the currently existing set of services and then all future creates
// and updates of services.
// No delta compressor is needed for the DeltaFIFO queue because we only ever
// care about the most recent state.
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
for i := 0; i < workerGoroutines; i++ {
go s.watchServices(serviceQueue)
}
return nil
}
func (s *ServiceController) init() error {
if s.cloud == nil {
return fmt.Errorf("ServiceController should not be run without a cloudprovider.")
}
balancer, ok := s.cloud.TCPLoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
}
s.balancer = balancer
zones, ok := s.cloud.Zones()
if !ok {
return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating external load balancers.")
}
zone, err := zones.GetZone()
if err != nil {
return fmt.Errorf("failed to get zone from cloud provider, will not be able to create external load balancers: %v", err)
}
s.zone = zone
return nil
}
// Loop infinitely, processing all service updates provided by the queue.
func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
for {
newItem := serviceQueue.Pop()
deltas, ok := newItem.(cache.Deltas)
if !ok {
glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem)
}
delta := deltas.Newest()
if delta == nil {
glog.Errorf("Received nil delta from watcher queue.")
continue
}
err, shouldRetry := s.processDelta(delta)
if shouldRetry {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service delta. Retrying: %v", err)
time.Sleep(5 * time.Second)
serviceQueue.AddIfNotPresent(deltas)
} else if err != nil {
util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
}
}
}
// Returns an error if processing the delta failed, along with a boolean
// indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, ok := delta.Object.(*api.Service)
var namespacedName types.NamespacedName
var cachedService *cachedService
if !ok {
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
// can send a deletion with an unknown state. Grab the service from our
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
}
cachedService, ok = s.cache.get(key.Key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
}
namespacedName = types.NamespacedName{service.Namespace, service.Name}
service = cachedService.service
delta.Object = cachedService.service
} else {
namespacedName.Namespace = service.Namespace
namespacedName.Name = service.Name
cachedService = s.cache.getOrCreate(namespacedName.String())
}
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service)
// Ensure that no other goroutine will interfere with our processing of the
// service.
cachedService.mu.Lock()
defer cachedService.mu.Unlock()
// TODO: Handle added, updated, and sync differently?
switch delta.Type {
case cache.Added:
fallthrough
case cache.Updated:
fallthrough
case cache.Sync:
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service)
if err != nil {
return err, retry
}
// Always update the cache upon success
cachedService.service = service
s.cache.set(namespacedName.String(), cachedService)
case cache.Deleted:
err := s.ensureLBDeleted(service)
if err != nil {
return err, retryable
}
s.cache.delete(namespacedName.String())
default:
glog.Errorf("Unexpected delta type: %v", delta.Type)
}
return nil, notRetryable
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) {
if cachedService != nil && !needsUpdate(cachedService, service) {
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
return nil, notRetryable
}
if cachedService != nil {
// If the service already exists but needs to be updated, delete it so that
// we can recreate it cleanly.
if cachedService.Spec.CreateExternalLoadBalancer {
glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName)
if err := s.ensureLBDeleted(cachedService); err != nil {
return err, retryable
}
}
} else {
// If we don't have any cached memory of the load balancer and it already
// exists, optimistically consider our work done.
// TODO: If we could read the spec of the existing load balancer, we could
// determine if an update is necessary.
exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable
}
if exists && len(service.Spec.PublicIPs) == 0 {
// The load balancer exists, but we apparently don't know about its public
// IPs, so just delete it and recreate it to get back to a sane state.
// TODO: Ideally the cloud provider interface would return the IP for us.
glog.Infof("Deleting old LB for service with no public IPs %s", namespacedName)
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
} else if exists {
// TODO: Better handle updates for non-cached services, this is optimistic.
glog.Infof("LB already exists for service %s", namespacedName)
return nil, notRetryable
}
}
if !service.Spec.CreateExternalLoadBalancer {
glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName)
return nil, notRetryable
}
glog.V(2).Infof("Creating LB for service %s", namespacedName)
// The load balancer doesn't exist yet, so create it.
publicIPstring := fmt.Sprint(service.Spec.PublicIPs)
err := s.createExternalLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
}
if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) {
glog.Infof("Not persisting unchanged service to registry.")
return nil, notRetryable
}
// If creating the load balancer succeeded, persist the updated service.
if err = s.persistUpdate(service); err != nil {
return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable
}
return nil, notRetryable
}
func (s *ServiceController) persistUpdate(service *api.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.Services(service.Namespace).Update(service)
if err == nil {
return nil
}
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service that no longer exists: %v", err)
return nil
}
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancers and public IPs. For now, just rely on the fact that we'll
// also process the update that caused the resource version to change.
if errors.IsConflict(err) {
glog.Infof("Not persisting update to service that has been changed since we received it: %v", err)
return nil
}
glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v",
service.Name, err)
time.Sleep(clientRetryInterval)
}
return err
}
func (s *ServiceController) createExternalLoadBalancer(service *api.Service) error {
ports, err := getTCPPorts(service)
if err != nil {
return err
}
nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
name := s.loadBalancerName(service)
if len(service.Spec.PublicIPs) > 0 {
for _, publicIP := range service.Spec.PublicIPs {
// TODO: Make this actually work for multiple IPs by using different
// names for each. For now, we'll just create the first and break.
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
break
}
} else {
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
}
return nil
}
// Ensures that the load balancer associated with the given service is deleted,
// doing the deletion if necessary. Should always be retried upon failure.
func (s *ServiceController) ensureLBDeleted(service *api.Service) error {
// This is only needed because not all delete load balancer implementations
// are currently idempotent to the LB not existing.
if exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
} else if !exists {
return nil
}
if err := s.balancer.DeleteTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
}
return nil
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) ListKeys() []string {
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]string, 0, len(s.serviceMap))
for k := range s.serviceMap {
keys = append(keys, k)
}
return keys
}
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
s.mu.Lock()
defer s.mu.Unlock()
service, ok := s.serviceMap[serviceName]
return service, ok
}
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
s.mu.Lock()
defer s.mu.Unlock()
service, ok := s.serviceMap[serviceName]
if !ok {
service = &cachedService{}
s.serviceMap[serviceName] = service
}
return service
}
func (s *serviceCache) set(serviceName string, service *cachedService) {
s.mu.Lock()
defer s.mu.Unlock()
s.serviceMap[serviceName] = service
}
func (s *serviceCache) delete(serviceName string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.serviceMap, serviceName)
}
func needsUpdate(oldService *api.Service, newService *api.Service) bool {
if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer {
return false
}
if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer {
return true
}
if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) {
return true
}
for i := range oldService.Spec.PublicIPs {
if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] {
return true
}
}
return false
}
func (s *ServiceController) loadBalancerName(service *api.Service) string {
return cloudprovider.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name)
}
func getTCPPorts(service *api.Service) ([]int, error) {
ports := []int{}
for i := range service.Spec.Ports {
// TODO: Support UDP. Remove the check from the API validation package once
// it's supported.
sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.")
}
ports = append(ports, sp.Port)
}
return ports, nil
}
func portsEqual(x, y *api.Service) bool {
xPorts, err := getTCPPorts(x)
if err != nil {
return false
}
yPorts, err := getTCPPorts(y)
if err != nil {
return false
}
if len(xPorts) != len(yPorts) {
return false
}
sort.Ints(xPorts)
sort.Ints(yPorts)
for i := range xPorts {
if xPorts[i] != yPorts[i] {
return false
}
}
return true
}
func hostsFromNodeList(list *api.NodeList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].Name
}
return result
}

View File

@ -0,0 +1,127 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicecontroller
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
const region = "us-central"
func TestCreateExternalLoadBalancer(t *testing.T) {
table := []struct {
service *api.Service
expectErr bool
expectCreateAttempt bool
}{
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "no-external-balancer",
Namespace: "default",
},
Spec: api.ServiceSpec{
CreateExternalLoadBalancer: false,
},
},
expectErr: false,
expectCreateAttempt: false,
},
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "udp-service",
Namespace: "default",
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: 80,
Protocol: api.ProtocolUDP,
}},
CreateExternalLoadBalancer: true,
},
},
expectErr: true,
expectCreateAttempt: false,
},
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "basic-service1",
Namespace: "default",
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: 80,
Protocol: api.ProtocolTCP,
}},
CreateExternalLoadBalancer: true,
},
},
expectErr: false,
expectCreateAttempt: true,
},
}
for _, item := range table {
cloud := &fake_cloud.FakeCloud{}
cloud.Region = region
client := &testclient.Fake{}
controller := New(cloud, client, "test-cluster")
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
client.Actions = nil // ignore any client calls made in init()
err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{"foo", "bar"}, item.service, nil)
if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
} else if item.expectErr && err == nil {
t.Errorf("expected error creating %v, got nil", item.service)
}
if !item.expectCreateAttempt {
if len(cloud.Calls) > 0 {
t.Errorf("unexpected cloud provider calls: %v", cloud.Calls)
}
if len(client.Actions) > 0 {
t.Errorf("unexpected client actions: %v", client.Actions)
}
} else {
if len(cloud.Balancers) != 1 {
t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers)
} else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) ||
cloud.Balancers[0].Region != region ||
cloud.Balancers[0].Ports[0] != item.service.Spec.Ports[0].Port {
t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0])
}
actionFound := false
for _, action := range client.Actions {
if action.Action == "update-service" {
actionFound = true
}
}
if !actionFound {
t.Errorf("expected updated service to be sent to client, got these actions instead: %v", client.Actions)
}
}
}
}
// TODO(a-robinson): Add tests for update/sync/delete.

View File

@ -40,7 +40,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer"
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
@ -72,7 +71,6 @@ import (
// Config is a structure used to configure a Master.
type Config struct {
Cloud cloudprovider.Interface
EtcdHelper tools.EtcdHelper
EventTTL time.Duration
MinionRegexp string
@ -396,7 +394,7 @@ func (m *Master) init(c *Config) {
"bindings": podStorage.Binding,
"replicationControllers": controllerStorage,
"services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName),
"services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName),
"endpoints": endpointsStorage,
"minions": nodeStorage,
"minions/status": nodeStatusStorage,

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package service provides Registry interface and it's RESTStorage
// Package service provides the Registry interface and its RESTStorage
// implementation for storing Service api objects.
package service

View File

@ -29,7 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
@ -43,7 +42,6 @@ import (
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
cloud cloudprovider.Interface
machines minion.Registry
endpoints endpoint.Registry
portalMgr *ipAllocator
@ -51,7 +49,7 @@ type REST struct {
}
// NewStorage returns a new REST.
func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet,
func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet,
clusterName string) *REST {
// TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd)
ipa := newIPAllocator(portalNet)
@ -62,7 +60,6 @@ func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minio
return &REST{
registry: registry,
cloud: cloud,
machines: machines,
endpoints: endpoints,
portalMgr: ipa,
@ -123,15 +120,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
releaseServiceIP = true
}
// TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if service.Spec.CreateExternalLoadBalancer {
err := rs.createExternalLoadBalancer(ctx, service)
if err != nil {
return nil, err
}
}
out, err := rs.registry.CreateService(ctx, service)
if err != nil {
err = rest.CheckGeneratedNameError(rest.Services, err, service)
@ -144,14 +132,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
return out, err
}
func hostsFromMinionList(list *api.NodeList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].Name
}
return result
}
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
service, err := rs.registry.GetService(ctx, id)
if err != nil {
@ -160,9 +140,6 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
if api.IsServiceIPSet(service) {
rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP))
}
if service.Spec.CreateExternalLoadBalancer {
rs.deleteExternalLoadBalancer(ctx, service)
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
}
@ -219,22 +196,6 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 {
return nil, false, errors.NewInvalid("service", service.Name, errs)
}
// Recreate external load balancer if changed.
if externalLoadBalancerNeedsUpdate(oldService, service) {
// TODO: support updating existing balancers
if oldService.Spec.CreateExternalLoadBalancer {
err = rs.deleteExternalLoadBalancer(ctx, oldService)
if err != nil {
return nil, false, err
}
}
if service.Spec.CreateExternalLoadBalancer {
err = rs.createExternalLoadBalancer(ctx, service)
if err != nil {
return nil, false, err
}
}
}
out, err := rs.registry.UpdateService(ctx, service)
return out, false, err
}
@ -282,130 +243,3 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou
}
return nil, nil, fmt.Errorf("no endpoints available for %q", id)
}
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {
if rs.cloud == nil {
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
ports, err := getTCPPorts(service)
if err != nil {
return err
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
}
zones, ok := rs.cloud.Zones()
if !ok {
return fmt.Errorf("the cloud provider does not support zone enumeration.")
}
hosts, err := rs.machines.ListMinions(ctx, labels.Everything(), fields.Everything())
if err != nil {
return err
}
zone, err := zones.GetZone()
if err != nil {
return err
}
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
var affinityType api.AffinityType = service.Spec.SessionAffinity
if len(service.Spec.PublicIPs) > 0 {
for _, publicIP := range service.Spec.PublicIPs {
_, err = balancer.CreateTCPLoadBalancer(name, zone.Region, net.ParseIP(publicIP), ports, hostsFromMinionList(hosts), affinityType)
if err != nil {
// TODO: have to roll-back any successful calls.
return err
}
}
} else {
endpoint, err := balancer.CreateTCPLoadBalancer(name, zone.Region, nil, ports, hostsFromMinionList(hosts), affinityType)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
}
return nil
}
func getTCPPorts(service *api.Service) ([]int, error) {
ports := []int{}
for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP {
// TODO: Support UDP here too.
return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.")
}
ports = append(ports, sp.Port)
}
return ports, nil
}
func portsEqual(x, y *api.Service) bool {
xPorts, err := getTCPPorts(x)
if err != nil {
return false
}
yPorts, err := getTCPPorts(y)
if err != nil {
return false
}
if len(xPorts) != len(yPorts) {
return false
}
for i := range xPorts {
if xPorts[i] != yPorts[i] {
return false
}
}
return true
}
func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service) error {
if rs.cloud == nil {
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
zones, ok := rs.cloud.Zones()
if !ok {
// We failed to get zone enumerator.
// As this should have failed when we tried in "create" too,
// assume external load balancer was never created.
return nil
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
// See comment above.
return nil
}
zone, err := zones.GetZone()
if err != nil {
return err
}
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil {
return err
}
return nil
}
func externalLoadBalancerNeedsUpdate(oldService, newService *api.Service) bool {
if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer {
return false
}
if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer {
return true
}
if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) {
return true
}
for i := range oldService.Spec.PublicIPs {
if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] {
return true
}
}
return false
}

View File

@ -19,7 +19,6 @@ package service
import (
"bytes"
"encoding/gob"
"fmt"
"net"
"strings"
"testing"
@ -28,22 +27,20 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *cloud.FakeCloud) {
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
endpointRegistry := &registrytest.EndpointRegistry{
Endpoints: endpoints,
}
nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{})
storage := NewStorage(registry, fakeCloud, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes")
return storage, registry, fakeCloud
storage := NewStorage(registry, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes")
return storage, registry
}
func makeIPNet(t *testing.T) *net.IPNet {
@ -65,7 +62,7 @@ func deepCloneService(svc *api.Service) *api.Service {
}
func TestServiceRegistryCreate(t *testing.T) {
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
storage.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -97,9 +94,6 @@ func TestServiceRegistryCreate(t *testing.T) {
if created_service.Spec.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP)
}
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := registry.GetService(ctx, svc.Name)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -110,7 +104,7 @@ func TestServiceRegistryCreate(t *testing.T) {
}
func TestServiceStorageValidatesCreate(t *testing.T) {
storage, _, _ := NewTestREST(t, nil)
storage, _ := NewTestREST(t, nil)
failureCases := map[string]api.Service{
"empty ID": {
ObjectMeta: api.ObjectMeta{Name: ""},
@ -148,7 +142,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) {
func TestServiceRegistryUpdate(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, _ := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
svc, err := registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@ -196,7 +190,7 @@ func TestServiceRegistryUpdate(t *testing.T) {
func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, _ := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -244,7 +238,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
func TestServiceRegistryExternalService(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -257,11 +251,9 @@ func TestServiceRegistryExternalService(t *testing.T) {
}},
},
}
if _, err := storage.Create(ctx, svc); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
_, err := storage.Create(ctx, svc)
if err != nil {
t.Errorf("Failed to create service: %#v", err)
}
srv, err := registry.GetService(ctx, svc.Name)
if err != nil {
@ -270,41 +262,11 @@ func TestServiceRegistryExternalService(t *testing.T) {
if srv == nil {
t.Errorf("Failed to find service: %s", svc.Name)
}
if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 {
t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers)
}
}
func TestServiceRegistryExternalServiceError(t *testing.T) {
storage, registry, fakeCloud := NewTestREST(t, nil)
fakeCloud.Err = fmt.Errorf("test error")
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true,
SessionAffinity: api.AffinityTypeNone,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
}},
},
}
ctx := api.NewDefaultContext()
if _, err := storage.Create(ctx, svc); err == nil {
t.Fatalf("Unexpected success")
}
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if registry.Service != nil {
t.Errorf("Expected registry.CreateService to not get called, but it got %#v", registry.Service)
}
}
func TestServiceRegistryDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -318,9 +280,6 @@ func TestServiceRegistryDelete(t *testing.T) {
}
registry.CreateService(ctx, svc)
storage.Delete(ctx, svc.Name)
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if e, a := "foo", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
@ -328,7 +287,7 @@ func TestServiceRegistryDelete(t *testing.T) {
func TestServiceRegistryDeleteExternal(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -343,9 +302,6 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
}
registry.CreateService(ctx, svc)
storage.Delete(ctx, svc.Name)
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if e, a := "foo", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
@ -353,7 +309,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
func TestServiceRegistryUpdateExternalService(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _, fakeCloud := NewTestREST(t, nil)
storage, _ := NewTestREST(t, nil)
// Create non-external load balancer.
svc1 := &api.Service{
@ -371,9 +327,6 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
if _, err := storage.Create(ctx, svc1); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
// Modify load balancer to be external.
svc2 := deepCloneService(svc1)
@ -381,9 +334,6 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc2); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
// Change port.
svc3 := deepCloneService(svc2)
@ -391,16 +341,11 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc3); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" ||
fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" ||
fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
}
func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _, fakeCloud := NewTestREST(t, nil)
storage, _ := NewTestREST(t, nil)
// Create external load balancer.
svc1 := &api.Service{
@ -423,9 +368,6 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
if _, err := storage.Create(ctx, svc1); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
// Modify ports
svc2 := deepCloneService(svc1)
@ -433,16 +375,11 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc2); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" ||
fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" ||
fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
}
func TestServiceRegistryGet(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -450,9 +387,6 @@ func TestServiceRegistryGet(t *testing.T) {
},
})
storage.Get(ctx, "foo")
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if e, a := "foo", registry.GottenID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
@ -474,7 +408,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
},
},
}
storage, registry, _ := NewTestREST(t, endpoints)
storage, registry := NewTestREST(t, endpoints)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -521,7 +455,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
func TestServiceRegistryList(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
storage, registry := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@ -537,9 +471,6 @@ func TestServiceRegistryList(t *testing.T) {
registry.List.ResourceVersion = "1"
s, _ := storage.List(ctx, labels.Everything(), fields.Everything())
sl := s.(*api.ServiceList)
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if len(sl.Items) != 2 {
t.Fatalf("Expected 2 services, but got %v", len(sl.Items))
}
@ -555,7 +486,7 @@ func TestServiceRegistryList(t *testing.T) {
}
func TestServiceRegistryIPAllocation(t *testing.T) {
rest, _, _ := NewTestREST(t, nil)
rest, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc1 := &api.Service{
@ -620,7 +551,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) {
}
func TestServiceRegistryIPReallocation(t *testing.T) {
rest, _, _ := NewTestREST(t, nil)
rest, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc1 := &api.Service{
@ -669,7 +600,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) {
}
func TestServiceRegistryIPUpdate(t *testing.T) {
rest, _, _ := NewTestREST(t, nil)
rest, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -713,7 +644,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) {
}
func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) {
rest, _, fakeCloud := NewTestREST(t, nil)
rest, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -744,18 +675,14 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 {
t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers)
}
}
func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{})
endpoints := &registrytest.EndpointRegistry{}
rest1 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest1 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest1.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -786,7 +713,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
// This will reload from storage, finding the previous 2
nodeRegistry = registrytest.NewMinionRegistry(machines, api.NodeResources{})
rest2 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest2 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest2.portalMgr.randomAttempts = 0
svc = &api.Service{
@ -845,7 +772,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) {
}
func TestCreate(t *testing.T) {
rest, registry, _ := NewTestREST(t, nil)
rest, registry := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
test := resttest.New(t, rest, registry.SetError)

View File

@ -286,6 +286,10 @@ var _ = Describe("Services", func() {
Expect(err).NotTo(HaveOccurred())
}(ns, serviceName)
// Wait for the load balancer to be created asynchronously, which is
// currently indicated by a public IP address being added to the spec.
result, err = waitForPublicIPs(c, serviceName, ns)
Expect(err).NotTo(HaveOccurred())
if len(result.Spec.PublicIPs) != 1 {
Failf("got unexpected number (%d) of public IPs for externally load balanced service: %v", result.Spec.PublicIPs, result)
}
@ -325,7 +329,7 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's external load balancer")
var resp *http.Response
for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) {
for t := time.Now(); time.Since(t) < time.Minute; time.Sleep(5 * time.Second) {
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port))
if err == nil {
break
@ -369,13 +373,19 @@ var _ = Describe("Services", func() {
service.ObjectMeta.Name = serviceName
service.ObjectMeta.Namespace = namespace
By("creating service " + serviceName + " in namespace " + namespace)
result, err := c.Services(namespace).Create(service)
_, err := c.Services(namespace).Create(service)
Expect(err).NotTo(HaveOccurred())
defer func(namespace, serviceName string) { // clean up when we're done
By("deleting service " + serviceName + " in namespace " + namespace)
err := c.Services(namespace).Delete(serviceName)
Expect(err).NotTo(HaveOccurred())
}(namespace, serviceName)
}
}
for _, namespace := range namespaces {
for _, serviceName := range serviceNames {
result, err := waitForPublicIPs(c, serviceName, namespace)
Expect(err).NotTo(HaveOccurred())
publicIPs = append(publicIPs, result.Spec.PublicIPs...) // Save 'em to check uniqueness
}
}
@ -383,6 +393,24 @@ var _ = Describe("Services", func() {
})
})
func waitForPublicIPs(c *client.Client, serviceName, namespace string) (*api.Service, error) {
const timeout = 4 * time.Minute
var service *api.Service
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace))
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
service, err := c.Services(namespace).Get(serviceName)
if err != nil {
Logf("Get service failed, ignoring for 5s: %v", err)
continue
}
if len(service.Spec.PublicIPs) > 0 {
return service, nil
}
Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start))
}
return service, fmt.Errorf("service %s in namespace %s to have a public IP after %.2f seconds", nil, serviceName, namespace, podStartTimeout.Seconds())
}
func validateUniqueOrFail(s []string) {
By(fmt.Sprintf("validating unique: %v", s))
sort.Strings(s)