mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #111663 from alexanderConstantinescu/ccm-service-sync-cleanup
[CCM - service controller] Clean up node sync and fix re-sync of failed services.
This commit is contained in:
commit
448575bbb0
@ -74,12 +74,10 @@ type serviceCache struct {
|
||||
// Controller keeps cloud provider service resources
|
||||
// (like load balancers) in sync with the registry.
|
||||
type Controller struct {
|
||||
cloud cloudprovider.Interface
|
||||
knownHosts []*v1.Node
|
||||
servicesToUpdate sets.String
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
cloud cloudprovider.Interface
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
// TODO(#85155): Stop relying on this and remove the cache completely.
|
||||
cache *serviceCache
|
||||
serviceLister corelisters.ServiceLister
|
||||
@ -88,18 +86,13 @@ type Controller struct {
|
||||
eventRecorder record.EventRecorder
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
// services that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time
|
||||
// and protects internal states (needFullSync) of nodeSync
|
||||
nodeSyncLock sync.Mutex
|
||||
// nodeSyncCh triggers nodeSyncLoop to run
|
||||
nodeSyncCh chan interface{}
|
||||
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
||||
needFullSync bool
|
||||
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
||||
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
||||
// services and nodes that need to be synced
|
||||
serviceQueue workqueue.RateLimitingInterface
|
||||
nodeQueue workqueue.RateLimitingInterface
|
||||
// lastSyncedNodes is used when reconciling node state and keeps track of
|
||||
// the last synced set of nodes. This field is concurrently safe because the
|
||||
// nodeQueue is serviced by only one go-routine, so node events are not
|
||||
// processed concurrently.
|
||||
lastSyncedNodes []*v1.Node
|
||||
}
|
||||
|
||||
@ -125,7 +118,6 @@ func New(
|
||||
registerMetrics()
|
||||
s := &Controller{
|
||||
cloud: cloud,
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
@ -133,10 +125,9 @@ func New(
|
||||
eventRecorder: recorder,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
|
||||
nodeSyncCh: make(chan interface{}, 1),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
}
|
||||
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
@ -167,7 +158,7 @@ func New(
|
||||
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(cur interface{}) {
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(cur)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldNode, ok := old.(*v1.Node)
|
||||
@ -184,13 +175,13 @@ func New(
|
||||
return
|
||||
}
|
||||
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(curNode)
|
||||
},
|
||||
DeleteFunc: func(old interface{}) {
|
||||
s.triggerNodeSync()
|
||||
s.enqueueNode(old)
|
||||
},
|
||||
},
|
||||
time.Duration(0),
|
||||
nodeSyncPeriod,
|
||||
)
|
||||
|
||||
if err := s.init(); err != nil {
|
||||
@ -200,15 +191,6 @@ func New(
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false.
|
||||
func (c *Controller) needFullSyncAndUnmark() bool {
|
||||
c.nodeSyncLock.Lock()
|
||||
defer c.nodeSyncLock.Unlock()
|
||||
ret := c.needFullSync
|
||||
c.needFullSync = false
|
||||
return ret
|
||||
}
|
||||
|
||||
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (c *Controller) enqueueService(obj interface{}) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
@ -216,7 +198,17 @@ func (c *Controller) enqueueService(obj interface{}) {
|
||||
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
c.queue.Add(key)
|
||||
c.serviceQueue.Add(key)
|
||||
}
|
||||
|
||||
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (c *Controller) enqueueNode(obj interface{}) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
c.nodeQueue.Add(key)
|
||||
}
|
||||
|
||||
// Run starts a background goroutine that watches for changes to services that
|
||||
@ -231,7 +223,8 @@ func (c *Controller) enqueueService(obj interface{}) {
|
||||
// object.
|
||||
func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
|
||||
defer runtime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
defer c.serviceQueue.ShutDown()
|
||||
defer c.nodeQueue.ShutDown()
|
||||
|
||||
// Start event processing pipeline.
|
||||
c.eventBroadcaster.StartStructuredLogging(0)
|
||||
@ -248,81 +241,60 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.UntilWithContext(ctx, c.worker, time.Second)
|
||||
go wait.UntilWithContext(ctx, c.serviceWorker, time.Second)
|
||||
}
|
||||
|
||||
go c.nodeSyncLoop(ctx, workers)
|
||||
go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done())
|
||||
// Initialize one go-routine servicing node events. This ensure we only
|
||||
// process one node at any given moment in time
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second)
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// triggerNodeSync triggers a nodeSync asynchronously
|
||||
func (c *Controller) triggerNodeSync() {
|
||||
c.nodeSyncLock.Lock()
|
||||
defer c.nodeSyncLock.Unlock()
|
||||
newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
||||
// if node list cannot be retrieve, trigger full node sync to be safe.
|
||||
c.needFullSync = true
|
||||
} else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) {
|
||||
// Here the last known state is recorded as knownHosts. For each
|
||||
// LB update, the latest node list is retrieved. This is to prevent
|
||||
// a stale set of nodes were used to be update loadbalancers when
|
||||
// there are many loadbalancers in the clusters. nodeSyncInternal
|
||||
// would be triggered until all loadbalancers are updated to the new state.
|
||||
klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services")
|
||||
c.needFullSync = true
|
||||
c.knownHosts = newHosts
|
||||
}
|
||||
|
||||
select {
|
||||
case c.nodeSyncCh <- struct{}{}:
|
||||
klog.V(4).Info("Triggering nodeSync")
|
||||
return
|
||||
default:
|
||||
klog.V(4).Info("A pending nodeSync is already in queue")
|
||||
return
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (c *Controller) serviceWorker(ctx context.Context) {
|
||||
for c.processNextServiceItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (c *Controller) worker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
func (c *Controller) nodeWorker(ctx context.Context, workers int) {
|
||||
for c.processNextNodeItem(ctx, workers) {
|
||||
}
|
||||
}
|
||||
|
||||
// nodeSyncLoop takes nodeSync signal and triggers nodeSync
|
||||
func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) {
|
||||
klog.V(4).Info("nodeSyncLoop Started")
|
||||
for {
|
||||
select {
|
||||
case <-c.nodeSyncCh:
|
||||
klog.V(4).Info("nodeSync has been triggered")
|
||||
c.nodeSyncInternal(ctx, workers)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := c.queue.Get()
|
||||
func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool {
|
||||
key, quit := c.nodeQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
defer c.nodeQueue.Done(key)
|
||||
|
||||
for serviceToRetry := range c.syncNodes(ctx, workers) {
|
||||
c.serviceQueue.Add(serviceToRetry)
|
||||
}
|
||||
|
||||
c.nodeQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Controller) processNextServiceItem(ctx context.Context) bool {
|
||||
key, quit := c.serviceQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.serviceQueue.Done(key)
|
||||
|
||||
err := c.syncService(ctx, key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
c.serviceQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
c.serviceQueue.AddRateLimited(key)
|
||||
return true
|
||||
}
|
||||
|
||||
@ -688,13 +660,6 @@ func nodeNames(nodes []*v1.Node) sets.String {
|
||||
return ret
|
||||
}
|
||||
|
||||
func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
}
|
||||
return nodeNames(x).Equal(nodeNames(y))
|
||||
}
|
||||
|
||||
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||
// Evaluate the individual node exclusion predicate before evaluating the
|
||||
// compounded result of all predicates. We don't sync ETP=local services
|
||||
@ -712,43 +677,23 @@ func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
|
||||
}
|
||||
|
||||
// nodeSyncInternal handles updating the hosts pointed to by all load
|
||||
// syncNodes handles updating the hosts pointed to by all load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
||||
func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
latency := time.Since(startTime).Seconds()
|
||||
klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency)
|
||||
klog.V(4).Infof("It took %v seconds to finish syncNodes", latency)
|
||||
nodeSyncLatency.Observe(latency)
|
||||
}()
|
||||
|
||||
if !c.needFullSyncAndUnmark() {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
// It is required to call `c.cache.get()` on each Service in case there was
|
||||
// an update event that occurred between retries.
|
||||
var servicesToUpdate []*v1.Service
|
||||
for key := range c.servicesToUpdate {
|
||||
cachedService, exist := c.cache.get(key)
|
||||
if !exist {
|
||||
klog.Errorf("Service %q should be in the cache but not", key)
|
||||
continue
|
||||
}
|
||||
servicesToUpdate = append(servicesToUpdate, cachedService.state)
|
||||
}
|
||||
|
||||
c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
||||
return
|
||||
}
|
||||
klog.V(2).Infof("Syncing backends for all LB services.")
|
||||
|
||||
// Try updating all services, and save the failed ones to try again next
|
||||
// round.
|
||||
servicesToUpdate := c.cache.allServices()
|
||||
numServices := len(servicesToUpdate)
|
||||
c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
||||
servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
||||
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(c.servicesToUpdate), numServices)
|
||||
numServices-len(servicesToRetry), numServices)
|
||||
return servicesToRetry
|
||||
}
|
||||
|
||||
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
||||
@ -803,7 +748,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
|
||||
key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name)
|
||||
servicesToRetry.Insert(key)
|
||||
}
|
||||
|
||||
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
||||
c.lastSyncedNodes = nodes
|
||||
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -46,8 +47,6 @@ import (
|
||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const region = "us-central"
|
||||
@ -101,7 +100,6 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
||||
|
||||
controller := &Controller{
|
||||
cloud: cloud,
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: "test-cluster",
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
@ -109,8 +107,8 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
||||
eventRecorder: recorder,
|
||||
nodeLister: newFakeNodeLister(nil),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeSyncCh: make(chan interface{}, 1),
|
||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||
lastSyncedNodes: []*v1.Node{},
|
||||
}
|
||||
|
||||
@ -566,7 +564,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
defer cancel()
|
||||
controller, cloud, _ := newController()
|
||||
controller.nodeLister = newFakeNodeLister(nil, nodes...)
|
||||
|
||||
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
|
||||
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
|
||||
}
|
||||
@ -908,7 +905,7 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) {
|
||||
cachedServiceTest.state = svc
|
||||
controller.cache.set(keyExpected, cachedServiceTest)
|
||||
|
||||
keyGot, quit := controller.queue.Get()
|
||||
keyGot, quit := controller.serviceQueue.Get()
|
||||
if quit {
|
||||
t.Fatalf("get no queue element")
|
||||
}
|
||||
@ -1516,28 +1513,6 @@ func TestServiceCache(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test a utility functions as it's not easy to unit test nodeSyncInternal directly
|
||||
func TestNodeSlicesEqualForLB(t *testing.T) {
|
||||
numNodes := 10
|
||||
nArray := make([]*v1.Node, numNodes)
|
||||
mArray := make([]*v1.Node, numNodes)
|
||||
for i := 0; i < numNodes; i++ {
|
||||
nArray[i] = &v1.Node{}
|
||||
nArray[i].Name = fmt.Sprintf("node%d", i)
|
||||
}
|
||||
for i := 0; i < numNodes; i++ {
|
||||
mArray[i] = &v1.Node{}
|
||||
mArray[i].Name = fmt.Sprintf("node%d", i+1)
|
||||
}
|
||||
|
||||
if !nodeSlicesEqualForLB(nArray, nArray) {
|
||||
t.Errorf("nodeSlicesEqualForLB() Expected=true Obtained=false")
|
||||
}
|
||||
if nodeSlicesEqualForLB(nArray, mArray) {
|
||||
t.Errorf("nodeSlicesEqualForLB() Expected=false Obtained=true")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(@MrHohn): Verify the end state when below issue is resolved:
|
||||
// https://github.com/kubernetes/client-go/issues/607
|
||||
func TestAddFinalizer(t *testing.T) {
|
||||
@ -3209,66 +3184,6 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggerNodeSync(t *testing.T) {
|
||||
controller, _, _ := newController()
|
||||
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
controller.triggerNodeSync()
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, true)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
controller.triggerNodeSync()
|
||||
controller.triggerNodeSync()
|
||||
controller.triggerNodeSync()
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, true)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||
}
|
||||
|
||||
func TestMarkAndUnmarkFullSync(t *testing.T) {
|
||||
controller, _, _ := newController()
|
||||
if controller.needFullSync != false {
|
||||
t.Errorf("expect controller.needFullSync to be false, but got true")
|
||||
}
|
||||
|
||||
ret := controller.needFullSyncAndUnmark()
|
||||
if ret != false {
|
||||
t.Errorf("expect ret == false, but got true")
|
||||
}
|
||||
|
||||
ret = controller.needFullSyncAndUnmark()
|
||||
if ret != false {
|
||||
t.Errorf("expect ret == false, but got true")
|
||||
}
|
||||
controller.needFullSync = true
|
||||
ret = controller.needFullSyncAndUnmark()
|
||||
if ret != true {
|
||||
t.Errorf("expect ret == true, but got false")
|
||||
}
|
||||
ret = controller.needFullSyncAndUnmark()
|
||||
if ret != false {
|
||||
t.Errorf("expect ret == false, but got true")
|
||||
}
|
||||
}
|
||||
|
||||
func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
t.Errorf("The channel is closed")
|
||||
}
|
||||
if !expectValue {
|
||||
t.Errorf("Does not expect value from the channel, but got a value")
|
||||
}
|
||||
default:
|
||||
if expectValue {
|
||||
t.Errorf("Expect value from the channel, but got none")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeNodeLister struct {
|
||||
cache []*v1.Node
|
||||
err error
|
||||
|
Loading…
Reference in New Issue
Block a user