mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Protect against race conditions in the service controller.
Re-GET the service object when we process it rather than trusting the delta. This will make for a lot more service get requests given that we resync all the services every 5 minutes, but will avoid re-ordering of updates and continually retrying stale updates, as has been described in a few other issues and PRs.
This commit is contained in:
parent
df234d83cd
commit
4ce5f68ed0
@ -23,6 +23,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
@ -34,7 +36,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/runtime"
|
"k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"reflect"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -198,7 +199,7 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
|
|||||||
// Returns an error if processing the delta failed, along with a boolean
|
// Returns an error if processing the delta failed, along with a boolean
|
||||||
// indicator of whether the processing should be retried.
|
// indicator of whether the processing should be retried.
|
||||||
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
||||||
service, ok := delta.Object.(*api.Service)
|
deltaService, ok := delta.Object.(*api.Service)
|
||||||
var namespacedName types.NamespacedName
|
var namespacedName types.NamespacedName
|
||||||
var cachedService *cachedService
|
var cachedService *cachedService
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -213,31 +214,46 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
|
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
|
||||||
}
|
}
|
||||||
service = cachedService.lastState
|
deltaService = cachedService.lastState
|
||||||
delta.Object = cachedService.lastState
|
delta.Object = deltaService
|
||||||
namespacedName = types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name}
|
||||||
} else {
|
} else {
|
||||||
namespacedName.Namespace = service.Namespace
|
namespacedName.Namespace = deltaService.Namespace
|
||||||
namespacedName.Name = service.Name
|
namespacedName.Name = deltaService.Name
|
||||||
cachedService = s.cache.getOrCreate(namespacedName.String())
|
cachedService = s.cache.getOrCreate(namespacedName.String())
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service)
|
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, deltaService)
|
||||||
|
|
||||||
// Ensure that no other goroutine will interfere with our processing of the
|
// Ensure that no other goroutine will interfere with our processing of the
|
||||||
// service.
|
// service.
|
||||||
cachedService.mu.Lock()
|
cachedService.mu.Lock()
|
||||||
defer cachedService.mu.Unlock()
|
defer cachedService.mu.Unlock()
|
||||||
|
|
||||||
|
// Get the most recent state of the service from the API directly rather than
|
||||||
|
// trusting the body of the delta. This avoids update re-ordering problems.
|
||||||
|
// TODO: Handle sync delta types differently rather than doing a get on every
|
||||||
|
// service every time we sync?
|
||||||
|
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
|
||||||
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
|
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
||||||
|
return err, retryable
|
||||||
|
} else if errors.IsNotFound(err) {
|
||||||
|
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
||||||
|
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||||
|
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region)
|
||||||
|
if err != nil {
|
||||||
|
message := "Error deleting load balancer (will retry): " + err.Error()
|
||||||
|
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
||||||
|
return err, retryable
|
||||||
|
}
|
||||||
|
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
||||||
|
s.cache.delete(namespacedName.String())
|
||||||
|
return nil, notRetryable
|
||||||
|
}
|
||||||
|
|
||||||
// Update the cached service (used above for populating synthetic deletes)
|
// Update the cached service (used above for populating synthetic deletes)
|
||||||
cachedService.lastState = service
|
cachedService.lastState = service
|
||||||
|
|
||||||
// TODO: Handle added, updated, and sync differently?
|
|
||||||
switch delta.Type {
|
|
||||||
case cache.Added:
|
|
||||||
fallthrough
|
|
||||||
case cache.Updated:
|
|
||||||
fallthrough
|
|
||||||
case cache.Sync:
|
|
||||||
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
|
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
message := "Error creating load balancer"
|
message := "Error creating load balancer"
|
||||||
@ -257,19 +273,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
// been successfully processed.
|
// been successfully processed.
|
||||||
cachedService.appliedState = service
|
cachedService.appliedState = service
|
||||||
s.cache.set(namespacedName.String(), cachedService)
|
s.cache.set(namespacedName.String(), cachedService)
|
||||||
case cache.Deleted:
|
|
||||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
|
||||||
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
|
|
||||||
if err != nil {
|
|
||||||
message := "Error deleting load balancer (will retry): " + err.Error()
|
|
||||||
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
|
||||||
return err, retryable
|
|
||||||
}
|
|
||||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
|
||||||
s.cache.delete(namespacedName.String())
|
|
||||||
default:
|
|
||||||
glog.Errorf("Unexpected delta type: %v", delta.Type)
|
|
||||||
}
|
|
||||||
return nil, notRetryable
|
return nil, notRetryable
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,7 +281,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
// should be retried.
|
// should be retried.
|
||||||
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) {
|
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) {
|
||||||
if appliedState != nil && !s.needsUpdate(appliedState, service) {
|
if appliedState != nil && !s.needsUpdate(appliedState, service) {
|
||||||
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
|
glog.Infof("LB doesn't need update for service %s", namespacedName)
|
||||||
return nil, notRetryable
|
return nil, notRetryable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user