mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-02 17:57:33 +00:00
Avoid re-syncing LBs for ETP=local svc
This removes the CCM's LB sync for nodes which are experiencing a transitioning readiness state only.
This commit is contained in:
@@ -97,6 +97,9 @@ type Controller struct {
|
|||||||
nodeSyncCh chan interface{}
|
nodeSyncCh chan interface{}
|
||||||
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
||||||
needFullSync bool
|
needFullSync bool
|
||||||
|
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
||||||
|
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
||||||
|
lastSyncedNodes []*v1.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new service controller to keep cloud provider service resources
|
// New returns a new service controller to keep cloud provider service resources
|
||||||
@@ -131,7 +134,8 @@ func New(
|
|||||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||||
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
||||||
nodeSyncCh: make(chan interface{}, 1),
|
nodeSyncCh: make(chan interface{}, 1),
|
||||||
|
lastSyncedNodes: []*v1.Node{},
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
@@ -450,7 +454,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
||||||
nodes, err := listWithPredicates(c.nodeLister, allNodePredicates...)
|
nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -664,6 +668,15 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serviceKeys(services []*v1.Service) sets.String {
|
||||||
|
ret := sets.NewString()
|
||||||
|
for _, service := range services {
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(service)
|
||||||
|
ret.Insert(key)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
func nodeNames(nodes []*v1.Node) sets.String {
|
func nodeNames(nodes []*v1.Node) sets.String {
|
||||||
ret := sets.NewString()
|
ret := sets.NewString()
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
@@ -680,6 +693,19 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||||
|
// Evaluate the individual node exclusion predicate before evaluating the
|
||||||
|
// compounded result of all predicates. We don't sync ETP=local services
|
||||||
|
// for changes on the readiness condition, hence if a node remains NotReady
|
||||||
|
// and a user adds the exclusion label we will need to sync as to make sure
|
||||||
|
// this change is reflected correctly on ETP=local services. The sync
|
||||||
|
// function compares lastSyncedNodes with the new (existing) set of nodes
|
||||||
|
// for each service, so services which are synced with the same set of nodes
|
||||||
|
// should be skipped internally in the sync function. This is needed as to
|
||||||
|
// trigger a global sync for all services and make sure no service gets
|
||||||
|
// skipped due to a changing node predicate.
|
||||||
|
if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
|
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -722,24 +748,29 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
|||||||
numServices-len(c.servicesToUpdate), numServices)
|
numServices-len(c.servicesToUpdate), numServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSyncService syncs the nodes for one load balancer type service
|
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
||||||
func (c *Controller) nodeSyncService(svc *v1.Service) bool {
|
// indicates if we should retry. Hence, this functions returns false if we've updated
|
||||||
|
// load balancers and finished doing it successfully, or didn't try to at all because
|
||||||
|
// there's no need. This function returns true if we tried to update load balancers and
|
||||||
|
// failed, indicating to the caller that we should try again.
|
||||||
|
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
|
||||||
|
retSuccess := false
|
||||||
|
retNeedRetry := true
|
||||||
if svc == nil || !wantsLoadBalancer(svc) {
|
if svc == nil || !wantsLoadBalancer(svc) {
|
||||||
return false
|
return retSuccess
|
||||||
|
}
|
||||||
|
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
|
||||||
|
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
|
||||||
|
if nodeNames(newNodes).Equal(nodeNames(oldNodes)) {
|
||||||
|
return retSuccess
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
||||||
hosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
|
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
|
||||||
if err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
||||||
return true
|
return retNeedRetry
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
|
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
|
||||||
return false
|
return retSuccess
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||||
@@ -748,11 +779,20 @@ func (c *Controller) nodeSyncService(svc *v1.Service) bool {
|
|||||||
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
||||||
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
||||||
|
|
||||||
|
// Include all nodes and let nodeSyncService filter and figure out if
|
||||||
|
// the update is relevant for the service in question.
|
||||||
|
nodes, err := listWithPredicates(c.nodeLister)
|
||||||
|
if err != nil {
|
||||||
|
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
||||||
|
return serviceKeys(services)
|
||||||
|
}
|
||||||
|
|
||||||
// lock for servicesToRetry
|
// lock for servicesToRetry
|
||||||
servicesToRetry = sets.NewString()
|
servicesToRetry = sets.NewString()
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
|
||||||
doWork := func(piece int) {
|
doWork := func(piece int) {
|
||||||
if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
|
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
@@ -762,6 +802,7 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
|
|||||||
}
|
}
|
||||||
|
|
||||||
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
||||||
|
c.lastSyncedNodes = nodes
|
||||||
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
||||||
return servicesToRetry
|
return servicesToRetry
|
||||||
}
|
}
|
||||||
@@ -944,8 +985,20 @@ var (
|
|||||||
nodeUnTaintedPredicate,
|
nodeUnTaintedPredicate,
|
||||||
nodeReadyPredicate,
|
nodeReadyPredicate,
|
||||||
}
|
}
|
||||||
|
etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
|
||||||
|
nodeIncludedPredicate,
|
||||||
|
nodeSchedulablePredicate,
|
||||||
|
nodeUnTaintedPredicate,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate {
|
||||||
|
if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
|
||||||
|
return etpLocalNodePredicates
|
||||||
|
}
|
||||||
|
return allNodePredicates
|
||||||
|
}
|
||||||
|
|
||||||
// We consider the node for load balancing only when the node is not labelled for exclusion.
|
// We consider the node for load balancing only when the node is not labelled for exclusion.
|
||||||
func nodeIncludedPredicate(node *v1.Node) bool {
|
func nodeIncludedPredicate(node *v1.Node) bool {
|
||||||
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
|
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
|
||||||
|
@@ -44,6 +44,7 @@ import (
|
|||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
fakecloud "k8s.io/cloud-provider/fake"
|
fakecloud "k8s.io/cloud-provider/fake"
|
||||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||||
|
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -64,6 +65,20 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newETPLocalService(name string, serviceType v1.ServiceType) *v1.Service {
|
||||||
|
return &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: "default",
|
||||||
|
UID: "777",
|
||||||
|
},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
Type: serviceType,
|
||||||
|
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Wrap newService so that you don't have to call default arguments again and again.
|
// Wrap newService so that you don't have to call default arguments again and again.
|
||||||
func defaultExternalService() *v1.Service {
|
func defaultExternalService() *v1.Service {
|
||||||
return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer)
|
return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer)
|
||||||
@@ -96,6 +111,7 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
|||||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||||
nodeSyncCh: make(chan interface{}, 1),
|
nodeSyncCh: make(chan interface{}, 1),
|
||||||
|
lastSyncedNodes: []*v1.Node{},
|
||||||
}
|
}
|
||||||
|
|
||||||
balancer, _ := cloud.LoadBalancer()
|
balancer, _ := cloud.LoadBalancer()
|
||||||
@@ -559,6 +575,193 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
|
||||||
|
node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
node2NotReady := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}}}
|
||||||
|
node2Tainted := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Spec: v1.NodeSpec{Taints: []v1.Taint{{Key: ToBeDeletedTaint}}}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}}}
|
||||||
|
node2Unschedulable := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Spec: v1.NodeSpec{Unschedulable: true}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
node2SpuriousChange := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Phase: v1.NodeTerminated, Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
node2Exclude := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{v1.LabelNodeExcludeBalancers: ""}}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
node3 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
|
|
||||||
|
type stateChanges struct {
|
||||||
|
nodes []*v1.Node
|
||||||
|
syncCallErr bool
|
||||||
|
}
|
||||||
|
|
||||||
|
etpLocalservice1 := newETPLocalService("s0", v1.ServiceTypeLoadBalancer)
|
||||||
|
etpLocalservice2 := newETPLocalService("s1", v1.ServiceTypeLoadBalancer)
|
||||||
|
service3 := defaultExternalService()
|
||||||
|
|
||||||
|
services := []*v1.Service{etpLocalservice1, etpLocalservice2, service3}
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
expectedUpdateCalls []fakecloud.UpdateBalancerCall
|
||||||
|
stateChanges []stateChanges
|
||||||
|
initialState []*v1.Node
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "No node changes",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 new node gets added",
|
||||||
|
initialState: []*v1.Node{node1, node2},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 new node gets added - with retries",
|
||||||
|
initialState: []*v1.Node{node1, node2},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2, node3},
|
||||||
|
syncCallErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 node goes NotReady",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2NotReady, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 node gets Tainted",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2Tainted, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 node goes unschedulable",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2Unschedulable, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 node goes Ready",
|
||||||
|
initialState: []*v1.Node{node1, node2NotReady, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 node get excluded",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2Exclude, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node3}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 old node gets deleted",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
|
||||||
|
{Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2}},
|
||||||
|
{Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2}},
|
||||||
|
{Service: service3, Hosts: []*v1.Node{node1, node2}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "1 spurious node update",
|
||||||
|
initialState: []*v1.Node{node1, node2, node3},
|
||||||
|
stateChanges: []stateChanges{
|
||||||
|
{
|
||||||
|
nodes: []*v1.Node{node1, node2SpuriousChange, node3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
controller, cloud, _ := newController()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
controller.lastSyncedNodes = tc.initialState
|
||||||
|
|
||||||
|
for _, state := range tc.stateChanges {
|
||||||
|
setupState := func() {
|
||||||
|
controller.nodeLister = newFakeNodeLister(nil, state.nodes...)
|
||||||
|
if state.syncCallErr {
|
||||||
|
cloud.Err = fmt.Errorf("error please")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanupState := func() {
|
||||||
|
cloud.Err = nil
|
||||||
|
}
|
||||||
|
setupState()
|
||||||
|
controller.updateLoadBalancerHosts(ctx, services, 3)
|
||||||
|
cleanupState()
|
||||||
|
}
|
||||||
|
|
||||||
|
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
|
func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
|
||||||
node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
|
||||||
@@ -2782,7 +2985,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "unschedable T, excluded F->T",
|
name: "unschedable T, excluded F->T",
|
||||||
@@ -2822,7 +3025,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "unschedable T, ready F->T",
|
name: "unschedable T, ready F->T",
|
||||||
@@ -3034,7 +3237,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "tainted T, excluded T->F",
|
name: "tainted T, excluded T->F",
|
||||||
@@ -3082,7 +3285,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "tainted T, ready F->T",
|
name: "tainted T, ready F->T",
|
||||||
@@ -3604,7 +3807,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "ready F, excluded T->F",
|
name: "ready F, excluded T->F",
|
||||||
@@ -3638,7 +3841,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
shouldSync: false,
|
shouldSync: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, testcase := range testcases {
|
for _, testcase := range testcases {
|
||||||
|
Reference in New Issue
Block a user