From fc08a0a71b7a8b00271da28c14c4d0897c502988 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Thu, 9 Apr 2015 20:48:27 +0000 Subject: [PATCH] Do service creation/update/deletion work in a pool of goroutines, protecting each service with a lock to ensure that no two goroutines will process a service at the same time. This is needed to avoid weird race conditions. --- .../app/controllermanager.go | 2 - pkg/cloudprovider/cloud.go | 2 + .../servicecontroller/servicecontroller.go | 172 +++++++++++------- .../servicecontroller_test.go | 3 +- 4 files changed, 107 insertions(+), 72 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 06f800bd48d..060c26239c0 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -50,7 +50,6 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string - ClusterName string MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -102,7 +101,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") - fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index ddf74af7da0..f3c3b18491d 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -42,6 +42,8 @@ type Clusters interface { Master(clusterName string) (string, error) } +// TODO(#6812): Use a shorter name that's less likely to be longer than cloud +// providers' name length limits. func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string { return clusterName + "-" + serviceNamespace + "-" + serviceName } diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index 9a4c98a0d2d..1d17323f10d 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -19,19 +19,25 @@ package servicecontroller import ( "fmt" "net" + "sort" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) const ( + workerGoroutines = 10 + clientRetryCount = 5 clientRetryInterval = 5 * time.Second @@ -39,14 +45,24 @@ const ( notRetryable = false ) +type cachedService struct { + service *api.Service + // Ensures only one goroutine can operate on this service at any given time. + mu sync.Mutex +} + +type serviceCache struct { + mu sync.Mutex // protects serviceMap + serviceMap map[string]*cachedService +} + type ServiceController struct { cloud cloudprovider.Interface kubeClient client.Interface clusterName string balancer cloudprovider.TCPLoadBalancer zone cloudprovider.Zone - mu sync.Mutex // protects serviceMap - serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc + cache *serviceCache } // New returns a new service controller to keep cloud provider service resources @@ -56,7 +72,7 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName cloud: cloud, kubeClient: kubeClient, clusterName: clusterName, - serviceMap: make(map[string]*api.Service), + cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, } } @@ -75,7 +91,16 @@ func (s *ServiceController) Run() error { return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.") } - go s.watchServices() + // Get the currently existing set of services and then all future creates + // and updates of services. + // No delta compressor is needed for the DeltaFIFO queue because we only ever + // care about the most recent state. + serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache) + lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) + cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() + for i := 0; i < workerGoroutines; i++ { + go s.watchServices(serviceQueue) + } return nil } @@ -102,15 +127,8 @@ func (s *ServiceController) init() error { return nil } -func (s *ServiceController) watchServices() { - // Get the currently existing set of services and then all future creates - // and updates of services. - // TODO: Add a compressor that intelligently squashes together updates? - keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() }) - serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister) - lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) - cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() - // TODO: Add proper retries rather than just re-adding to the queue? +// Loop infinitely, processing all service updates provided by the queue. +func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { for { newItem := serviceQueue.Pop() deltas, ok := newItem.(cache.Deltas) @@ -129,7 +147,7 @@ func (s *ServiceController) watchServices() { time.Sleep(5 * time.Second) serviceQueue.AddIfNotPresent(deltas) } else if err != nil { - glog.Errorf("Failed to process service delta. Not retrying: %v", err) + util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) } } } @@ -138,6 +156,8 @@ func (s *ServiceController) watchServices() { // indicator of whether the processing should be retried. func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { service, ok := delta.Object.(*api.Service) + var namespacedName types.NamespacedName + var cachedService *cachedService if !ok { // If the DeltaFIFO saw a key in our cache that it didn't know about, it // can send a deletion with an unknown state. Grab the service from our @@ -146,16 +166,25 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { if !ok { return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable } - service, ok = s.getService(key.Key) + cachedService, ok = s.cache.get(key.Key) if !ok { return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable } - delta.Object = service + namespacedName = types.NamespacedName{service.Namespace, service.Name} + service = cachedService.service + delta.Object = cachedService.service + } else { + namespacedName.Namespace = service.Namespace + namespacedName.Name = service.Name + cachedService = s.cache.getOrCreate(namespacedName.String()) } glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) - // TODO: Make this more parallel. The only things that need to serialized - // are changes to services with the same namespace and name. + // Ensure that no other goroutine will interfere with our processing of the + // service. + cachedService.mu.Lock() + defer cachedService.mu.Unlock() + // TODO: Handle added, updated, and sync differently? switch delta.Type { case cache.Added: @@ -163,9 +192,19 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { case cache.Updated: fallthrough case cache.Sync: - return s.createLoadBalancerIfNeeded(service) + err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service) + if err != nil { + return err, retry + } + // Always update the cache upon success + cachedService.service = service + s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: - return s.handleDelete(service) + err := s.ensureLBDeleted(service) + if err != nil { + return err, retryable + } + s.cache.delete(namespacedName.String()) default: glog.Errorf("Unexpected delta type: %v", delta.Type) } @@ -174,18 +213,12 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. -func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) { - namespacedName, err := cache.MetaNamespaceKeyFunc(service) - if err != nil { - return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable - } - - cachedService, cached := s.getService(namespacedName) - if cached && !needsUpdate(cachedService, service) { +func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) { + if cachedService != nil && !needsUpdate(cachedService, service) { glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) return nil, notRetryable } - if cached { + if cachedService != nil { // If the service already exists but needs to be updated, delete it so that // we can recreate it cleanly. if cachedService.Spec.CreateExternalLoadBalancer { @@ -227,7 +260,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er // The load balancer doesn't exist yet, so create it. publicIPstring := fmt.Sprint(service.Spec.PublicIPs) - err = s.createExternalLoadBalancer(service) + err := s.createExternalLoadBalancer(service) if err != nil { return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable } @@ -236,7 +269,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er glog.Infof("Not persisting unchanged service to registry.") return nil, notRetryable } - s.setService(namespacedName, service) // If creating the load balancer succeeded, persist the updated service. if err = s.persistUpdate(service); err != nil { @@ -245,8 +277,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er return nil, notRetryable } -// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or -// the object having been deleted. func (s *ServiceController) persistUpdate(service *api.Service) error { var err error for i := 0; i < clientRetryCount; i++ { @@ -254,6 +284,20 @@ func (s *ServiceController) persistUpdate(service *api.Service) error { if err == nil { return nil } + // If the object no longer exists, we don't want to recreate it. Just bail + // out so that we can process the delete, which we should soon be receiving + // if we haven't already. + if errors.IsNotFound(err) { + glog.Infof("Not persisting update to service that no longer exists: %v", err) + return nil + } + // TODO: Try to resolve the conflict if the change was unrelated to load + // balancers and public IPs. For now, just rely on the fact that we'll + // also process the update that caused the resource version to change. + if errors.IsConflict(err) { + glog.Infof("Not persisting update to service that has been changed since we received it: %v", err) + return nil + } glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v", service.Name, err) time.Sleep(clientRetryInterval) @@ -266,7 +310,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err if err != nil { return err } - nodes, err := s.kubeClient.Nodes().List(labels.Everything()) + nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) if err != nil { return err } @@ -294,24 +338,8 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err return nil } -// Returns whatever error occurred along with a boolean indicator of whether it -// should be retried. -func (s *ServiceController) handleDelete(service *api.Service) (error, bool) { - if err := s.ensureLBDeleted(service); err != nil { - return err, retryable - } - namespacedName, err := cache.MetaNamespaceKeyFunc(service) - if err != nil { - // This is panic-worthy, since the queue shouldn't have been able to - // handle the service if it couldn't generate a name for it. - return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable - } - s.deleteService(namespacedName) - return nil, notRetryable -} - // Ensures that the load balancer associated with the given service is deleted, -// doing the deletion if necessary. +// doing the deletion if necessary. Should always be retried upon failure. func (s *ServiceController) ensureLBDeleted(service *api.Service) error { // This is only needed because not all delete load balancer implementations // are currently idempotent to the LB not existing. @@ -327,9 +355,9 @@ func (s *ServiceController) ensureLBDeleted(service *api.Service) error { return nil } -// listKeys implements the interface required by DeltaFIFO to list the keys we +// ListKeys implements the interface required by DeltaFIFO to list the keys we // already know about. -func (s *ServiceController) listKeys() []string { +func (s *serviceCache) ListKeys() []string { s.mu.Lock() defer s.mu.Unlock() keys := make([]string, 0, len(s.serviceMap)) @@ -339,20 +367,31 @@ func (s *ServiceController) listKeys() []string { return keys } -func (s *ServiceController) getService(serviceName string) (*api.Service, bool) { +func (s *serviceCache) get(serviceName string) (*cachedService, bool) { s.mu.Lock() defer s.mu.Unlock() - info, ok := s.serviceMap[serviceName] - return info, ok + service, ok := s.serviceMap[serviceName] + return service, ok } -func (s *ServiceController) setService(serviceName string, info *api.Service) { +func (s *serviceCache) getOrCreate(serviceName string) *cachedService { s.mu.Lock() defer s.mu.Unlock() - s.serviceMap[serviceName] = info + service, ok := s.serviceMap[serviceName] + if !ok { + service = &cachedService{} + s.serviceMap[serviceName] = service + } + return service } -func (s *ServiceController) deleteService(serviceName string) { +func (s *serviceCache) set(serviceName string, service *cachedService) { + s.mu.Lock() + defer s.mu.Unlock() + s.serviceMap[serviceName] = service +} + +func (s *serviceCache) delete(serviceName string) { s.mu.Lock() defer s.mu.Unlock() delete(s.serviceMap, serviceName) @@ -379,10 +418,8 @@ func needsUpdate(oldService *api.Service, newService *api.Service) bool { return false } -// TODO: Use a shorter name that's less likely to be longer than cloud -// providers' length limits. func (s *ServiceController) loadBalancerName(service *api.Service) string { - return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) + return cloudprovider.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) } func getTCPPorts(service *api.Service) ([]int, error) { @@ -411,13 +448,10 @@ func portsEqual(x, y *api.Service) bool { if len(xPorts) != len(yPorts) { return false } - // Use a map for comparison since port slices aren't necessarily sorted. - xPortMap := make(map[int]bool) - for _, xPort := range xPorts { - xPortMap[xPort] = true - } - for _, yPort := range yPorts { - if !xPortMap[yPort] { + sort.Ints(xPorts) + sort.Ints(yPorts) + for i := range xPorts { + if xPorts[i] != yPorts[i] { return false } } diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go index 73aa5ff08b4..d1a5a46a817 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) const region = "us-central" @@ -89,7 +90,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) { controller.init() cloud.Calls = nil // ignore any cloud calls made in init() client.Actions = nil // ignore any client calls made in init() - err, _ := controller.createLoadBalancerIfNeeded(item.service) + err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{"foo", "bar"}, item.service, nil) if !item.expectErr && err != nil { t.Errorf("unexpected error: %v", err) } else if item.expectErr && err == nil {