mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #25189 from mfanjie/kube-service-controller-rewritten
Automatic merge from submit-queue Rewrite service controller to apply best controller pattern This PR is a long term solution for #21625: We apply the same pattern like replication controller to service controller to avoid the potential process order messes in service controller, the change includes: 1. introduce informer controller to watch service changes from kube-apiserver, so that every changes on same service will be kept in serviceStore as the only element. 2. put the service name to be processed to working queue 3. when process service, always get info from serviceStore to ensure the info is up-to-date 4. keep the retry mechanism, sleep for certain interval and add it back to queue. 5. remote the logic of reading last service info from kube-apiserver before processing the LB info as we trust the info from serviceStore. The UT has been passed, manual test passed after I hardcode the cloud provider as FakeCloud, however I am not able to boot a k8s cluster with any available cloudprovider, so e2e test is not done. Submit this PR first for review and for triggering a e2e test.
This commit is contained in:
commit
c41c3d4d14
@ -240,9 +240,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
nodeController.Run(s.NodeSyncPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
|
||||
serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
} else {
|
||||
serviceController.Run(int(s.ConcurrentServiceSyncs))
|
||||
}
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
|
@ -46,6 +46,7 @@ func NewCMServer() *CMServer {
|
||||
Port: ports.ControllerManagerPort,
|
||||
Address: "0.0.0.0",
|
||||
ConcurrentEndpointSyncs: 5,
|
||||
ConcurrentServiceSyncs: 1,
|
||||
ConcurrentRCSyncs: 5,
|
||||
ConcurrentRSSyncs: 5,
|
||||
ConcurrentDaemonSetSyncs: 2,
|
||||
@ -108,6 +109,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
|
||||
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||
fs.Int32Var(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||
fs.Int32Var(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load")
|
||||
fs.Int32Var(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
|
||||
fs.Int32Var(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
|
||||
fs.Int32Var(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
||||
|
@ -171,9 +171,11 @@ func (s *CMServer) Run(_ []string) error {
|
||||
glog.Fatalf("Failed to start node status update controller: %v", err)
|
||||
}
|
||||
|
||||
serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
|
||||
serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
} else {
|
||||
serviceController.Run(int(s.ConcurrentServiceSyncs))
|
||||
}
|
||||
|
||||
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -479,6 +479,10 @@ type KubeControllerManagerConfiguration struct {
|
||||
// allowed to sync concurrently. Larger number = more responsive replica
|
||||
// management, but more CPU (and network) load.
|
||||
ConcurrentRCSyncs int32 `json:"concurrentRCSyncs"`
|
||||
// concurrentServiceSyncs is the number of services that are
|
||||
// allowed to sync concurrently. Larger number = more responsive service
|
||||
// management, but more CPU (and network) load.
|
||||
ConcurrentServiceSyncs int32 `json:"concurrentServiceSyncs"`
|
||||
// concurrentResourceQuotaSyncs is the number of resource quotas that are
|
||||
// allowed to sync concurrently. Larger number = more responsive quota
|
||||
// management, but more CPU (and network) load.
|
||||
|
@ -71,6 +71,7 @@ func DeepCopy_componentconfig_KubeControllerManagerConfiguration(in interface{},
|
||||
out.ConcurrentEndpointSyncs = in.ConcurrentEndpointSyncs
|
||||
out.ConcurrentRSSyncs = in.ConcurrentRSSyncs
|
||||
out.ConcurrentRCSyncs = in.ConcurrentRCSyncs
|
||||
out.ConcurrentServiceSyncs = in.ConcurrentServiceSyncs
|
||||
out.ConcurrentResourceQuotaSyncs = in.ConcurrentResourceQuotaSyncs
|
||||
out.ConcurrentDeploymentSyncs = in.ConcurrentDeploymentSyncs
|
||||
out.ConcurrentDaemonSetSyncs = in.ConcurrentDaemonSetSyncs
|
||||
|
@ -32,14 +32,22 @@ import (
|
||||
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
workerGoroutines = 10
|
||||
// Interval of synchoronizing service status from apiserver
|
||||
serviceSyncPeriod = 30 * time.Second
|
||||
// Interval of synchoronizing node status from apiserver
|
||||
nodeSyncPeriod = 100 * time.Second
|
||||
|
||||
// How long to wait before retrying the processing of a service change.
|
||||
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
||||
@ -57,14 +65,8 @@ const (
|
||||
)
|
||||
|
||||
type cachedService struct {
|
||||
// Ensures only one goroutine can operate on this service at any given time.
|
||||
mu sync.Mutex
|
||||
|
||||
// The last-known state of the service
|
||||
lastState *api.Service
|
||||
// The state as successfully applied to the load balancer
|
||||
appliedState *api.Service
|
||||
|
||||
// The cached state of the service
|
||||
state *api.Service
|
||||
// Controls error back-off
|
||||
lastRetryDelay time.Duration
|
||||
}
|
||||
@ -75,20 +77,27 @@ type serviceCache struct {
|
||||
}
|
||||
|
||||
type ServiceController struct {
|
||||
cloud cloudprovider.Interface
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
zone cloudprovider.Zone
|
||||
cache *serviceCache
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
nodeLister cache.StoreToNodeLister
|
||||
cloud cloudprovider.Interface
|
||||
knownHosts []string
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
zone cloudprovider.Zone
|
||||
cache *serviceCache
|
||||
// A store of services, populated by the serviceController
|
||||
serviceStore cache.StoreToServiceLister
|
||||
// Watches changes to all services
|
||||
serviceController *framework.Controller
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
nodeLister cache.StoreToNodeLister
|
||||
// services that need to be synced
|
||||
workingQueue workqueue.DelayingInterface
|
||||
}
|
||||
|
||||
// New returns a new service controller to keep cloud provider service resources
|
||||
// (like load balancers) in sync with the registry.
|
||||
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController {
|
||||
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
|
||||
@ -97,8 +106,9 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
return &ServiceController{
|
||||
s := &ServiceController{
|
||||
cloud: cloud,
|
||||
knownHosts: []string{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
@ -107,7 +117,45 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
nodeLister: cache.StoreToNodeLister{
|
||||
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
},
|
||||
workingQueue: workqueue.NewDelayingQueue(),
|
||||
}
|
||||
s.serviceStore.Store, s.serviceController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Service{},
|
||||
serviceSyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.enqueueService,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldSvc, ok1 := old.(*api.Service)
|
||||
curSvc, ok2 := cur.(*api.Service)
|
||||
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
|
||||
s.enqueueService(cur)
|
||||
}
|
||||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
)
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (s *ServiceController) enqueueService(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
s.workingQueue.Add(key)
|
||||
}
|
||||
|
||||
// Run starts a background goroutine that watches for changes to services that
|
||||
@ -120,42 +168,33 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
//
|
||||
// It's an error to call Run() more than once for a given ServiceController
|
||||
// object.
|
||||
func (s *ServiceController) Run(serviceSyncPeriod, nodeSyncPeriod time.Duration) error {
|
||||
if err := s.init(); err != nil {
|
||||
return err
|
||||
func (s *ServiceController) Run(workers int) {
|
||||
defer runtime.HandleCrash()
|
||||
go s.serviceController.Run(wait.NeverStop)
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(s.worker, time.Second, wait.NeverStop)
|
||||
}
|
||||
|
||||
// We have to make this check because the ListWatch that we use in
|
||||
// WatchServices requires Client functions that aren't in the interface
|
||||
// for some reason.
|
||||
if _, ok := s.kubeClient.(*clientset.Clientset); !ok {
|
||||
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the clientset.Interface.")
|
||||
}
|
||||
|
||||
// Get the currently existing set of services and then all future creates
|
||||
// and updates of services.
|
||||
// A delta compressor is needed for the DeltaFIFO queue because we only ever
|
||||
// care about the most recent state.
|
||||
serviceQueue := cache.NewDeltaFIFO(
|
||||
cache.MetaNamespaceKeyFunc,
|
||||
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
|
||||
if len(d) == 0 {
|
||||
return d
|
||||
}
|
||||
return cache.Deltas{*d.Newest()}
|
||||
}),
|
||||
s.cache,
|
||||
)
|
||||
lw := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "services", api.NamespaceAll, fields.Everything())
|
||||
cache.NewReflector(lw, &api.Service{}, serviceQueue, serviceSyncPeriod).Run()
|
||||
for i := 0; i < workerGoroutines; i++ {
|
||||
go s.watchServices(serviceQueue)
|
||||
}
|
||||
|
||||
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "nodes", api.NamespaceAll, fields.Everything())
|
||||
cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
|
||||
go s.nodeSyncLoop(nodeSyncPeriod)
|
||||
return nil
|
||||
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (s *ServiceController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := s.workingQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer s.workingQueue.Done(key)
|
||||
err := s.syncService(key.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing service: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ServiceController) init() error {
|
||||
@ -181,104 +220,14 @@ func (s *ServiceController) init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Loop infinitely, processing all service updates provided by the queue.
|
||||
func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
|
||||
for {
|
||||
serviceQueue.Pop(func(obj interface{}) error {
|
||||
deltas, ok := obj.(cache.Deltas)
|
||||
if !ok {
|
||||
runtime.HandleError(fmt.Errorf("Received object from service watcher that wasn't Deltas: %+v", obj))
|
||||
return nil
|
||||
}
|
||||
delta := deltas.Newest()
|
||||
if delta == nil {
|
||||
runtime.HandleError(fmt.Errorf("Received nil delta from watcher queue."))
|
||||
return nil
|
||||
}
|
||||
err, retryDelay := s.processDelta(delta)
|
||||
if retryDelay != 0 {
|
||||
// Add the failed service back to the queue so we'll retry it.
|
||||
runtime.HandleError(fmt.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err))
|
||||
go func(deltas cache.Deltas, delay time.Duration) {
|
||||
time.Sleep(delay)
|
||||
if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Error requeuing service delta - will not retry: %v", err))
|
||||
}
|
||||
}(deltas, retryDelay)
|
||||
} else if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Returns an error if processing the delta failed, along with a time.Duration
|
||||
// Returns an error if processing the service update failed, along with a time.Duration
|
||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||
// we should retry in that Duration.
|
||||
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
|
||||
var (
|
||||
namespacedName types.NamespacedName
|
||||
cachedService *cachedService
|
||||
)
|
||||
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
|
||||
|
||||
deltaService, ok := delta.Object.(*api.Service)
|
||||
if ok {
|
||||
namespacedName.Namespace = deltaService.Namespace
|
||||
namespacedName.Name = deltaService.Name
|
||||
cachedService = s.cache.getOrCreate(namespacedName.String())
|
||||
} else {
|
||||
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
|
||||
// can send a deletion with an unknown state. Grab the service from our
|
||||
// cache for deleting.
|
||||
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return fmt.Errorf("delta contained object that wasn't a service or a deleted key: %#v", delta), doNotRetry
|
||||
}
|
||||
cachedService, ok = s.cache.get(key.Key)
|
||||
if !ok {
|
||||
return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
||||
}
|
||||
deltaService = cachedService.lastState
|
||||
delta.Object = deltaService
|
||||
namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name}
|
||||
}
|
||||
glog.V(2).Infof("Got new %s delta for service: %v", delta.Type, namespacedName)
|
||||
|
||||
// Ensure that no other goroutine will interfere with our processing of the
|
||||
// service.
|
||||
cachedService.mu.Lock()
|
||||
defer cachedService.mu.Unlock()
|
||||
|
||||
// Get the most recent state of the service from the API directly rather than
|
||||
// trusting the body of the delta. This avoids update re-ordering problems.
|
||||
// TODO: Handle sync delta types differently rather than doing a get on every
|
||||
// service every time we sync?
|
||||
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
if errors.IsNotFound(err) {
|
||||
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
||||
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, deltaService)
|
||||
if err != nil {
|
||||
message := "Error deleting load balancer (will retry): " + err.Error()
|
||||
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
||||
s.cache.delete(namespacedName.String())
|
||||
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
}
|
||||
|
||||
// Update the cached service (used above for populating synthetic deletes)
|
||||
cachedService.lastState = service
|
||||
|
||||
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
|
||||
// cache the service, we need the info for service deletion
|
||||
cachedService.state = service
|
||||
err, retry := s.createLoadBalancerIfNeeded(key, service)
|
||||
if err != nil {
|
||||
message := "Error creating load balancer"
|
||||
if retry {
|
||||
@ -295,8 +244,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
|
||||
// NOTE: Since we update the cached service if and only if we successfully
|
||||
// processed it, a cached service being nil implies that it hasn't yet
|
||||
// been successfully processed.
|
||||
cachedService.appliedState = service
|
||||
s.cache.set(namespacedName.String(), cachedService)
|
||||
s.cache.set(key, cachedService)
|
||||
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
@ -304,11 +252,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
|
||||
|
||||
// Returns whatever error occurred along with a boolean indicator of whether it
|
||||
// should be retried.
|
||||
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) {
|
||||
if appliedState != nil && !s.needsUpdate(appliedState, service) {
|
||||
glog.Infof("LB doesn't need update for service %s", namespacedName)
|
||||
return nil, notRetryable
|
||||
}
|
||||
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
|
||||
|
||||
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
|
||||
// which may involve service interruption. Also, we would like user-friendly events.
|
||||
@ -316,39 +260,18 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
||||
// Save the state so we can avoid a write if it doesn't change
|
||||
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||
|
||||
if wantsLoadBalancer(service) {
|
||||
glog.V(2).Infof("Ensuring LB for service %s", namespacedName)
|
||||
|
||||
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
|
||||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||
err := s.createLoadBalancer(service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
|
||||
} else {
|
||||
if !wantsLoadBalancer(service) {
|
||||
needDelete := true
|
||||
if appliedState != nil {
|
||||
if !wantsLoadBalancer(appliedState) {
|
||||
needDelete = false
|
||||
}
|
||||
} else {
|
||||
// If we don't have any cached memory of the load balancer, we have to ask
|
||||
// the cloud provider for what it knows about it.
|
||||
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
|
||||
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting LB for service %s: %v", namespacedName, err), retryable
|
||||
}
|
||||
if !exists {
|
||||
needDelete = false
|
||||
}
|
||||
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error getting LB for service %s: %v", key, err), retryable
|
||||
}
|
||||
if !exists {
|
||||
needDelete = false
|
||||
}
|
||||
|
||||
if needDelete {
|
||||
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
|
||||
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
|
||||
return err, retryable
|
||||
@ -357,16 +280,28 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
||||
}
|
||||
|
||||
service.Status.LoadBalancer = api.LoadBalancerStatus{}
|
||||
} else {
|
||||
glog.V(2).Infof("Ensuring LB for service %s", key)
|
||||
|
||||
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
|
||||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||
err := s.createLoadBalancer(service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
|
||||
}
|
||||
|
||||
// Write the state if changed
|
||||
// TODO: Be careful here ... what if there were other changes to the service?
|
||||
if api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
||||
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
|
||||
} else {
|
||||
if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
||||
if err := s.persistUpdate(service); err != nil {
|
||||
return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||
}
|
||||
} else {
|
||||
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
|
||||
}
|
||||
|
||||
return nil, notRetryable
|
||||
@ -376,26 +311,25 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
||||
var err error
|
||||
for i := 0; i < clientRetryCount; i++ {
|
||||
_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
if err == nil {
|
||||
return nil
|
||||
case errors.IsNotFound(err):
|
||||
// If the object no longer exists, we don't want to recreate it. Just bail
|
||||
// out so that we can process the delete, which we should soon be receiving
|
||||
// if we haven't already.
|
||||
}
|
||||
// If the object no longer exists, we don't want to recreate it. Just bail
|
||||
// out so that we can process the delete, which we should soon be receiving
|
||||
// if we haven't already.
|
||||
if errors.IsNotFound(err) {
|
||||
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
return nil
|
||||
case errors.IsConflict(err):
|
||||
// TODO: Try to resolve the conflict if the change was unrelated to load
|
||||
// balancer status. For now, just rely on the fact that we'll
|
||||
// also process the update that caused the resource version to change.
|
||||
}
|
||||
// TODO: Try to resolve the conflict if the change was unrelated to load
|
||||
// balancer status. For now, just rely on the fact that we'll
|
||||
// also process the update that caused the resource version to change.
|
||||
if errors.IsConflict(err) {
|
||||
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
|
||||
service.Namespace, service.Name, err)
|
||||
time.Sleep(clientRetryInterval)
|
||||
@ -415,9 +349,10 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
||||
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
service.Status.LoadBalancer = *status
|
||||
}
|
||||
|
||||
service.Status.LoadBalancer = *status
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -443,12 +378,14 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (s *serviceCache) allServices() []*cachedService {
|
||||
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
||||
// already know about.
|
||||
func (s *serviceCache) allServices() []*api.Service {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
services := make([]*cachedService, 0, len(s.serviceMap))
|
||||
services := make([]*api.Service, 0, len(s.serviceMap))
|
||||
for _, v := range s.serviceMap {
|
||||
services = append(services, v)
|
||||
services = append(services, v.state)
|
||||
}
|
||||
return services
|
||||
}
|
||||
@ -681,52 +618,43 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
|
||||
// nodeSyncLoop handles updating the hosts pointed to by all load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
func (s *ServiceController) nodeSyncLoop(period time.Duration) {
|
||||
var prevHosts []string
|
||||
var servicesToUpdate []*cachedService
|
||||
for range time.Tick(period) {
|
||||
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||
continue
|
||||
}
|
||||
newHosts := hostsFromNodeSlice(nodes)
|
||||
if stringSlicesEqual(newHosts, prevHosts) {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
continue
|
||||
}
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
servicesToUpdate = s.cache.allServices()
|
||||
numServices := len(servicesToUpdate)
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(servicesToUpdate), numServices)
|
||||
|
||||
prevHosts = newHosts
|
||||
func (s *ServiceController) nodeSyncLoop() {
|
||||
var servicesToUpdate []*api.Service
|
||||
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||
return
|
||||
}
|
||||
newHosts := hostsFromNodeSlice(nodes)
|
||||
if stringSlicesEqual(newHosts, s.knownHosts) {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
return
|
||||
}
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
servicesToUpdate = s.cache.allServices()
|
||||
numServices := len(servicesToUpdate)
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(servicesToUpdate), numServices)
|
||||
|
||||
s.knownHosts = newHosts
|
||||
}
|
||||
|
||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||
// they will match the list of hosts provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, hosts []string) (servicesToRetry []*cachedService) {
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
|
||||
for _, service := range services {
|
||||
func() {
|
||||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
// If the applied state is nil, that means it hasn't yet been successfully dealt
|
||||
// with by the load balancer reconciler. We can trust the load balancer
|
||||
// reconciler to ensure the service's load balancer is created to target
|
||||
// the correct nodes.
|
||||
if service.appliedState == nil {
|
||||
if service == nil {
|
||||
return
|
||||
}
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil {
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
|
||||
glog.Errorf("External error while updating load balancer: %v.", err)
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
}
|
||||
@ -785,3 +713,81 @@ func (s *cachedService) nextRetryDelay() time.Duration {
|
||||
func (s *cachedService) resetRetryDelay() {
|
||||
s.lastRetryDelay = time.Duration(0)
|
||||
}
|
||||
|
||||
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
|
||||
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
||||
// invoked concurrently with the same key.
|
||||
func (s *ServiceController) syncService(key string) error {
|
||||
startTime := time.Now()
|
||||
var cachedService *cachedService
|
||||
var retryDelay time.Duration
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.workingQueue.Add(key)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
||||
glog.Infof("Service has been deleted %v", key)
|
||||
err, retryDelay = s.processServiceDeletion(key)
|
||||
} else {
|
||||
service, ok := obj.(*api.Service)
|
||||
if ok {
|
||||
cachedService = s.cache.getOrCreate(key)
|
||||
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
||||
} else {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
|
||||
}
|
||||
glog.Infof("Found tombstone for %v", key)
|
||||
err, retryDelay = s.processServiceDeletion(tombstone.Key)
|
||||
}
|
||||
}
|
||||
|
||||
if retryDelay != 0 {
|
||||
// Add the failed service back to the queue so we'll retry it.
|
||||
glog.Errorf("Failed to process service. Retrying in %s: %v", retryDelay, err)
|
||||
go func(obj interface{}, delay time.Duration) {
|
||||
// put back the service key to working queue, it is possible that more entries of the service
|
||||
// were added into the queue during the delay, but it does not mess as when handling the retry,
|
||||
// it always get the last service info from service store
|
||||
s.workingQueue.AddAfter(obj, delay)
|
||||
}(key, retryDelay)
|
||||
} else if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns an error if processing the service deletion failed, along with a time.Duration
|
||||
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||
// we should retry after that Duration.
|
||||
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
|
||||
cachedService, ok := s.cache.get(key)
|
||||
if !ok {
|
||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
||||
}
|
||||
service := cachedService.state
|
||||
// delete load balancer info only if the service type is LoadBalancer
|
||||
if !wantsLoadBalancer(service) {
|
||||
return nil, doNotRetry
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
|
||||
if err != nil {
|
||||
message := "Error deleting load balancer (will retry): " + err.Error()
|
||||
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
||||
s.cache.delete(key)
|
||||
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
@ -29,7 +30,7 @@ import (
|
||||
const region = "us-central"
|
||||
|
||||
func newService(name string, uid types.UID, serviceType api.ServiceType) *api.Service {
|
||||
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{Type: serviceType}}
|
||||
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: api.ServiceSpec{Type: serviceType}}
|
||||
}
|
||||
|
||||
func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
@ -56,6 +57,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "udp-service",
|
||||
Namespace: "default",
|
||||
SelfLink: testapi.Default.SelfLink("services", "udp-service"),
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{{
|
||||
@ -73,6 +75,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "basic-service1",
|
||||
Namespace: "default",
|
||||
SelfLink: testapi.Default.SelfLink("services", "basic-service1"),
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{{
|
||||
@ -91,11 +94,11 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
cloud := &fakecloud.FakeCloud{}
|
||||
cloud.Region = region
|
||||
client := &fake.Clientset{}
|
||||
controller := New(cloud, client, "test-cluster")
|
||||
controller, _ := New(cloud, client, "test-cluster")
|
||||
controller.init()
|
||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||
client.ClearActions() // ignore any client calls made in init()
|
||||
err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{Namespace: "foo", Name: "bar"}, item.service, nil)
|
||||
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
|
||||
if !item.expectErr && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
} else if item.expectErr && err == nil {
|
||||
@ -211,13 +214,13 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
|
||||
cloud.Region = region
|
||||
client := &fake.Clientset{}
|
||||
controller := New(cloud, client, "test-cluster2")
|
||||
controller, _ := New(cloud, client, "test-cluster2")
|
||||
controller.init()
|
||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||
|
||||
var services []*cachedService
|
||||
var services []*api.Service
|
||||
for _, service := range item.services {
|
||||
services = append(services, &cachedService{lastState: service, appliedState: service})
|
||||
services = append(services, service)
|
||||
}
|
||||
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user