CCM [Service controller]: align function pointer variable with struct name

This commit is contained in:
Alexander Constantinescu 2022-07-29 13:22:42 +02:00
parent 95303390ac
commit dd9623cbea

View File

@ -196,22 +196,22 @@ func New(
} }
// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. // needFullSyncAndUnmark returns the value and needFullSync and marks the field to false.
func (s *Controller) needFullSyncAndUnmark() bool { func (c *Controller) needFullSyncAndUnmark() bool {
s.nodeSyncLock.Lock() c.nodeSyncLock.Lock()
defer s.nodeSyncLock.Unlock() defer c.nodeSyncLock.Unlock()
ret := s.needFullSync ret := c.needFullSync
s.needFullSync = false c.needFullSync = false
return ret return ret
} }
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (s *Controller) enqueueService(obj interface{}) { func (c *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return return
} }
s.queue.Add(key) c.queue.Add(key)
} }
// Run starts a background goroutine that watches for changes to services that // Run starts a background goroutine that watches for changes to services that
@ -224,54 +224,54 @@ func (s *Controller) enqueueService(obj interface{}) {
// //
// It's an error to call Run() more than once for a given ServiceController // It's an error to call Run() more than once for a given ServiceController
// object. // object.
func (s *Controller) Run(ctx context.Context, workers int) { func (c *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer s.queue.ShutDown() defer c.queue.ShutDown()
// Start event processing pipeline. // Start event processing pipeline.
s.eventBroadcaster.StartStructuredLogging(0) c.eventBroadcaster.StartStructuredLogging(0)
s.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.kubeClient.CoreV1().Events("")}) c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
defer s.eventBroadcaster.Shutdown() defer c.eventBroadcaster.Shutdown()
klog.Info("Starting service controller") klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller") defer klog.Info("Shutting down service controller")
if !cache.WaitForNamedCacheSync("service", ctx.Done(), s.serviceListerSynced, s.nodeListerSynced) { if !cache.WaitForNamedCacheSync("service", ctx.Done(), c.serviceListerSynced, c.nodeListerSynced) {
return return
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, s.worker, time.Second) go wait.UntilWithContext(ctx, c.worker, time.Second)
} }
go s.nodeSyncLoop(ctx, workers) go c.nodeSyncLoop(ctx, workers)
go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done())
<-ctx.Done() <-ctx.Done()
} }
// triggerNodeSync triggers a nodeSync asynchronously // triggerNodeSync triggers a nodeSync asynchronously
func (s *Controller) triggerNodeSync() { func (c *Controller) triggerNodeSync() {
s.nodeSyncLock.Lock() c.nodeSyncLock.Lock()
defer s.nodeSyncLock.Unlock() defer c.nodeSyncLock.Unlock()
newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
// if node list cannot be retrieve, trigger full node sync to be safe. // if node list cannot be retrieve, trigger full node sync to be safe.
s.needFullSync = true c.needFullSync = true
} else if !nodeSlicesEqualForLB(newHosts, s.knownHosts) { } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) {
// Here the last known state is recorded as knownHosts. For each // Here the last known state is recorded as knownHosts. For each
// LB update, the latest node list is retrieved. This is to prevent // LB update, the latest node list is retrieved. This is to prevent
// a stale set of nodes were used to be update loadbalancers when // a stale set of nodes were used to be update loadbalancers when
// there are many loadbalancers in the clusters. nodeSyncInternal // there are many loadbalancers in the clusters. nodeSyncInternal
// would be triggered until all loadbalancers are updated to the new state. // would be triggered until all loadbalancers are updated to the new state.
klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services")
s.needFullSync = true c.needFullSync = true
s.knownHosts = newHosts c.knownHosts = newHosts
} }
select { select {
case s.nodeSyncCh <- struct{}{}: case c.nodeSyncCh <- struct{}{}:
klog.V(4).Info("Triggering nodeSync") klog.V(4).Info("Triggering nodeSync")
return return
default: default:
@ -282,82 +282,82 @@ func (s *Controller) triggerNodeSync() {
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *Controller) worker(ctx context.Context) { func (c *Controller) worker(ctx context.Context) {
for s.processNextWorkItem(ctx) { for c.processNextWorkItem(ctx) {
} }
} }
// nodeSyncLoop takes nodeSync signal and triggers nodeSync // nodeSyncLoop takes nodeSync signal and triggers nodeSync
func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) { func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) {
klog.V(4).Info("nodeSyncLoop Started") klog.V(4).Info("nodeSyncLoop Started")
for { for {
select { select {
case <-s.nodeSyncCh: case <-c.nodeSyncCh:
klog.V(4).Info("nodeSync has been triggered") klog.V(4).Info("nodeSync has been triggered")
s.nodeSyncInternal(ctx, workers) c.nodeSyncInternal(ctx, workers)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (s *Controller) processNextWorkItem(ctx context.Context) bool { func (c *Controller) processNextWorkItem(ctx context.Context) bool {
key, quit := s.queue.Get() key, quit := c.queue.Get()
if quit { if quit {
return false return false
} }
defer s.queue.Done(key) defer c.queue.Done(key)
err := s.syncService(ctx, key.(string)) err := c.syncService(ctx, key.(string))
if err == nil { if err == nil {
s.queue.Forget(key) c.queue.Forget(key)
return true return true
} }
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
s.queue.AddRateLimited(key) c.queue.AddRateLimited(key)
return true return true
} }
func (s *Controller) init() error { func (c *Controller) init() error {
if s.cloud == nil { if c.cloud == nil {
return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail") return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
} }
balancer, ok := s.cloud.LoadBalancer() balancer, ok := c.cloud.LoadBalancer()
if !ok { if !ok {
return fmt.Errorf("the cloud provider does not support external load balancers") return fmt.Errorf("the cloud provider does not support external load balancers")
} }
s.balancer = balancer c.balancer = balancer
return nil return nil
} }
// processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly. // processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
// Returns an error if processing the service update failed. // Returns an error if processing the service update failed.
func (s *Controller) processServiceCreateOrUpdate(ctx context.Context, service *v1.Service, key string) error { func (c *Controller) processServiceCreateOrUpdate(ctx context.Context, service *v1.Service, key string) error {
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
// path. Ref https://github.com/kubernetes/enhancements/issues/980. // path. Ref https://github.com/kubernetes/enhancements/issues/980.
cachedService := s.cache.getOrCreate(key) cachedService := c.cache.getOrCreate(key)
if cachedService.state != nil && cachedService.state.UID != service.UID { if cachedService.state != nil && cachedService.state.UID != service.UID {
// This happens only when a service is deleted and re-created // This happens only when a service is deleted and re-created
// in a short period, which is only possible when it doesn't // in a short period, which is only possible when it doesn't
// contain finalizer. // contain finalizer.
if err := s.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil {
return err return err
} }
} }
// Always cache the service, we need the info for service deletion in case // Always cache the service, we need the info for service deletion in case
// when load balancer cleanup is not handled via finalizer. // when load balancer cleanup is not handled via finalizer.
cachedService.state = service cachedService.state = service
op, err := s.syncLoadBalancerIfNeeded(ctx, service, key) op, err := c.syncLoadBalancerIfNeeded(ctx, service, key)
if err != nil { if err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err) c.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
return err return err
} }
if op == deleteLoadBalancer { if op == deleteLoadBalancer {
// Only delete the cache upon successful load balancer deletion. // Only delete the cache upon successful load balancer deletion.
s.cache.delete(key) c.cache.delete(key)
} }
return nil return nil
@ -373,7 +373,7 @@ const (
// syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer // syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service // i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
// doesn't want a loadbalancer no more. Returns whatever error occurred. // doesn't want a loadbalancer no more. Returns whatever error occurred.
func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.Service, key string) (loadBalancerOperation, error) { func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.Service, key string) (loadBalancerOperation, error) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events. // which may involve service interruption. Also, we would like user-friendly events.
@ -387,16 +387,16 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
// Delete the load balancer if service no longer wants one, or if service needs cleanup. // Delete the load balancer if service no longer wants one, or if service needs cleanup.
op = deleteLoadBalancer op = deleteLoadBalancer
newStatus = &v1.LoadBalancerStatus{} newStatus = &v1.LoadBalancerStatus{}
_, exists, err := s.balancer.GetLoadBalancer(ctx, s.clusterName, service) _, exists, err := c.balancer.GetLoadBalancer(ctx, c.clusterName, service)
if err != nil { if err != nil {
return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err) return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
} }
if exists { if exists {
klog.V(2).Infof("Deleting existing load balancer for service %s", key) klog.V(2).Infof("Deleting existing load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(ctx, s.clusterName, service); err != nil { if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil {
if err == cloudprovider.ImplementedElsewhere { if err == cloudprovider.ImplementedElsewhere {
klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error on deletion", key, s.cloud.ProviderName()) klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error on deletion", key, c.cloud.ProviderName())
} else { } else {
return op, fmt.Errorf("failed to delete load balancer: %v", err) return op, fmt.Errorf("failed to delete load balancer: %v", err)
} }
@ -404,27 +404,27 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
} }
// Always remove finalizer when load balancer is deleted, this ensures Services // Always remove finalizer when load balancer is deleted, this ensures Services
// can be deleted after all corresponding load balancer resources are deleted. // can be deleted after all corresponding load balancer resources are deleted.
if err := s.removeFinalizer(service); err != nil { if err := c.removeFinalizer(service); err != nil {
return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err) return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
} }
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
} else { } else {
// Create or update the load balancer if service wants one. // Create or update the load balancer if service wants one.
op = ensureLoadBalancer op = ensureLoadBalancer
klog.V(2).Infof("Ensuring load balancer for service %s", key) klog.V(2).Infof("Ensuring load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
// Always add a finalizer prior to creating load balancers, this ensures Services // Always add a finalizer prior to creating load balancers, this ensures Services
// can't be deleted until all corresponding load balancer resources are also deleted. // can't be deleted until all corresponding load balancer resources are also deleted.
if err := s.addFinalizer(service); err != nil { if err := c.addFinalizer(service); err != nil {
return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err) return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
} }
newStatus, err = s.ensureLoadBalancer(ctx, service) newStatus, err = c.ensureLoadBalancer(ctx, service)
if err != nil { if err != nil {
if err == cloudprovider.ImplementedElsewhere { if err == cloudprovider.ImplementedElsewhere {
// ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the // ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the
// functionality is implemented by a different controller. In this case, we // functionality is implemented by a different controller. In this case, we
// return immediately without doing anything. // return immediately without doing anything.
klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, s.cloud.ProviderName()) klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName())
return op, nil return op, nil
} }
return op, fmt.Errorf("failed to ensure load balancer: %v", err) return op, fmt.Errorf("failed to ensure load balancer: %v", err)
@ -433,10 +433,10 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil") return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil")
} }
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
} }
if err := s.patchStatus(service, previousStatus, newStatus); err != nil { if err := c.patchStatus(service, previousStatus, newStatus); err != nil {
// Only retry error that isn't not found: // Only retry error that isn't not found:
// - Not found error mostly happens when service disappears right after // - Not found error mostly happens when service disappears right after
// we remove the finalizer. // we remove the finalizer.
@ -449,21 +449,21 @@ func (s *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
return op, nil return op, nil
} }
func (s *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(nodes) == 0 { if len(nodes) == 0 {
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} }
// - Only one protocol supported per service // - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return // - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols // an error for unsupported protocols
return s.balancer.EnsureLoadBalancer(ctx, s.clusterName, service, nodes) return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
} }
// ListKeys implements the interface required by DeltaFIFO to list the keys we // ListKeys implements the interface required by DeltaFIFO to list the keys we
@ -549,18 +549,18 @@ func needsCleanup(service *v1.Service) bool {
} }
// needsUpdate checks if load balancer needs to be updated due to change in attributes. // needsUpdate checks if load balancer needs to be updated due to change in attributes.
func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { func (c *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false return false
} }
if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) { if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type) oldService.Spec.Type, newService.Spec.Type)
return true return true
} }
if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) { if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v",
oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges)
return true return true
} }
@ -573,18 +573,18 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service)
return true return true
} }
if !loadBalancerIPsAreEqual(oldService, newService) { if !loadBalancerIPsAreEqual(oldService, newService) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true return true
} }
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true return true
} }
for i := range oldService.Spec.ExternalIPs { for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i]) newService.Spec.ExternalIPs[i])
return true return true
} }
@ -593,17 +593,17 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service)
return true return true
} }
if oldService.UID != newService.UID { if oldService.UID != newService.UID {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID) oldService.UID, newService.UID)
return true return true
} }
if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy { if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v",
oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy) oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy)
return true return true
} }
if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort { if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort {
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v", c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v",
oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort) oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort)
return true return true
} }
@ -679,7 +679,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
return nodeNames(x).Equal(nodeNames(y)) return nodeNames(x).Equal(nodeNames(y))
} }
func (s *Controller) getNodeConditionPredicate() NodeConditionPredicate { func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate {
return func(node *v1.Node) bool { return func(node *v1.Node) bool {
if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel { if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel {
return false return false
@ -735,7 +735,7 @@ func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
// nodeSyncInternal handles updating the hosts pointed to by all load // nodeSyncInternal handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes. // balancers whenever the set of nodes in the cluster changes.
func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) { func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
latency := time.Since(startTime).Seconds() latency := time.Since(startTime).Seconds()
@ -743,14 +743,14 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) {
nodeSyncLatency.Observe(latency) nodeSyncLatency.Observe(latency)
}() }()
if !s.needFullSyncAndUnmark() { if !c.needFullSyncAndUnmark() {
// The set of nodes in the cluster hasn't changed, but we can retry // The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around. // updating any services that we failed to update last time around.
// It is required to call `s.cache.get()` on each Service in case there was // It is required to call `c.cache.get()` on each Service in case there was
// an update event that occurred between retries. // an update event that occurred between retries.
var servicesToUpdate []*v1.Service var servicesToUpdate []*v1.Service
for key := range s.servicesToUpdate { for key := range c.servicesToUpdate {
cachedService, exist := s.cache.get(key) cachedService, exist := c.cache.get(key)
if !exist { if !exist {
klog.Errorf("Service %q should be in the cache but not", key) klog.Errorf("Service %q should be in the cache but not", key)
continue continue
@ -758,33 +758,33 @@ func (s *Controller) nodeSyncInternal(ctx context.Context, workers int) {
servicesToUpdate = append(servicesToUpdate, cachedService.state) servicesToUpdate = append(servicesToUpdate, cachedService.state)
} }
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
return return
} }
klog.V(2).Infof("Syncing backends for all LB services.") klog.V(2).Infof("Syncing backends for all LB services.")
// Try updating all services, and save the failed ones to try again next // Try updating all services, and save the failed ones to try again next
// round. // round.
servicesToUpdate := s.cache.allServices() servicesToUpdate := c.cache.allServices()
numServices := len(servicesToUpdate) numServices := len(servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices) numServices-len(c.servicesToUpdate), numServices)
} }
// nodeSyncService syncs the nodes for one load balancer type service // nodeSyncService syncs the nodes for one load balancer type service
func (s *Controller) nodeSyncService(svc *v1.Service) bool { func (c *Controller) nodeSyncService(svc *v1.Service) bool {
if svc == nil || !wantsLoadBalancer(svc) { if svc == nil || !wantsLoadBalancer(svc) {
return false return false
} }
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
return true return true
} }
if err := s.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil {
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
return true return true
} }
@ -795,14 +795,14 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool {
// updateLoadBalancerHosts updates all existing load balancers so that // updateLoadBalancerHosts updates all existing load balancers so that
// they will match the latest list of nodes with input number of workers. // they will match the latest list of nodes with input number of workers.
// Returns the list of services that couldn't be updated. // Returns the list of services that couldn't be updated.
func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
// lock for servicesToRetry // lock for servicesToRetry
servicesToRetry = sets.NewString() servicesToRetry = sets.NewString()
lock := sync.Mutex{} lock := sync.Mutex{}
doWork := func(piece int) { doWork := func(piece int) {
if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry { if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
return return
} }
lock.Lock() lock.Lock()
@ -818,7 +818,7 @@ func (s *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
// Updates the load balancer of a service, assuming we hold the mutex // Updates the load balancer of a service, assuming we hold the mutex
// associated with the service. // associated with the service.
func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { func (c *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
latency := time.Since(startTime).Seconds() latency := time.Since(startTime).Seconds()
@ -828,13 +828,13 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []
klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts)) klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts))
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts) err := c.balancer.UpdateLoadBalancer(context.TODO(), c.clusterName, service, hosts)
if err == nil { if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 { if len(hosts) == 0 {
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} else { } else {
s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts") c.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
} }
return nil return nil
} }
@ -845,13 +845,13 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []
return nil return nil
} }
// It's only an actual error if the load balancer still exists. // It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil { if _, exists, err := c.balancer.GetLoadBalancer(context.TODO(), c.clusterName, service); err != nil {
runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err)) runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
} else if !exists { } else if !exists {
return nil return nil
} }
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err) c.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err return err
} }
@ -867,7 +867,7 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
// syncService will sync the Service with the given key if it has had its expectations fulfilled, // syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key. // invoked concurrently with the same key.
func (s *Controller) syncService(ctx context.Context, key string) error { func (c *Controller) syncService(ctx context.Context, key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
@ -879,25 +879,25 @@ func (s *Controller) syncService(ctx context.Context, key string) error {
} }
// service holds the latest service info from apiserver // service holds the latest service info from apiserver
service, err := s.serviceLister.Services(namespace).Get(name) service, err := c.serviceLister.Services(namespace).Get(name)
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned // service absence in store means watcher caught the deletion, ensure LB info is cleaned
err = s.processServiceDeletion(ctx, key) err = c.processServiceDeletion(ctx, key)
case err != nil: case err != nil:
runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err)) runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
default: default:
// It is not safe to modify an object returned from an informer. // It is not safe to modify an object returned from an informer.
// As reconcilers may modify the service object we need to copy // As reconcilers may modify the service object we need to copy
// it first. // it first.
err = s.processServiceCreateOrUpdate(ctx, service.DeepCopy(), key) err = c.processServiceCreateOrUpdate(ctx, service.DeepCopy(), key)
} }
return err return err
} }
func (s *Controller) processServiceDeletion(ctx context.Context, key string) error { func (c *Controller) processServiceDeletion(ctx context.Context, key string) error {
cachedService, ok := s.cache.get(key) cachedService, ok := c.cache.get(key)
if !ok { if !ok {
// Cache does not contains the key means: // Cache does not contains the key means:
// - We didn't create a Load Balancer for the deleted service at all. // - We didn't create a Load Balancer for the deleted service at all.
@ -906,29 +906,29 @@ func (s *Controller) processServiceDeletion(ctx context.Context, key string) err
return nil return nil
} }
klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key) klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
if err := s.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil { if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil {
return err return err
} }
s.cache.delete(key) c.cache.delete(key)
return nil return nil
} }
func (s *Controller) processLoadBalancerDelete(ctx context.Context, service *v1.Service, key string) error { func (c *Controller) processLoadBalancerDelete(ctx context.Context, service *v1.Service, key string) error {
// delete load balancer info only if the service type is LoadBalancer // delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) { if !wantsLoadBalancer(service) {
return nil return nil
} }
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(ctx, s.clusterName, service); err != nil { if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) c.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
return err return err
} }
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
return nil return nil
} }
// addFinalizer patches the service to add finalizer. // addFinalizer patches the service to add finalizer.
func (s *Controller) addFinalizer(service *v1.Service) error { func (c *Controller) addFinalizer(service *v1.Service) error {
if servicehelper.HasLBFinalizer(service) { if servicehelper.HasLBFinalizer(service) {
return nil return nil
} }
@ -938,12 +938,12 @@ func (s *Controller) addFinalizer(service *v1.Service) error {
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer) updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name) klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err return err
} }
// removeFinalizer patches the service to remove finalizer. // removeFinalizer patches the service to remove finalizer.
func (s *Controller) removeFinalizer(service *v1.Service) error { func (c *Controller) removeFinalizer(service *v1.Service) error {
if !servicehelper.HasLBFinalizer(service) { if !servicehelper.HasLBFinalizer(service) {
return nil return nil
} }
@ -953,7 +953,7 @@ func (s *Controller) removeFinalizer(service *v1.Service) error {
updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer) updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name) klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err return err
} }
@ -970,7 +970,7 @@ func removeString(slice []string, s string) []string {
} }
// patchStatus patches the service with the given LoadBalancerStatus. // patchStatus patches the service with the given LoadBalancerStatus.
func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error { func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) { if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil return nil
} }
@ -979,7 +979,7 @@ func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus
updated.Status.LoadBalancer = *newStatus updated.Status.LoadBalancer = *newStatus
klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name) klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(s.kubeClient.CoreV1(), service, updated) _, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err return err
} }