mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #11891 from a-robinson/lb
Add a resync period for services in the service controller
This commit is contained in:
commit
6c36f0dfa4
@ -61,6 +61,7 @@ type CMServer struct {
|
||||
CloudConfigFile string
|
||||
ConcurrentEndpointSyncs int
|
||||
ConcurrentRCSyncs int
|
||||
ServiceSyncPeriod time.Duration
|
||||
NodeSyncPeriod time.Duration
|
||||
ResourceQuotaSyncPeriod time.Duration
|
||||
NamespaceSyncPeriod time.Duration
|
||||
@ -92,6 +93,7 @@ func NewCMServer() *CMServer {
|
||||
Address: util.IP(net.ParseIP("127.0.0.1")),
|
||||
ConcurrentEndpointSyncs: 5,
|
||||
ConcurrentRCSyncs: 5,
|
||||
ServiceSyncPeriod: 5 * time.Minute,
|
||||
NodeSyncPeriod: 10 * time.Second,
|
||||
ResourceQuotaSyncPeriod: 10 * time.Second,
|
||||
NamespaceSyncPeriod: 5 * time.Minute,
|
||||
@ -111,13 +113,14 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
||||
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
|
||||
fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+
|
||||
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
||||
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
|
||||
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
|
||||
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
|
||||
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
|
||||
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
|
||||
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
|
||||
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
|
||||
fs.IntVar(&s.DeletingPodsBurst, "deleting-pods-burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
|
||||
fs.IntVar(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+
|
||||
@ -197,7 +200,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
nodeController.Run(s.NodeSyncPeriod)
|
||||
|
||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
||||
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
}
|
||||
|
||||
|
@ -119,6 +119,7 @@ func runScheduler(cl *client.Client) {
|
||||
|
||||
// RunControllerManager starts a controller
|
||||
func runControllerManager(cl *client.Client) {
|
||||
const serviceSyncPeriod = 5 * time.Minute
|
||||
const nodeSyncPeriod = 10 * time.Second
|
||||
nodeController := nodecontroller.NewNodeController(
|
||||
nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)),
|
||||
@ -126,7 +127,7 @@ func runControllerManager(cl *client.Client) {
|
||||
nodeController.Run(nodeSyncPeriod)
|
||||
|
||||
serviceController := servicecontroller.New(nil, cl, "kubernetes")
|
||||
if err := serviceController.Run(nodeSyncPeriod); err != nil {
|
||||
if err := serviceController.Run(serviceSyncPeriod, nodeSyncPeriod); err != nil {
|
||||
glog.Warningf("Running without a service controller: %v", err)
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
nodeController.Run(s.NodeSyncPeriod)
|
||||
|
||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
||||
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
}
|
||||
|
||||
|
@ -100,12 +100,14 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
|
||||
// Run starts a background goroutine that watches for changes to services that
|
||||
// have (or had) externalLoadBalancers=true and ensures that they have external
|
||||
// load balancers created and deleted appropriately.
|
||||
// serviceSyncPeriod controls how often we check the cluster's services to
|
||||
// ensure that the correct external load balancers exist.
|
||||
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
|
||||
// if external load balancers need to be updated to point to a new set.
|
||||
//
|
||||
// It's an error to call Run() more than once for a given ServiceController
|
||||
// object.
|
||||
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
||||
func (s *ServiceController) Run(serviceSyncPeriod, nodeSyncPeriod time.Duration) error {
|
||||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -132,7 +134,7 @@ func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
||||
s.cache,
|
||||
)
|
||||
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
|
||||
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
|
||||
cache.NewReflector(lw, &api.Service{}, serviceQueue, serviceSyncPeriod).Run()
|
||||
for i := 0; i < workerGoroutines; i++ {
|
||||
go s.watchServices(serviceQueue)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user