diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index a13c867b2e5..161f40548d1 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -30,7 +30,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" @@ -75,6 +74,7 @@ type ServiceController struct { cache *serviceCache eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder + nodeLister cache.StoreToNodeLister } // New returns a new service controller to keep cloud provider service resources @@ -91,6 +91,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, eventBroadcaster: broadcaster, eventRecorder: recorder, + nodeLister: cache.StoreToNodeLister{ + Store: cache.NewStore(cache.MetaNamespaceKeyFunc), + }, } } @@ -99,6 +102,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName // load balancers created and deleted appropriately. // nodeSyncPeriod controls how often we check the cluster's nodes to determine // if external load balancers need to be updated to point to a new set. +// +// It's an error to call Run() more than once for a given ServiceController +// object. func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error { if err := s.init(); err != nil { return err @@ -113,19 +119,27 @@ func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error { // Get the currently existing set of services and then all future creates // and updates of services. - // No delta compressor is needed for the DeltaFIFO queue because we only ever + // A delta compressor is needed for the DeltaFIFO queue because we only ever // care about the most recent state. - serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache) + serviceQueue := cache.NewDeltaFIFO( + cache.MetaNamespaceKeyFunc, + cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas { + if len(d) == 0 { + return d + } + return cache.Deltas{*d.Newest()} + }), + s.cache, + ) lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() for i := 0; i < workerGoroutines; i++ { go s.watchServices(serviceQueue) } - nodeLister := &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything()) - cache.NewReflector(nodeLW, &api.Node{}, nodeLister.Store, 0).Run() - go s.nodeSyncLoop(nodeLister, nodeSyncPeriod) + cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run() + go s.nodeSyncLoop(nodeSyncPeriod) return nil } @@ -342,7 +356,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err if err != nil { return err } - nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + nodes, err := s.nodeLister.List() if err != nil { return err } @@ -352,7 +366,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err // TODO: Make this actually work for multiple IPs by using different // names for each. For now, we'll just create the first and break. status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), - ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err } else { @@ -362,7 +376,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err } } else { status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, - ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err } else { @@ -570,7 +584,7 @@ func hostsFromNodeList(list *api.NodeList) []string { // nodeSyncLoop handles updating the hosts pointed to by all external load // balancers whenever the set of nodes in the cluster changes. -func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, period time.Duration) { +func (s *ServiceController) nodeSyncLoop(period time.Duration) { var prevHosts []string var servicesToUpdate []*cachedService // TODO: Eliminate the unneeded now variable once we stop compiling in go1.3. @@ -578,7 +592,7 @@ func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, pe // something to compile, and gofmt1.4 complains about using `_ = range`. for now := range time.Tick(period) { _ = now - nodes, err := nodeLister.List() + nodes, err := s.nodeLister.List() if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) continue