Merge pull request #121091 from alexanderConstantinescu/kccm-service-sync-fix

KCCM: fix transient node addition + removal while syncing load balancers
This commit is contained in:
Kubernetes Prow Robot 2023-11-14 17:17:10 +01:00 committed by GitHub
commit 77d72a30e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 256 additions and 89 deletions

View File

@ -93,10 +93,10 @@ type Controller struct {
serviceQueue workqueue.RateLimitingInterface serviceQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface
// lastSyncedNodes is used when reconciling node state and keeps track of // lastSyncedNodes is used when reconciling node state and keeps track of
// the last synced set of nodes. This field is concurrently safe because the // the last synced set of nodes per service key. This is accessed from the
// nodeQueue is serviced by only one go-routine, so node events are not // service and node controllers, hence it is protected by a lock.
// processed concurrently. lastSyncedNodes map[string][]*v1.Node
lastSyncedNodes []*v1.Node lastSyncedNodesLock sync.Mutex
} }
// New returns a new service controller to keep cloud provider service resources // New returns a new service controller to keep cloud provider service resources
@ -124,7 +124,7 @@ func New(
nodeListerSynced: nodeInformer.Informer().HasSynced, nodeListerSynced: nodeInformer.Informer().HasSynced,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
lastSyncedNodes: []*v1.Node{}, lastSyncedNodes: make(map[string][]*v1.Node),
} }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(nodes) == 0 { if len(nodes) == 0 {
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} }
c.storeLastSyncedNodes(service, nodes)
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return // - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols // an error for unsupported protocols
return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
if err != nil {
return nil, err
}
return status, nil
}
func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) {
c.lastSyncedNodesLock.Lock()
defer c.lastSyncedNodesLock.Unlock()
key, _ := cache.MetaNamespaceKeyFunc(svc)
c.lastSyncedNodes[key] = nodes
}
func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node {
c.lastSyncedNodesLock.Lock()
defer c.lastSyncedNodesLock.Unlock()
key, _ := cache.MetaNamespaceKeyFunc(svc)
return c.lastSyncedNodes[key]
} }
// ListKeys implements the interface required by DeltaFIFO to list the keys we // ListKeys implements the interface required by DeltaFIFO to list the keys we
@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
return true return true
} }
func serviceKeys(services []*v1.Service) sets.String {
ret := sets.NewString()
for _, service := range services {
key, _ := cache.MetaNamespaceKeyFunc(service)
ret.Insert(key)
}
return ret
}
func nodeNames(nodes []*v1.Node) sets.String { func nodeNames(nodes []*v1.Node) sets.String {
ret := sets.NewString() ret := sets.NewString()
for _, node := range nodes { for _, node := range nodes {
@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
// load balancers and finished doing it successfully, or didn't try to at all because // load balancers and finished doing it successfully, or didn't try to at all because
// there's no need. This function returns true if we tried to update load balancers and // there's no need. This function returns true if we tried to update load balancers and
// failed, indicating to the caller that we should try again. // failed, indicating to the caller that we should try again.
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { func (c *Controller) nodeSyncService(svc *v1.Service) bool {
const retSuccess = false const retSuccess = false
const retNeedRetry = true const retNeedRetry = true
if svc == nil || !wantsLoadBalancer(svc) { if svc == nil || !wantsLoadBalancer(svc) {
return retSuccess return retSuccess
} }
newNodes, err := listWithPredicates(c.nodeLister)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
nodeSyncErrorCount.Inc()
return retNeedRetry
}
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...)
// Store last synced nodes without actually determining if we successfully
// synced them or not. Failed node syncs are passed off to retries in the
// service queue, so no need to wait. If we don't store it now, we risk
// re-syncing all LBs twice, one from another sync in the node sync and
// from the service sync
c.storeLastSyncedNodes(svc, newNodes)
if nodesSufficientlyEqual(oldNodes, newNodes) { if nodesSufficientlyEqual(oldNodes, newNodes) {
return retSuccess return retSuccess
} }
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool {
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
// Include all nodes and let nodeSyncService filter and figure out if
// the update is relevant for the service in question.
nodes, err := listWithPredicates(c.nodeLister)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
return serviceKeys(services)
}
// lock for servicesToRetry // lock for servicesToRetry
servicesToRetry = sets.NewString() servicesToRetry = sets.NewString()
lock := sync.Mutex{} lock := sync.Mutex{}
doWork := func(piece int) { doWork := func(piece int) {
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
return return
} }
lock.Lock() lock.Lock()
@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
servicesToRetry.Insert(key) servicesToRetry.Insert(key)
} }
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
c.lastSyncedNodes = nodes
klog.V(4).Infof("Finished updateLoadBalancerHosts") klog.V(4).Infof("Finished updateLoadBalancerHosts")
return servicesToRetry return servicesToRetry
} }

View File

@ -23,6 +23,7 @@ import (
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service {
return newService("external-balancer", v1.ServiceTypeLoadBalancer) return newService("external-balancer", v1.ServiceTypeLoadBalancer)
} }
func alwaysReady() bool { return true } // newController creates a new service controller. Callers have the option to
// specify `stopChan` for test cases which might require running the
func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { // node/service informers and reacting to resource events. Callers can also
// specify `objects` which represent the initial state of objects, used to
// populate the client set / informer cache at start-up.
func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) {
cloud := &fakecloud.Cloud{} cloud := &fakecloud.Cloud{}
cloud.Region = region cloud.Region = region
kubeClient := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services() serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes() nodeInformer := informerFactory.Core().V1().Nodes()
@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
controller := &Controller{ controller := &Controller{
cloud: cloud, cloud: cloud,
kubeClient: kubeClient, kubeClient: kubeClient,
clusterName: "test-cluster", clusterName: "test-cluster",
cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, eventBroadcaster: broadcaster,
eventBroadcaster: broadcaster, eventRecorder: recorder,
eventRecorder: recorder, serviceLister: serviceInformer.Lister(),
nodeLister: newFakeNodeLister(nil), serviceListerSynced: serviceInformer.Informer().HasSynced,
nodeListerSynced: nodeInformer.Informer().HasSynced, nodeLister: nodeInformer.Lister(),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeListerSynced: nodeInformer.Informer().HasSynced,
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
lastSyncedNodes: []*v1.Node{}, nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
lastSyncedNodes: make(map[string][]*v1.Node),
} }
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
serviceMap := make(map[string]*cachedService)
services, _ := serviceInformer.Lister().List(labels.Everything())
for _, service := range services {
serviceMap[service.Name] = &cachedService{
state: service,
}
}
controller.cache = &serviceCache{serviceMap: serviceMap}
balancer, _ := cloud.LoadBalancer() balancer, _ := cloud.LoadBalancer()
controller.balancer = balancer controller.balancer = balancer
controller.serviceLister = serviceInformer.Lister()
controller.nodeListerSynced = alwaysReady
controller.serviceListerSynced = alwaysReady
controller.eventRecorder = record.NewFakeRecorder(100) controller.eventRecorder = record.NewFakeRecorder(100)
cloud.Calls = nil // ignore any cloud calls made in init() cloud.Calls = nil // ignore any cloud calls made in init()
@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
controller, cloud, client := newController() controller, cloud, client := newController(nil)
cloud.Exists = tc.lbExists cloud.Exists = tc.lbExists
key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name)
if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil {
@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
t.Run(item.desc, func(t *testing.T) { t.Run(item.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
controller, cloud, _ := newController() controller, cloud, _ := newController(nil)
controller.nodeLister = newFakeNodeLister(nil, nodes...) controller.nodeLister = newFakeNodeLister(nil, nodes...)
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
@ -590,11 +604,15 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
}} { }} {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
controller, cloud, _ := newController() controller, cloud, _ := newController(nil)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
controller.lastSyncedNodes = tc.initialState for _, svc := range services {
key, _ := cache.MetaNamespaceKeyFunc(svc)
controller.lastSyncedNodes[key] = tc.initialState
}
for _, state := range tc.stateChanges { for _, state := range tc.stateChanges {
setupState := func() { setupState := func() {
@ -762,11 +780,15 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
}} { }} {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
controller, cloud, _ := newController() controller, cloud, _ := newController(nil)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
controller.lastSyncedNodes = tc.initialState for _, svc := range services {
key, _ := cache.MetaNamespaceKeyFunc(svc)
controller.lastSyncedNodes[key] = tc.initialState
}
for _, state := range tc.stateChanges { for _, state := range tc.stateChanges {
setupState := func() { setupState := func() {
@ -806,7 +828,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName()))
} }
controller, cloud, _ := newController() controller, cloud, _ := newController(nil)
for _, tc := range []struct { for _, tc := range []struct {
desc string desc string
nodes []*v1.Node nodes []*v1.Node
@ -901,8 +923,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall
} }
} }
// compareHostSets compares if the nodes in left are in right, despite the order.
func compareHostSets(t *testing.T, left, right []*v1.Node) bool {
if len(left) != len(right) {
return false
}
for _, lHost := range left {
found := false
for _, rHost := range right {
if reflect.DeepEqual(lHost, rHost) {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func TestNodesNotEqual(t *testing.T) { func TestNodesNotEqual(t *testing.T) {
controller, cloud, _ := newController() controller, cloud, _ := newController(nil)
services := []*v1.Service{ services := []*v1.Service{
newService("s0", v1.ServiceTypeLoadBalancer), newService("s0", v1.ServiceTypeLoadBalancer),
@ -952,7 +994,12 @@ func TestNodesNotEqual(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
controller.lastSyncedNodes = tc.lastSyncNodes
for _, svc := range services {
key, _ := cache.MetaNamespaceKeyFunc(svc)
controller.lastSyncedNodes[key] = tc.lastSyncNodes
}
controller.updateLoadBalancerHosts(ctx, services, 5) controller.updateLoadBalancerHosts(ctx, services, 5)
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{} cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{}
@ -961,7 +1008,7 @@ func TestNodesNotEqual(t *testing.T) {
} }
func TestProcessServiceCreateOrUpdate(t *testing.T) { func TestProcessServiceCreateOrUpdate(t *testing.T) {
controller, _, client := newController() controller, _, client := newController(nil)
//A pair of old and new loadbalancer IP address //A pair of old and new loadbalancer IP address
oldLBIP := "192.168.1.1" oldLBIP := "192.168.1.1"
@ -1076,7 +1123,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) {
svc := newService(svcName, v1.ServiceTypeLoadBalancer) svc := newService(svcName, v1.ServiceTypeLoadBalancer)
// Preset finalizer so k8s error only happens when patching status. // Preset finalizer so k8s error only happens when patching status.
svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer}
controller, _, client := newController() controller, _, client := newController(nil)
client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.k8sErr return true, nil, tc.k8sErr
}) })
@ -1120,7 +1167,7 @@ func TestSyncService(t *testing.T) {
testName: "if an invalid service name is synced", testName: "if an invalid service name is synced",
key: "invalid/key/string", key: "invalid/key/string",
updateFn: func() { updateFn: func() {
controller, _, _ = newController() controller, _, _ = newController(nil)
}, },
expectedFn: func(e error) error { expectedFn: func(e error) error {
//TODO: should find a way to test for dependent package errors in such a way that it won't break //TODO: should find a way to test for dependent package errors in such a way that it won't break
@ -1152,7 +1199,7 @@ func TestSyncService(t *testing.T) {
key: "external-balancer", key: "external-balancer",
updateFn: func() { updateFn: func() {
testSvc := defaultExternalService() testSvc := defaultExternalService()
controller, _, _ = newController() controller, _, _ = newController(nil)
controller.enqueueService(testSvc) controller.enqueueService(testSvc)
svc := controller.cache.getOrCreate("external-balancer") svc := controller.cache.getOrCreate("external-balancer")
svc.state = testSvc svc.state = testSvc
@ -1258,7 +1305,7 @@ func TestProcessServiceDeletion(t *testing.T) {
defer cancel() defer cancel()
//Create a new controller. //Create a new controller.
controller, cloud, _ = newController() controller, cloud, _ = newController(nil)
tc.updateFn(controller) tc.updateFn(controller)
obtainedErr := controller.processServiceDeletion(ctx, svcKey) obtainedErr := controller.processServiceDeletion(ctx, svcKey)
if err := tc.expectedFn(obtainedErr); err != nil { if err := tc.expectedFn(obtainedErr); err != nil {
@ -1333,8 +1380,96 @@ func TestNeedsCleanup(t *testing.T) {
} }
func TestNeedsUpdate(t *testing.T) { // This tests a service update while a slow node sync is happening. If we have multiple
// services to process from a node sync: each service will experience a sync delta.
// If a new Node is added and a service is synced while this happens: we want to
// make sure that the slow node sync never removes the Node from LB set because it
// has stale data.
func TestSlowNodeSync(t *testing.T) {
stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string)
defer close(stopCh)
defer close(syncService)
node1 := makeNode(tweakName("node1"))
node2 := makeNode(tweakName("node2"))
node3 := makeNode(tweakName("node3"))
service1 := newService("service1", v1.ServiceTypeLoadBalancer)
service2 := newService("service2", v1.ServiceTypeLoadBalancer)
sKey1, _ := cache.MetaNamespaceKeyFunc(service1)
sKey2, _ := cache.MetaNamespaceKeyFunc(service2)
serviceKeys := sets.New(sKey1, sKey2)
controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2)
cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) {
key, _ := cache.MetaNamespaceKeyFunc(update.Service)
impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0]
syncService <- impactedService
<-syncServiceDone
}
cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) {
syncServiceDone <- update.Service.Name
}
// Two update calls are expected. This is because this test calls
// controller.syncNodes once with two existing services, but with one
// controller.syncService while that is happening. The end result is
// therefore two update calls - since the second controller.syncNodes won't
// trigger an update call because the syncService already did. Each update
// call takes cloudProvider.RequestDelay to process. The test asserts that
// the order of the Hosts defined by the update calls is respected, but
// doesn't necessarily assert the order of the Service. This is because the
// controller implementation doesn't use a deterministic order when syncing
// services. The test therefor works out which service is impacted by the
// slow node sync (which will be whatever service is not synced first) and
// then validates that the Hosts for each update call is respected.
expectedUpdateCalls := []fakecloud.UpdateBalancerCall{
// First update call for first service from controller.syncNodes
{Service: service1, Hosts: []*v1.Node{node1, node2}},
}
expectedEnsureCalls := []fakecloud.UpdateBalancerCall{
// Second update call for impacted service from controller.syncService
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
controller.syncNodes(context.TODO(), 1)
}()
key := <-syncService
if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil {
t.Fatalf("error creating node3, err: %v", err)
}
// Sync the service
if err := controller.syncService(context.TODO(), key); err != nil {
t.Fatalf("unexpected service sync error, err: %v", err)
}
wg.Wait()
if len(expectedUpdateCalls) != len(cloudProvider.UpdateCalls) {
t.Fatalf("unexpected amount of update calls, expected: %v, got: %v", len(expectedUpdateCalls), len(cloudProvider.UpdateCalls))
}
for idx, update := range cloudProvider.UpdateCalls {
if !compareHostSets(t, expectedUpdateCalls[idx].Hosts, update.Hosts) {
t.Fatalf("unexpected updated hosts for update: %v, expected: %v, got: %v", idx, expectedUpdateCalls[idx].Hosts, update.Hosts)
}
}
if len(expectedEnsureCalls) != len(cloudProvider.EnsureCalls) {
t.Fatalf("unexpected amount of ensure calls, expected: %v, got: %v", len(expectedEnsureCalls), len(cloudProvider.EnsureCalls))
}
for idx, ensure := range cloudProvider.EnsureCalls {
if !compareHostSets(t, expectedEnsureCalls[idx].Hosts, ensure.Hosts) {
t.Fatalf("unexpected updated hosts for ensure: %v, expected: %v, got: %v", idx, expectedEnsureCalls[idx].Hosts, ensure.Hosts)
}
}
}
func TestNeedsUpdate(t *testing.T) {
testCases := []struct { testCases := []struct {
testName string //Name of the test case testName string //Name of the test case
updateFn func() (*v1.Service, *v1.Service) //Function to update the service object updateFn func() (*v1.Service, *v1.Service) //Function to update the service object
@ -1494,7 +1629,7 @@ func TestNeedsUpdate(t *testing.T) {
expectedNeedsUpdate: true, expectedNeedsUpdate: true,
}} }}
controller, _, _ := newController() controller, _, _ := newController(nil)
for _, tc := range testCases { for _, tc := range testCases {
oldSvc, newSvc := tc.updateFn() oldSvc, newSvc := tc.updateFn()
obtainedResult := controller.needsUpdate(oldSvc, newSvc) obtainedResult := controller.needsUpdate(oldSvc, newSvc)
@ -2441,7 +2576,7 @@ func TestServiceQueueDelay(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
controller, cloud, client := newController() controller, cloud, client := newController(nil)
queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")}
controller.serviceQueue = queue controller.serviceQueue = queue
cloud.Err = tc.lbCloudErr cloud.Err = tc.lbCloudErr

View File

@ -73,24 +73,29 @@ type Cloud struct {
ErrShutdownByProviderID error ErrShutdownByProviderID error
MetadataErr error MetadataErr error
Calls []string Calls []string
Addresses []v1.NodeAddress Addresses []v1.NodeAddress
addressesMux sync.Mutex addressesMux sync.Mutex
ExtID map[types.NodeName]string ExtID map[types.NodeName]string
ExtIDErr map[types.NodeName]error ExtIDErr map[types.NodeName]error
InstanceTypes map[types.NodeName]string InstanceTypes map[types.NodeName]string
Machines []types.NodeName Machines []types.NodeName
NodeResources *v1.NodeResources NodeResources *v1.NodeResources
ClusterList []string ClusterList []string
MasterName string MasterName string
ExternalIP net.IP ExternalIP net.IP
Balancers map[string]Balancer Balancers map[string]Balancer
UpdateCalls []UpdateBalancerCall updateCallLock sync.Mutex
RouteMap map[string]*Route UpdateCalls []UpdateBalancerCall
Lock sync.Mutex ensureCallLock sync.Mutex
Provider string EnsureCalls []UpdateBalancerCall
ProviderID map[types.NodeName]string EnsureCallCb func(UpdateBalancerCall)
addCallLock sync.Mutex UpdateCallCb func(UpdateBalancerCall)
RouteMap map[string]*Route
Lock sync.Mutex
Provider string
ProviderID map[types.NodeName]string
addCallLock sync.Mutex
cloudprovider.Zone cloudprovider.Zone
VolumeLabelMap map[string]map[string]string VolumeLabelMap map[string]map[string]string
@ -201,6 +206,7 @@ func (f *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, ser
// It adds an entry "create" into the internal method call record. // It adds an entry "create" into the internal method call record.
func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
f.addCall("create") f.addCall("create")
f.markEnsureCall(service, nodes)
if f.Balancers == nil { if f.Balancers == nil {
f.Balancers = make(map[string]Balancer) f.Balancers = make(map[string]Balancer)
} }
@ -222,13 +228,31 @@ func (f *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, serv
return status, f.Err return status, f.Err
} }
func (f *Cloud) markUpdateCall(service *v1.Service, nodes []*v1.Node) {
f.updateCallLock.Lock()
defer f.updateCallLock.Unlock()
update := UpdateBalancerCall{service, nodes}
f.UpdateCalls = append(f.UpdateCalls, update)
if f.UpdateCallCb != nil {
f.UpdateCallCb(update)
}
}
func (f *Cloud) markEnsureCall(service *v1.Service, nodes []*v1.Node) {
f.ensureCallLock.Lock()
defer f.ensureCallLock.Unlock()
update := UpdateBalancerCall{service, nodes}
f.EnsureCalls = append(f.EnsureCalls, update)
if f.EnsureCallCb != nil {
f.EnsureCallCb(update)
}
}
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record. // It adds an entry "update" into the internal method call record.
func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { func (f *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
f.addCall("update") f.addCall("update")
f.Lock.Lock() f.markUpdateCall(service, nodes)
defer f.Lock.Unlock()
f.UpdateCalls = append(f.UpdateCalls, UpdateBalancerCall{service, nodes})
return f.Err return f.Err
} }