Implement a ServiceController that watches services and handles keeping

external load balancers up-to-date based on the service's specs, using
the new DeltaFIFO watch queue class. Remove the old registry REST
handler code for creating/updating/deleting load balancers.

Also clean up a bunch of the GCE cloudprovider code related to load balancers.
This commit is contained in:
Alex Robinson 2015-03-24 17:32:43 +00:00
parent a11106edd3
commit ccc300289f
12 changed files with 683 additions and 220 deletions

View File

@ -6,12 +6,16 @@
{% set master="--master=127.0.0.1:8080" -%}
{% set machines = ""-%}
{% set cluster_name = "" -%}
{% set minion_regexp = "--minion_regexp=.*" -%}
{% set sync_nodes = "--sync_nodes=true" -%}
{% if pillar['node_instance_prefix'] is defined -%}
{% set minion_regexp = "--minion_regexp='" + pillar['node_instance_prefix'] + ".*'" -%}
{% endif -%}
{% if pillar['instance_prefix'] is defined -%}
{% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%}
{% endif -%}
{% set cloud_provider = "" -%}
{% set cloud_config = "" -%}
@ -49,4 +53,4 @@
{% endif -%} # grains.cloud is defined
DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}"
DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{cluster_name}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}"

View File

@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/namespace"
@ -49,6 +50,7 @@ type CMServer struct {
ClientConfig client.Config
CloudProvider string
CloudConfigFile string
ClusterName string
MinionRegexp string
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
client.BindClientConfigFlags(fs, &s.ClientConfig)
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.")
fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
@ -192,6 +195,11 @@ func (s *CMServer) Run(_ []string) error {
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)

View File

@ -35,6 +35,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
@ -132,6 +133,11 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
nodeController.Run(10*time.Second, true)
serviceController := servicecontroller.New(nil, cl, "kubernetes")
if err := serviceController.Run(); err != nil {
glog.Warningf("Running without a service controller: %v", err)
}
endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

View File

@ -50,6 +50,8 @@ func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) stri
type TCPLoadBalancer interface {
// TCPLoadBalancerExists returns whether the specified load balancer exists.
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
// TODO: This should really return the details of the load balancer so we can
// determine if it matches the needs of a service rather than if it exists.
TCPLoadBalancerExists(name, region string) (bool, error)
// CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address or hostname of the balancer
CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error)

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce_cloud
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -37,6 +36,7 @@ import (
"code.google.com/p/gcfg"
compute "code.google.com/p/google-api-go-client/compute/v1"
container "code.google.com/p/google-api-go-client/container/v1beta1"
"code.google.com/p/google-api-go-client/googleapi"
"github.com/golang/glog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@ -196,7 +196,7 @@ const (
GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO"
)
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) (string, error) {
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostLink(gce.projectID, gce.zone, host))
@ -208,13 +208,16 @@ func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinit
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil {
return "", err
return err
}
if err = gce.waitForRegionOp(op, region); err != nil {
return "", err
return err
}
link := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
return link, nil
return nil
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
@ -228,7 +231,10 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
}
}
if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 {
return errors.New(pollOp.Error.Errors[0].Message)
return &googleapi.Error{
Code: int(pollOp.HttpErrorStatusCode),
Message: pollOp.Error.Errors[0].Message,
}
}
return nil
}
@ -236,10 +242,21 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
// TCPLoadBalancerExists is an implementation of TCPLoadBalancer.TCPLoadBalancerExists.
func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) {
_, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err == nil {
return true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, nil
}
return false, err
}
//translate from what K8s supports to what the cloud provider supports for session affinity.
func isHTTPErrorCode(err error, code int) bool {
apiErr, ok := err.(*googleapi.Error)
return ok && apiErr.Code == code
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.AffinityType) GCEAffinityType {
switch affinityType {
case api.AffinityTypeClientIP:
@ -253,10 +270,15 @@ func translateAffinityType(affinityType api.AffinityType) GCEAffinityType {
}
// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are.
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) {
pool, err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
if err != nil {
return "", err
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err)
}
if len(ports) == 0 {
@ -276,18 +298,17 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I
Name: name,
IPProtocol: "TCP",
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
Target: pool,
}
if len(externalIP) > 0 {
req.IPAddress = externalIP.String()
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil {
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
err = gce.waitForRegionOp(op, region)
if err != nil {
return "", err
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return "", err
}
}
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
@ -343,23 +364,28 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
// DeleteTCPLoadBalancer is an implementation of TCPLoadBalancer.DeleteTCPLoadBalancer.
func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error {
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil {
glog.Warningln("Failed to delete Forwarding Rules %s: got error %s. Trying to delete Target Pool", name, err.Error())
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name)
} else if err != nil {
glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error())
return err
} else {
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningln("Failed waiting for Forwarding Rule %s to be deleted: got error %s. Trying to delete Target Pool", name, err.Error())
glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error())
}
}
op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil {
glog.Warningln("Failed to delete Target Pool %s, got error %s.", name, err.Error())
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted.", name)
return nil
} else if err != nil {
glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error())
return err
}
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningln("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
}
return err
}

View File

@ -0,0 +1,19 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package servicecontroller contains code for syncing cloud load balancers
// with the service registry.
package servicecontroller

View File

@ -0,0 +1,434 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicecontroller
import (
"fmt"
"net"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)
const (
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
)
type ServiceController struct {
cloud cloudprovider.Interface
kubeClient client.Interface
clusterName string
balancer cloudprovider.TCPLoadBalancer
zone cloudprovider.Zone
mu sync.Mutex // protects serviceMap
serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc
}
// New returns a new service controller to keep cloud provider service resources
// (like external load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName string) *ServiceController {
return &ServiceController{
cloud: cloud,
kubeClient: kubeClient,
clusterName: clusterName,
serviceMap: make(map[string]*api.Service),
}
}
// Run starts a background goroutine that watches for changes to services that
// have (or had) externalLoadBalancers=true and ensures that they have external
// load balancers created and deleted appropriately.
func (s *ServiceController) Run() error {
if err := s.init(); err != nil {
return err
}
// We have to make this check beecause the ListWatch that we use in
// WatchServices requires Client functions that aren't in the interface
// for some reason.
if _, ok := s.kubeClient.(*client.Client); !ok {
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.")
}
go s.watchServices()
return nil
}
func (s *ServiceController) init() error {
if s.cloud == nil {
return fmt.Errorf("ServiceController should not be run without a cloudprovider.")
}
balancer, ok := s.cloud.TCPLoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
}
s.balancer = balancer
zones, ok := s.cloud.Zones()
if !ok {
return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating external load balancers.")
}
zone, err := zones.GetZone()
if err != nil {
return fmt.Errorf("failed to get zone from cloud provider, will not be able to create external load balancers: %v", err)
}
s.zone = zone
return nil
}
func (s *ServiceController) watchServices() {
// Get the currently existing set of services and then all future creates
// and updates of services.
// TODO: Add a compressor that intelligently squashes together updates?
keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() })
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
// TODO: Add proper retries rather than just re-adding to the queue?
for {
newItem := serviceQueue.Pop()
deltas, ok := newItem.(cache.Deltas)
if !ok {
glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem)
}
delta := deltas.Newest()
if delta == nil {
glog.Errorf("Received nil delta from watcher queue.")
continue
}
err, shouldRetry := s.processDelta(delta)
if shouldRetry {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service delta. Retrying: %v", err)
time.Sleep(5 * time.Second)
serviceQueue.AddIfNotPresent(deltas)
} else if err != nil {
glog.Errorf("Failed to process service delta. Not retrying: %v", err)
}
}
}
// Returns an error if processing the delta failed, along with a boolean
// indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, ok := delta.Object.(*api.Service)
if !ok {
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
// can send a deletion with an unknown state. Grab the service from our
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
}
service, ok = s.getService(key.Key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
}
delta.Object = service
}
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service)
// TODO: Make this more parallel. The only things that need to serialized
// are changes to services with the same namespace and name.
// TODO: Handle added, updated, and sync differently?
switch delta.Type {
case cache.Added:
fallthrough
case cache.Updated:
fallthrough
case cache.Sync:
return s.createLoadBalancerIfNeeded(service)
case cache.Deleted:
return s.handleDelete(service)
default:
glog.Errorf("Unexpected delta type: %v", delta.Type)
}
return nil, notRetryable
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) {
namespacedName, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
cachedService, cached := s.getService(namespacedName)
if cached && !needsUpdate(cachedService, service) {
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
return nil, notRetryable
}
if cached {
// If the service already exists but needs to be updated, delete it so that
// we can recreate it cleanly.
if cachedService.Spec.CreateExternalLoadBalancer {
glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName)
if err := s.ensureLBDeleted(cachedService); err != nil {
return err, retryable
}
}
} else {
// If we don't have any cached memory of the load balancer and it already
// exists, optimistically consider our work done.
// TODO: If we could read the spec of the existing load balancer, we could
// determine if an update is necessary.
exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable
}
if exists && len(service.Spec.PublicIPs) == 0 {
// The load balancer exists, but we apparently don't know about its public
// IPs, so just delete it and recreate it to get back to a sane state.
// TODO: Ideally the cloud provider interface would return the IP for us.
glog.Infof("Deleting old LB for service with no public IPs %s", namespacedName)
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
} else if exists {
// TODO: Better handle updates for non-cached services, this is optimistic.
glog.Infof("LB already exists for service %s", namespacedName)
return nil, notRetryable
}
}
if !service.Spec.CreateExternalLoadBalancer {
glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName)
return nil, notRetryable
}
glog.V(2).Infof("Creating LB for service %s", namespacedName)
// The load balancer doesn't exist yet, so create it.
publicIPstring := fmt.Sprint(service.Spec.PublicIPs)
err = s.createExternalLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
}
if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) {
glog.Infof("Not persisting unchanged service to registry.")
return nil, notRetryable
}
s.setService(namespacedName, service)
// If creating the load balancer succeeded, persist the updated service.
if err = s.persistUpdate(service); err != nil {
return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable
}
return nil, notRetryable
}
// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or
// the object having been deleted.
func (s *ServiceController) persistUpdate(service *api.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.Services(service.Namespace).Update(service)
if err == nil {
return nil
}
glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v",
service.Name, err)
time.Sleep(clientRetryInterval)
}
return err
}
func (s *ServiceController) createExternalLoadBalancer(service *api.Service) error {
ports, err := getTCPPorts(service)
if err != nil {
return err
}
nodes, err := s.kubeClient.Nodes().List(labels.Everything())
if err != nil {
return err
}
name := s.loadBalancerName(service)
if len(service.Spec.PublicIPs) > 0 {
for _, publicIP := range service.Spec.PublicIPs {
// TODO: Make this actually work for multiple IPs by using different
// names for each. For now, we'll just create the first and break.
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
break
}
} else {
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
}
return nil
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) handleDelete(service *api.Service) (error, bool) {
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
namespacedName, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
// This is panic-worthy, since the queue shouldn't have been able to
// handle the service if it couldn't generate a name for it.
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
s.deleteService(namespacedName)
return nil, notRetryable
}
// Ensures that the load balancer associated with the given service is deleted,
// doing the deletion if necessary.
func (s *ServiceController) ensureLBDeleted(service *api.Service) error {
// This is only needed because not all delete load balancer implementations
// are currently idempotent to the LB not existing.
if exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
} else if !exists {
return nil
}
if err := s.balancer.DeleteTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
}
return nil
}
// listKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *ServiceController) listKeys() []string {
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]string, 0, len(s.serviceMap))
for k := range s.serviceMap {
keys = append(keys, k)
}
return keys
}
func (s *ServiceController) getService(serviceName string) (*api.Service, bool) {
s.mu.Lock()
defer s.mu.Unlock()
info, ok := s.serviceMap[serviceName]
return info, ok
}
func (s *ServiceController) setService(serviceName string, info *api.Service) {
s.mu.Lock()
defer s.mu.Unlock()
s.serviceMap[serviceName] = info
}
func (s *ServiceController) deleteService(serviceName string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.serviceMap, serviceName)
}
func needsUpdate(oldService *api.Service, newService *api.Service) bool {
if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer {
return false
}
if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer {
return true
}
if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) {
return true
}
for i := range oldService.Spec.PublicIPs {
if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] {
return true
}
}
return false
}
// TODO: Use a shorter name that's less likely to be longer than cloud
// providers' length limits.
func (s *ServiceController) loadBalancerName(service *api.Service) string {
return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name)
}
// TODO: Deduplicate this with the copy in pkg/registry/service/rest.go.
func getTCPPorts(service *api.Service) ([]int, error) {
ports := []int{}
for i := range service.Spec.Ports {
// TODO: Support UDP. Remove the check from the API validation package once
// it's supported.
sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.")
}
ports = append(ports, sp.Port)
}
return ports, nil
}
func portsEqual(x, y *api.Service) bool {
xPorts, err := getTCPPorts(x)
if err != nil {
return false
}
yPorts, err := getTCPPorts(y)
if err != nil {
return false
}
if len(xPorts) != len(yPorts) {
return false
}
// Use a map for comparison since port slices aren't necessarily sorted.
xPortMap := make(map[int]bool)
for _, xPort := range xPorts {
xPortMap[xPort] = true
}
for _, yPort := range yPorts {
if !xPortMap[yPort] {
return false
}
}
return true
}
func hostsFromNodeList(list *api.NodeList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].Name
}
return result
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicecontroller
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
)
const region = "us-central"
func TestCreateExternalLoadBalancer(t *testing.T) {
table := []struct {
service *api.Service
expectErr bool
expectCreateAttempt bool
}{
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "no-external-balancer",
Namespace: "default",
},
Spec: api.ServiceSpec{
CreateExternalLoadBalancer: false,
},
},
expectErr: false,
expectCreateAttempt: false,
},
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "udp-service",
Namespace: "default",
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: 80,
Protocol: api.ProtocolUDP,
}},
CreateExternalLoadBalancer: true,
},
},
expectErr: true,
expectCreateAttempt: false,
},
{
service: &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "basic-service1",
Namespace: "default",
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: 80,
Protocol: api.ProtocolTCP,
}},
CreateExternalLoadBalancer: true,
},
},
expectErr: false,
expectCreateAttempt: true,
},
}
for _, item := range table {
cloud := &fake_cloud.FakeCloud{}
cloud.Region = region
client := &testclient.Fake{}
controller := New(cloud, client, "test-cluster")
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
client.Actions = nil // ignore any client calls made in init()
err, _ := controller.createLoadBalancerIfNeeded(item.service)
if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
} else if item.expectErr && err == nil {
t.Errorf("expected error creating %v, got nil", item.service)
}
if !item.expectCreateAttempt {
if len(cloud.Calls) > 0 {
t.Errorf("unexpected cloud provider calls: %v", cloud.Calls)
}
if len(client.Actions) > 0 {
t.Errorf("unexpected client actions: %v", client.Actions)
}
} else {
if len(cloud.Balancers) != 1 {
t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers)
} else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) ||
cloud.Balancers[0].Region != region ||
cloud.Balancers[0].Ports[0] != item.service.Spec.Ports[0].Port {
t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0])
}
actionFound := false
for _, action := range client.Actions {
if action.Action == "update-service" {
actionFound = true
}
}
if !actionFound {
t.Errorf("expected updated service to be sent to client, got these actions instead: %v", client.Actions)
}
}
}
}
// TODO(a-robinson): Add tests for update/sync/delete.

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package service provides Registry interface and it's RESTStorage
// Package service provides the Registry interface and its RESTStorage
// implementation for storing Service api objects.
package service

View File

@ -42,7 +42,8 @@ import (
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
registry Registry
// TODO(a-robinson): Remove cloud
cloud cloudprovider.Interface
machines minion.Registry
endpoints endpoint.Registry
@ -97,6 +98,14 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
return nil, err
}
// Make sure that we'll be able to create a load balancer for the service,
// even though it'll be created by the ServiceController.
if service.Spec.CreateExternalLoadBalancer {
if _, err := getTCPPorts(service); err != nil {
return nil, err
}
}
releaseServiceIP := false
defer func() {
if releaseServiceIP {
@ -123,15 +132,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
releaseServiceIP = true
}
// TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if service.Spec.CreateExternalLoadBalancer {
err := rs.createExternalLoadBalancer(ctx, service)
if err != nil {
return nil, err
}
}
out, err := rs.registry.CreateService(ctx, service)
if err != nil {
err = rest.CheckGeneratedNameError(rest.Services, err, service)
@ -144,14 +144,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
return out, err
}
func hostsFromMinionList(list *api.NodeList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].Name
}
return result
}
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
service, err := rs.registry.GetService(ctx, id)
if err != nil {
@ -160,9 +152,6 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
if api.IsServiceIPSet(service) {
rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP))
}
if service.Spec.CreateExternalLoadBalancer {
rs.deleteExternalLoadBalancer(ctx, service)
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
}
@ -219,22 +208,6 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 {
return nil, false, errors.NewInvalid("service", service.Name, errs)
}
// Recreate external load balancer if changed.
if externalLoadBalancerNeedsUpdate(oldService, service) {
// TODO: support updating existing balancers
if oldService.Spec.CreateExternalLoadBalancer {
err = rs.deleteExternalLoadBalancer(ctx, oldService)
if err != nil {
return nil, false, err
}
}
if service.Spec.CreateExternalLoadBalancer {
err = rs.createExternalLoadBalancer(ctx, service)
if err != nil {
return nil, false, err
}
}
}
out, err := rs.registry.UpdateService(ctx, service)
return out, false, err
}
@ -283,52 +256,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou
return nil, nil, fmt.Errorf("no endpoints available for %q", id)
}
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {
if rs.cloud == nil {
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
ports, err := getTCPPorts(service)
if err != nil {
return err
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
}
zones, ok := rs.cloud.Zones()
if !ok {
return fmt.Errorf("the cloud provider does not support zone enumeration.")
}
hosts, err := rs.machines.ListMinions(ctx, labels.Everything(), fields.Everything())
if err != nil {
return err
}
zone, err := zones.GetZone()
if err != nil {
return err
}
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
var affinityType api.AffinityType = service.Spec.SessionAffinity
if len(service.Spec.PublicIPs) > 0 {
for _, publicIP := range service.Spec.PublicIPs {
_, err = balancer.CreateTCPLoadBalancer(name, zone.Region, net.ParseIP(publicIP), ports, hostsFromMinionList(hosts), affinityType)
if err != nil {
// TODO: have to roll-back any successful calls.
return err
}
}
} else {
endpoint, err := balancer.CreateTCPLoadBalancer(name, zone.Region, nil, ports, hostsFromMinionList(hosts), affinityType)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
}
return nil
}
// TODO: Deduplicate with the copy of this in pkg/registry/service/rest.go
func getTCPPorts(service *api.Service) ([]int, error) {
ports := []int{}
for i := range service.Spec.Ports {
@ -341,71 +269,3 @@ func getTCPPorts(service *api.Service) ([]int, error) {
}
return ports, nil
}
func portsEqual(x, y *api.Service) bool {
xPorts, err := getTCPPorts(x)
if err != nil {
return false
}
yPorts, err := getTCPPorts(y)
if err != nil {
return false
}
if len(xPorts) != len(yPorts) {
return false
}
for i := range xPorts {
if xPorts[i] != yPorts[i] {
return false
}
}
return true
}
func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service) error {
if rs.cloud == nil {
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
zones, ok := rs.cloud.Zones()
if !ok {
// We failed to get zone enumerator.
// As this should have failed when we tried in "create" too,
// assume external load balancer was never created.
return nil
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
// See comment above.
return nil
}
zone, err := zones.GetZone()
if err != nil {
return err
}
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil {
return err
}
return nil
}
func externalLoadBalancerNeedsUpdate(oldService, newService *api.Service) bool {
if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer {
return false
}
if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer {
return true
}
if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) {
return true
}
for i := range oldService.Spec.PublicIPs {
if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] {
return true
}
}
return false
}

View File

@ -19,7 +19,6 @@ package service
import (
"bytes"
"encoding/gob"
"fmt"
"net"
"strings"
"testing"
@ -257,10 +256,11 @@ func TestServiceRegistryExternalService(t *testing.T) {
}},
},
}
if _, err := storage.Create(ctx, svc); err != nil {
t.Fatalf("Unexpected error: %v", err)
_, err := storage.Create(ctx, svc)
if err != nil {
t.Errorf("Failed to create service: %#v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := registry.GetService(ctx, svc.Name)
@ -270,38 +270,11 @@ func TestServiceRegistryExternalService(t *testing.T) {
if srv == nil {
t.Errorf("Failed to find service: %s", svc.Name)
}
if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 {
if len(fakeCloud.Balancers) != 0 {
t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers)
}
}
func TestServiceRegistryExternalServiceError(t *testing.T) {
storage, registry, fakeCloud := NewTestREST(t, nil)
fakeCloud.Err = fmt.Errorf("test error")
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true,
SessionAffinity: api.AffinityTypeNone,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
}},
},
}
ctx := api.NewDefaultContext()
if _, err := storage.Create(ctx, svc); err == nil {
t.Fatalf("Unexpected success")
}
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if registry.Service != nil {
t.Errorf("Expected registry.CreateService to not get called, but it got %#v", registry.Service)
}
}
func TestServiceRegistryDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, registry, fakeCloud := NewTestREST(t, nil)
@ -343,7 +316,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
}
registry.CreateService(ctx, svc)
storage.Delete(ctx, svc.Name)
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
if e, a := "foo", registry.DeletedID; e != a {
@ -381,7 +354,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc2); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
@ -391,9 +364,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc3); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" ||
fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" ||
fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
}
@ -423,7 +394,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
if _, err := storage.Create(ctx, svc1); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
@ -433,9 +404,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
if _, _, err := storage.Update(ctx, svc2); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" ||
fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" ||
fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" {
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
}
@ -744,7 +713,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 {
if len(fakeCloud.Balancers) != 0 {
t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers)
}
}

View File

@ -286,6 +286,15 @@ var _ = Describe("Services", func() {
Expect(err).NotTo(HaveOccurred())
}(ns, serviceName)
// Wait for the load balancer to be created asynchronously, which is
// (unfortunately) currently indicated by a public IP address being
// added to the spec.
for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) {
result, _ = c.Services(ns).Get(serviceName)
if len(result.Spec.PublicIPs) == 1 {
break
}
}
if len(result.Spec.PublicIPs) != 1 {
Failf("got unexpected number (%d) of public IPs for externally load balanced service: %v", result.Spec.PublicIPs, result)
}
@ -325,7 +334,7 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's external load balancer")
var resp *http.Response
for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) {
for t := time.Now(); time.Since(t) < time.Minute; time.Sleep(5 * time.Second) {
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port))
if err == nil {
break