Merge pull request #9413 from lavalamp/serviceFix

Don't allow services to get stuck on an intermediate bad state
This commit is contained in:
Prashanth B 2015-07-23 17:46:41 -07:00
commit 90a2b7d91e

View File

@ -30,7 +30,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
@ -75,6 +74,7 @@ type ServiceController struct {
cache *serviceCache
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister cache.StoreToNodeLister
}
// New returns a new service controller to keep cloud provider service resources
@ -91,6 +91,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: cache.StoreToNodeLister{
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
},
}
}
@ -99,6 +102,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
// load balancers created and deleted appropriately.
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
// if external load balancers need to be updated to point to a new set.
//
// It's an error to call Run() more than once for a given ServiceController
// object.
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
if err := s.init(); err != nil {
return err
@ -113,19 +119,27 @@ func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
// Get the currently existing set of services and then all future creates
// and updates of services.
// No delta compressor is needed for the DeltaFIFO queue because we only ever
// A delta compressor is needed for the DeltaFIFO queue because we only ever
// care about the most recent state.
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache)
serviceQueue := cache.NewDeltaFIFO(
cache.MetaNamespaceKeyFunc,
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
if len(d) == 0 {
return d
}
return cache.Deltas{*d.Newest()}
}),
s.cache,
)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
for i := 0; i < workerGoroutines; i++ {
go s.watchServices(serviceQueue)
}
nodeLister := &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, nodeLister.Store, 0).Run()
go s.nodeSyncLoop(nodeLister, nodeSyncPeriod)
cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
go s.nodeSyncLoop(nodeSyncPeriod)
return nil
}
@ -342,7 +356,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
if err != nil {
return err
}
nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
nodes, err := s.nodeLister.List()
if err != nil {
return err
}
@ -352,7 +366,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
// TODO: Make this actually work for multiple IPs by using different
// names for each. For now, we'll just create the first and break.
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil {
return err
} else {
@ -362,7 +376,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
}
} else {
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil {
return err
} else {
@ -570,7 +584,7 @@ func hostsFromNodeList(list *api.NodeList) []string {
// nodeSyncLoop handles updating the hosts pointed to by all external load
// balancers whenever the set of nodes in the cluster changes.
func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, period time.Duration) {
func (s *ServiceController) nodeSyncLoop(period time.Duration) {
var prevHosts []string
var servicesToUpdate []*cachedService
// TODO: Eliminate the unneeded now variable once we stop compiling in go1.3.
@ -578,7 +592,7 @@ func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, pe
// something to compile, and gofmt1.4 complains about using `_ = range`.
for now := range time.Tick(period) {
_ = now
nodes, err := nodeLister.List()
nodes, err := s.nodeLister.List()
if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
continue