mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-23 18:40:53 +00:00
Cleanup service sync path
It dawned on me that `needsFullSync` can never be false. `needsFullSync` was used to compare the set of nodes that were existing last time the node event handler was triggered, with the current set of node for this run. However, if `triggerNodeSync` gets called it's always because the set of nodes have changed due to a condition changing on one node, or a new node being added/removed. If `needsFullSync` can never be false then a lot of things in the service sync path was just spurious, for ex: `servicesToRetry`, `knownHosts`. Essentially: if we ever need to `triggerNodeSync` then the set of nodes have somehow changed and we always need to re-sync all services. Before this patch series there was a possibility for `needsFullSync` to be set to false. `shouldSyncNode` and the predicates used to list nodes were not aligned, specifically for Unschedulable nodes. This means that we could have been triggered by a change to the schedulable state but not actually computed any diffs between the old vs. new nodes. Meaning, whenever there was a change in schedulable state we would just try to re-sync all service updates that might have failed when we synced last time. But I believe this to be an overlooked coincidence, rather than something actually intended.
This commit is contained in:
parent
bd9444c1cf
commit
72c1c6559e
@ -75,8 +75,6 @@ type serviceCache struct {
|
|||||||
// (like load balancers) in sync with the registry.
|
// (like load balancers) in sync with the registry.
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
knownHosts []*v1.Node
|
|
||||||
servicesToUpdate sets.String
|
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
clusterName string
|
clusterName string
|
||||||
balancer cloudprovider.LoadBalancer
|
balancer cloudprovider.LoadBalancer
|
||||||
@ -96,8 +94,6 @@ type Controller struct {
|
|||||||
nodeSyncLock sync.Mutex
|
nodeSyncLock sync.Mutex
|
||||||
// nodeSyncCh triggers nodeSyncLoop to run
|
// nodeSyncCh triggers nodeSyncLoop to run
|
||||||
nodeSyncCh chan interface{}
|
nodeSyncCh chan interface{}
|
||||||
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
|
|
||||||
needFullSync bool
|
|
||||||
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
// lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of
|
||||||
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
// nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock
|
||||||
lastSyncedNodes []*v1.Node
|
lastSyncedNodes []*v1.Node
|
||||||
@ -125,7 +121,6 @@ func New(
|
|||||||
registerMetrics()
|
registerMetrics()
|
||||||
s := &Controller{
|
s := &Controller{
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
knownHosts: []*v1.Node{},
|
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
clusterName: clusterName,
|
clusterName: clusterName,
|
||||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||||
@ -200,15 +195,6 @@ func New(
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false.
|
|
||||||
func (c *Controller) needFullSyncAndUnmark() bool {
|
|
||||||
c.nodeSyncLock.Lock()
|
|
||||||
defer c.nodeSyncLock.Unlock()
|
|
||||||
ret := c.needFullSync
|
|
||||||
c.needFullSync = false
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
|
||||||
func (c *Controller) enqueueService(obj interface{}) {
|
func (c *Controller) enqueueService(obj interface{}) {
|
||||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||||
@ -261,22 +247,6 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr
|
|||||||
func (c *Controller) triggerNodeSync() {
|
func (c *Controller) triggerNodeSync() {
|
||||||
c.nodeSyncLock.Lock()
|
c.nodeSyncLock.Lock()
|
||||||
defer c.nodeSyncLock.Unlock()
|
defer c.nodeSyncLock.Unlock()
|
||||||
newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...)
|
|
||||||
if err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
|
||||||
// if node list cannot be retrieve, trigger full node sync to be safe.
|
|
||||||
c.needFullSync = true
|
|
||||||
} else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) {
|
|
||||||
// Here the last known state is recorded as knownHosts. For each
|
|
||||||
// LB update, the latest node list is retrieved. This is to prevent
|
|
||||||
// a stale set of nodes were used to be update loadbalancers when
|
|
||||||
// there are many loadbalancers in the clusters. nodeSyncInternal
|
|
||||||
// would be triggered until all loadbalancers are updated to the new state.
|
|
||||||
klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services")
|
|
||||||
c.needFullSync = true
|
|
||||||
c.knownHosts = newHosts
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case c.nodeSyncCh <- struct{}{}:
|
case c.nodeSyncCh <- struct{}{}:
|
||||||
klog.V(4).Info("Triggering nodeSync")
|
klog.V(4).Info("Triggering nodeSync")
|
||||||
@ -688,13 +658,6 @@ func nodeNames(nodes []*v1.Node) sets.String {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
|
||||||
if len(x) != len(y) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return nodeNames(x).Equal(nodeNames(y))
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
|
||||||
// Evaluate the individual node exclusion predicate before evaluating the
|
// Evaluate the individual node exclusion predicate before evaluating the
|
||||||
// compounded result of all predicates. We don't sync ETP=local services
|
// compounded result of all predicates. We don't sync ETP=local services
|
||||||
@ -722,33 +685,12 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) {
|
|||||||
nodeSyncLatency.Observe(latency)
|
nodeSyncLatency.Observe(latency)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if !c.needFullSyncAndUnmark() {
|
|
||||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
|
||||||
// updating any services that we failed to update last time around.
|
|
||||||
// It is required to call `c.cache.get()` on each Service in case there was
|
|
||||||
// an update event that occurred between retries.
|
|
||||||
var servicesToUpdate []*v1.Service
|
|
||||||
for key := range c.servicesToUpdate {
|
|
||||||
cachedService, exist := c.cache.get(key)
|
|
||||||
if !exist {
|
|
||||||
klog.Errorf("Service %q should be in the cache but not", key)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
servicesToUpdate = append(servicesToUpdate, cachedService.state)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(2).Infof("Syncing backends for all LB services.")
|
klog.V(2).Infof("Syncing backends for all LB services.")
|
||||||
|
|
||||||
// Try updating all services, and save the failed ones to try again next
|
|
||||||
// round.
|
|
||||||
servicesToUpdate := c.cache.allServices()
|
servicesToUpdate := c.cache.allServices()
|
||||||
numServices := len(servicesToUpdate)
|
numServices := len(servicesToUpdate)
|
||||||
c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
|
||||||
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||||
numServices-len(c.servicesToUpdate), numServices)
|
numServices-len(servicesToRetry), numServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
// nodeSyncService syncs the nodes for one load balancer type service. The return value
|
||||||
@ -803,7 +745,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
|
|||||||
key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name)
|
key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name)
|
||||||
servicesToRetry.Insert(key)
|
servicesToRetry.Insert(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
||||||
c.lastSyncedNodes = nodes
|
c.lastSyncedNodes = nodes
|
||||||
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -46,8 +47,6 @@ import (
|
|||||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||||
|
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const region = "us-central"
|
const region = "us-central"
|
||||||
@ -101,7 +100,6 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
|||||||
|
|
||||||
controller := &Controller{
|
controller := &Controller{
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
knownHosts: []*v1.Node{},
|
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
clusterName: "test-cluster",
|
clusterName: "test-cluster",
|
||||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||||
@ -566,7 +564,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
controller, cloud, _ := newController()
|
controller, cloud, _ := newController()
|
||||||
controller.nodeLister = newFakeNodeLister(nil, nodes...)
|
controller.nodeLister = newFakeNodeLister(nil, nodes...)
|
||||||
|
|
||||||
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
|
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
|
||||||
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
|
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
|
||||||
}
|
}
|
||||||
@ -1516,28 +1513,6 @@ func TestServiceCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test a utility functions as it's not easy to unit test nodeSyncInternal directly
|
|
||||||
func TestNodeSlicesEqualForLB(t *testing.T) {
|
|
||||||
numNodes := 10
|
|
||||||
nArray := make([]*v1.Node, numNodes)
|
|
||||||
mArray := make([]*v1.Node, numNodes)
|
|
||||||
for i := 0; i < numNodes; i++ {
|
|
||||||
nArray[i] = &v1.Node{}
|
|
||||||
nArray[i].Name = fmt.Sprintf("node%d", i)
|
|
||||||
}
|
|
||||||
for i := 0; i < numNodes; i++ {
|
|
||||||
mArray[i] = &v1.Node{}
|
|
||||||
mArray[i].Name = fmt.Sprintf("node%d", i+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !nodeSlicesEqualForLB(nArray, nArray) {
|
|
||||||
t.Errorf("nodeSlicesEqualForLB() Expected=true Obtained=false")
|
|
||||||
}
|
|
||||||
if nodeSlicesEqualForLB(nArray, mArray) {
|
|
||||||
t.Errorf("nodeSlicesEqualForLB() Expected=false Obtained=true")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(@MrHohn): Verify the end state when below issue is resolved:
|
// TODO(@MrHohn): Verify the end state when below issue is resolved:
|
||||||
// https://github.com/kubernetes/client-go/issues/607
|
// https://github.com/kubernetes/client-go/issues/607
|
||||||
func TestAddFinalizer(t *testing.T) {
|
func TestAddFinalizer(t *testing.T) {
|
||||||
@ -3227,32 +3202,6 @@ func TestTriggerNodeSync(t *testing.T) {
|
|||||||
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
tryReadFromChannel(t, controller.nodeSyncCh, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMarkAndUnmarkFullSync(t *testing.T) {
|
|
||||||
controller, _, _ := newController()
|
|
||||||
if controller.needFullSync != false {
|
|
||||||
t.Errorf("expect controller.needFullSync to be false, but got true")
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := controller.needFullSyncAndUnmark()
|
|
||||||
if ret != false {
|
|
||||||
t.Errorf("expect ret == false, but got true")
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = controller.needFullSyncAndUnmark()
|
|
||||||
if ret != false {
|
|
||||||
t.Errorf("expect ret == false, but got true")
|
|
||||||
}
|
|
||||||
controller.needFullSync = true
|
|
||||||
ret = controller.needFullSyncAndUnmark()
|
|
||||||
if ret != true {
|
|
||||||
t.Errorf("expect ret == true, but got false")
|
|
||||||
}
|
|
||||||
ret = controller.needFullSyncAndUnmark()
|
|
||||||
if ret != false {
|
|
||||||
t.Errorf("expect ret == false, but got true")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) {
|
func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) {
|
||||||
select {
|
select {
|
||||||
case _, ok := <-ch:
|
case _, ok := <-ch:
|
||||||
|
Loading…
Reference in New Issue
Block a user