diff --git a/cluster/saltbase/salt/kube-controller-manager/default b/cluster/saltbase/salt/kube-controller-manager/default index c10a3976ac8..17a7b5d39cc 100644 --- a/cluster/saltbase/salt/kube-controller-manager/default +++ b/cluster/saltbase/salt/kube-controller-manager/default @@ -6,12 +6,16 @@ {% set master="--master=127.0.0.1:8080" -%} {% set machines = ""-%} +{% set cluster_name = "" -%} {% set minion_regexp = "--minion_regexp=.*" -%} {% set sync_nodes = "--sync_nodes=true" -%} {% if pillar['node_instance_prefix'] is defined -%} {% set minion_regexp = "--minion_regexp='" + pillar['node_instance_prefix'] + ".*'" -%} {% endif -%} +{% if pillar['instance_prefix'] is defined -%} + {% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%} +{% endif -%} {% set cloud_provider = "" -%} {% set cloud_config = "" -%} @@ -49,4 +53,4 @@ {% endif -%} # grains.cloud is defined -DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}" +DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{cluster_name}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}" diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index db71f46c8ca..06f800bd48d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" @@ -49,6 +50,7 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string + ClusterName string MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -192,6 +195,11 @@ func (s *CMServer) Run(_ []string) error { s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) + serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) + if err := serviceController.Run(); err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } + resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 1f4f1f9948e..8ec247d9b30 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -35,6 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -132,6 +133,11 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(10*time.Second, true) + serviceController := servicecontroller.New(nil, cl, "kubernetes") + if err := serviceController.Run(); err != nil { + glog.Warningf("Running without a service controller: %v", err) + } + endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 3f76e80147f..ddf74af7da0 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -50,6 +50,8 @@ func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) stri type TCPLoadBalancer interface { // TCPLoadBalancerExists returns whether the specified load balancer exists. // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service + // TODO: This should really return the details of the load balancer so we can + // determine if it matches the needs of a service rather than if it exists. TCPLoadBalancerExists(name, region string) (bool, error) // CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address or hostname of the balancer CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 188aa34ba52..f5f577bce38 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,7 +17,6 @@ limitations under the License. package gce_cloud import ( - "errors" "fmt" "io" "io/ioutil" @@ -37,6 +36,7 @@ import ( "code.google.com/p/gcfg" compute "code.google.com/p/google-api-go-client/compute/v1" container "code.google.com/p/google-api-go-client/container/v1beta1" + "code.google.com/p/google-api-go-client/googleapi" "github.com/golang/glog" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -196,7 +196,7 @@ const ( GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO" ) -func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) (string, error) { +func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error { var instances []string for _, host := range hosts { instances = append(instances, makeHostLink(gce.projectID, gce.zone, host)) @@ -208,13 +208,16 @@ func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinit } op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() if err != nil { - return "", err + return err } if err = gce.waitForRegionOp(op, region); err != nil { - return "", err + return err } - link := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) - return link, nil + return nil +} + +func (gce *GCECloud) targetPoolURL(name, region string) string { + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { @@ -228,7 +231,10 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error } } if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 { - return errors.New(pollOp.Error.Errors[0].Message) + return &googleapi.Error{ + Code: int(pollOp.HttpErrorStatusCode), + Message: pollOp.Error.Errors[0].Message, + } } return nil } @@ -236,10 +242,21 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error // TCPLoadBalancerExists is an implementation of TCPLoadBalancer.TCPLoadBalancerExists. func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) { _, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() + if err == nil { + return true, nil + } + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, nil + } return false, err } -//translate from what K8s supports to what the cloud provider supports for session affinity. +func isHTTPErrorCode(err error, code int) bool { + apiErr, ok := err.(*googleapi.Error) + return ok && apiErr.Code == code +} + +// translate from what K8s supports to what the cloud provider supports for session affinity. func translateAffinityType(affinityType api.AffinityType) GCEAffinityType { switch affinityType { case api.AffinityTypeClientIP: @@ -253,10 +270,15 @@ func translateAffinityType(affinityType api.AffinityType) GCEAffinityType { } // CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're +// owned by the project and available to be used, and use them if they are. func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) { - pool, err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) + err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) if err != nil { - return "", err + if !isHTTPErrorCode(err, http.StatusConflict) { + return "", err + } + glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err) } if len(ports) == 0 { @@ -276,18 +298,17 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I Name: name, IPProtocol: "TCP", PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), - Target: pool, - } - if len(externalIP) > 0 { - req.IPAddress = externalIP.String() + Target: gce.targetPoolURL(name, region), } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() - if err != nil { + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { return "", err } - err = gce.waitForRegionOp(op, region) - if err != nil { - return "", err + if op != nil { + err = gce.waitForRegionOp(op, region) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return "", err + } } fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() if err != nil { @@ -343,23 +364,28 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) // DeleteTCPLoadBalancer is an implementation of TCPLoadBalancer.DeleteTCPLoadBalancer. func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error { op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() - if err != nil { - glog.Warningln("Failed to delete Forwarding Rules %s: got error %s. Trying to delete Target Pool", name, err.Error()) + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name) + } else if err != nil { + glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error()) return err } else { err = gce.waitForRegionOp(op, region) if err != nil { - glog.Warningln("Failed waiting for Forwarding Rule %s to be deleted: got error %s. Trying to delete Target Pool", name, err.Error()) + glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error()) } } op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() - if err != nil { - glog.Warningln("Failed to delete Target Pool %s, got error %s.", name, err.Error()) + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Target pool %s already deleted.", name) + return nil + } else if err != nil { + glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error()) return err } err = gce.waitForRegionOp(op, region) if err != nil { - glog.Warningln("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) + glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) } return err } diff --git a/pkg/cloudprovider/servicecontroller/doc.go b/pkg/cloudprovider/servicecontroller/doc.go new file mode 100644 index 00000000000..697b65d991f --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package servicecontroller contains code for syncing cloud load balancers +// with the service registry. +package servicecontroller diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go new file mode 100644 index 00000000000..d77e6241d3e --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -0,0 +1,434 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicecontroller + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/golang/glog" +) + +const ( + clientRetryCount = 5 + clientRetryInterval = 5 * time.Second + + retryable = true + notRetryable = false +) + +type ServiceController struct { + cloud cloudprovider.Interface + kubeClient client.Interface + clusterName string + balancer cloudprovider.TCPLoadBalancer + zone cloudprovider.Zone + mu sync.Mutex // protects serviceMap + serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc +} + +// New returns a new service controller to keep cloud provider service resources +// (like external load balancers) in sync with the registry. +func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName string) *ServiceController { + return &ServiceController{ + cloud: cloud, + kubeClient: kubeClient, + clusterName: clusterName, + serviceMap: make(map[string]*api.Service), + } +} + +// Run starts a background goroutine that watches for changes to services that +// have (or had) externalLoadBalancers=true and ensures that they have external +// load balancers created and deleted appropriately. +func (s *ServiceController) Run() error { + if err := s.init(); err != nil { + return err + } + + // We have to make this check beecause the ListWatch that we use in + // WatchServices requires Client functions that aren't in the interface + // for some reason. + if _, ok := s.kubeClient.(*client.Client); !ok { + return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.") + } + + go s.watchServices() + return nil +} + +func (s *ServiceController) init() error { + if s.cloud == nil { + return fmt.Errorf("ServiceController should not be run without a cloudprovider.") + } + + balancer, ok := s.cloud.TCPLoadBalancer() + if !ok { + return fmt.Errorf("the cloud provider does not support external TCP load balancers.") + } + s.balancer = balancer + + zones, ok := s.cloud.Zones() + if !ok { + return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating external load balancers.") + } + zone, err := zones.GetZone() + if err != nil { + return fmt.Errorf("failed to get zone from cloud provider, will not be able to create external load balancers: %v", err) + } + s.zone = zone + return nil +} + +func (s *ServiceController) watchServices() { + // Get the currently existing set of services and then all future creates + // and updates of services. + // TODO: Add a compressor that intelligently squashes together updates? + keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() }) + serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister) + lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) + cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() + // TODO: Add proper retries rather than just re-adding to the queue? + for { + newItem := serviceQueue.Pop() + deltas, ok := newItem.(cache.Deltas) + if !ok { + glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem) + } + delta := deltas.Newest() + if delta == nil { + glog.Errorf("Received nil delta from watcher queue.") + continue + } + err, shouldRetry := s.processDelta(delta) + if shouldRetry { + // Add the failed service back to the queue so we'll retry it. + glog.Errorf("Failed to process service delta. Retrying: %v", err) + time.Sleep(5 * time.Second) + serviceQueue.AddIfNotPresent(deltas) + } else if err != nil { + glog.Errorf("Failed to process service delta. Not retrying: %v", err) + } + } +} + +// Returns an error if processing the delta failed, along with a boolean +// indicator of whether the processing should be retried. +func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { + service, ok := delta.Object.(*api.Service) + if !ok { + // If the DeltaFIFO saw a key in our cache that it didn't know about, it + // can send a deletion with an unknown state. Grab the service from our + // cache for deleting. + key, ok := delta.Object.(cache.DeletedFinalStateUnknown) + if !ok { + return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable + } + service, ok = s.getService(key.Key) + if !ok { + return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable + } + delta.Object = service + } + glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) + + // TODO: Make this more parallel. The only things that need to serialized + // are changes to services with the same namespace and name. + // TODO: Handle added, updated, and sync differently? + switch delta.Type { + case cache.Added: + fallthrough + case cache.Updated: + fallthrough + case cache.Sync: + return s.createLoadBalancerIfNeeded(service) + case cache.Deleted: + return s.handleDelete(service) + default: + glog.Errorf("Unexpected delta type: %v", delta.Type) + } + return nil, notRetryable +} + +// Returns whatever error occurred along with a boolean indicator of whether it +// should be retried. +func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) { + namespacedName, err := cache.MetaNamespaceKeyFunc(service) + if err != nil { + return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable + } + + cachedService, cached := s.getService(namespacedName) + if cached && !needsUpdate(cachedService, service) { + glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) + return nil, notRetryable + } + if cached { + // If the service already exists but needs to be updated, delete it so that + // we can recreate it cleanly. + if cachedService.Spec.CreateExternalLoadBalancer { + glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName) + if err := s.ensureLBDeleted(cachedService); err != nil { + return err, retryable + } + } + } else { + // If we don't have any cached memory of the load balancer and it already + // exists, optimistically consider our work done. + // TODO: If we could read the spec of the existing load balancer, we could + // determine if an update is necessary. + exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region) + if err != nil { + return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable + } + if exists && len(service.Spec.PublicIPs) == 0 { + // The load balancer exists, but we apparently don't know about its public + // IPs, so just delete it and recreate it to get back to a sane state. + // TODO: Ideally the cloud provider interface would return the IP for us. + glog.Infof("Deleting old LB for service with no public IPs %s", namespacedName) + if err := s.ensureLBDeleted(service); err != nil { + return err, retryable + } + } else if exists { + // TODO: Better handle updates for non-cached services, this is optimistic. + glog.Infof("LB already exists for service %s", namespacedName) + return nil, notRetryable + } + } + + if !service.Spec.CreateExternalLoadBalancer { + glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName) + return nil, notRetryable + } + + glog.V(2).Infof("Creating LB for service %s", namespacedName) + + // The load balancer doesn't exist yet, so create it. + publicIPstring := fmt.Sprint(service.Spec.PublicIPs) + err = s.createExternalLoadBalancer(service) + if err != nil { + return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable + } + + if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) { + glog.Infof("Not persisting unchanged service to registry.") + return nil, notRetryable + } + s.setService(namespacedName, service) + + // If creating the load balancer succeeded, persist the updated service. + if err = s.persistUpdate(service); err != nil { + return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable + } + return nil, notRetryable +} + +// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or +// the object having been deleted. +func (s *ServiceController) persistUpdate(service *api.Service) error { + var err error + for i := 0; i < clientRetryCount; i++ { + _, err = s.kubeClient.Services(service.Namespace).Update(service) + if err == nil { + return nil + } + glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v", + service.Name, err) + time.Sleep(clientRetryInterval) + } + return err +} + +func (s *ServiceController) createExternalLoadBalancer(service *api.Service) error { + ports, err := getTCPPorts(service) + if err != nil { + return err + } + nodes, err := s.kubeClient.Nodes().List(labels.Everything()) + if err != nil { + return err + } + name := s.loadBalancerName(service) + if len(service.Spec.PublicIPs) > 0 { + for _, publicIP := range service.Spec.PublicIPs { + // TODO: Make this actually work for multiple IPs by using different + // names for each. For now, we'll just create the first and break. + endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), + ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + if err != nil { + return err + } + service.Spec.PublicIPs = []string{endpoint} + break + } + } else { + endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, + ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + if err != nil { + return err + } + service.Spec.PublicIPs = []string{endpoint} + } + return nil +} + +// Returns whatever error occurred along with a boolean indicator of whether it +// should be retried. +func (s *ServiceController) handleDelete(service *api.Service) (error, bool) { + if err := s.ensureLBDeleted(service); err != nil { + return err, retryable + } + namespacedName, err := cache.MetaNamespaceKeyFunc(service) + if err != nil { + // This is panic-worthy, since the queue shouldn't have been able to + // handle the service if it couldn't generate a name for it. + return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable + } + s.deleteService(namespacedName) + return nil, notRetryable +} + +// Ensures that the load balancer associated with the given service is deleted, +// doing the deletion if necessary. +func (s *ServiceController) ensureLBDeleted(service *api.Service) error { + // This is only needed because not all delete load balancer implementations + // are currently idempotent to the LB not existing. + if exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region); err != nil { + return err + } else if !exists { + return nil + } + + if err := s.balancer.DeleteTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region); err != nil { + return err + } + return nil +} + +// listKeys implements the interface required by DeltaFIFO to list the keys we +// already know about. +func (s *ServiceController) listKeys() []string { + s.mu.Lock() + defer s.mu.Unlock() + keys := make([]string, 0, len(s.serviceMap)) + for k := range s.serviceMap { + keys = append(keys, k) + } + return keys +} + +func (s *ServiceController) getService(serviceName string) (*api.Service, bool) { + s.mu.Lock() + defer s.mu.Unlock() + info, ok := s.serviceMap[serviceName] + return info, ok +} + +func (s *ServiceController) setService(serviceName string, info *api.Service) { + s.mu.Lock() + defer s.mu.Unlock() + s.serviceMap[serviceName] = info +} + +func (s *ServiceController) deleteService(serviceName string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.serviceMap, serviceName) +} + +func needsUpdate(oldService *api.Service, newService *api.Service) bool { + if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer { + return false + } + if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer { + return true + } + if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { + return true + } + if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) { + return true + } + for i := range oldService.Spec.PublicIPs { + if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] { + return true + } + } + return false +} + +// TODO: Use a shorter name that's less likely to be longer than cloud +// providers' length limits. +func (s *ServiceController) loadBalancerName(service *api.Service) string { + return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) +} + +// TODO: Deduplicate this with the copy in pkg/registry/service/rest.go. +func getTCPPorts(service *api.Service) ([]int, error) { + ports := []int{} + for i := range service.Spec.Ports { + // TODO: Support UDP. Remove the check from the API validation package once + // it's supported. + sp := &service.Spec.Ports[i] + if sp.Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") + } + ports = append(ports, sp.Port) + } + return ports, nil +} + +func portsEqual(x, y *api.Service) bool { + xPorts, err := getTCPPorts(x) + if err != nil { + return false + } + yPorts, err := getTCPPorts(y) + if err != nil { + return false + } + if len(xPorts) != len(yPorts) { + return false + } + // Use a map for comparison since port slices aren't necessarily sorted. + xPortMap := make(map[int]bool) + for _, xPort := range xPorts { + xPortMap[xPort] = true + } + for _, yPort := range yPorts { + if !xPortMap[yPort] { + return false + } + } + return true +} + +func hostsFromNodeList(list *api.NodeList) []string { + result := make([]string, len(list.Items)) + for ix := range list.Items { + result[ix] = list.Items[ix].Name + } + return result +} diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go new file mode 100644 index 00000000000..73aa5ff08b4 --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicecontroller + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" +) + +const region = "us-central" + +func TestCreateExternalLoadBalancer(t *testing.T) { + table := []struct { + service *api.Service + expectErr bool + expectCreateAttempt bool + }{ + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "no-external-balancer", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + CreateExternalLoadBalancer: false, + }, + }, + expectErr: false, + expectCreateAttempt: false, + }, + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "udp-service", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: 80, + Protocol: api.ProtocolUDP, + }}, + CreateExternalLoadBalancer: true, + }, + }, + expectErr: true, + expectCreateAttempt: false, + }, + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "basic-service1", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: 80, + Protocol: api.ProtocolTCP, + }}, + CreateExternalLoadBalancer: true, + }, + }, + expectErr: false, + expectCreateAttempt: true, + }, + } + + for _, item := range table { + cloud := &fake_cloud.FakeCloud{} + cloud.Region = region + client := &testclient.Fake{} + controller := New(cloud, client, "test-cluster") + controller.init() + cloud.Calls = nil // ignore any cloud calls made in init() + client.Actions = nil // ignore any client calls made in init() + err, _ := controller.createLoadBalancerIfNeeded(item.service) + if !item.expectErr && err != nil { + t.Errorf("unexpected error: %v", err) + } else if item.expectErr && err == nil { + t.Errorf("expected error creating %v, got nil", item.service) + } + if !item.expectCreateAttempt { + if len(cloud.Calls) > 0 { + t.Errorf("unexpected cloud provider calls: %v", cloud.Calls) + } + if len(client.Actions) > 0 { + t.Errorf("unexpected client actions: %v", client.Actions) + } + } else { + if len(cloud.Balancers) != 1 { + t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) + } else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) || + cloud.Balancers[0].Region != region || + cloud.Balancers[0].Ports[0] != item.service.Spec.Ports[0].Port { + t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0]) + } + actionFound := false + for _, action := range client.Actions { + if action.Action == "update-service" { + actionFound = true + } + } + if !actionFound { + t.Errorf("expected updated service to be sent to client, got these actions instead: %v", client.Actions) + } + } + } +} + +// TODO(a-robinson): Add tests for update/sync/delete. diff --git a/pkg/registry/service/doc.go b/pkg/registry/service/doc.go index bd503f2b0e1..1022a6e73c7 100644 --- a/pkg/registry/service/doc.go +++ b/pkg/registry/service/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package service provides Registry interface and it's RESTStorage +// Package service provides the Registry interface and its RESTStorage // implementation for storing Service api objects. package service diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index f84e53ef6ad..2402f31d3e7 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -42,7 +42,8 @@ import ( // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry + registry Registry + // TODO(a-robinson): Remove cloud cloud cloudprovider.Interface machines minion.Registry endpoints endpoint.Registry @@ -97,6 +98,14 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return nil, err } + // Make sure that we'll be able to create a load balancer for the service, + // even though it'll be created by the ServiceController. + if service.Spec.CreateExternalLoadBalancer { + if _, err := getTCPPorts(service); err != nil { + return nil, err + } + } + releaseServiceIP := false defer func() { if releaseServiceIP { @@ -123,15 +132,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err releaseServiceIP = true } - // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if service.Spec.CreateExternalLoadBalancer { - err := rs.createExternalLoadBalancer(ctx, service) - if err != nil { - return nil, err - } - } - out, err := rs.registry.CreateService(ctx, service) if err != nil { err = rest.CheckGeneratedNameError(rest.Services, err, service) @@ -144,14 +144,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return out, err } -func hostsFromMinionList(list *api.NodeList) []string { - result := make([]string, len(list.Items)) - for ix := range list.Items { - result[ix] = list.Items[ix].Name - } - return result -} - func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id) if err != nil { @@ -160,9 +152,6 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { if api.IsServiceIPSet(service) { rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) } - if service.Spec.CreateExternalLoadBalancer { - rs.deleteExternalLoadBalancer(ctx, service) - } return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) } @@ -219,22 +208,6 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 { return nil, false, errors.NewInvalid("service", service.Name, errs) } - // Recreate external load balancer if changed. - if externalLoadBalancerNeedsUpdate(oldService, service) { - // TODO: support updating existing balancers - if oldService.Spec.CreateExternalLoadBalancer { - err = rs.deleteExternalLoadBalancer(ctx, oldService) - if err != nil { - return nil, false, err - } - } - if service.Spec.CreateExternalLoadBalancer { - err = rs.createExternalLoadBalancer(ctx, service) - if err != nil { - return nil, false, err - } - } - } out, err := rs.registry.UpdateService(ctx, service) return out, false, err } @@ -283,52 +256,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou return nil, nil, fmt.Errorf("no endpoints available for %q", id) } -func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { - if rs.cloud == nil { - return fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - - ports, err := getTCPPorts(service) - if err != nil { - return err - } - - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - return fmt.Errorf("the cloud provider does not support external TCP load balancers.") - } - zones, ok := rs.cloud.Zones() - if !ok { - return fmt.Errorf("the cloud provider does not support zone enumeration.") - } - hosts, err := rs.machines.ListMinions(ctx, labels.Everything(), fields.Everything()) - if err != nil { - return err - } - zone, err := zones.GetZone() - if err != nil { - return err - } - name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) - var affinityType api.AffinityType = service.Spec.SessionAffinity - if len(service.Spec.PublicIPs) > 0 { - for _, publicIP := range service.Spec.PublicIPs { - _, err = balancer.CreateTCPLoadBalancer(name, zone.Region, net.ParseIP(publicIP), ports, hostsFromMinionList(hosts), affinityType) - if err != nil { - // TODO: have to roll-back any successful calls. - return err - } - } - } else { - endpoint, err := balancer.CreateTCPLoadBalancer(name, zone.Region, nil, ports, hostsFromMinionList(hosts), affinityType) - if err != nil { - return err - } - service.Spec.PublicIPs = []string{endpoint} - } - return nil -} - +// TODO: Deduplicate with the copy of this in pkg/registry/service/rest.go func getTCPPorts(service *api.Service) ([]int, error) { ports := []int{} for i := range service.Spec.Ports { @@ -341,71 +269,3 @@ func getTCPPorts(service *api.Service) ([]int, error) { } return ports, nil } - -func portsEqual(x, y *api.Service) bool { - xPorts, err := getTCPPorts(x) - if err != nil { - return false - } - yPorts, err := getTCPPorts(y) - if err != nil { - return false - } - if len(xPorts) != len(yPorts) { - return false - } - for i := range xPorts { - if xPorts[i] != yPorts[i] { - return false - } - } - return true -} - -func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service) error { - if rs.cloud == nil { - return fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - zones, ok := rs.cloud.Zones() - if !ok { - // We failed to get zone enumerator. - // As this should have failed when we tried in "create" too, - // assume external load balancer was never created. - return nil - } - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - // See comment above. - return nil - } - zone, err := zones.GetZone() - if err != nil { - return err - } - name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) - if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil { - return err - } - return nil -} - -func externalLoadBalancerNeedsUpdate(oldService, newService *api.Service) bool { - if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer { - return false - } - if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer { - return true - } - if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { - return true - } - if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) { - return true - } - for i := range oldService.Spec.PublicIPs { - if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] { - return true - } - } - return false -} diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index b96c17c24e2..cbf3f29f87c 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -19,7 +19,6 @@ package service import ( "bytes" "encoding/gob" - "fmt" "net" "strings" "testing" @@ -257,10 +256,11 @@ func TestServiceRegistryExternalService(t *testing.T) { }}, }, } - if _, err := storage.Create(ctx, svc); err != nil { - t.Fatalf("Unexpected error: %v", err) + _, err := storage.Create(ctx, svc) + if err != nil { + t.Errorf("Failed to create service: %#v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } srv, err := registry.GetService(ctx, svc.Name) @@ -270,38 +270,11 @@ func TestServiceRegistryExternalService(t *testing.T) { if srv == nil { t.Errorf("Failed to find service: %s", svc.Name) } - if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 { + if len(fakeCloud.Balancers) != 0 { t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) } } -func TestServiceRegistryExternalServiceError(t *testing.T) { - storage, registry, fakeCloud := NewTestREST(t, nil) - fakeCloud.Err = fmt.Errorf("test error") - svc := &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - CreateExternalLoadBalancer: true, - SessionAffinity: api.AffinityTypeNone, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - }}, - }, - } - ctx := api.NewDefaultContext() - if _, err := storage.Create(ctx, svc); err == nil { - t.Fatalf("Unexpected success") - } - if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } - if registry.Service != nil { - t.Errorf("Expected registry.CreateService to not get called, but it got %#v", registry.Service) - } -} - func TestServiceRegistryDelete(t *testing.T) { ctx := api.NewDefaultContext() storage, registry, fakeCloud := NewTestREST(t, nil) @@ -343,7 +316,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } registry.CreateService(ctx, svc) storage.Delete(ctx, svc.Name) - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } if e, a := "foo", registry.DeletedID; e != a { @@ -381,7 +354,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -391,9 +364,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" || - fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" || - fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } } @@ -423,7 +394,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, err := storage.Create(ctx, svc1); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -433,9 +404,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" || - fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" || - fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } } @@ -744,7 +713,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { if err != nil { t.Errorf("Unexpected error %v", err) } - if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 { + if len(fakeCloud.Balancers) != 0 { t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) } } diff --git a/test/e2e/service.go b/test/e2e/service.go index 2cce8805e79..d98efe50d3e 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -286,6 +286,15 @@ var _ = Describe("Services", func() { Expect(err).NotTo(HaveOccurred()) }(ns, serviceName) + // Wait for the load balancer to be created asynchronously, which is + // (unfortunately) currently indicated by a public IP address being + // added to the spec. + for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) { + result, _ = c.Services(ns).Get(serviceName) + if len(result.Spec.PublicIPs) == 1 { + break + } + } if len(result.Spec.PublicIPs) != 1 { Failf("got unexpected number (%d) of public IPs for externally load balanced service: %v", result.Spec.PublicIPs, result) } @@ -325,7 +334,7 @@ var _ = Describe("Services", func() { By("hitting the pod through the service's external load balancer") var resp *http.Response - for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) { + for t := time.Now(); time.Since(t) < time.Minute; time.Sleep(5 * time.Second) { resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port)) if err == nil { break