diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 2b9fdd58adf..85c1fc60649 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -216,11 +216,17 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) // Start the service controller - serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) + serviceController, err := servicecontroller.New( + cloud, + client("service-controller"), + newSharedInformers.Core().V1().Services(), + newSharedInformers.Core().V1().Nodes(), + s.ClusterName, + ) if err != nil { glog.Errorf("Failed to start service controller: %v", err) } else { - serviceController.Run(int(s.ConcurrentServiceSyncs)) + go serviceController.Run(stop, int(s.ConcurrentServiceSyncs)) } time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 791f3b41c32..b89f17087e7 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -450,11 +450,17 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName) + serviceController, err := servicecontroller.New( + cloud, + clientBuilder.ClientOrDie("service-controller"), + newSharedInformers.Core().V1().Services(), + newSharedInformers.Core().V1().Nodes(), + s.ClusterName, + ) if err != nil { glog.Errorf("Failed to start service controller: %v", err) } else { - serviceController.Run(int(s.ConcurrentServiceSyncs)) + go serviceController.Run(stop, int(s.ConcurrentServiceSyncs)) } time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index d460f512361..fc3ffc6230f 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -19,18 +19,16 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/fields", - "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", @@ -48,7 +46,9 @@ go_test( "//pkg/api/testapi:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/client-go/tools/record", diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 6167040b24d..6a9066b257c 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -26,12 +26,9 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -40,7 +37,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" @@ -80,28 +78,33 @@ type serviceCache struct { } type ServiceController struct { - cloud cloudprovider.Interface - knownHosts []*v1.Node - servicesToUpdate []*v1.Service - kubeClient clientset.Interface - clusterName string - balancer cloudprovider.LoadBalancer - zone cloudprovider.Zone - cache *serviceCache - // A store of services, populated by the serviceController - serviceStore listers.StoreToServiceLister - // Watches changes to all services - serviceController cache.Controller - eventBroadcaster record.EventBroadcaster - eventRecorder record.EventRecorder - nodeLister listers.StoreToNodeLister + cloud cloudprovider.Interface + knownHosts []*v1.Node + servicesToUpdate []*v1.Service + kubeClient clientset.Interface + clusterName string + balancer cloudprovider.LoadBalancer + zone cloudprovider.Zone + cache *serviceCache + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced // services that need to be synced workingQueue workqueue.DelayingInterface } // New returns a new service controller to keep cloud provider service resources // (like load balancers) in sync with the registry. -func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) { +func New( + cloud cloudprovider.Interface, + kubeClient clientset.Interface, + serviceInformer coreinformers.ServiceInformer, + nodeInformer coreinformers.NodeInformer, + clusterName string, +) (*ServiceController, error) { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "service-controller"}) @@ -118,22 +121,12 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, eventBroadcaster: broadcaster, eventRecorder: recorder, - nodeLister: listers.StoreToNodeLister{ - Store: cache.NewStore(cache.MetaNamespaceKeyFunc), - }, - workingQueue: workqueue.NewDelayingQueue(), + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + workingQueue: workqueue.NewNamedDelayingQueue("service"), } - s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return s.kubeClient.Core().Services(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return s.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.Service{}, - serviceSyncPeriod, + + serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: s.enqueueService, UpdateFunc: func(old, cur interface{}) { @@ -145,8 +138,11 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN }, DeleteFunc: s.enqueueService, }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + serviceSyncPeriod, ) + s.serviceLister = serviceInformer.Lister() + s.serviceListerSynced = serviceInformer.Informer().HasSynced + if err := s.init(); err != nil { return nil, err } @@ -173,15 +169,24 @@ func (s *ServiceController) enqueueService(obj interface{}) { // // It's an error to call Run() more than once for a given ServiceController // object. -func (s *ServiceController) Run(workers int) { +func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { defer runtime.HandleCrash() - go s.serviceController.Run(wait.NeverStop) - for i := 0; i < workers; i++ { - go wait.Until(s.worker, time.Second, wait.NeverStop) + defer s.workingQueue.ShutDown() + + glog.Info("Starting service controller") + + if !cache.WaitForCacheSync(stopCh, s.serviceListerSynced, s.nodeListerSynced) { + runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) } - nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything()) - cache.NewReflector(nodeLW, &v1.Node{}, s.nodeLister.Store, 0).Run() - go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop) + + for i := 0; i < workers; i++ { + go wait.Until(s.worker, time.Second, stopCh) + } + + go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) + + <-stopCh + glog.Info("Stopping service controller") } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -258,12 +263,13 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) { - // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // which may involve service interruption. Also, we would like user-friendly events. // Save the state so we can avoid a write if it doesn't change previousState := v1.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) + var newState *v1.LoadBalancerStatus + var err error if !wantsLoadBalancer(service) { needDelete := true @@ -284,7 +290,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") } - service.Status.LoadBalancer = v1.LoadBalancerStatus{} + newState = &v1.LoadBalancerStatus{} } else { glog.V(2).Infof("Ensuring LB for service %s", key) @@ -292,7 +298,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S // The load balancer doesn't exist yet, so create it. s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer") - err := s.createLoadBalancer(service) + newState, err = s.createLoadBalancer(service) if err != nil { return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable } @@ -301,7 +307,17 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S // Write the state if changed // TODO: Be careful here ... what if there were other changes to the service? - if !v1.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) { + if !v1.LoadBalancerStatusEqual(previousState, newState) { + // Make a copy so we don't mutate the shared informer cache + copy, err := api.Scheme.DeepCopy(service) + if err != nil { + return err, retryable + } + service = copy.(*v1.Service) + + // Update the status on the copy + service.Status.LoadBalancer = *newState + if err := s.persistUpdate(service); err != nil { return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable } @@ -340,30 +356,23 @@ func (s *ServiceController) persistUpdate(service *v1.Service) error { return err } -func (s *ServiceController) createLoadBalancer(service *v1.Service) error { - nodes, err := s.nodeLister.List() +func (s *ServiceController) createLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { + nodes, err := s.nodeLister.List(labels.Everything()) if err != nil { - return err + return nil, err } lbNodes := []*v1.Node{} - for ix := range nodes.Items { - if includeNodeFromNodeList(&nodes.Items[ix]) { - lbNodes = append(lbNodes, &nodes.Items[ix]) + for ix := range nodes { + if includeNodeFromNodeList(nodes[ix]) { + lbNodes = append(lbNodes, nodes[ix]) } } // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes) - if err != nil { - return err - } else { - service.Status.LoadBalancer = *status - } - - return nil + return s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes) } // ListKeys implements the interface required by DeltaFIFO to list the keys we @@ -604,7 +613,7 @@ func includeNodeFromNodeList(node *v1.Node) bool { return !node.Spec.Unschedulable } -func getNodeConditionPredicate() listers.NodeConditionPredicate { +func getNodeConditionPredicate() corelisters.NodeConditionPredicate { return func(node *v1.Node) bool { // We add the master to the node list, but its unschedulable. So we use this to filter // the master. @@ -631,7 +640,7 @@ func getNodeConditionPredicate() listers.NodeConditionPredicate { // nodeSyncLoop handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. func (s *ServiceController) nodeSyncLoop() { - newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List() + newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return @@ -736,30 +745,26 @@ func (s *ServiceController) syncService(key string) error { defer func() { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) }() - // obj holds the latest service info from apiserver - obj, exists, err := s.serviceStore.Indexer.GetByKey(key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.Infof("Unable to retrieve service %v from store: %v", key, err) - s.workingQueue.Add(key) return err } - if !exists { + + // service holds the latest service info from apiserver + service, err := s.serviceLister.Services(namespace).Get(name) + switch { + case errors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned glog.Infof("Service has been deleted %v", key) err, retryDelay = s.processServiceDeletion(key) - } else { - service, ok := obj.(*v1.Service) - if ok { - cachedService = s.cache.getOrCreate(key) - err, retryDelay = s.processServiceUpdate(cachedService, service, key) - } else { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj) - } - glog.Infof("Found tombstone for %v", key) - err, retryDelay = s.processServiceDeletion(tombstone.Key) - } + case err != nil: + glog.Infof("Unable to retrieve service %v from store: %v", key, err) + s.workingQueue.Add(key) + return err + default: + cachedService = s.cache.getOrCreate(key) + err, retryDelay = s.processServiceUpdate(cachedService, service, key) } if retryDelay != 0 { diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 8b1e8bf9c36..066f02841a0 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -26,7 +26,9 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/controller" ) const region = "us-central" @@ -35,6 +37,30 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv return &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: v1.ServiceSpec{Type: serviceType}} } +func alwaysReady() bool { return true } + +func newController() (*ServiceController, *fakecloud.FakeCloud, *fake.Clientset) { + cloud := &fakecloud.FakeCloud{} + cloud.Region = region + + client := fake.NewSimpleClientset() + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + serviceInformer := informerFactory.Core().V1().Services() + nodeInformer := informerFactory.Core().V1().Nodes() + + controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster") + controller.nodeListerSynced = alwaysReady + controller.serviceListerSynced = alwaysReady + controller.eventRecorder = record.NewFakeRecorder(100) + + controller.init() + cloud.Calls = nil // ignore any cloud calls made in init() + client.ClearActions() // ignore any client calls made in init() + + return controller, cloud, client +} + func TestCreateExternalLoadBalancer(t *testing.T) { table := []struct { service *v1.Service @@ -93,14 +119,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } for _, item := range table { - cloud := &fakecloud.FakeCloud{} - cloud.Region = region - client := &fake.Clientset{} - controller, _ := New(cloud, client, "test-cluster") - controller.eventRecorder = record.NewFakeRecorder(100) - controller.init() - cloud.Calls = nil // ignore any cloud calls made in init() - client.ClearActions() // ignore any client calls made in init() + controller, cloud, client := newController() err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service) if !item.expectErr && err != nil { t.Errorf("unexpected error: %v", err) @@ -217,14 +236,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }, } for _, item := range table { - cloud := &fakecloud.FakeCloud{} - - cloud.Region = region - client := &fake.Clientset{} - controller, _ := New(cloud, client, "test-cluster2") - controller.eventRecorder = record.NewFakeRecorder(100) - controller.init() - cloud.Calls = nil // ignore any cloud calls made in init() + controller, cloud, _ := newController() var services []*v1.Service for _, service := range item.services {