mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
controller/service: minor cleanup
This commit is contained in:
parent
c12de567cd
commit
67f670df2e
@ -57,14 +57,14 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type cachedService struct {
|
type cachedService struct {
|
||||||
|
// Ensures only one goroutine can operate on this service at any given time.
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
// The last-known state of the service
|
// The last-known state of the service
|
||||||
lastState *api.Service
|
lastState *api.Service
|
||||||
// The state as successfully applied to the load balancer
|
// The state as successfully applied to the load balancer
|
||||||
appliedState *api.Service
|
appliedState *api.Service
|
||||||
|
|
||||||
// Ensures only one goroutine can operate on this service at any given time.
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
// Controls error back-off
|
// Controls error back-off
|
||||||
lastRetryDelay time.Duration
|
lastRetryDelay time.Duration
|
||||||
}
|
}
|
||||||
@ -217,28 +217,31 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
|
|||||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||||
// we should retry in that Duration.
|
// we should retry in that Duration.
|
||||||
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
|
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
|
||||||
|
var (
|
||||||
|
namespacedName types.NamespacedName
|
||||||
|
cachedService *cachedService
|
||||||
|
)
|
||||||
|
|
||||||
deltaService, ok := delta.Object.(*api.Service)
|
deltaService, ok := delta.Object.(*api.Service)
|
||||||
var namespacedName types.NamespacedName
|
if ok {
|
||||||
var cachedService *cachedService
|
namespacedName.Namespace = deltaService.Namespace
|
||||||
if !ok {
|
namespacedName.Name = deltaService.Name
|
||||||
|
cachedService = s.cache.getOrCreate(namespacedName.String())
|
||||||
|
} else {
|
||||||
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
|
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
|
||||||
// can send a deletion with an unknown state. Grab the service from our
|
// can send a deletion with an unknown state. Grab the service from our
|
||||||
// cache for deleting.
|
// cache for deleting.
|
||||||
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
|
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry
|
return fmt.Errorf("delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry
|
||||||
}
|
}
|
||||||
cachedService, ok = s.cache.get(key.Key)
|
cachedService, ok = s.cache.get(key.Key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
||||||
}
|
}
|
||||||
deltaService = cachedService.lastState
|
deltaService = cachedService.lastState
|
||||||
delta.Object = deltaService
|
delta.Object = deltaService
|
||||||
namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name}
|
namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name}
|
||||||
} else {
|
|
||||||
namespacedName.Namespace = deltaService.Namespace
|
|
||||||
namespacedName.Name = deltaService.Name
|
|
||||||
cachedService = s.cache.getOrCreate(namespacedName.String())
|
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Got new %s delta for service: %v", delta.Type, namespacedName)
|
glog.V(2).Infof("Got new %s delta for service: %v", delta.Type, namespacedName)
|
||||||
|
|
||||||
@ -255,7 +258,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
|
|||||||
if err != nil && !errors.IsNotFound(err) {
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
||||||
return err, cachedService.nextRetryDelay()
|
return err, cachedService.nextRetryDelay()
|
||||||
} else if errors.IsNotFound(err) {
|
}
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
||||||
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||||
err := s.balancer.EnsureLoadBalancerDeleted(deltaService)
|
err := s.balancer.EnsureLoadBalancerDeleted(deltaService)
|
||||||
@ -312,7 +316,19 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
|||||||
// Save the state so we can avoid a write if it doesn't change
|
// Save the state so we can avoid a write if it doesn't change
|
||||||
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||||
|
|
||||||
if !wantsLoadBalancer(service) {
|
if wantsLoadBalancer(service) {
|
||||||
|
glog.V(2).Infof("Ensuring LB for service %s", namespacedName)
|
||||||
|
|
||||||
|
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
|
||||||
|
|
||||||
|
// The load balancer doesn't exist yet, so create it.
|
||||||
|
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||||
|
err := s.createLoadBalancer(service)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
||||||
|
}
|
||||||
|
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
|
||||||
|
} else {
|
||||||
needDelete := true
|
needDelete := true
|
||||||
if appliedState != nil {
|
if appliedState != nil {
|
||||||
if !wantsLoadBalancer(appliedState) {
|
if !wantsLoadBalancer(appliedState) {
|
||||||
@ -324,7 +340,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
|||||||
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
|
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
|
||||||
_, exists, err := s.balancer.GetLoadBalancer(service)
|
_, exists, err := s.balancer.GetLoadBalancer(service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
|
return fmt.Errorf("error getting LB for service %s: %v", namespacedName, err), retryable
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
needDelete = false
|
needDelete = false
|
||||||
@ -341,28 +357,16 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
|||||||
}
|
}
|
||||||
|
|
||||||
service.Status.LoadBalancer = api.LoadBalancerStatus{}
|
service.Status.LoadBalancer = api.LoadBalancerStatus{}
|
||||||
} else {
|
|
||||||
glog.V(2).Infof("Ensuring LB for service %s", namespacedName)
|
|
||||||
|
|
||||||
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
|
|
||||||
|
|
||||||
// The load balancer doesn't exist yet, so create it.
|
|
||||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
|
||||||
err := s.createLoadBalancer(service)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
|
||||||
}
|
|
||||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the state if changed
|
// Write the state if changed
|
||||||
// TODO: Be careful here ... what if there were other changes to the service?
|
// TODO: Be careful here ... what if there were other changes to the service?
|
||||||
if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
if api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
||||||
if err := s.persistUpdate(service); err != nil {
|
|
||||||
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
|
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
|
||||||
|
} else {
|
||||||
|
if err := s.persistUpdate(service); err != nil {
|
||||||
|
return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, notRetryable
|
return nil, notRetryable
|
||||||
@ -372,25 +376,26 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
|||||||
var err error
|
var err error
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
for i := 0; i < clientRetryCount; i++ {
|
||||||
_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
|
_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
|
||||||
if err == nil {
|
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
return nil
|
return nil
|
||||||
}
|
case errors.IsNotFound(err):
|
||||||
// If the object no longer exists, we don't want to recreate it. Just bail
|
// If the object no longer exists, we don't want to recreate it. Just bail
|
||||||
// out so that we can process the delete, which we should soon be receiving
|
// out so that we can process the delete, which we should soon be receiving
|
||||||
// if we haven't already.
|
// if we haven't already.
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
||||||
service.Namespace, service.Name, err)
|
service.Namespace, service.Name, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
case errors.IsConflict(err):
|
||||||
// TODO: Try to resolve the conflict if the change was unrelated to load
|
// TODO: Try to resolve the conflict if the change was unrelated to load
|
||||||
// balancer status. For now, just rely on the fact that we'll
|
// balancer status. For now, just rely on the fact that we'll
|
||||||
// also process the update that caused the resource version to change.
|
// also process the update that caused the resource version to change.
|
||||||
if errors.IsConflict(err) {
|
|
||||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
||||||
service.Namespace, service.Name, err)
|
service.Namespace, service.Name, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
|
glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
|
||||||
service.Namespace, service.Name, err)
|
service.Namespace, service.Name, err)
|
||||||
time.Sleep(clientRetryInterval)
|
time.Sleep(clientRetryInterval)
|
||||||
@ -410,10 +415,9 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
|||||||
status, err := s.balancer.EnsureLoadBalancer(service, hostsFromNodeList(&nodes))
|
status, err := s.balancer.EnsureLoadBalancer(service, hostsFromNodeList(&nodes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
|
||||||
service.Status.LoadBalancer = *status
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
service.Status.LoadBalancer = *status
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user