mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #109706 from alexanderConstantinescu/etp-local-svc
Avoid re-syncing LBs for ETP=local services
This commit is contained in:
commit
aee13fc3de
@ -98,6 +98,9 @@ type Controller struct {
|
|||||||
nodeSyncCh chan interface{}
|
nodeSyncCh chan interface{}
|
||||||
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
||||||
needFullSync bool
|
needFullSync bool
|
||||||
|
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
||||||
|
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
||||||
|
lastSyncedNodes []*v1.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new service controller to keep cloud provider service resources
|
// New returns a new service controller to keep cloud provider service resources
|
||||||
@ -132,7 +135,8 @@ func New(
|
|||||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||||
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
||||||
nodeSyncCh: make(chan interface{}, 1),
|
nodeSyncCh: make(chan interface{}, 1),
|
||||||
|
lastSyncedNodes: []*v1.Node{},
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
@ -176,7 +180,7 @@ func New(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !shouldSyncNode(oldNode, curNode) {
|
if !shouldSyncUpdatedNode(oldNode, curNode) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -257,7 +261,7 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr
|
|||||||
func (c *Controller) triggerNodeSync() {
|
func (c *Controller) triggerNodeSync() {
|
||||||
c.nodeSyncLock.Lock()
|
c.nodeSyncLock.Lock()
|
||||||
defer c.nodeSyncLock.Unlock()
|
defer c.nodeSyncLock.Unlock()
|
||||||
newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
|
newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
||||||
// if node list cannot be retrieve, trigger full node sync to be safe.
|
// if node list cannot be retrieve, trigger full node sync to be safe.
|
||||||
@ -453,7 +457,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
||||||
nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
|
nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -667,6 +671,15 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serviceKeys(services []*v1.Service) sets.String {
|
||||||
|
ret := sets.NewString()
|
||||||
|
for _, service := range services {
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(service)
|
||||||
|
ret.Insert(key)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
func nodeNames(nodes []*v1.Node) sets.String {
|
func nodeNames(nodes []*v1.Node) sets.String {
|
||||||
ret := sets.NewString()
|
ret := sets.NewString()
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
@ -682,58 +695,21 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
|||||||
return nodeNames(x).Equal(nodeNames(y))
|
return nodeNames(x).Equal(nodeNames(y))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate {
|
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||||
return func(node *v1.Node) bool {
|
// Evaluate the individual node exclusion predicate before evaluating the
|
||||||
if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel {
|
// compounded result of all predicates. We don't sync ETP=local services
|
||||||
return false
|
// for changes on the readiness condition, hence if a node remains NotReady
|
||||||
}
|
// and a user adds the exclusion label we will need to sync as to make sure
|
||||||
|
// this change is reflected correctly on ETP=local services. The sync
|
||||||
// Remove nodes that are about to be deleted by the cluster autoscaler.
|
// function compares lastSyncedNodes with the new (existing) set of nodes
|
||||||
for _, taint := range node.Spec.Taints {
|
// for each service, so services which are synced with the same set of nodes
|
||||||
if taint.Key == ToBeDeletedTaint {
|
// should be skipped internally in the sync function. This is needed as to
|
||||||
klog.V(4).Infof("Ignoring node %v with autoscaler taint %+v", node.Name, taint)
|
// trigger a global sync for all services and make sure no service gets
|
||||||
return false
|
// skipped due to a changing node predicate.
|
||||||
}
|
if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) {
|
||||||
}
|
|
||||||
|
|
||||||
// If we have no info, don't accept
|
|
||||||
if len(node.Status.Conditions) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, cond := range node.Status.Conditions {
|
|
||||||
// We consider the node for load balancing only when its NodeReady condition status
|
|
||||||
// is ConditionTrue
|
|
||||||
if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
|
|
||||||
klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
|
||||||
|
|
||||||
func shouldSyncNode(oldNode, newNode *v1.Node) bool {
|
|
||||||
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodeReadyConditionStatus(oldNode) != nodeReadyConditionStatus(newNode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
|
|
||||||
for _, condition := range node.Status.Conditions {
|
|
||||||
if condition.Type != v1.NodeReady {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
return condition.Status
|
|
||||||
}
|
|
||||||
|
|
||||||
return ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSyncInternal handles updating the hosts pointed to by all load
|
// nodeSyncInternal handles updating the hosts pointed to by all load
|
||||||
@ -775,24 +751,29 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
|||||||
numServices-len(c.servicesToUpdate), numServices)
|
numServices-len(c.servicesToUpdate), numServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSyncService syncs the nodes for one load balancer type service
|
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
||||||
func (c *Controller) nodeSyncService(svc *v1.Service) bool {
|
// indicates if we should retry. Hence, this functions returns false if we've updated
|
||||||
|
// load balancers and finished doing it successfully, or didn't try to at all because
|
||||||
|
// there's no need. This function returns true if we tried to update load balancers and
|
||||||
|
// failed, indicating to the caller that we should try again.
|
||||||
|
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
|
||||||
|
retSuccess := false
|
||||||
|
retNeedRetry := true
|
||||||
if svc == nil || !wantsLoadBalancer(svc) {
|
if svc == nil || !wantsLoadBalancer(svc) {
|
||||||
return false
|
return retSuccess
|
||||||
|
}
|
||||||
|
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
|
||||||
|
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
|
||||||
|
if nodeNames(newNodes).Equal(nodeNames(oldNodes)) {
|
||||||
|
return retSuccess
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
||||||
hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate())
|
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
|
||||||
if err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
||||||
return true
|
return retNeedRetry
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
|
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
|
||||||
return false
|
return retSuccess
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||||
@ -801,11 +782,20 @@ func (c *Controller) nodeSyncService(svc *v1.Service) bool {
|
|||||||
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
||||||
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
||||||
|
|
||||||
|
// Include all nodes and let nodeSyncService filter and figure out if
|
||||||
|
// the update is relevant for the service in question.
|
||||||
|
nodes, err := listWithPredicates(c.nodeLister)
|
||||||
|
if err != nil {
|
||||||
|
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
||||||
|
return serviceKeys(services)
|
||||||
|
}
|
||||||
|
|
||||||
// lock for servicesToRetry
|
// lock for servicesToRetry
|
||||||
servicesToRetry = sets.NewString()
|
servicesToRetry = sets.NewString()
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
|
||||||
doWork := func(piece int) {
|
doWork := func(piece int) {
|
||||||
if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
|
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
@ -815,6 +805,7 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
|
|||||||
}
|
}
|
||||||
|
|
||||||
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
||||||
|
c.lastSyncedNodes = nodes
|
||||||
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
||||||
return servicesToRetry
|
return servicesToRetry
|
||||||
}
|
}
|
||||||
@ -990,19 +981,82 @@ func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus
|
|||||||
// some set of criteria defined by the function.
|
// some set of criteria defined by the function.
|
||||||
type NodeConditionPredicate func(node *v1.Node) bool
|
type NodeConditionPredicate func(node *v1.Node) bool
|
||||||
|
|
||||||
// listWithPredicate gets nodes that matches predicate function.
|
var (
|
||||||
func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) {
|
allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
|
||||||
|
nodeIncludedPredicate,
|
||||||
|
nodeSchedulablePredicate,
|
||||||
|
nodeUnTaintedPredicate,
|
||||||
|
nodeReadyPredicate,
|
||||||
|
}
|
||||||
|
etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
|
||||||
|
nodeIncludedPredicate,
|
||||||
|
nodeSchedulablePredicate,
|
||||||
|
nodeUnTaintedPredicate,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate {
|
||||||
|
if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
|
||||||
|
return etpLocalNodePredicates
|
||||||
|
}
|
||||||
|
return allNodePredicates
|
||||||
|
}
|
||||||
|
|
||||||
|
// We consider the node for load balancing only when the node is not labelled for exclusion.
|
||||||
|
func nodeIncludedPredicate(node *v1.Node) bool {
|
||||||
|
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
|
||||||
|
return !hasExcludeBalancerLabel
|
||||||
|
}
|
||||||
|
|
||||||
|
// We consider the node for load balancing only when the node is schedulable.
|
||||||
|
func nodeSchedulablePredicate(node *v1.Node) bool {
|
||||||
|
return !node.Spec.Unschedulable
|
||||||
|
}
|
||||||
|
|
||||||
|
// We consider the node for load balancing only when its not tainted for deletion by the cluster autoscaler.
|
||||||
|
func nodeUnTaintedPredicate(node *v1.Node) bool {
|
||||||
|
for _, taint := range node.Spec.Taints {
|
||||||
|
if taint.Key == ToBeDeletedTaint {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// We consider the node for load balancing only when its NodeReady condition status is ConditionTrue
|
||||||
|
func nodeReadyPredicate(node *v1.Node) bool {
|
||||||
|
for _, cond := range node.Status.Conditions {
|
||||||
|
if cond.Type == v1.NodeReady {
|
||||||
|
return cond.Status == v1.ConditionTrue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// listWithPredicate gets nodes that matches all predicate functions.
|
||||||
|
func listWithPredicates(nodeLister corelisters.NodeLister, predicates ...NodeConditionPredicate) ([]*v1.Node, error) {
|
||||||
nodes, err := nodeLister.List(labels.Everything())
|
nodes, err := nodeLister.List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return filterWithPredicates(nodes, predicates...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterWithPredicates(nodes []*v1.Node, predicates ...NodeConditionPredicate) []*v1.Node {
|
||||||
var filtered []*v1.Node
|
var filtered []*v1.Node
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
if predicate(nodes[i]) {
|
if respectsPredicates(nodes[i], predicates...) {
|
||||||
filtered = append(filtered, nodes[i])
|
filtered = append(filtered, nodes[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return filtered
|
||||||
return filtered, nil
|
}
|
||||||
|
|
||||||
|
func respectsPredicates(node *v1.Node, predicates ...NodeConditionPredicate) bool {
|
||||||
|
for _, p := range predicates {
|
||||||
|
if !p(node) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user