rewrite serivce controller to apply the latest controller pattern

This commit is contained in:
mfanjie 2016-07-26 13:35:40 +08:00
parent 52559696e9
commit 5fa640490e
8 changed files with 1121 additions and 1060 deletions

View File

@ -250,9 +250,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
serviceController.Run(int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -46,6 +46,7 @@ func NewCMServer() *CMServer {
Port: ports.ControllerManagerPort,
Address: "0.0.0.0",
ConcurrentEndpointSyncs: 5,
ConcurrentServiceSyncs: 1,
ConcurrentRCSyncs: 5,
ConcurrentRSSyncs: 5,
ConcurrentDaemonSetSyncs: 2,
@ -108,6 +109,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.Int32Var(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")

View File

@ -171,9 +171,11 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatalf("Failed to start node status update controller: %v", err)
}
serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
serviceController.Run(int(s.ConcurrentServiceSyncs))
}
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {

File diff suppressed because it is too large Load Diff

View File

@ -479,6 +479,10 @@ type KubeControllerManagerConfiguration struct {
// allowed to sync concurrently. Larger number = more responsive replica
// management, but more CPU (and network) load.
ConcurrentRCSyncs int32 `json:"concurrentRCSyncs"`
// concurrentServiceSyncs is the number of services that are
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentServiceSyncs int32 `json:"concurrentServiceSyncs"`
// concurrentResourceQuotaSyncs is the number of resource quotas that are
// allowed to sync concurrently. Larger number = more responsive quota
// management, but more CPU (and network) load.

View File

@ -71,6 +71,7 @@ func DeepCopy_componentconfig_KubeControllerManagerConfiguration(in interface{},
out.ConcurrentEndpointSyncs = in.ConcurrentEndpointSyncs
out.ConcurrentRSSyncs = in.ConcurrentRSSyncs
out.ConcurrentRCSyncs = in.ConcurrentRCSyncs
out.ConcurrentServiceSyncs = in.ConcurrentServiceSyncs
out.ConcurrentResourceQuotaSyncs = in.ConcurrentResourceQuotaSyncs
out.ConcurrentDeploymentSyncs = in.ConcurrentDeploymentSyncs
out.ConcurrentDaemonSetSyncs = in.ConcurrentDaemonSetSyncs

View File

@ -32,14 +32,22 @@ import (
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
workerGoroutines = 10
// Interval of synchoronizing service status from apiserver
serviceSyncPeriod = 30 * time.Second
// Interval of synchoronizing node status from apiserver
nodeSyncPeriod = 100 * time.Second
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
@ -57,14 +65,8 @@ const (
)
type cachedService struct {
// Ensures only one goroutine can operate on this service at any given time.
mu sync.Mutex
// The last-known state of the service
lastState *api.Service
// The state as successfully applied to the load balancer
appliedState *api.Service
// The cached state of the service
state *api.Service
// Controls error back-off
lastRetryDelay time.Duration
}
@ -75,20 +77,27 @@ type serviceCache struct {
}
type ServiceController struct {
cloud cloudprovider.Interface
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone
cache *serviceCache
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister cache.StoreToNodeLister
cloud cloudprovider.Interface
knownHosts []string
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone
cache *serviceCache
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
serviceController *framework.Controller
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister cache.StoreToNodeLister
// services that need to be synced
workingQueue workqueue.DelayingInterface
}
// New returns a new service controller to keep cloud provider service resources
// (like load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) *ServiceController {
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
@ -97,8 +106,9 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
return &ServiceController{
s := &ServiceController{
cloud: cloud,
knownHosts: []string{},
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
@ -107,7 +117,45 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
nodeLister: cache.StoreToNodeLister{
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
},
workingQueue: workqueue.NewDelayingQueue(),
}
s.serviceStore.Store, s.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
},
&api.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*api.Service)
curSvc, ok2 := cur.(*api.Service)
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
},
)
if err := s.init(); err != nil {
return nil, err
}
return s, nil
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
return
}
s.workingQueue.Add(key)
}
// Run starts a background goroutine that watches for changes to services that
@ -120,42 +168,33 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
//
// It's an error to call Run() more than once for a given ServiceController
// object.
func (s *ServiceController) Run(serviceSyncPeriod, nodeSyncPeriod time.Duration) error {
if err := s.init(); err != nil {
return err
func (s *ServiceController) Run(workers int) {
defer runtime.HandleCrash()
go s.serviceController.Run(wait.NeverStop)
for i := 0; i < workers; i++ {
go wait.Until(s.worker, time.Second, wait.NeverStop)
}
// We have to make this check because the ListWatch that we use in
// WatchServices requires Client functions that aren't in the interface
// for some reason.
if _, ok := s.kubeClient.(*clientset.Clientset); !ok {
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the clientset.Interface.")
}
// Get the currently existing set of services and then all future creates
// and updates of services.
// A delta compressor is needed for the DeltaFIFO queue because we only ever
// care about the most recent state.
serviceQueue := cache.NewDeltaFIFO(
cache.MetaNamespaceKeyFunc,
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
if len(d) == 0 {
return d
}
return cache.Deltas{*d.Newest()}
}),
s.cache,
)
lw := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, serviceSyncPeriod).Run()
for i := 0; i < workerGoroutines; i++ {
go s.watchServices(serviceQueue)
}
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
go s.nodeSyncLoop(nodeSyncPeriod)
return nil
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *ServiceController) worker() {
for {
func() {
key, quit := s.workingQueue.Get()
if quit {
return
}
defer s.workingQueue.Done(key)
err := s.syncService(key.(string))
if err != nil {
glog.Errorf("Error syncing service: %v", err)
}
}()
}
}
func (s *ServiceController) init() error {
@ -181,104 +220,14 @@ func (s *ServiceController) init() error {
return nil
}
// Loop infinitely, processing all service updates provided by the queue.
func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
for {
serviceQueue.Pop(func(obj interface{}) error {
deltas, ok := obj.(cache.Deltas)
if !ok {
runtime.HandleError(fmt.Errorf("Received object from service watcher that wasn't Deltas: %+v", obj))
return nil
}
delta := deltas.Newest()
if delta == nil {
runtime.HandleError(fmt.Errorf("Received nil delta from watcher queue."))
return nil
}
err, retryDelay := s.processDelta(delta)
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
runtime.HandleError(fmt.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err))
go func(deltas cache.Deltas, delay time.Duration) {
time.Sleep(delay)
if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
runtime.HandleError(fmt.Errorf("Error requeuing service delta - will not retry: %v", err))
}
}(deltas, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
}
return nil
})
}
}
// Returns an error if processing the delta failed, along with a time.Duration
// Returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
var (
namespacedName types.NamespacedName
cachedService *cachedService
)
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
deltaService, ok := delta.Object.(*api.Service)
if ok {
namespacedName.Namespace = deltaService.Namespace
namespacedName.Name = deltaService.Name
cachedService = s.cache.getOrCreate(namespacedName.String())
} else {
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
// can send a deletion with an unknown state. Grab the service from our
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("delta contained object that wasn't a service or a deleted key: %#v", delta), doNotRetry
}
cachedService, ok = s.cache.get(key.Key)
if !ok {
return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
}
deltaService = cachedService.lastState
delta.Object = deltaService
namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name}
}
glog.V(2).Infof("Got new %s delta for service: %v", delta.Type, namespacedName)
// Ensure that no other goroutine will interfere with our processing of the
// service.
cachedService.mu.Lock()
defer cachedService.mu.Unlock()
// Get the most recent state of the service from the API directly rather than
// trusting the body of the delta. This avoids update re-ordering problems.
// TODO: Handle sync delta types differently rather than doing a get on every
// service every time we sync?
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
if err != nil && !errors.IsNotFound(err) {
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
return err, cachedService.nextRetryDelay()
}
if errors.IsNotFound(err) {
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, deltaService)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(namespacedName.String())
cachedService.resetRetryDelay()
return nil, doNotRetry
}
// Update the cached service (used above for populating synthetic deletes)
cachedService.lastState = service
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
// cache the service, we need the info for service deletion
cachedService.state = service
err, retry := s.createLoadBalancerIfNeeded(key, service)
if err != nil {
message := "Error creating load balancer"
if retry {
@ -295,8 +244,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
// NOTE: Since we update the cached service if and only if we successfully
// processed it, a cached service being nil implies that it hasn't yet
// been successfully processed.
cachedService.appliedState = service
s.cache.set(namespacedName.String(), cachedService)
s.cache.set(key, cachedService)
cachedService.resetRetryDelay()
return nil, doNotRetry
@ -304,11 +252,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Durati
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) {
if appliedState != nil && !s.needsUpdate(appliedState, service) {
glog.Infof("LB doesn't need update for service %s", namespacedName)
return nil, notRetryable
}
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
@ -316,39 +260,18 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
// Save the state so we can avoid a write if it doesn't change
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
if wantsLoadBalancer(service) {
glog.V(2).Infof("Ensuring LB for service %s", namespacedName)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to create load balancer for service %s: %v", namespacedName, err), retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
} else {
if !wantsLoadBalancer(service) {
needDelete := true
if appliedState != nil {
if !wantsLoadBalancer(appliedState) {
needDelete = false
}
} else {
// If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it.
// Technically EnsureLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", namespacedName, err), retryable
}
if !exists {
needDelete = false
}
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", key, err), retryable
}
if !exists {
needDelete = false
}
if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
return err, retryable
@ -357,16 +280,28 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
}
service.Status.LoadBalancer = api.LoadBalancerStatus{}
} else {
glog.V(2).Infof("Ensuring LB for service %s", key)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service)
if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
}
// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
if api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
} else {
if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
if err := s.persistUpdate(service); err != nil {
return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
}
} else {
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
}
return nil, notRetryable
@ -376,26 +311,25 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
switch {
case err == nil:
if err == nil {
return nil
case errors.IsNotFound(err):
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
}
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
case errors.IsConflict(err):
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancer status. For now, just rely on the fact that we'll
// also process the update that caused the resource version to change.
}
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancer status. For now, just rely on the fact that we'll
// also process the update that caused the resource version to change.
if errors.IsConflict(err) {
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
service.Namespace, service.Name, err)
return nil
}
glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
service.Namespace, service.Name, err)
time.Sleep(clientRetryInterval)
@ -415,9 +349,10 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
if err != nil {
return err
} else {
service.Status.LoadBalancer = *status
}
service.Status.LoadBalancer = *status
return nil
}
@ -443,12 +378,14 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
return nil, false, nil
}
func (s *serviceCache) allServices() []*cachedService {
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) allServices() []*api.Service {
s.mu.Lock()
defer s.mu.Unlock()
services := make([]*cachedService, 0, len(s.serviceMap))
services := make([]*api.Service, 0, len(s.serviceMap))
for _, v := range s.serviceMap {
services = append(services, v)
services = append(services, v.state)
}
return services
}
@ -681,52 +618,43 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
// nodeSyncLoop handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes.
func (s *ServiceController) nodeSyncLoop(period time.Duration) {
var prevHosts []string
var servicesToUpdate []*cachedService
for range time.Tick(period) {
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
continue
}
newHosts := hostsFromNodeSlice(nodes)
if stringSlicesEqual(newHosts, prevHosts) {
// The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
continue
}
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
// Try updating all services, and save the ones that fail to try again next
// round.
servicesToUpdate = s.cache.allServices()
numServices := len(servicesToUpdate)
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(servicesToUpdate), numServices)
prevHosts = newHosts
func (s *ServiceController) nodeSyncLoop() {
var servicesToUpdate []*api.Service
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
return
}
newHosts := hostsFromNodeSlice(nodes)
if stringSlicesEqual(newHosts, s.knownHosts) {
// The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
return
}
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
// Try updating all services, and save the ones that fail to try again next
// round.
servicesToUpdate = s.cache.allServices()
numServices := len(servicesToUpdate)
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(servicesToUpdate), numServices)
s.knownHosts = newHosts
}
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the list of hosts provided.
// Returns the list of services that couldn't be updated.
func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, hosts []string) (servicesToRetry []*cachedService) {
func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
for _, service := range services {
func() {
service.mu.Lock()
defer service.mu.Unlock()
// If the applied state is nil, that means it hasn't yet been successfully dealt
// with by the load balancer reconciler. We can trust the load balancer
// reconciler to ensure the service's load balancer is created to target
// the correct nodes.
if service.appliedState == nil {
if service == nil {
return
}
if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil {
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
glog.Errorf("External error while updating load balancer: %v.", err)
servicesToRetry = append(servicesToRetry, service)
}
@ -785,3 +713,81 @@ func (s *cachedService) nextRetryDelay() time.Duration {
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
var retryDelay time.Duration
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
obj, exists, err := s.serviceStore.Store.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)
return err
}
if !exists {
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
} else {
service, ok := obj.(*api.Service)
if ok {
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
}
glog.Infof("Found tombstone for %v", key)
err, retryDelay = s.processServiceDeletion(tombstone.Key)
}
}
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service. Retrying in %s: %v", retryDelay, err)
go func(obj interface{}, delay time.Duration) {
// put back the service key to working queue, it is possible that more entries of the service
// were added into the queue during the delay, but it does not mess as when handling the retry,
// it always get the last service info from service store
s.workingQueue.AddAfter(obj, delay)
}(key, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
}
return nil
}
// Returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry after that Duration.
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
cachedService, ok := s.cache.get(key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
}
service := cachedService.state
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil, doNotRetry
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
cachedService.resetRetryDelay()
return nil, doNotRetry
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/types"
@ -29,7 +30,7 @@ import (
const region = "us-central"
func newService(name string, uid types.UID, serviceType api.ServiceType) *api.Service {
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{Type: serviceType}}
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: api.ServiceSpec{Type: serviceType}}
}
func TestCreateExternalLoadBalancer(t *testing.T) {
@ -56,6 +57,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
ObjectMeta: api.ObjectMeta{
Name: "udp-service",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("services", "udp-service"),
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
@ -73,6 +75,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
ObjectMeta: api.ObjectMeta{
Name: "basic-service1",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("services", "basic-service1"),
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
@ -91,11 +94,11 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
cloud := &fakecloud.FakeCloud{}
cloud.Region = region
client := &fake.Clientset{}
controller := New(cloud, client, "test-cluster")
controller, _ := New(cloud, client, "test-cluster")
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
client.ClearActions() // ignore any client calls made in init()
err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{Namespace: "foo", Name: "bar"}, item.service, nil)
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
} else if item.expectErr && err == nil {
@ -211,13 +214,13 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
cloud.Region = region
client := &fake.Clientset{}
controller := New(cloud, client, "test-cluster2")
controller, _ := New(cloud, client, "test-cluster2")
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
var services []*cachedService
var services []*api.Service
for _, service := range item.services {
services = append(services, &cachedService{lastState: service, appliedState: service})
services = append(services, service)
}
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil {
t.Errorf("unexpected error: %v", err)