mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 18:09:10 +00:00 
			
		
		
		
	update client-gen to use the term "internalversion" rather than "unversioned"; leave internal one unqualified; cleanup client-gen
		
			
				
	
	
		
			794 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			794 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package service
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"reflect"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 	"k8s.io/kubernetes/pkg/api"
 | |
| 	"k8s.io/kubernetes/pkg/api/errors"
 | |
| 	"k8s.io/kubernetes/pkg/client/cache"
 | |
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
 | |
| 	unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
 | |
| 	"k8s.io/kubernetes/pkg/client/record"
 | |
| 	"k8s.io/kubernetes/pkg/cloudprovider"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	"k8s.io/kubernetes/pkg/fields"
 | |
| 	pkg_runtime "k8s.io/kubernetes/pkg/runtime"
 | |
| 	"k8s.io/kubernetes/pkg/util/metrics"
 | |
| 	"k8s.io/kubernetes/pkg/util/runtime"
 | |
| 	"k8s.io/kubernetes/pkg/util/wait"
 | |
| 	"k8s.io/kubernetes/pkg/util/workqueue"
 | |
| 	"k8s.io/kubernetes/pkg/watch"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// Interval of synchoronizing service status from apiserver
 | |
| 	serviceSyncPeriod = 30 * time.Second
 | |
| 	// Interval of synchoronizing node status from apiserver
 | |
| 	nodeSyncPeriod = 100 * time.Second
 | |
| 
 | |
| 	// How long to wait before retrying the processing of a service change.
 | |
| 	// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
 | |
| 	// should be changed appropriately.
 | |
| 	minRetryDelay = 5 * time.Second
 | |
| 	maxRetryDelay = 300 * time.Second
 | |
| 
 | |
| 	clientRetryCount    = 5
 | |
| 	clientRetryInterval = 5 * time.Second
 | |
| 
 | |
| 	retryable    = true
 | |
| 	notRetryable = false
 | |
| 
 | |
| 	doNotRetry = time.Duration(0)
 | |
| )
 | |
| 
 | |
| type cachedService struct {
 | |
| 	// The cached state of the service
 | |
| 	state *api.Service
 | |
| 	// Controls error back-off
 | |
| 	lastRetryDelay time.Duration
 | |
| }
 | |
| 
 | |
| type serviceCache struct {
 | |
| 	mu         sync.Mutex // protects serviceMap
 | |
| 	serviceMap map[string]*cachedService
 | |
| }
 | |
| 
 | |
| type ServiceController struct {
 | |
| 	cloud            cloudprovider.Interface
 | |
| 	knownHosts       []string
 | |
| 	servicesToUpdate []*api.Service
 | |
| 	kubeClient       clientset.Interface
 | |
| 	clusterName      string
 | |
| 	balancer         cloudprovider.LoadBalancer
 | |
| 	zone             cloudprovider.Zone
 | |
| 	cache            *serviceCache
 | |
| 	// A store of services, populated by the serviceController
 | |
| 	serviceStore cache.StoreToServiceLister
 | |
| 	// Watches changes to all services
 | |
| 	serviceController *cache.Controller
 | |
| 	eventBroadcaster  record.EventBroadcaster
 | |
| 	eventRecorder     record.EventRecorder
 | |
| 	nodeLister        cache.StoreToNodeLister
 | |
| 	// services that need to be synced
 | |
| 	workingQueue workqueue.DelayingInterface
 | |
| }
 | |
| 
 | |
| // New returns a new service controller to keep cloud provider service resources
 | |
| // (like load balancers) in sync with the registry.
 | |
| func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
 | |
| 	broadcaster := record.NewBroadcaster()
 | |
| 	broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
 | |
| 	recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
 | |
| 
 | |
| 	if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
 | |
| 		metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().RESTClient().GetRateLimiter())
 | |
| 	}
 | |
| 
 | |
| 	s := &ServiceController{
 | |
| 		cloud:            cloud,
 | |
| 		knownHosts:       []string{},
 | |
| 		kubeClient:       kubeClient,
 | |
| 		clusterName:      clusterName,
 | |
| 		cache:            &serviceCache{serviceMap: make(map[string]*cachedService)},
 | |
| 		eventBroadcaster: broadcaster,
 | |
| 		eventRecorder:    recorder,
 | |
| 		nodeLister: cache.StoreToNodeLister{
 | |
| 			Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
 | |
| 		},
 | |
| 		workingQueue: workqueue.NewDelayingQueue(),
 | |
| 	}
 | |
| 	s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
 | |
| 		&cache.ListWatch{
 | |
| 			ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
 | |
| 				return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
 | |
| 			},
 | |
| 			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | |
| 				return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
 | |
| 			},
 | |
| 		},
 | |
| 		&api.Service{},
 | |
| 		serviceSyncPeriod,
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: s.enqueueService,
 | |
| 			UpdateFunc: func(old, cur interface{}) {
 | |
| 				oldSvc, ok1 := old.(*api.Service)
 | |
| 				curSvc, ok2 := cur.(*api.Service)
 | |
| 				if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
 | |
| 					s.enqueueService(cur)
 | |
| 				}
 | |
| 			},
 | |
| 			DeleteFunc: s.enqueueService,
 | |
| 		},
 | |
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 | |
| 	)
 | |
| 	if err := s.init(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
 | |
| func (s *ServiceController) enqueueService(obj interface{}) {
 | |
| 	key, err := controller.KeyFunc(obj)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
 | |
| 		return
 | |
| 	}
 | |
| 	s.workingQueue.Add(key)
 | |
| }
 | |
| 
 | |
| // Run starts a background goroutine that watches for changes to services that
 | |
| // have (or had) LoadBalancers=true and ensures that they have
 | |
| // load balancers created and deleted appropriately.
 | |
| // serviceSyncPeriod controls how often we check the cluster's services to
 | |
| // ensure that the correct load balancers exist.
 | |
| // nodeSyncPeriod controls how often we check the cluster's nodes to determine
 | |
| // if load balancers need to be updated to point to a new set.
 | |
| //
 | |
| // It's an error to call Run() more than once for a given ServiceController
 | |
| // object.
 | |
| func (s *ServiceController) Run(workers int) {
 | |
| 	defer runtime.HandleCrash()
 | |
| 	go s.serviceController.Run(wait.NeverStop)
 | |
| 	for i := 0; i < workers; i++ {
 | |
| 		go wait.Until(s.worker, time.Second, wait.NeverStop)
 | |
| 	}
 | |
| 	nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything())
 | |
| 	cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
 | |
| 	go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
 | |
| }
 | |
| 
 | |
| // worker runs a worker thread that just dequeues items, processes them, and marks them done.
 | |
| // It enforces that the syncHandler is never invoked concurrently with the same key.
 | |
| func (s *ServiceController) worker() {
 | |
| 	for {
 | |
| 		func() {
 | |
| 			key, quit := s.workingQueue.Get()
 | |
| 			if quit {
 | |
| 				return
 | |
| 			}
 | |
| 			defer s.workingQueue.Done(key)
 | |
| 			err := s.syncService(key.(string))
 | |
| 			if err != nil {
 | |
| 				glog.Errorf("Error syncing service: %v", err)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) init() error {
 | |
| 	if s.cloud == nil {
 | |
| 		return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail.")
 | |
| 	}
 | |
| 
 | |
| 	balancer, ok := s.cloud.LoadBalancer()
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("the cloud provider does not support external load balancers.")
 | |
| 	}
 | |
| 	s.balancer = balancer
 | |
| 
 | |
| 	zones, ok := s.cloud.Zones()
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating load balancers.")
 | |
| 	}
 | |
| 	zone, err := zones.GetZone()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get zone from cloud provider, will not be able to create load balancers: %v", err)
 | |
| 	}
 | |
| 	s.zone = zone
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Returns an error if processing the service update failed, along with a time.Duration
 | |
| // indicating whether processing should be retried; zero means no-retry; otherwise
 | |
| // we should retry in that Duration.
 | |
| func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
 | |
| 
 | |
| 	// cache the service, we need the info for service deletion
 | |
| 	cachedService.state = service
 | |
| 	err, retry := s.createLoadBalancerIfNeeded(key, service)
 | |
| 	if err != nil {
 | |
| 		message := "Error creating load balancer"
 | |
| 		if retry {
 | |
| 			message += " (will retry): "
 | |
| 		} else {
 | |
| 			message += " (will not retry): "
 | |
| 		}
 | |
| 		message += err.Error()
 | |
| 		s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
 | |
| 
 | |
| 		return err, cachedService.nextRetryDelay()
 | |
| 	}
 | |
| 	// Always update the cache upon success.
 | |
| 	// NOTE: Since we update the cached service if and only if we successfully
 | |
| 	// processed it, a cached service being nil implies that it hasn't yet
 | |
| 	// been successfully processed.
 | |
| 	s.cache.set(key, cachedService)
 | |
| 
 | |
| 	cachedService.resetRetryDelay()
 | |
| 	return nil, doNotRetry
 | |
| }
 | |
| 
 | |
| // Returns whatever error occurred along with a boolean indicator of whether it
 | |
| // should be retried.
 | |
| func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
 | |
| 
 | |
| 	// Note: It is safe to just call EnsureLoadBalancer.  But, on some clouds that requires a delete & create,
 | |
| 	// which may involve service interruption.  Also, we would like user-friendly events.
 | |
| 
 | |
| 	// Save the state so we can avoid a write if it doesn't change
 | |
| 	previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
 | |
| 
 | |
| 	if !wantsLoadBalancer(service) {
 | |
| 		needDelete := true
 | |
| 		_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("Error getting LB for service %s: %v", key, err), retryable
 | |
| 		}
 | |
| 		if !exists {
 | |
| 			needDelete = false
 | |
| 		}
 | |
| 
 | |
| 		if needDelete {
 | |
| 			glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
 | |
| 			s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
 | |
| 			if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
 | |
| 				return err, retryable
 | |
| 			}
 | |
| 			s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
 | |
| 		}
 | |
| 
 | |
| 		service.Status.LoadBalancer = api.LoadBalancerStatus{}
 | |
| 	} else {
 | |
| 		glog.V(2).Infof("Ensuring LB for service %s", key)
 | |
| 
 | |
| 		// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
 | |
| 
 | |
| 		// The load balancer doesn't exist yet, so create it.
 | |
| 		s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
 | |
| 		err := s.createLoadBalancer(service)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
 | |
| 		}
 | |
| 		s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
 | |
| 	}
 | |
| 
 | |
| 	// Write the state if changed
 | |
| 	// TODO: Be careful here ... what if there were other changes to the service?
 | |
| 	if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
 | |
| 		if err := s.persistUpdate(service); err != nil {
 | |
| 			return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
 | |
| 		}
 | |
| 	} else {
 | |
| 		glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
 | |
| 	}
 | |
| 
 | |
| 	return nil, notRetryable
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) persistUpdate(service *api.Service) error {
 | |
| 	var err error
 | |
| 	for i := 0; i < clientRetryCount; i++ {
 | |
| 		_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
 | |
| 		if err == nil {
 | |
| 			return nil
 | |
| 		}
 | |
| 		// If the object no longer exists, we don't want to recreate it. Just bail
 | |
| 		// out so that we can process the delete, which we should soon be receiving
 | |
| 		// if we haven't already.
 | |
| 		if errors.IsNotFound(err) {
 | |
| 			glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
 | |
| 				service.Namespace, service.Name, err)
 | |
| 			return nil
 | |
| 		}
 | |
| 		// TODO: Try to resolve the conflict if the change was unrelated to load
 | |
| 		// balancer status. For now, just rely on the fact that we'll
 | |
| 		// also process the update that caused the resource version to change.
 | |
| 		if errors.IsConflict(err) {
 | |
| 			glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
 | |
| 				service.Namespace, service.Name, err)
 | |
| 			return nil
 | |
| 		}
 | |
| 		glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
 | |
| 			service.Namespace, service.Name, err)
 | |
| 		time.Sleep(clientRetryInterval)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) createLoadBalancer(service *api.Service) error {
 | |
| 	nodes, err := s.nodeLister.List()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// - Only one protocol supported per service
 | |
| 	// - Not all cloud providers support all protocols and the next step is expected to return
 | |
| 	//   an error for unsupported protocols
 | |
| 	status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	} else {
 | |
| 		service.Status.LoadBalancer = *status
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ListKeys implements the interface required by DeltaFIFO to list the keys we
 | |
| // already know about.
 | |
| func (s *serviceCache) ListKeys() []string {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	keys := make([]string, 0, len(s.serviceMap))
 | |
| 	for k := range s.serviceMap {
 | |
| 		keys = append(keys, k)
 | |
| 	}
 | |
| 	return keys
 | |
| }
 | |
| 
 | |
| // GetByKey returns the value stored in the serviceMap under the given key
 | |
| func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	if v, ok := s.serviceMap[key]; ok {
 | |
| 		return v, true, nil
 | |
| 	}
 | |
| 	return nil, false, nil
 | |
| }
 | |
| 
 | |
| // ListKeys implements the interface required by DeltaFIFO to list the keys we
 | |
| // already know about.
 | |
| func (s *serviceCache) allServices() []*api.Service {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	services := make([]*api.Service, 0, len(s.serviceMap))
 | |
| 	for _, v := range s.serviceMap {
 | |
| 		services = append(services, v.state)
 | |
| 	}
 | |
| 	return services
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	service, ok := s.serviceMap[serviceName]
 | |
| 	return service, ok
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	service, ok := s.serviceMap[serviceName]
 | |
| 	if !ok {
 | |
| 		service = &cachedService{}
 | |
| 		s.serviceMap[serviceName] = service
 | |
| 	}
 | |
| 	return service
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) set(serviceName string, service *cachedService) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	s.serviceMap[serviceName] = service
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) delete(serviceName string) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	delete(s.serviceMap, serviceName)
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api.Service) bool {
 | |
| 	if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
 | |
| 		return false
 | |
| 	}
 | |
| 	if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
 | |
| 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
 | |
| 			oldService.Spec.Type, newService.Spec.Type)
 | |
| 		return true
 | |
| 	}
 | |
| 	if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
 | |
| 		return true
 | |
| 	}
 | |
| 	if !loadBalancerIPsAreEqual(oldService, newService) {
 | |
| 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
 | |
| 			oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
 | |
| 		return true
 | |
| 	}
 | |
| 	if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
 | |
| 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
 | |
| 			len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
 | |
| 		return true
 | |
| 	}
 | |
| 	for i := range oldService.Spec.ExternalIPs {
 | |
| 		if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
 | |
| 			s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
 | |
| 				newService.Spec.ExternalIPs[i])
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
 | |
| 		return true
 | |
| 	}
 | |
| 	if oldService.UID != newService.UID {
 | |
| 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
 | |
| 			oldService.UID, newService.UID)
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) loadBalancerName(service *api.Service) string {
 | |
| 	return cloudprovider.GetLoadBalancerName(service)
 | |
| }
 | |
| 
 | |
| func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
 | |
| 	var protocol api.Protocol
 | |
| 
 | |
| 	ports := []*api.ServicePort{}
 | |
| 	for i := range service.Spec.Ports {
 | |
| 		sp := &service.Spec.Ports[i]
 | |
| 		// The check on protocol was removed here.  The cloud provider itself is now responsible for all protocol validation
 | |
| 		ports = append(ports, sp)
 | |
| 		if protocol == "" {
 | |
| 			protocol = sp.Protocol
 | |
| 		} else if protocol != sp.Protocol && wantsLoadBalancer(service) {
 | |
| 			// TODO:  Convert error messages to use event recorder
 | |
| 			return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
 | |
| 		}
 | |
| 	}
 | |
| 	return ports, nil
 | |
| }
 | |
| 
 | |
| func portsEqualForLB(x, y *api.Service) bool {
 | |
| 	xPorts, err := getPortsForLB(x)
 | |
| 	if err != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	yPorts, err := getPortsForLB(y)
 | |
| 	if err != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	return portSlicesEqualForLB(xPorts, yPorts)
 | |
| }
 | |
| 
 | |
| func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
 | |
| 	if len(x) != len(y) {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	for i := range x {
 | |
| 		if !portEqualForLB(x[i], y[i]) {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func portEqualForLB(x, y *api.ServicePort) bool {
 | |
| 	// TODO: Should we check name?  (In theory, an LB could expose it)
 | |
| 	if x.Name != y.Name {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Protocol != y.Protocol {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Port != y.Port {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.NodePort != y.NodePort {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// We don't check TargetPort; that is not relevant for load balancing
 | |
| 	// TODO: Should we blank it out?  Or just check it anyway?
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func intSlicesEqual(x, y []int) bool {
 | |
| 	if len(x) != len(y) {
 | |
| 		return false
 | |
| 	}
 | |
| 	if !sort.IntsAreSorted(x) {
 | |
| 		sort.Ints(x)
 | |
| 	}
 | |
| 	if !sort.IntsAreSorted(y) {
 | |
| 		sort.Ints(y)
 | |
| 	}
 | |
| 	for i := range x {
 | |
| 		if x[i] != y[i] {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func stringSlicesEqual(x, y []string) bool {
 | |
| 	if len(x) != len(y) {
 | |
| 		return false
 | |
| 	}
 | |
| 	if !sort.StringsAreSorted(x) {
 | |
| 		sort.Strings(x)
 | |
| 	}
 | |
| 	if !sort.StringsAreSorted(y) {
 | |
| 		sort.Strings(y)
 | |
| 	}
 | |
| 	for i := range x {
 | |
| 		if x[i] != y[i] {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func includeNodeFromNodeList(node *api.Node) bool {
 | |
| 	return !node.Spec.Unschedulable
 | |
| }
 | |
| 
 | |
| func hostsFromNodeList(list *api.NodeList) []string {
 | |
| 	result := []string{}
 | |
| 	for ix := range list.Items {
 | |
| 		if includeNodeFromNodeList(&list.Items[ix]) {
 | |
| 			result = append(result, list.Items[ix].Name)
 | |
| 		}
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func hostsFromNodeSlice(nodes []*api.Node) []string {
 | |
| 	result := []string{}
 | |
| 	for _, node := range nodes {
 | |
| 		if includeNodeFromNodeList(node) {
 | |
| 			result = append(result, node.Name)
 | |
| 		}
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func getNodeConditionPredicate() cache.NodeConditionPredicate {
 | |
| 	return func(node *api.Node) bool {
 | |
| 		// We add the master to the node list, but its unschedulable.  So we use this to filter
 | |
| 		// the master.
 | |
| 		// TODO: Use a node annotation to indicate the master
 | |
| 		if node.Spec.Unschedulable {
 | |
| 			return false
 | |
| 		}
 | |
| 		// If we have no info, don't accept
 | |
| 		if len(node.Status.Conditions) == 0 {
 | |
| 			return false
 | |
| 		}
 | |
| 		for _, cond := range node.Status.Conditions {
 | |
| 			// We consider the node for load balancing only when its NodeReady condition status
 | |
| 			// is ConditionTrue
 | |
| 			if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue {
 | |
| 				glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
 | |
| 				return false
 | |
| 			}
 | |
| 		}
 | |
| 		return true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // nodeSyncLoop handles updating the hosts pointed to by all load
 | |
| // balancers whenever the set of nodes in the cluster changes.
 | |
| func (s *ServiceController) nodeSyncLoop() {
 | |
| 	nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	newHosts := hostsFromNodeSlice(nodes)
 | |
| 	if stringSlicesEqual(newHosts, s.knownHosts) {
 | |
| 		// The set of nodes in the cluster hasn't changed, but we can retry
 | |
| 		// updating any services that we failed to update last time around.
 | |
| 		s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
 | |
| 		return
 | |
| 	}
 | |
| 	glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
 | |
| 
 | |
| 	// Try updating all services, and save the ones that fail to try again next
 | |
| 	// round.
 | |
| 	s.servicesToUpdate = s.cache.allServices()
 | |
| 	numServices := len(s.servicesToUpdate)
 | |
| 	s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
 | |
| 	glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
 | |
| 		numServices-len(s.servicesToUpdate), numServices)
 | |
| 
 | |
| 	s.knownHosts = newHosts
 | |
| }
 | |
| 
 | |
| // updateLoadBalancerHosts updates all existing load balancers so that
 | |
| // they will match the list of hosts provided.
 | |
| // Returns the list of services that couldn't be updated.
 | |
| func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
 | |
| 	for _, service := range services {
 | |
| 		func() {
 | |
| 			if service == nil {
 | |
| 				return
 | |
| 			}
 | |
| 			if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
 | |
| 				glog.Errorf("External error while updating load balancer: %v.", err)
 | |
| 				servicesToRetry = append(servicesToRetry, service)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 	return servicesToRetry
 | |
| }
 | |
| 
 | |
| // Updates the load balancer of a service, assuming we hold the mutex
 | |
| // associated with the service.
 | |
| func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error {
 | |
| 	if !wantsLoadBalancer(service) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
 | |
| 	err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
 | |
| 	if err == nil {
 | |
| 		s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// It's only an actual error if the load balancer still exists.
 | |
| 	if _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service); err != nil {
 | |
| 		glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
 | |
| 	} else if !exists {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	s.eventRecorder.Eventf(service, api.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func wantsLoadBalancer(service *api.Service) bool {
 | |
| 	return service.Spec.Type == api.ServiceTypeLoadBalancer
 | |
| }
 | |
| 
 | |
| func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
 | |
| 	return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
 | |
| }
 | |
| 
 | |
| // Computes the next retry, using exponential backoff
 | |
| // mutex must be held.
 | |
| func (s *cachedService) nextRetryDelay() time.Duration {
 | |
| 	s.lastRetryDelay = s.lastRetryDelay * 2
 | |
| 	if s.lastRetryDelay < minRetryDelay {
 | |
| 		s.lastRetryDelay = minRetryDelay
 | |
| 	}
 | |
| 	if s.lastRetryDelay > maxRetryDelay {
 | |
| 		s.lastRetryDelay = maxRetryDelay
 | |
| 	}
 | |
| 	return s.lastRetryDelay
 | |
| }
 | |
| 
 | |
| // Resets the retry exponential backoff.  mutex must be held.
 | |
| func (s *cachedService) resetRetryDelay() {
 | |
| 	s.lastRetryDelay = time.Duration(0)
 | |
| }
 | |
| 
 | |
| // syncService will sync the Service with the given key if it has had its expectations fulfilled,
 | |
| // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
 | |
| // invoked concurrently with the same key.
 | |
| func (s *ServiceController) syncService(key string) error {
 | |
| 	startTime := time.Now()
 | |
| 	var cachedService *cachedService
 | |
| 	var retryDelay time.Duration
 | |
| 	defer func() {
 | |
| 		glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
 | |
| 	}()
 | |
| 	// obj holds the latest service info from apiserver
 | |
| 	obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
 | |
| 	if err != nil {
 | |
| 		glog.Infof("Unable to retrieve service %v from store: %v", key, err)
 | |
| 		s.workingQueue.Add(key)
 | |
| 		return err
 | |
| 	}
 | |
| 	if !exists {
 | |
| 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned
 | |
| 		glog.Infof("Service has been deleted %v", key)
 | |
| 		err, retryDelay = s.processServiceDeletion(key)
 | |
| 	} else {
 | |
| 		service, ok := obj.(*api.Service)
 | |
| 		if ok {
 | |
| 			cachedService = s.cache.getOrCreate(key)
 | |
| 			err, retryDelay = s.processServiceUpdate(cachedService, service, key)
 | |
| 		} else {
 | |
| 			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 			if !ok {
 | |
| 				return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
 | |
| 			}
 | |
| 			glog.Infof("Found tombstone for %v", key)
 | |
| 			err, retryDelay = s.processServiceDeletion(tombstone.Key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if retryDelay != 0 {
 | |
| 		// Add the failed service back to the queue so we'll retry it.
 | |
| 		glog.Errorf("Failed to process service. Retrying in %s: %v", retryDelay, err)
 | |
| 		go func(obj interface{}, delay time.Duration) {
 | |
| 			// put back the service key to working queue, it is possible that more entries of the service
 | |
| 			// were added into the queue during the delay, but it does not mess as when handling the retry,
 | |
| 			// it always get the last service info from service store
 | |
| 			s.workingQueue.AddAfter(obj, delay)
 | |
| 		}(key, retryDelay)
 | |
| 	} else if err != nil {
 | |
| 		runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Returns an error if processing the service deletion failed, along with a time.Duration
 | |
| // indicating whether processing should be retried; zero means no-retry; otherwise
 | |
| // we should retry after that Duration.
 | |
| func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
 | |
| 	cachedService, ok := s.cache.get(key)
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
 | |
| 	}
 | |
| 	service := cachedService.state
 | |
| 	// delete load balancer info only if the service type is LoadBalancer
 | |
| 	if !wantsLoadBalancer(service) {
 | |
| 		return nil, doNotRetry
 | |
| 	}
 | |
| 	s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
 | |
| 	err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
 | |
| 	if err != nil {
 | |
| 		message := "Error deleting load balancer (will retry): " + err.Error()
 | |
| 		s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
 | |
| 		return err, cachedService.nextRetryDelay()
 | |
| 	}
 | |
| 	s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
 | |
| 	s.cache.delete(key)
 | |
| 
 | |
| 	cachedService.resetRetryDelay()
 | |
| 	return nil, doNotRetry
 | |
| }
 |