mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 20:42:26 +00:00
Merge pull request #9912 from justinsb/loadbalancer_logic
servicecontroller: last state applied to LB vs last state seen
This commit is contained in:
commit
185eb5e26a
@ -52,7 +52,11 @@ const (
|
||||
)
|
||||
|
||||
type cachedService struct {
|
||||
service *api.Service
|
||||
// The last-known state of the service
|
||||
lastState *api.Service
|
||||
// The state as successfully applied to the load balancer
|
||||
appliedState *api.Service
|
||||
|
||||
// Ensures only one goroutine can operate on this service at any given time.
|
||||
mu sync.Mutex
|
||||
}
|
||||
@ -191,8 +195,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
||||
if !ok {
|
||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
|
||||
}
|
||||
service = cachedService.service
|
||||
delta.Object = cachedService.service
|
||||
service = cachedService.lastState
|
||||
delta.Object = cachedService.lastState
|
||||
namespacedName = types.NamespacedName{service.Namespace, service.Name}
|
||||
} else {
|
||||
namespacedName.Namespace = service.Namespace
|
||||
@ -206,6 +210,9 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
||||
cachedService.mu.Lock()
|
||||
defer cachedService.mu.Unlock()
|
||||
|
||||
// Update the cached service (used above for populating synthetic deletes)
|
||||
cachedService.lastState = service
|
||||
|
||||
// TODO: Handle added, updated, and sync differently?
|
||||
switch delta.Type {
|
||||
case cache.Added:
|
||||
@ -213,7 +220,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
||||
case cache.Updated:
|
||||
fallthrough
|
||||
case cache.Sync:
|
||||
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service)
|
||||
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
|
||||
if err != nil {
|
||||
s.eventRecorder.Event(service, "creating loadbalancer failed", err.Error())
|
||||
return err, retry
|
||||
@ -222,7 +229,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
||||
// NOTE: Since we update the cached service if and only if we successully
|
||||
// processed it, a cached service being nil implies that it hasn't yet
|
||||
// been successfully processed.
|
||||
cachedService.service = service
|
||||
cachedService.appliedState = service
|
||||
s.cache.set(namespacedName.String(), cachedService)
|
||||
case cache.Deleted:
|
||||
err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
|
||||
@ -609,10 +616,10 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h
|
||||
// with by the load balancer reconciler. We can trust the load balancer
|
||||
// reconciler to ensure the service's load balancer is created to target
|
||||
// the correct nodes.
|
||||
if service.service == nil {
|
||||
if service.appliedState == nil {
|
||||
return
|
||||
}
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service.service, hosts); err != nil {
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil {
|
||||
glog.Errorf("External error while updating TCP load balancer: %v.", err)
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
|
||||
var services []*cachedService
|
||||
for _, service := range item.services {
|
||||
services = append(services, &cachedService{service: service})
|
||||
services = append(services, &cachedService{lastState: service, appliedState: service})
|
||||
}
|
||||
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user