Add Load Balancer finalizer support

- Always try to remove finalizer upon load balancer cleanup
- Add finalizer prior to load balancer creation (feature gated)
- Cache logic fix-ups
- Event type/message fix-ups
- Use runtime.HandleError() on eaten errors

Co-authored-by: Josh Horwitz <horwitzja@gmail.com>
This commit is contained in:
Zihong Zheng 2019-05-23 00:01:08 -07:00
parent e44fb7333e
commit aa3f81d657
2 changed files with 180 additions and 84 deletions

View File

@ -39,11 +39,13 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
)
const (
@ -135,15 +137,28 @@ func New(
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
AddFunc: func(cur interface{}) {
svc, ok := cur.(*v1.Service)
if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
s.enqueueService(cur)
}
},
DeleteFunc: func(old interface{}) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
// No need to handle deletion event if finalizer feature gate is
// enabled. Because the deletion would be handled by the update
// path when the deletion timestamp is added.
return
}
s.enqueueService(old)
},
},
serviceSyncPeriod,
)
@ -160,7 +175,7 @@ func New(
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
s.queue.Add(key)
@ -235,95 +250,112 @@ func (s *ServiceController) init() error {
return nil
}
// processServiceUpdate operates loadbalancers for the incoming service accordingly.
// processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
// Returns an error if processing the service update failed.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
if cachedService.state != nil {
if cachedService.state.UID != service.UID {
err := s.processLoadBalancerDelete(cachedService, key)
if err != nil {
return err
}
func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, key string) error {
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
// path. Ref https://github.com/kubernetes/enhancements/issues/980.
cachedService := s.cache.getOrCreate(key)
if cachedService.state != nil && cachedService.state.UID != service.UID {
// This happens only when a service is deleted and re-created
// in a short period, which is only possible when it doesn't
// contain finalizer.
if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
return err
}
}
// cache the service, we need the info for service deletion
// Always cache the service, we need the info for service deletion in case
// when load balancer cleanup is not handled via finalizer.
cachedService.state = service
err := s.syncLoadBalancerIfNeeded(key, service)
op, err := s.syncLoadBalancerIfNeeded(service, key)
if err != nil {
eventType := "CreatingLoadBalancerFailed"
message := "Error creating load balancer (will retry): "
if !wantsLoadBalancer(service) {
eventType = "CleanupLoadBalancerFailed"
message = "Error cleaning up load balancer (will retry): "
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, eventType, message)
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
return err
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
// processed it, a cached service being nil implies that it hasn't yet
// been successfully processed.
s.cache.set(key, cachedService)
if op == deleteLoadBalancer {
// Only delete the cache upon successful load balancer deletion.
s.cache.delete(key)
}
return nil
}
type loadBalancerOperation int
const (
deleteLoadBalancer loadBalancerOperation = iota
ensureLoadBalancer
)
// syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
// doesn't want a loadbalancer no more. Returns whatever error occurred.
func (s *ServiceController) syncLoadBalancerIfNeeded(key string, service *v1.Service) error {
func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
previousState := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newState *v1.LoadBalancerStatus
previousStatus := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newStatus *v1.LoadBalancerStatus
var op loadBalancerOperation
var err error
if !wantsLoadBalancer(service) {
if !wantsLoadBalancer(service) || needsCleanup(service) {
// Delete the load balancer if service no longer wants one, or if service needs cleanup.
op = deleteLoadBalancer
newStatus = &v1.LoadBalancerStatus{}
_, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err)
return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
}
if exists {
klog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
klog.V(2).Infof("Deleting existing load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
return err
return op, fmt.Errorf("failed to delete load balancer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
newState = &v1.LoadBalancerStatus{}
// Always try to remove finalizer when load balancer is deleted.
// It will be a no-op if finalizer does not exist.
// Note this also clears up finalizer if the cluster is downgraded
// from a version that attaches finalizer to a version that doesn't.
if err := s.removeFinalizer(service); err != nil {
return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
} else {
klog.V(2).Infof("Ensuring LB for service %s", key)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// Create or update the load balancer if service wants one.
op = ensureLoadBalancer
klog.V(2).Infof("Ensuring load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
newState, err = s.ensureLoadBalancer(service)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
// Always try to add finalizer prior to load balancer creation.
// It will be a no-op if finalizer already exists.
// Note this also retrospectively puts on finalizer if the cluster
// is upgraded from a version that doesn't attach finalizer to a
// version that does.
if err := s.addFinalizer(service); err != nil {
return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
}
}
newStatus, err = s.ensureLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
return op, fmt.Errorf("failed to ensure load balancer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
// If there are any changes to the status then patch the service.
if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
// Make a copy so we don't mutate the shared informer cache
updated := service.DeepCopy()
updated.Status.LoadBalancer = *newState
if _, err := patch(s.kubeClient.CoreV1(), service, updated); err != nil {
return fmt.Errorf("failed to patch status for service %s: %v", key, err)
if err := s.patchStatus(service, previousStatus, newStatus); err != nil {
// Only retry error that isn't not found:
// - Not found error mostly happens when service disappears right after
// we remove the finalizer.
// - We can't patch status on non-exist service anyway.
if !errors.IsNotFound(err) {
return op, fmt.Errorf("failed to update load balancer status: %v", err)
}
klog.V(4).Infof("Successfully patched status for service %s", key)
} else {
klog.V(4).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}
return nil
return op, nil
}
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
@ -334,7 +366,7 @@ func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBal
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(nodes) == 0 {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer service %s/%s", service.Namespace, service.Name)
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
}
// - Only one protocol supported per service
@ -407,6 +439,12 @@ func (s *serviceCache) delete(serviceName string) {
delete(s.serviceMap, serviceName)
}
// needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
func needsCleanup(service *v1.Service) bool {
return service.ObjectMeta.DeletionTimestamp != nil && servicehelper.HasLBFinalizer(service)
}
// needsUpdate checks if load balancer needs to be updated due to change in attributes.
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false
@ -592,7 +630,7 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
func (s *ServiceController) nodeSyncLoop() {
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
klog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
return
}
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
@ -602,7 +640,7 @@ func (s *ServiceController) nodeSyncLoop() {
return
}
klog.Infof("Detected change in list of current cluster nodes. New node set: %v",
klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
// Try updating all services, and save the ones that fail to try again next
@ -610,7 +648,7 @@ func (s *ServiceController) nodeSyncLoop() {
s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
klog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
s.knownHosts = newHosts
@ -626,7 +664,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
return
}
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
klog.Errorf("External error while updating load balancer: %v.", err)
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
servicesToRetry = append(servicesToRetry, service)
}
}()
@ -646,7 +684,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer service %s/%s", service.Namespace, service.Name)
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} else {
s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
}
@ -655,12 +693,12 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
// It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {
klog.Errorf("External error while checking if load balancer %q exists: name, %v", s.balancer.GetLoadBalancerName(context.TODO(), s.clusterName, service), err)
runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
} else if !exists {
return nil
}
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err
}
@ -677,7 +715,6 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
defer func() {
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()
@ -692,44 +729,88 @@ func (s *ServiceController) syncService(key string) error {
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
klog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
err = s.processServiceDeletion(key)
case err != nil:
klog.Infof("Unable to retrieve service %v from store: %v", key, err)
runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
default:
cachedService = s.cache.getOrCreate(key)
err = s.processServiceUpdate(cachedService, service, key)
err = s.processServiceCreateOrUpdate(service, key)
}
return err
}
// Returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry after that Duration.
func (s *ServiceController) processServiceDeletion(key string) error {
cachedService, ok := s.cache.get(key)
if !ok {
klog.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key)
// Cache does not contains the key means:
// - We didn't create a Load Balancer for the deleted service at all.
// - We already deleted the Load Balancer that was created for the service.
// In both cases we have nothing left to do.
return nil
}
return s.processLoadBalancerDelete(cachedService, key)
klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
return err
}
s.cache.delete(key)
return nil
}
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) error {
service := cachedService.state
func (s *ServiceController) processLoadBalancerDelete(service *v1.Service, key string) error {
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service)
if err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err)
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
return nil
}
// addFinalizer patches the service to add finalizer.
func (s *ServiceController) addFinalizer(service *v1.Service) error {
if servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}
// removeFinalizer patches the service to remove finalizer.
func (s *ServiceController) removeFinalizer(service *v1.Service) error {
if !servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer, nil)
klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}
// patchStatus patches the service with the given LoadBalancerStatus.
func (s *ServiceController) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
if v1helper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.Status.LoadBalancer = *newStatus
klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}

View File

@ -20,7 +20,7 @@ import (
"fmt"
"strings"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
utilnet "k8s.io/utils/net"
)
@ -31,6 +31,11 @@ in order for in-tree cloud providers to not depend on internal packages.
const (
defaultLoadBalancerSourceRanges = "0.0.0.0/0"
// LoadBalancerCleanupFinalizer is the finalizer added to load balancer
// services to ensure the Service resource is not fully deleted until
// the correlating load balancer resources are deleted.
LoadBalancerCleanupFinalizer = "service.kubernetes.io/load-balancer-cleanup"
)
// IsAllowAll checks whether the utilnet.IPNet allows traffic from 0.0.0.0/0
@ -100,3 +105,13 @@ func NeedsHealthCheck(service *v1.Service) bool {
}
return RequestsOnlyLocalTraffic(service)
}
// HasLBFinalizer checks if service contains LoadBalancerCleanupFinalizer.
func HasLBFinalizer(service *v1.Service) bool {
for _, finalizer := range service.ObjectMeta.Finalizers {
if finalizer == LoadBalancerCleanupFinalizer {
return true
}
}
return false
}