Add a resync period for services in the service controller.

This should ensure all load balancers get deleted even if a reordering of
watch events causes us to strand one after its service has been deleted,
because the sync will notice that the service controller's cache has a
service in it that no longer exists in the apiserver.

It could still leak in the case that the controller manager is killed
between when it leaks something and the sync runs, but this should
improve things.
This commit is contained in:
Alex Robinson 2015-07-27 18:03:13 +00:00
parent d63e35247f
commit 60611c253e
4 changed files with 12 additions and 6 deletions

View File

@ -61,6 +61,7 @@ type CMServer struct {
CloudConfigFile string
ConcurrentEndpointSyncs int
ConcurrentRCSyncs int
ServiceSyncPeriod time.Duration
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
NamespaceSyncPeriod time.Duration
@ -92,6 +93,7 @@ func NewCMServer() *CMServer {
Address: util.IP(net.ParseIP("127.0.0.1")),
ConcurrentEndpointSyncs: 5,
ConcurrentRCSyncs: 5,
ServiceSyncPeriod: 5 * time.Minute,
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 5 * time.Minute,
@ -111,13 +113,14 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.IntVar(&s.DeletingPodsBurst, "deleting-pods-burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.IntVar(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+
@ -197,7 +200,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController.Run(s.NodeSyncPeriod)
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}

View File

@ -119,6 +119,7 @@ func runScheduler(cl *client.Client) {
// RunControllerManager starts a controller
func runControllerManager(cl *client.Client) {
const serviceSyncPeriod = 5 * time.Minute
const nodeSyncPeriod = 10 * time.Second
nodeController := nodecontroller.NewNodeController(
nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)),
@ -126,7 +127,7 @@ func runControllerManager(cl *client.Client) {
nodeController.Run(nodeSyncPeriod)
serviceController := servicecontroller.New(nil, cl, "kubernetes")
if err := serviceController.Run(nodeSyncPeriod); err != nil {
if err := serviceController.Run(serviceSyncPeriod, nodeSyncPeriod); err != nil {
glog.Warningf("Running without a service controller: %v", err)
}

View File

@ -128,7 +128,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController.Run(s.NodeSyncPeriod)
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}

View File

@ -100,12 +100,14 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
// Run starts a background goroutine that watches for changes to services that
// have (or had) externalLoadBalancers=true and ensures that they have external
// load balancers created and deleted appropriately.
// serviceSyncPeriod controls how often we check the cluster's services to
// ensure that the correct external load balancers exist.
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
// if external load balancers need to be updated to point to a new set.
//
// It's an error to call Run() more than once for a given ServiceController
// object.
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
func (s *ServiceController) Run(serviceSyncPeriod, nodeSyncPeriod time.Duration) error {
if err := s.init(); err != nil {
return err
}
@ -132,7 +134,7 @@ func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
s.cache,
)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
cache.NewReflector(lw, &api.Service{}, serviceQueue, serviceSyncPeriod).Run()
for i := 0; i < workerGoroutines; i++ {
go s.watchServices(serviceQueue)
}