diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 984fe74cbfe..5265c718a46 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -196,22 +196,22 @@ func New( } // needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. -func (s *Controller) needFullSyncAndUnmark() bool { - s.nodeSyncLock.Lock() - defer s.nodeSyncLock.Unlock() - ret := s.needFullSync - s.needFullSync = false +func (c *Controller) needFullSyncAndUnmark() bool { + c.nodeSyncLock.Lock() + defer c.nodeSyncLock.Unlock() + ret := c.needFullSync + c.needFullSync = false return ret } // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. -func (s *Controller) enqueueService(obj interface{}) { +func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) return } - s.queue.Add(key) + c.queue.Add(key) } // Run starts a background goroutine that watches for changes to services that @@ -224,54 +224,54 @@ func (s *Controller) enqueueService(obj interface{}) { // // It's an error to call Run() more than once for a given ServiceController // object. -func (s *Controller) Run(ctx context.Context, workers int) { +func (c *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() - defer s.queue.ShutDown() + defer c.queue.ShutDown() // Start event processing pipeline. - s.eventBroadcaster.StartStructuredLogging(0) - s.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.kubeClient.CoreV1().Events("")}) - defer s.eventBroadcaster.Shutdown() + c.eventBroadcaster.StartStructuredLogging(0) + c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) + defer c.eventBroadcaster.Shutdown() klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") - if !cache.WaitForNamedCacheSync("service", ctx.Done(), s.serviceListerSynced, s.nodeListerSynced) { + if !cache.WaitForNamedCacheSync("service", ctx.Done(), c.serviceListerSynced, c.nodeListerSynced) { return } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, s.worker, time.Second) + go wait.UntilWithContext(ctx, c.worker, time.Second) } - go s.nodeSyncLoop(ctx, workers) - go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + go c.nodeSyncLoop(ctx, workers) + go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done()) <-ctx.Done() } // triggerNodeSync triggers a nodeSync asynchronously -func (s *Controller) triggerNodeSync() { - s.nodeSyncLock.Lock() - defer s.nodeSyncLock.Unlock() - newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) +func (c *Controller) triggerNodeSync() { + c.nodeSyncLock.Lock() + defer c.nodeSyncLock.Unlock() + newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) if err != nil { runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) // if node list cannot be retrieve, trigger full node sync to be safe. - s.needFullSync = true - } else if !nodeSlicesEqualForLB(newHosts, s.knownHosts) { + c.needFullSync = true + } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { // Here the last known state is recorded as knownHosts. For each // LB update, the latest node list is retrieved. This is to prevent // a stale set of nodes were used to be update loadbalancers when // there are many loadbalancers in the clusters. nodeSyncInternal // would be triggered until all loadbalancers are updated to the new state. klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") - s.needFullSync = true - s.knownHosts = newHosts + c.needFullSync = true + c.knownHosts = newHosts } select { - case s.nodeSyncCh <- struct{}{}: + case c.nodeSyncCh <- struct{}{}: klog.V(4).Info("Triggering nodeSync") return default: @@ -282,82 +282,82 @@ func (s *Controller) triggerNodeSync() { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (s *Controller) worker(ctx context.Context) { - for s.processNextWorkItem(ctx) { +func (c *Controller) worker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // nodeSyncLoop takes nodeSync signal and triggers nodeSync -func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) { +func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) { klog.V(4).Info("nodeSyncLoop Started") for { select { - case <-s.nodeSyncCh: + case <-c.nodeSyncCh: klog.V(4).Info("nodeSync has been triggered") - s.nodeSyncInternal(ctx, workers) + c.nodeSyncInternal(ctx, workers) case <-ctx.Done(): return } } } -func (s *Controller) processNextWorkItem(ctx context.Context) bool { - key, quit := s.queue.Get() +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() if quit { return false } - defer s.queue.Done(key) + defer c.queue.Done(key) - err := s.syncService(ctx, key.(string)) + err := c.syncService(ctx, key.(string)) if err == nil { - s.queue.Forget(key) + c.queue.Forget(key) return true } runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - s.queue.AddRateLimited(key) + c.queue.AddRateLimited(key) return true } -func (s *Controller) init() error { - if s.cloud == nil { +func (c *Controller) init() error { + if c.cloud == nil { return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail") } - balancer, ok := s.cloud.LoadBalancer() + balancer, ok := c.cloud.LoadBalancer() if !ok { return fmt.Errorf("the cloud provider does not support external load balancers") } - s.balancer = balancer + c.balancer = balancer return nil } // processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly. // Returns an error if processing the service update failed. -func (s *Controller) processServiceCreateOrUpdate(ctx context.Context, service *v1.Service, key string) error { +func (c *Controller) processServiceCreateOrUpdate(ctx context.Context, service *v1.Service, key string) error { // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion // path. Ref https://github.com/kubernetes/enhancements/issues/980. - cachedService := s.cache.getOrCreate(key) + cachedService := c.cache.getOrCreate(key) if cachedService.state != nil && cachedService.state.UID != service.UID { // This happens only when a service is deleted and re-created // in a short period, which is only possible when it doesn't // contain finalizer. - if err := s.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { + if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { return err } } // Always cache the service, we need the info for service deletion in case // when load balancer cleanup is not handled via finalizer. cachedService.state = service - op, err := s.syncLoadBalancerIfNeeded(ctx, service, key) + op, err := c.syncLoadBalancerIfNeeded(ctx, service, key) if err != nil { - s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err) + c.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err) return err } if op == deleteLoadBalancer { // Only delete the cache upon successful load balancer deletion. - s.cache.delete(key) + c.cache.delete(key) } return nil @@ -373,7 +373,7 @@ const ( // syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer // i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service // doesn't want a loadbalancer no more. Returns whatever error occurred. -func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.Service, key string) (loadBalancerOperation, error) { +func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.Service, key string) (loadBalancerOperation, error) { // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // which may involve service interruption. Also, we would like user-friendly events. @@ -387,16 +387,16 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S // Delete the load balancer if service no longer wants one, or if service needs cleanup. op = deleteLoadBalancer newStatus = &v1.LoadBalancerStatus{} - _, exists, err := s.balancer.GetLoadBalancer(ctx, s.clusterName, service) + _, exists, err := c.balancer.GetLoadBalancer(ctx, c.clusterName, service) if err != nil { return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err) } if exists { klog.V(2).Infof("Deleting existing load balancer for service %s", key) - s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - if err := s.balancer.EnsureLoadBalancerDeleted(ctx, s.clusterName, service); err != nil { + c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") + if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil { if err == cloudprovider.ImplementedElsewhere { - klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error on deletion", key, s.cloud.ProviderName()) + klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error on deletion", key, c.cloud.ProviderName()) } else { return op, fmt.Errorf("failed to delete load balancer: %v", err) } @@ -404,27 +404,27 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S } // Always remove finalizer when load balancer is deleted, this ensures Services // can be deleted after all corresponding load balancer resources are deleted. - if err := s.removeFinalizer(service); err != nil { + if err := c.removeFinalizer(service); err != nil { return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err) } - s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") + c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") } else { // Create or update the load balancer if service wants one. op = ensureLoadBalancer klog.V(2).Infof("Ensuring load balancer for service %s", key) - s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer") + c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer") // Always add a finalizer prior to creating load balancers, this ensures Services // can't be deleted until all corresponding load balancer resources are also deleted. - if err := s.addFinalizer(service); err != nil { + if err := c.addFinalizer(service); err != nil { return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err) } - newStatus, err = s.ensureLoadBalancer(ctx, service) + newStatus, err = c.ensureLoadBalancer(ctx, service) if err != nil { if err == cloudprovider.ImplementedElsewhere { // ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the // functionality is implemented by a different controller. In this case, we // return immediately without doing anything. - klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, s.cloud.ProviderName()) + klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName()) return op, nil } return op, fmt.Errorf("failed to ensure load balancer: %v", err) @@ -433,10 +433,10 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil") } - s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") + c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") } - if err := s.patchStatus(service, previousStatus, newStatus); err != nil { + if err := c.patchStatus(service, previousStatus, newStatus); err != nil { // Only retry error that isn't not found: // - Not found error mostly happens when service disappears right after // we remove the finalizer. @@ -449,21 +449,21 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S return op, nil } -func (s *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { - nodes, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) +func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { + nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) if err != nil { return nil, err } // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(nodes) == 0 { - s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") + c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - return s.balancer.EnsureLoadBalancer(ctx, s.clusterName, service, nodes) + return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) } // ListKeys implements the interface required by DeltaFIFO to list the keys we @@ -549,18 +549,18 @@ func needsCleanup(service *v1.Service) bool { } // needsUpdate checks if load balancer needs to be updated due to change in attributes. -func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { +func (c *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { return false } if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v", oldService.Spec.Type, newService.Spec.Type) return true } if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v", oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) return true } @@ -573,18 +573,18 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) return true } if !loadBalancerIPsAreEqual(oldService, newService) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v", oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) return true } if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v", len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) return true } for i := range oldService.Spec.ExternalIPs { if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v", newService.Spec.ExternalIPs[i]) return true } @@ -593,17 +593,17 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) return true } if oldService.UID != newService.UID { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v", oldService.UID, newService.UID) return true } if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v", oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy) return true } if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort { - s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v", + c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v", oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort) return true } @@ -679,7 +679,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool { return nodeNames(x).Equal(nodeNames(y)) } -func (s *Controller) getNodeConditionPredicate() NodeConditionPredicate { +func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate { return func(node *v1.Node) bool { if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel { return false @@ -735,7 +735,7 @@ func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus { // nodeSyncInternal handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. -func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) { +func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { startTime := time.Now() defer func() { latency := time.Since(startTime).Seconds() @@ -743,14 +743,14 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) { nodeSyncLatency.Observe(latency) }() - if !s.needFullSyncAndUnmark() { + if !c.needFullSyncAndUnmark() { // The set of nodes in the cluster hasn't changed, but we can retry // updating any services that we failed to update last time around. - // It is required to call `s.cache.get()` on each Service in case there was + // It is required to call `c.cache.get()` on each Service in case there was // an update event that occurred between retries. var servicesToUpdate []*v1.Service - for key := range s.servicesToUpdate { - cachedService, exist := s.cache.get(key) + for key := range c.servicesToUpdate { + cachedService, exist := c.cache.get(key) if !exist { klog.Errorf("Service %q should be in the cache but not", key) continue @@ -758,33 +758,33 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) { servicesToUpdate = append(servicesToUpdate, cachedService.state) } - s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) + c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) return } klog.V(2).Infof("Syncing backends for all LB services.") // Try updating all services, and save the failed ones to try again next // round. - servicesToUpdate := s.cache.allServices() + servicesToUpdate := c.cache.allServices() numServices := len(servicesToUpdate) - s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) + c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", - numServices-len(s.servicesToUpdate), numServices) + numServices-len(c.servicesToUpdate), numServices) } // nodeSyncService syncs the nodes for one load balancer type service -func (s *Controller) nodeSyncService(svc *v1.Service) bool { +func (c *Controller) nodeSyncService(svc *v1.Service) bool { if svc == nil || !wantsLoadBalancer(svc) { return false } klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) - hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) + hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) if err != nil { runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) return true } - if err := s.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { + if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) return true } @@ -795,14 +795,14 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool { // updateLoadBalancerHosts updates all existing load balancers so that // they will match the latest list of nodes with input number of workers. // Returns the list of services that couldn't be updated. -func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { +func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} doWork := func(piece int) { - if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -818,7 +818,7 @@ func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 // Updates the load balancer of a service, assuming we hold the mutex // associated with the service. -func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { +func (c *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { startTime := time.Now() defer func() { latency := time.Since(startTime).Seconds() @@ -828,13 +828,13 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts [] klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts)) // This operation doesn't normally take very long (and happens pretty often), so we only record the final event - err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts) + err := c.balancer.UpdateLoadBalancer(context.TODO(), c.clusterName, service, hosts) if err == nil { // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(hosts) == 0 { - s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") + c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } else { - s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") + c.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") } return nil } @@ -845,13 +845,13 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts [] return nil } // It's only an actual error if the load balancer still exists. - if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil { + if _, exists, err := c.balancer.GetLoadBalancer(context.TODO(), c.clusterName, service); err != nil { runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err)) } else if !exists { return nil } - s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err) + c.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err) return err } @@ -867,7 +867,7 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool { // syncService will sync the Service with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. -func (s *Controller) syncService(ctx context.Context, key string) error { +func (c *Controller) syncService(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime)) @@ -879,25 +879,25 @@ func (s *Controller) syncService(ctx context.Context, key string) error { } // service holds the latest service info from apiserver - service, err := s.serviceLister.Services(namespace).Get(name) + service, err := c.serviceLister.Services(namespace).Get(name) switch { case errors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned - err = s.processServiceDeletion(ctx, key) + err = c.processServiceDeletion(ctx, key) case err != nil: runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err)) default: // It is not safe to modify an object returned from an informer. // As reconcilers may modify the service object we need to copy // it first. - err = s.processServiceCreateOrUpdate(ctx, service.DeepCopy(), key) + err = c.processServiceCreateOrUpdate(ctx, service.DeepCopy(), key) } return err } -func (s *Controller) processServiceDeletion(ctx context.Context, key string) error { - cachedService, ok := s.cache.get(key) +func (c *Controller) processServiceDeletion(ctx context.Context, key string) error { + cachedService, ok := c.cache.get(key) if !ok { // Cache does not contains the key means: // - We didn't create a Load Balancer for the deleted service at all. @@ -906,29 +906,29 @@ func (s *Controller) processServiceDeletion(ctx context.Context, key string) err return nil } klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key) - if err := s.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { + if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { return err } - s.cache.delete(key) + c.cache.delete(key) return nil } -func (s *Controller) processLoadBalancerDelete(ctx context.Context, service *v1.Service, key string) error { +func (c *Controller) processLoadBalancerDelete(ctx context.Context, service *v1.Service, key string) error { // delete load balancer info only if the service type is LoadBalancer if !wantsLoadBalancer(service) { return nil } - s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - if err := s.balancer.EnsureLoadBalancerDeleted(ctx, s.clusterName, service); err != nil { - s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) + c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") + if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil { + c.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) return err } - s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") + c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") return nil } // addFinalizer patches the service to add finalizer. -func (s *Controller) addFinalizer(service *v1.Service) error { +func (c *Controller) addFinalizer(service *v1.Service) error { if servicehelper.HasLBFinalizer(service) { return nil } @@ -938,12 +938,12 @@ func (s *Controller) addFinalizer(service *v1.Service) error { updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer) klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name) - _, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) + _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated) return err } // removeFinalizer patches the service to remove finalizer. -func (s *Controller) removeFinalizer(service *v1.Service) error { +func (c *Controller) removeFinalizer(service *v1.Service) error { if !servicehelper.HasLBFinalizer(service) { return nil } @@ -953,7 +953,7 @@ func (s *Controller) removeFinalizer(service *v1.Service) error { updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer) klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name) - _, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) + _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated) return err } @@ -970,7 +970,7 @@ func removeString(slice []string, s string) []string { } // patchStatus patches the service with the given LoadBalancerStatus. -func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error { +func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error { if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) { return nil } @@ -979,7 +979,7 @@ func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus updated.Status.LoadBalancer = *newStatus klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name) - _, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) + _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated) return err }