De-duping node "update enqueuing"/sync predicates

This commit is contained in:
Alexander Constantinescu 2022-07-20 23:36:45 +02:00
parent 7bcd739851
commit 5a73ab9310
2 changed files with 1958 additions and 83 deletions

View File

@ -175,7 +175,7 @@ func New(
return return
} }
if !shouldSyncNode(oldNode, curNode) { if !shouldSyncUpdatedNode(oldNode, curNode) {
return return
} }
@ -254,7 +254,7 @@ func (c *Controller) Run(ctx context.Context, workers int) {
func (c *Controller) triggerNodeSync() { func (c *Controller) triggerNodeSync() {
c.nodeSyncLock.Lock() c.nodeSyncLock.Lock()
defer c.nodeSyncLock.Unlock() defer c.nodeSyncLock.Unlock()
newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
// if node list cannot be retrieve, trigger full node sync to be safe. // if node list cannot be retrieve, trigger full node sync to be safe.
@ -450,7 +450,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
} }
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) nodes, err := listWithPredicates(c.nodeLister, allNodePredicates...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -679,58 +679,8 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
return nodeNames(x).Equal(nodeNames(y)) return nodeNames(x).Equal(nodeNames(y))
} }
func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate { func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
return func(node *v1.Node) bool { return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel {
return false
}
// Remove nodes that are about to be deleted by the cluster autoscaler.
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
klog.V(4).Infof("Ignoring node %v with autoscaler taint %+v", node.Name, taint)
return false
}
}
// If we have no info, don't accept
if len(node.Status.Conditions) == 0 {
return false
}
for _, cond := range node.Status.Conditions {
// We consider the node for load balancing only when its NodeReady condition status
// is ConditionTrue
if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
return false
}
}
return true
}
}
func shouldSyncNode(oldNode, newNode *v1.Node) bool {
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
return true
}
if !reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
return true
}
return nodeReadyConditionStatus(oldNode) != nodeReadyConditionStatus(newNode)
}
func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
for _, condition := range node.Status.Conditions {
if condition.Type != v1.NodeReady {
continue
}
return condition.Status
}
return ""
} }
// nodeSyncInternal handles updating the hosts pointed to by all load // nodeSyncInternal handles updating the hosts pointed to by all load
@ -778,7 +728,7 @@ func (c *Controller) nodeSyncService(svc *v1.Service) bool {
return false return false
} }
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) hosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
return true return true
@ -987,19 +937,70 @@ func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus
// some set of criteria defined by the function. // some set of criteria defined by the function.
type NodeConditionPredicate func(node *v1.Node) bool type NodeConditionPredicate func(node *v1.Node) bool
// listWithPredicate gets nodes that matches predicate function. var (
func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) { allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
nodeIncludedPredicate,
nodeSchedulablePredicate,
nodeUnTaintedPredicate,
nodeReadyPredicate,
}
)
// We consider the node for load balancing only when the node is not labelled for exclusion.
func nodeIncludedPredicate(node *v1.Node) bool {
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
return !hasExcludeBalancerLabel
}
// We consider the node for load balancing only when the node is schedulable.
func nodeSchedulablePredicate(node *v1.Node) bool {
return !node.Spec.Unschedulable
}
// We consider the node for load balancing only when its not tainted for deletion by the cluster autoscaler.
func nodeUnTaintedPredicate(node *v1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
return false
}
}
return true
}
// We consider the node for load balancing only when its NodeReady condition status is ConditionTrue
func nodeReadyPredicate(node *v1.Node) bool {
for _, cond := range node.Status.Conditions {
if cond.Type == v1.NodeReady {
return cond.Status == v1.ConditionTrue
}
}
return false
}
// listWithPredicate gets nodes that matches all predicate functions.
func listWithPredicates(nodeLister corelisters.NodeLister, predicates ...NodeConditionPredicate) ([]*v1.Node, error) {
nodes, err := nodeLister.List(labels.Everything()) nodes, err := nodeLister.List(labels.Everything())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return filterWithPredicates(nodes, predicates...), nil
}
func filterWithPredicates(nodes []*v1.Node, predicates ...NodeConditionPredicate) []*v1.Node {
var filtered []*v1.Node var filtered []*v1.Node
for i := range nodes { for i := range nodes {
if predicate(nodes[i]) { if respectsPredicates(nodes[i], predicates...) {
filtered = append(filtered, nodes[i]) filtered = append(filtered, nodes[i])
} }
} }
return filtered
return filtered, nil }
func respectsPredicates(node *v1.Node, predicates ...NodeConditionPredicate) bool {
for _, p := range predicates {
if !p(node) {
return false
}
}
return true
} }