Do service creation/update/deletion work in a pool of goroutines, protecting

each service with a lock to ensure that no two goroutines will process a
service at the same time. This is needed to avoid weird race conditions.
This commit is contained in:
Alex Robinson 2015-04-09 20:48:27 +00:00
parent 9a351e3670
commit fc08a0a71b
4 changed files with 107 additions and 72 deletions

View File

@ -50,7 +50,6 @@ type CMServer struct {
ClientConfig client.Config ClientConfig client.Config
CloudProvider string CloudProvider string
CloudConfigFile string CloudConfigFile string
ClusterName string
MinionRegexp string MinionRegexp string
NodeSyncPeriod time.Duration NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration
@ -102,7 +101,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
client.BindClientConfigFlags(fs, &s.ClientConfig) client.BindClientConfigFlags(fs, &s.ClientConfig)
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.")
fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+

View File

@ -42,6 +42,8 @@ type Clusters interface {
Master(clusterName string) (string, error) Master(clusterName string) (string, error)
} }
// TODO(#6812): Use a shorter name that's less likely to be longer than cloud
// providers' name length limits.
func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string { func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string {
return clusterName + "-" + serviceNamespace + "-" + serviceName return clusterName + "-" + serviceNamespace + "-" + serviceName
} }

View File

@ -19,19 +19,25 @@ package servicecontroller
import ( import (
"fmt" "fmt"
"net" "net"
"sort"
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
const ( const (
workerGoroutines = 10
clientRetryCount = 5 clientRetryCount = 5
clientRetryInterval = 5 * time.Second clientRetryInterval = 5 * time.Second
@ -39,14 +45,24 @@ const (
notRetryable = false notRetryable = false
) )
type cachedService struct {
service *api.Service
// Ensures only one goroutine can operate on this service at any given time.
mu sync.Mutex
}
type serviceCache struct {
mu sync.Mutex // protects serviceMap
serviceMap map[string]*cachedService
}
type ServiceController struct { type ServiceController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
kubeClient client.Interface kubeClient client.Interface
clusterName string clusterName string
balancer cloudprovider.TCPLoadBalancer balancer cloudprovider.TCPLoadBalancer
zone cloudprovider.Zone zone cloudprovider.Zone
mu sync.Mutex // protects serviceMap cache *serviceCache
serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc
} }
// New returns a new service controller to keep cloud provider service resources // New returns a new service controller to keep cloud provider service resources
@ -56,7 +72,7 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
cloud: cloud, cloud: cloud,
kubeClient: kubeClient, kubeClient: kubeClient,
clusterName: clusterName, clusterName: clusterName,
serviceMap: make(map[string]*api.Service), cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
} }
} }
@ -75,7 +91,16 @@ func (s *ServiceController) Run() error {
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.") return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.")
} }
go s.watchServices() // Get the currently existing set of services and then all future creates
// and updates of services.
// No delta compressor is needed for the DeltaFIFO queue because we only ever
// care about the most recent state.
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
for i := 0; i < workerGoroutines; i++ {
go s.watchServices(serviceQueue)
}
return nil return nil
} }
@ -102,15 +127,8 @@ func (s *ServiceController) init() error {
return nil return nil
} }
func (s *ServiceController) watchServices() { // Loop infinitely, processing all service updates provided by the queue.
// Get the currently existing set of services and then all future creates func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
// and updates of services.
// TODO: Add a compressor that intelligently squashes together updates?
keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() })
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
// TODO: Add proper retries rather than just re-adding to the queue?
for { for {
newItem := serviceQueue.Pop() newItem := serviceQueue.Pop()
deltas, ok := newItem.(cache.Deltas) deltas, ok := newItem.(cache.Deltas)
@ -129,7 +147,7 @@ func (s *ServiceController) watchServices() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
serviceQueue.AddIfNotPresent(deltas) serviceQueue.AddIfNotPresent(deltas)
} else if err != nil { } else if err != nil {
glog.Errorf("Failed to process service delta. Not retrying: %v", err) util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
} }
} }
} }
@ -138,6 +156,8 @@ func (s *ServiceController) watchServices() {
// indicator of whether the processing should be retried. // indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, ok := delta.Object.(*api.Service) service, ok := delta.Object.(*api.Service)
var namespacedName types.NamespacedName
var cachedService *cachedService
if !ok { if !ok {
// If the DeltaFIFO saw a key in our cache that it didn't know about, it // If the DeltaFIFO saw a key in our cache that it didn't know about, it
// can send a deletion with an unknown state. Grab the service from our // can send a deletion with an unknown state. Grab the service from our
@ -146,16 +166,25 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
if !ok { if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
} }
service, ok = s.getService(key.Key) cachedService, ok = s.cache.get(key.Key)
if !ok { if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
} }
delta.Object = service namespacedName = types.NamespacedName{service.Namespace, service.Name}
service = cachedService.service
delta.Object = cachedService.service
} else {
namespacedName.Namespace = service.Namespace
namespacedName.Name = service.Name
cachedService = s.cache.getOrCreate(namespacedName.String())
} }
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service)
// TODO: Make this more parallel. The only things that need to serialized // Ensure that no other goroutine will interfere with our processing of the
// are changes to services with the same namespace and name. // service.
cachedService.mu.Lock()
defer cachedService.mu.Unlock()
// TODO: Handle added, updated, and sync differently? // TODO: Handle added, updated, and sync differently?
switch delta.Type { switch delta.Type {
case cache.Added: case cache.Added:
@ -163,9 +192,19 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
case cache.Updated: case cache.Updated:
fallthrough fallthrough
case cache.Sync: case cache.Sync:
return s.createLoadBalancerIfNeeded(service) err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service)
if err != nil {
return err, retry
}
// Always update the cache upon success
cachedService.service = service
s.cache.set(namespacedName.String(), cachedService)
case cache.Deleted: case cache.Deleted:
return s.handleDelete(service) err := s.ensureLBDeleted(service)
if err != nil {
return err, retryable
}
s.cache.delete(namespacedName.String())
default: default:
glog.Errorf("Unexpected delta type: %v", delta.Type) glog.Errorf("Unexpected delta type: %v", delta.Type)
} }
@ -174,18 +213,12 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// Returns whatever error occurred along with a boolean indicator of whether it // Returns whatever error occurred along with a boolean indicator of whether it
// should be retried. // should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) { func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) {
namespacedName, err := cache.MetaNamespaceKeyFunc(service) if cachedService != nil && !needsUpdate(cachedService, service) {
if err != nil {
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
cachedService, cached := s.getService(namespacedName)
if cached && !needsUpdate(cachedService, service) {
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
return nil, notRetryable return nil, notRetryable
} }
if cached { if cachedService != nil {
// If the service already exists but needs to be updated, delete it so that // If the service already exists but needs to be updated, delete it so that
// we can recreate it cleanly. // we can recreate it cleanly.
if cachedService.Spec.CreateExternalLoadBalancer { if cachedService.Spec.CreateExternalLoadBalancer {
@ -227,7 +260,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er
// The load balancer doesn't exist yet, so create it. // The load balancer doesn't exist yet, so create it.
publicIPstring := fmt.Sprint(service.Spec.PublicIPs) publicIPstring := fmt.Sprint(service.Spec.PublicIPs)
err = s.createExternalLoadBalancer(service) err := s.createExternalLoadBalancer(service)
if err != nil { if err != nil {
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
} }
@ -236,7 +269,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er
glog.Infof("Not persisting unchanged service to registry.") glog.Infof("Not persisting unchanged service to registry.")
return nil, notRetryable return nil, notRetryable
} }
s.setService(namespacedName, service)
// If creating the load balancer succeeded, persist the updated service. // If creating the load balancer succeeded, persist the updated service.
if err = s.persistUpdate(service); err != nil { if err = s.persistUpdate(service); err != nil {
@ -245,8 +277,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er
return nil, notRetryable return nil, notRetryable
} }
// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or
// the object having been deleted.
func (s *ServiceController) persistUpdate(service *api.Service) error { func (s *ServiceController) persistUpdate(service *api.Service) error {
var err error var err error
for i := 0; i < clientRetryCount; i++ { for i := 0; i < clientRetryCount; i++ {
@ -254,6 +284,20 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
if err == nil { if err == nil {
return nil return nil
} }
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
if errors.IsNotFound(err) {
glog.Infof("Not persisting update to service that no longer exists: %v", err)
return nil
}
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancers and public IPs. For now, just rely on the fact that we'll
// also process the update that caused the resource version to change.
if errors.IsConflict(err) {
glog.Infof("Not persisting update to service that has been changed since we received it: %v", err)
return nil
}
glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v", glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v",
service.Name, err) service.Name, err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
@ -266,7 +310,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
if err != nil { if err != nil {
return err return err
} }
nodes, err := s.kubeClient.Nodes().List(labels.Everything()) nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -294,24 +338,8 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
return nil return nil
} }
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) handleDelete(service *api.Service) (error, bool) {
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
namespacedName, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
// This is panic-worthy, since the queue shouldn't have been able to
// handle the service if it couldn't generate a name for it.
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
s.deleteService(namespacedName)
return nil, notRetryable
}
// Ensures that the load balancer associated with the given service is deleted, // Ensures that the load balancer associated with the given service is deleted,
// doing the deletion if necessary. // doing the deletion if necessary. Should always be retried upon failure.
func (s *ServiceController) ensureLBDeleted(service *api.Service) error { func (s *ServiceController) ensureLBDeleted(service *api.Service) error {
// This is only needed because not all delete load balancer implementations // This is only needed because not all delete load balancer implementations
// are currently idempotent to the LB not existing. // are currently idempotent to the LB not existing.
@ -327,9 +355,9 @@ func (s *ServiceController) ensureLBDeleted(service *api.Service) error {
return nil return nil
} }
// listKeys implements the interface required by DeltaFIFO to list the keys we // ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about. // already know about.
func (s *ServiceController) listKeys() []string { func (s *serviceCache) ListKeys() []string {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
keys := make([]string, 0, len(s.serviceMap)) keys := make([]string, 0, len(s.serviceMap))
@ -339,20 +367,31 @@ func (s *ServiceController) listKeys() []string {
return keys return keys
} }
func (s *ServiceController) getService(serviceName string) (*api.Service, bool) { func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
info, ok := s.serviceMap[serviceName] service, ok := s.serviceMap[serviceName]
return info, ok return service, ok
} }
func (s *ServiceController) setService(serviceName string, info *api.Service) { func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.serviceMap[serviceName] = info service, ok := s.serviceMap[serviceName]
if !ok {
service = &cachedService{}
s.serviceMap[serviceName] = service
}
return service
} }
func (s *ServiceController) deleteService(serviceName string) { func (s *serviceCache) set(serviceName string, service *cachedService) {
s.mu.Lock()
defer s.mu.Unlock()
s.serviceMap[serviceName] = service
}
func (s *serviceCache) delete(serviceName string) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
delete(s.serviceMap, serviceName) delete(s.serviceMap, serviceName)
@ -379,10 +418,8 @@ func needsUpdate(oldService *api.Service, newService *api.Service) bool {
return false return false
} }
// TODO: Use a shorter name that's less likely to be longer than cloud
// providers' length limits.
func (s *ServiceController) loadBalancerName(service *api.Service) string { func (s *ServiceController) loadBalancerName(service *api.Service) string {
return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) return cloudprovider.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name)
} }
func getTCPPorts(service *api.Service) ([]int, error) { func getTCPPorts(service *api.Service) ([]int, error) {
@ -411,13 +448,10 @@ func portsEqual(x, y *api.Service) bool {
if len(xPorts) != len(yPorts) { if len(xPorts) != len(yPorts) {
return false return false
} }
// Use a map for comparison since port slices aren't necessarily sorted. sort.Ints(xPorts)
xPortMap := make(map[int]bool) sort.Ints(yPorts)
for _, xPort := range xPorts { for i := range xPorts {
xPortMap[xPort] = true if xPorts[i] != yPorts[i] {
}
for _, yPort := range yPorts {
if !xPortMap[yPort] {
return false return false
} }
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
const region = "us-central" const region = "us-central"
@ -89,7 +90,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
controller.init() controller.init()
cloud.Calls = nil // ignore any cloud calls made in init() cloud.Calls = nil // ignore any cloud calls made in init()
client.Actions = nil // ignore any client calls made in init() client.Actions = nil // ignore any client calls made in init()
err, _ := controller.createLoadBalancerIfNeeded(item.service) err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{"foo", "bar"}, item.service, nil)
if !item.expectErr && err != nil { if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} else if item.expectErr && err == nil { } else if item.expectErr && err == nil {