mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 13:55:41 +00:00
KCCM: fix slow node sync + service update
This commit is contained in:
parent
a8673fa5b4
commit
60338c79d7
@ -93,10 +93,10 @@ type Controller struct {
|
|||||||
serviceQueue workqueue.RateLimitingInterface
|
serviceQueue workqueue.RateLimitingInterface
|
||||||
nodeQueue workqueue.RateLimitingInterface
|
nodeQueue workqueue.RateLimitingInterface
|
||||||
// lastSyncedNodes is used when reconciling node state and keeps track of
|
// lastSyncedNodes is used when reconciling node state and keeps track of
|
||||||
// the last synced set of nodes. This field is concurrently safe because the
|
// the last synced set of nodes per service key. This is accessed from the
|
||||||
// nodeQueue is serviced by only one go-routine, so node events are not
|
// service and node controllers, hence it is protected by a lock.
|
||||||
// processed concurrently.
|
lastSyncedNodes map[string][]*v1.Node
|
||||||
lastSyncedNodes []*v1.Node
|
lastSyncedNodesLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new service controller to keep cloud provider service resources
|
// New returns a new service controller to keep cloud provider service resources
|
||||||
@ -124,7 +124,7 @@ func New(
|
|||||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||||
lastSyncedNodes: []*v1.Node{},
|
lastSyncedNodes: make(map[string][]*v1.Node),
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
|
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
|
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
|
||||||
}
|
}
|
||||||
|
|
||||||
// - Only one protocol supported per service
|
|
||||||
// - Not all cloud providers support all protocols and the next step is expected to return
|
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||||
// an error for unsupported protocols
|
// an error for unsupported protocols
|
||||||
return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
|
status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.storeLastSyncedNodes(service, nodes)
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) {
|
||||||
|
c.lastSyncedNodesLock.Lock()
|
||||||
|
defer c.lastSyncedNodesLock.Unlock()
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(svc)
|
||||||
|
c.lastSyncedNodes[key] = nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node {
|
||||||
|
c.lastSyncedNodesLock.Lock()
|
||||||
|
defer c.lastSyncedNodesLock.Unlock()
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(svc)
|
||||||
|
return c.lastSyncedNodes[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
||||||
@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceKeys(services []*v1.Service) sets.String {
|
|
||||||
ret := sets.NewString()
|
|
||||||
for _, service := range services {
|
|
||||||
key, _ := cache.MetaNamespaceKeyFunc(service)
|
|
||||||
ret.Insert(key)
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func nodeNames(nodes []*v1.Node) sets.String {
|
func nodeNames(nodes []*v1.Node) sets.String {
|
||||||
ret := sets.NewString()
|
ret := sets.NewString()
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
|
|||||||
// load balancers and finished doing it successfully, or didn't try to at all because
|
// load balancers and finished doing it successfully, or didn't try to at all because
|
||||||
// there's no need. This function returns true if we tried to update load balancers and
|
// there's no need. This function returns true if we tried to update load balancers and
|
||||||
// failed, indicating to the caller that we should try again.
|
// failed, indicating to the caller that we should try again.
|
||||||
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
|
func (c *Controller) nodeSyncService(svc *v1.Service) bool {
|
||||||
const retSuccess = false
|
const retSuccess = false
|
||||||
const retNeedRetry = true
|
const retNeedRetry = true
|
||||||
if svc == nil || !wantsLoadBalancer(svc) {
|
if svc == nil || !wantsLoadBalancer(svc) {
|
||||||
return retSuccess
|
return retSuccess
|
||||||
}
|
}
|
||||||
|
newNodes, err := listWithPredicates(c.nodeLister)
|
||||||
|
if err != nil {
|
||||||
|
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
||||||
|
nodeSyncErrorCount.Inc()
|
||||||
|
return retNeedRetry
|
||||||
|
}
|
||||||
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
|
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
|
||||||
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
|
oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...)
|
||||||
|
// Store last synced nodes without actually determining if we successfully
|
||||||
|
// synced them or not. Failed node syncs are passed off to retries in the
|
||||||
|
// service queue, so no need to wait. If we don't store it now, we risk
|
||||||
|
// re-syncing all LBs twice, one from another sync in the node sync and
|
||||||
|
// from the service sync
|
||||||
|
c.storeLastSyncedNodes(svc, newNodes)
|
||||||
if nodesSufficientlyEqual(oldNodes, newNodes) {
|
if nodesSufficientlyEqual(oldNodes, newNodes) {
|
||||||
return retSuccess
|
return retSuccess
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
|
||||||
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
|
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
|
||||||
@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool {
|
|||||||
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
|
||||||
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
|
||||||
|
|
||||||
// Include all nodes and let nodeSyncService filter and figure out if
|
|
||||||
// the update is relevant for the service in question.
|
|
||||||
nodes, err := listWithPredicates(c.nodeLister)
|
|
||||||
if err != nil {
|
|
||||||
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
|
|
||||||
return serviceKeys(services)
|
|
||||||
}
|
|
||||||
|
|
||||||
// lock for servicesToRetry
|
// lock for servicesToRetry
|
||||||
servicesToRetry = sets.NewString()
|
servicesToRetry = sets.NewString()
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
|
||||||
doWork := func(piece int) {
|
doWork := func(piece int) {
|
||||||
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
|
if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
|
|||||||
servicesToRetry.Insert(key)
|
servicesToRetry.Insert(key)
|
||||||
}
|
}
|
||||||
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
|
||||||
c.lastSyncedNodes = nodes
|
|
||||||
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
klog.V(4).Infof("Finished updateLoadBalancerHosts")
|
||||||
return servicesToRetry
|
return servicesToRetry
|
||||||
}
|
}
|
||||||
|
@ -177,7 +177,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll
|
|||||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||||
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
|
||||||
lastSyncedNodes: []*v1.Node{},
|
lastSyncedNodes: make(map[string][]*v1.Node),
|
||||||
}
|
}
|
||||||
|
|
||||||
informerFactory.Start(stopCh)
|
informerFactory.Start(stopCh)
|
||||||
@ -609,7 +609,10 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
controller.lastSyncedNodes = tc.initialState
|
for _, svc := range services {
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(svc)
|
||||||
|
controller.lastSyncedNodes[key] = tc.initialState
|
||||||
|
}
|
||||||
|
|
||||||
for _, state := range tc.stateChanges {
|
for _, state := range tc.stateChanges {
|
||||||
setupState := func() {
|
setupState := func() {
|
||||||
@ -782,7 +785,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
controller.lastSyncedNodes = tc.initialState
|
for _, svc := range services {
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(svc)
|
||||||
|
controller.lastSyncedNodes[key] = tc.initialState
|
||||||
|
}
|
||||||
|
|
||||||
for _, state := range tc.stateChanges {
|
for _, state := range tc.stateChanges {
|
||||||
setupState := func() {
|
setupState := func() {
|
||||||
@ -989,7 +995,10 @@ func TestNodesNotEqual(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
|
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
|
||||||
|
|
||||||
controller.lastSyncedNodes = tc.lastSyncNodes
|
for _, svc := range services {
|
||||||
|
key, _ := cache.MetaNamespaceKeyFunc(svc)
|
||||||
|
controller.lastSyncedNodes[key] = tc.lastSyncNodes
|
||||||
|
}
|
||||||
|
|
||||||
controller.updateLoadBalancerHosts(ctx, services, 5)
|
controller.updateLoadBalancerHosts(ctx, services, 5)
|
||||||
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
|
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
|
||||||
@ -1420,9 +1429,8 @@ func TestSlowNodeSync(t *testing.T) {
|
|||||||
{Service: service1, Hosts: []*v1.Node{node1, node2}},
|
{Service: service1, Hosts: []*v1.Node{node1, node2}},
|
||||||
// Second update call for impacted service from controller.syncService
|
// Second update call for impacted service from controller.syncService
|
||||||
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
|
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
// Third update call for second service from controller.syncNodes. Here
|
// Third update call for second service from controller.syncNodes.
|
||||||
// is the problem: this update call removes the previously added node3.
|
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
|
||||||
{Service: service2, Hosts: []*v1.Node{node1, node2}},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
Loading…
Reference in New Issue
Block a user