|
|
|
@@ -100,9 +100,9 @@ type serviceCache struct {
|
|
|
|
|
serviceMap map[string]*cachedService
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ServiceController keeps cloud provider service resources
|
|
|
|
|
// Controller keeps cloud provider service resources
|
|
|
|
|
// (like load balancers) in sync with the registry.
|
|
|
|
|
type ServiceController struct {
|
|
|
|
|
type Controller struct {
|
|
|
|
|
cloud cloudprovider.Interface
|
|
|
|
|
knownHosts []*v1.Node
|
|
|
|
|
servicesToUpdate []*v1.Service
|
|
|
|
@@ -128,7 +128,7 @@ func New(
|
|
|
|
|
serviceInformer coreinformers.ServiceInformer,
|
|
|
|
|
nodeInformer coreinformers.NodeInformer,
|
|
|
|
|
clusterName string,
|
|
|
|
|
) (*ServiceController, error) {
|
|
|
|
|
) (*Controller, error) {
|
|
|
|
|
broadcaster := record.NewBroadcaster()
|
|
|
|
|
broadcaster.StartLogging(klog.Infof)
|
|
|
|
|
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
|
|
|
@@ -140,7 +140,7 @@ func New(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s := &ServiceController{
|
|
|
|
|
s := &Controller{
|
|
|
|
|
cloud: cloud,
|
|
|
|
|
knownHosts: []*v1.Node{},
|
|
|
|
|
kubeClient: kubeClient,
|
|
|
|
@@ -192,7 +192,7 @@ func New(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
|
|
|
|
func (s *ServiceController) enqueueService(obj interface{}) {
|
|
|
|
|
func (s *Controller) enqueueService(obj interface{}) {
|
|
|
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
|
|
|
|
if err != nil {
|
|
|
|
|
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
|
|
|
|
@@ -211,7 +211,7 @@ func (s *ServiceController) enqueueService(obj interface{}) {
|
|
|
|
|
//
|
|
|
|
|
// It's an error to call Run() more than once for a given ServiceController
|
|
|
|
|
// object.
|
|
|
|
|
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
|
|
|
|
|
func (s *Controller) Run(stopCh <-chan struct{}, workers int) {
|
|
|
|
|
defer runtime.HandleCrash()
|
|
|
|
|
defer s.queue.ShutDown()
|
|
|
|
|
|
|
|
|
@@ -233,12 +233,12 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
|
|
|
|
|
|
|
|
|
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
|
|
|
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
|
|
|
|
func (s *ServiceController) worker() {
|
|
|
|
|
func (s *Controller) worker() {
|
|
|
|
|
for s.processNextWorkItem() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServiceController) processNextWorkItem() bool {
|
|
|
|
|
func (s *Controller) processNextWorkItem() bool {
|
|
|
|
|
key, quit := s.queue.Get()
|
|
|
|
|
if quit {
|
|
|
|
|
return false
|
|
|
|
@@ -256,7 +256,7 @@ func (s *ServiceController) processNextWorkItem() bool {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServiceController) init() error {
|
|
|
|
|
func (s *Controller) init() error {
|
|
|
|
|
if s.cloud == nil {
|
|
|
|
|
return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
|
|
|
|
|
}
|
|
|
|
@@ -272,7 +272,7 @@ func (s *ServiceController) init() error {
|
|
|
|
|
|
|
|
|
|
// processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
|
|
|
|
|
// Returns an error if processing the service update failed.
|
|
|
|
|
func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, key string) error {
|
|
|
|
|
func (s *Controller) processServiceCreateOrUpdate(service *v1.Service, key string) error {
|
|
|
|
|
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
|
|
|
|
|
// path. Ref https://github.com/kubernetes/enhancements/issues/980.
|
|
|
|
|
cachedService := s.cache.getOrCreate(key)
|
|
|
|
@@ -310,7 +310,7 @@ const (
|
|
|
|
|
// syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
|
|
|
|
|
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
|
|
|
|
|
// doesn't want a loadbalancer no more. Returns whatever error occurred.
|
|
|
|
|
func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
|
|
|
|
|
func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
|
|
|
|
|
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
|
|
|
|
|
// which may involve service interruption. Also, we would like user-friendly events.
|
|
|
|
|
|
|
|
|
@@ -385,7 +385,7 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key st
|
|
|
|
|
return op, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
|
|
|
|
func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
|
|
|
|
nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
@@ -485,7 +485,7 @@ func needsCleanup(service *v1.Service) bool {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// needsUpdate checks if load balancer needs to be updated due to change in attributes.
|
|
|
|
|
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
|
|
|
|
|
func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
|
|
|
|
|
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
@@ -658,7 +658,7 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
|
|
|
|
|
|
|
|
|
|
// nodeSyncLoop handles updating the hosts pointed to by all load
|
|
|
|
|
// balancers whenever the set of nodes in the cluster changes.
|
|
|
|
|
func (s *ServiceController) nodeSyncLoop() {
|
|
|
|
|
func (s *Controller) nodeSyncLoop() {
|
|
|
|
|
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
|
|
|
|
if err != nil {
|
|
|
|
|
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
|
|
|
@@ -688,7 +688,7 @@ func (s *ServiceController) nodeSyncLoop() {
|
|
|
|
|
// updateLoadBalancerHosts updates all existing load balancers so that
|
|
|
|
|
// they will match the list of hosts provided.
|
|
|
|
|
// Returns the list of services that couldn't be updated.
|
|
|
|
|
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
|
|
|
|
func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
|
|
|
|
for _, service := range services {
|
|
|
|
|
func() {
|
|
|
|
|
if service == nil {
|
|
|
|
@@ -705,7 +705,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
|
|
|
|
|
|
|
|
|
|
// Updates the load balancer of a service, assuming we hold the mutex
|
|
|
|
|
// associated with the service.
|
|
|
|
|
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
|
|
|
|
func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
|
|
|
|
if !wantsLoadBalancer(service) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -749,7 +749,7 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
|
|
|
|
|
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
|
|
|
|
|
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
|
|
|
|
// invoked concurrently with the same key.
|
|
|
|
|
func (s *ServiceController) syncService(key string) error {
|
|
|
|
|
func (s *Controller) syncService(key string) error {
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
defer func() {
|
|
|
|
|
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
|
|
|
|
@@ -775,7 +775,7 @@ func (s *ServiceController) syncService(key string) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServiceController) processServiceDeletion(key string) error {
|
|
|
|
|
func (s *Controller) processServiceDeletion(key string) error {
|
|
|
|
|
cachedService, ok := s.cache.get(key)
|
|
|
|
|
if !ok {
|
|
|
|
|
// Cache does not contains the key means:
|
|
|
|
@@ -792,7 +792,7 @@ func (s *ServiceController) processServiceDeletion(key string) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServiceController) processLoadBalancerDelete(service *v1.Service, key string) error {
|
|
|
|
|
func (s *Controller) processLoadBalancerDelete(service *v1.Service, key string) error {
|
|
|
|
|
// delete load balancer info only if the service type is LoadBalancer
|
|
|
|
|
if !wantsLoadBalancer(service) {
|
|
|
|
|
return nil
|
|
|
|
@@ -807,7 +807,7 @@ func (s *ServiceController) processLoadBalancerDelete(service *v1.Service, key s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// addFinalizer patches the service to add finalizer.
|
|
|
|
|
func (s *ServiceController) addFinalizer(service *v1.Service) error {
|
|
|
|
|
func (s *Controller) addFinalizer(service *v1.Service) error {
|
|
|
|
|
if servicehelper.HasLBFinalizer(service) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -822,7 +822,7 @@ func (s *ServiceController) addFinalizer(service *v1.Service) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// removeFinalizer patches the service to remove finalizer.
|
|
|
|
|
func (s *ServiceController) removeFinalizer(service *v1.Service) error {
|
|
|
|
|
func (s *Controller) removeFinalizer(service *v1.Service) error {
|
|
|
|
|
if !servicehelper.HasLBFinalizer(service) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -849,11 +849,10 @@ func removeString(slice []string, s string) []string {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// patchStatus patches the service with the given LoadBalancerStatus.
|
|
|
|
|
func (s *ServiceController) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
|
|
|
|
|
func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
|
|
|
|
|
if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Make a copy so we don't mutate the shared informer cache.
|
|
|
|
|
updated := service.DeepCopy()
|
|
|
|
|
updated.Status.LoadBalancer = *newStatus
|