Optimize and parallelize LoadBalancer Host update

This commit is contained in:
Minhan Xia 2021-01-27 13:57:27 -08:00
parent 0ced9d2854
commit 4af1f4b30b
3 changed files with 376 additions and 57 deletions

View File

@ -72,7 +72,6 @@ type serviceCache struct {
type Controller struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
knownHostsLock sync.Mutex
servicesToUpdate []*v1.Service
kubeClient clientset.Interface
clusterName string
@ -87,6 +86,14 @@ type Controller struct {
nodeListerSynced cache.InformerSynced
// services that need to be synced
queue workqueue.RateLimitingInterface
// nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time
// and protects internal states (needFullSync) of nodeSync
nodeSyncLock sync.Mutex
// nodeSyncCh triggers nodeSyncLoop to run
nodeSyncCh chan interface{}
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
needFullSync bool
}
// New returns a new service controller to keep cloud provider service resources
@ -122,6 +129,8 @@ func New(
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
nodeSyncCh: make(chan interface{}, 1),
}
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -152,7 +161,7 @@ func New(
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
s.nodeSyncLoop()
s.triggerNodeSync()
},
UpdateFunc: func(old, cur interface{}) {
oldNode, ok := old.(*v1.Node)
@ -169,10 +178,10 @@ func New(
return
}
s.nodeSyncLoop()
s.triggerNodeSync()
},
DeleteFunc: func(old interface{}) {
s.nodeSyncLoop()
s.triggerNodeSync()
},
},
time.Duration(0),
@ -185,6 +194,15 @@ func New(
return s, nil
}
// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false.
func (s *Controller) needFullSyncAndUnmark() bool {
s.nodeSyncLock.Lock()
defer s.nodeSyncLock.Unlock()
ret := s.needFullSync
s.needFullSync = false
return ret
}
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (s *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@ -220,11 +238,42 @@ func (s *Controller) Run(stopCh <-chan struct{}, workers int) {
go wait.Until(s.worker, time.Second, stopCh)
}
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
go s.nodeSyncLoop(workers)
go wait.Until(s.triggerNodeSync, nodeSyncPeriod, stopCh)
<-stopCh
}
// triggerNodeSync triggers a nodeSync asynchronously
func (s *Controller) triggerNodeSync() {
s.nodeSyncLock.Lock()
defer s.nodeSyncLock.Unlock()
newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
// if node list cannot be retrieve, trigger full node sync to be safe.
s.needFullSync = true
} else if !nodeSlicesEqualForLB(newHosts, s.knownHosts) {
// Here the last known state is recorded as knownHosts. For each
// LB update, the latest node list is retrieved. This is to prevent
// a stale set of nodes were used to be update loadbalancers when
// there are many loadbalancers in the clusters. nodeSyncInternal
// would be triggered until all loadbalancers are updated to the new state.
klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services")
s.needFullSync = true
s.knownHosts = newHosts
}
select {
case s.nodeSyncCh <- struct{}{}:
klog.V(4).Info("Triggering nodeSync")
return
default:
klog.V(4).Info("A pending nodeSync is already in queue")
return
}
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *Controller) worker() {
@ -232,6 +281,16 @@ func (s *Controller) worker() {
}
}
// nodeSyncLoop takes nodeSync signal and triggers nodeSync
func (s *Controller) nodeSyncLoop(workers int) {
klog.V(4).Info("nodeSyncLoop Started")
for range s.nodeSyncCh {
klog.V(4).Info("nodeSync has been triggered")
s.nodeSyncInternal(workers)
}
klog.V(2).Info("s.nodeSyncCh is closed. Exiting nodeSyncLoop")
}
func (s *Controller) processNextWorkItem() bool {
key, quit := s.queue.Get()
if quit {
@ -652,68 +711,78 @@ func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
return ""
}
// nodeSyncLoop handles updating the hosts pointed to by all load
// nodeSyncInternal handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes.
func (s *Controller) nodeSyncLoop() {
s.knownHostsLock.Lock()
defer s.knownHostsLock.Unlock()
func (s *Controller) nodeSyncInternal(workers int) {
startTime := time.Now()
defer func() {
latency := time.Now().Sub(startTime).Seconds()
klog.V(4).Infof("It took %v seconds to finish nodeSyncLoop", latency)
klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency)
nodeSyncLatency.Observe(latency)
}()
newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
return
}
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
if !s.needFullSyncAndUnmark() {
// The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, workers)
return
}
klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
klog.V(2).Infof("Syncing backends for all LB services.")
// Try updating all services, and save the ones that fail to try again next
// round.
s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, workers)
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
}
s.knownHosts = newHosts
// nodeSyncService syncs the nodes for one load balancer type service
func (s *Controller) nodeSyncService(svc *v1.Service) bool {
if svc == nil || !wantsLoadBalancer(svc) {
return false
}
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
return true
}
if err := s.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil {
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
return true
}
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
return false
}
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the list of hosts provided.
// they will match the latest list of nodes with input number of workers.
// Returns the list of services that couldn't be updated.
func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
for _, service := range services {
func() {
if service == nil {
return
}
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
servicesToRetry = append(servicesToRetry, service)
}
}()
func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, workers int) (servicesToRetry []*v1.Service) {
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
// lock for servicesToRetry
lock := sync.Mutex{}
doWork := func(piece int) {
if shouldRetry := s.nodeSyncService(services[piece]); !shouldRetry {
return
}
lock.Lock()
defer lock.Unlock()
servicesToRetry = append(servicesToRetry, services[piece])
}
workqueue.ParallelizeUntil(context.TODO(), workers, len(services), doWork)
klog.V(4).Infof("Finished updateLoadBalancerHosts")
return servicesToRetry
}
// Updates the load balancer of a service, assuming we hold the mutex
// associated with the service.
func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
if !wantsLoadBalancer(service) {
return nil
}
startTime := time.Now()
defer func() {
latency := time.Now().Sub(startTime).Seconds()
@ -721,6 +790,7 @@ func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []
updateLoadBalancerHostLatency.Observe(latency)
}()
klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts))
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)
if err == nil {

View File

@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -72,11 +73,9 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
cloud.Region = region
kubeClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes()
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
@ -90,9 +89,10 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeLister: newFakeNodeLister(nil),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
nodeSyncCh: make(chan interface{}, 1),
}
balancer, _ := cloud.LoadBalancer()
@ -416,38 +416,43 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
// TODO: Finish converting and update comments
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node73"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}},
}
table := []struct {
desc string
services []*v1.Service
expectedUpdateCalls []fakecloud.UpdateBalancerCall
workers int
}{
{
// No services present: no calls should be made.
desc: "No services present: no calls should be made.",
services: []*v1.Service{},
expectedUpdateCalls: nil,
workers: 1,
},
{
// Services do not have external load balancers: no calls should be made.
desc: "Services do not have external load balancers: no calls should be made.",
services: []*v1.Service{
newService("s0", "111", v1.ServiceTypeClusterIP),
newService("s1", "222", v1.ServiceTypeNodePort),
},
expectedUpdateCalls: nil,
workers: 2,
},
{
// Services does have an external load balancer: one call should be made.
desc: "Services does have an external load balancer: one call should be made.",
services: []*v1.Service{
newService("s0", "333", v1.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
workers: 3,
},
{
// Three services have an external load balancer: three calls.
desc: "Three services have an external load balancer: three calls.",
services: []*v1.Service{
newService("s0", "444", v1.ServiceTypeLoadBalancer),
newService("s1", "555", v1.ServiceTypeLoadBalancer),
@ -458,9 +463,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s1", "555", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
workers: 4,
},
{
// Two services have an external load balancer and two don't: two calls.
desc: "Two services have an external load balancer and two don't: two calls.",
services: []*v1.Service{
newService("s0", "777", v1.ServiceTypeNodePort),
newService("s1", "888", v1.ServiceTypeLoadBalancer),
@ -471,9 +477,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
workers: 5,
},
{
// One service has an external load balancer and one is nil: one call.
desc: "One service has an external load balancer and one is nil: one call.",
services: []*v1.Service{
newService("s0", "234", v1.ServiceTypeLoadBalancer),
nil,
@ -481,21 +488,172 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
workers: 6,
},
{
desc: "Four services have external load balancer with only 2 workers",
services: []*v1.Service{
newService("s0", "777", v1.ServiceTypeLoadBalancer),
newService("s1", "888", v1.ServiceTypeLoadBalancer),
newService("s3", "999", v1.ServiceTypeLoadBalancer),
newService("s4", "123", v1.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
workers: 2,
},
}
for _, item := range table {
controller, cloud, _ := newController()
t.Run(item.desc, func(t *testing.T) {
controller, cloud, _ := newController()
controller.nodeLister = newFakeNodeLister(nil, nodes...)
var services []*v1.Service
services = append(services, item.services...)
if servicesToRetry := controller.updateLoadBalancerHosts(item.services, item.workers); servicesToRetry != nil {
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
}
compareUpdateCalls(t, item.expectedUpdateCalls, cloud.UpdateCalls)
})
}
}
if servicesToRetry := controller.updateLoadBalancerHosts(services, nodes); servicesToRetry != nil {
t.Errorf("unexpected servicesToRetry: %v", servicesToRetry)
func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
node3 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
node4 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node4"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}}
services := []*v1.Service{
newService("s0", "777", v1.ServiceTypeLoadBalancer),
newService("s1", "888", v1.ServiceTypeLoadBalancer),
newService("s3", "999", v1.ServiceTypeLoadBalancer),
newService("s4", "123", v1.ServiceTypeLoadBalancer),
}
controller, cloud, _ := newController()
for _, tc := range []struct {
desc string
nodes []*v1.Node
expectedUpdateCalls []fakecloud.UpdateBalancerCall
worker int
nodeListerErr error
expectedRetryServices []*v1.Service
}{
{
desc: "only 1 node",
nodes: []*v1.Node{node1},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}},
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}},
{Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1}},
},
worker: 3,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
},
{
desc: "2 nodes",
nodes: []*v1.Node{node1, node2},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}},
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}},
{Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2}},
},
worker: 1,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
},
{
desc: "4 nodes",
nodes: []*v1.Node{node1, node2, node3, node4},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{
{Service: newService("s0", "777", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}},
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}},
{Service: newService("s4", "123", v1.ServiceTypeLoadBalancer), Hosts: []*v1.Node{node1, node2, node3, node4}},
},
worker: 3,
nodeListerErr: nil,
expectedRetryServices: []*v1.Service{},
},
{
desc: "error occur during sync",
nodes: []*v1.Node{node1, node2, node3, node4},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
worker: 3,
nodeListerErr: fmt.Errorf("random error"),
expectedRetryServices: services,
},
{
desc: "error occur during sync with 1 workers",
nodes: []*v1.Node{node1, node2, node3, node4},
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
worker: 1,
nodeListerErr: fmt.Errorf("random error"),
expectedRetryServices: services,
},
} {
t.Run(tc.desc, func(t *testing.T) {
controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...)
servicesToRetry := controller.updateLoadBalancerHosts(services, tc.worker)
compareServiceList(t, tc.expectedRetryServices, servicesToRetry)
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{}
})
}
}
// compareServiceList compares if both left and right inputs contains the same service list despite the order.
func compareServiceList(t *testing.T, left, right []*v1.Service) {
if len(left) != len(right) {
t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right))
}
mismatch := false
for _, l := range left {
found := false
for _, r := range right {
if reflect.DeepEqual(l, r) {
found = true
}
}
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
if !found {
mismatch = true
break
}
}
if mismatch {
t.Errorf("expected service list to match, expected %+v, got %+v", left, right)
}
}
// compareUpdateCalls compares if the same update calls were made in both left and right inputs despite the order.
func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall) {
if len(left) != len(right) {
t.Errorf("expect len(left) == len(right), but got %v != %v", len(left), len(right))
}
mismatch := false
for _, l := range left {
found := false
for _, r := range right {
if reflect.DeepEqual(l, r) {
found = true
}
}
if !found {
mismatch = true
break
}
}
if mismatch {
t.Errorf("expected update calls to match, expected %+v, got %+v", left, right)
}
}
func TestProcessServiceCreateOrUpdate(t *testing.T) {
@ -1138,7 +1296,7 @@ func TestServiceCache(t *testing.T) {
}
}
//Test a utility functions as it's not easy to unit test nodeSyncLoop directly
//Test a utility functions as it's not easy to unit test nodeSyncInternal directly
func TestNodeSlicesEqualForLB(t *testing.T) {
numNodes := 10
nArray := make([]*v1.Node, numNodes)
@ -1577,3 +1735,92 @@ func Test_shouldSyncNode(t *testing.T) {
})
}
}
func TestTriggerNodeSync(t *testing.T) {
controller, _, _ := newController()
tryReadFromChannel(t, controller.nodeSyncCh, false)
controller.triggerNodeSync()
tryReadFromChannel(t, controller.nodeSyncCh, true)
tryReadFromChannel(t, controller.nodeSyncCh, false)
tryReadFromChannel(t, controller.nodeSyncCh, false)
tryReadFromChannel(t, controller.nodeSyncCh, false)
controller.triggerNodeSync()
controller.triggerNodeSync()
controller.triggerNodeSync()
tryReadFromChannel(t, controller.nodeSyncCh, true)
tryReadFromChannel(t, controller.nodeSyncCh, false)
tryReadFromChannel(t, controller.nodeSyncCh, false)
tryReadFromChannel(t, controller.nodeSyncCh, false)
}
func TestMarkAndUnmarkFullSync(t *testing.T) {
controller, _, _ := newController()
if controller.needFullSync != false {
t.Errorf("expect controller.needFullSync to be false, but got true")
}
ret := controller.needFullSyncAndUnmark()
if ret != false {
t.Errorf("expect ret == false, but got true")
}
ret = controller.needFullSyncAndUnmark()
if ret != false {
t.Errorf("expect ret == false, but got true")
}
controller.needFullSync = true
ret = controller.needFullSyncAndUnmark()
if ret != true {
t.Errorf("expect ret == true, but got false")
}
ret = controller.needFullSyncAndUnmark()
if ret != false {
t.Errorf("expect ret == false, but got true")
}
}
func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) {
select {
case _, ok := <-ch:
if !ok {
t.Errorf("The channel is closed")
}
if !expectValue {
t.Errorf("Does not expect value from the channel, but got a value")
}
default:
if expectValue {
t.Errorf("Expect value from the channel, but got none")
}
}
}
type fakeNodeLister struct {
cache []*v1.Node
err error
}
func newFakeNodeLister(err error, nodes ...*v1.Node) *fakeNodeLister {
ret := &fakeNodeLister{}
ret.cache = nodes
ret.err = err
return ret
}
// List lists all Nodes in the indexer.
// Objects returned here must be treated as read-only.
func (l *fakeNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) {
return l.cache, l.err
}
// Get retrieves the Node from the index for a given name.
// Objects returned here must be treated as read-only.
func (l *fakeNodeLister) Get(name string) (*v1.Node, error) {
for _, node := range l.cache {
if node.Name == name {
return node, nil
}
}
return nil, nil
}

View File

@ -224,6 +224,8 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv
// It adds an entry "update" into the internal method call record.
func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
f.addCall("update")
f.Lock.Lock()
defer f.Lock.Unlock()
f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes})
return f.Err
}