Rate limit events in kubelet

1. Add EvnetRecordQps and EventBurst parameter in kubelet.
2. If  EvnetRecordQps and EventBurst was set, rate limit events in kubelet
with a independent ratelimiter as setted.
This commit is contained in:
jiangyaoguo 2015-09-09 16:57:21 +08:00
parent 45742e885c
commit 1460a1fb9e
4 changed files with 294 additions and 270 deletions

View File

@ -84,6 +84,8 @@ type KubeletServer struct {
HostNetworkSources string HostNetworkSources string
RegistryPullQPS float64 RegistryPullQPS float64
RegistryBurst int RegistryBurst int
EventRecordQPS float32
EventBurst int
RunOnce bool RunOnce bool
EnableDebuggingHandlers bool EnableDebuggingHandlers bool
MinimumGCAge time.Duration MinimumGCAge time.Duration
@ -220,6 +222,8 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.HostNetworkSources, "host-network-sources", s.HostNetworkSources, "Comma-separated list of sources from which the Kubelet allows pods to use of host network. For all sources use \"*\" [default=\"file\"]") fs.StringVar(&s.HostNetworkSources, "host-network-sources", s.HostNetworkSources, "Comma-separated list of sources from which the Kubelet allows pods to use of host network. For all sources use \"*\" [default=\"file\"]")
fs.Float64Var(&s.RegistryPullQPS, "registry-qps", s.RegistryPullQPS, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") fs.Float64Var(&s.RegistryPullQPS, "registry-qps", s.RegistryPullQPS, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
fs.IntVar(&s.RegistryBurst, "registry-burst", s.RegistryBurst, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry-qps. Only used if --registry-qps > 0") fs.IntVar(&s.RegistryBurst, "registry-burst", s.RegistryBurst, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry-qps. Only used if --registry-qps > 0")
fs.Float32Var(&s.EventRecordQPS, "event-qps", s.EventRecordQPS, "If > 0, limit event creations per second to this value. If 0, unlimited. [default=0.0]")
fs.IntVar(&s.EventBurst, "event-burst", s.EventBurst, "Maximum size of a bursty event records, temporarily allows event records to burst to this number, while still not exceeding event-qps. Only used if --event-qps > 0")
fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server") fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server")
fs.BoolVar(&s.EnableDebuggingHandlers, "enable-debugging-handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands") fs.BoolVar(&s.EnableDebuggingHandlers, "enable-debugging-handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands")
fs.DurationVar(&s.MinimumGCAge, "minimum-container-ttl-duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") fs.DurationVar(&s.MinimumGCAge, "minimum-container-ttl-duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
@ -327,6 +331,8 @@ func (s *KubeletServer) KubeletConfig() (*KubeletConfig, error) {
SyncFrequency: s.SyncFrequency, SyncFrequency: s.SyncFrequency,
RegistryPullQPS: s.RegistryPullQPS, RegistryPullQPS: s.RegistryPullQPS,
RegistryBurst: s.RegistryBurst, RegistryBurst: s.RegistryBurst,
EventRecordQPS: s.EventRecordQPS,
EventBurst: s.EventBurst,
MinimumGCAge: s.MinimumGCAge, MinimumGCAge: s.MinimumGCAge,
MaxPerPodContainerCount: s.MaxPerPodContainerCount, MaxPerPodContainerCount: s.MaxPerPodContainerCount,
MaxContainerCount: s.MaxContainerCount, MaxContainerCount: s.MaxContainerCount,
@ -646,7 +652,13 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
eventBroadcaster.StartLogging(glog.V(3).Infof) eventBroadcaster.StartLogging(glog.V(3).Infof)
if kcfg.KubeClient != nil { if kcfg.KubeClient != nil {
glog.V(4).Infof("Sending events to api server.") glog.V(4).Infof("Sending events to api server.")
if kcfg.EventRecordQPS == 0.0 {
eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events("")) eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events(""))
} else {
eventClient := *kcfg.KubeClient
eventClient.Throttle = util.NewTokenBucketRateLimiter(kcfg.EventRecordQPS, kcfg.EventBurst)
eventBroadcaster.StartRecordingToSink(eventClient.Events(""))
}
} else { } else {
glog.Warning("No api server defined - no events will be sent to API server.") glog.Warning("No api server defined - no events will be sent to API server.")
} }
@ -742,6 +754,8 @@ type KubeletConfig struct {
SyncFrequency time.Duration SyncFrequency time.Duration
RegistryPullQPS float64 RegistryPullQPS float64
RegistryBurst int RegistryBurst int
EventRecordQPS float32
EventBurst int
MinimumGCAge time.Duration MinimumGCAge time.Duration
MaxPerPodContainerCount int MaxPerPodContainerCount int
MaxContainerCount int MaxContainerCount int
@ -809,6 +823,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.SyncFrequency, kc.SyncFrequency,
float32(kc.RegistryPullQPS), float32(kc.RegistryPullQPS),
kc.RegistryBurst, kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy, gcPolicy,
pc.SeenAllSources, pc.SeenAllSources,
kc.RegisterNode, kc.RegisterNode,

View File

@ -301,6 +301,8 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kc.SyncFrequency, kc.SyncFrequency,
float32(kc.RegistryPullQPS), float32(kc.RegistryPullQPS),
kc.RegistryBurst, kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy, gcPolicy,
pc.SeenAllSources, pc.SeenAllSources,
kc.RegisterNode, kc.RegisterNode,

View File

@ -9,7 +9,9 @@ algorithm-provider
all-namespaces all-namespaces
allocate-node-cidrs allocate-node-cidrs
allow-privileged allow-privileged
api-burst
api-prefix api-prefix
api-rate
api-servers api-servers
api-token api-token
api-version api-version
@ -71,6 +73,8 @@ etcd-config
etcd-prefix etcd-prefix
etcd-server etcd-server
etcd-servers etcd-servers
event-burst
event-qps
event-ttl event-ttl
executor-bindall executor-bindall
executor-logv executor-logv

View File

@ -139,6 +139,8 @@ func NewMainKubelet(
resyncInterval time.Duration, resyncInterval time.Duration,
pullQPS float32, pullQPS float32,
pullBurst int, pullBurst int,
eventQPS float32,
eventBurst int,
containerGCPolicy ContainerGCPolicy, containerGCPolicy ContainerGCPolicy,
sourcesReady SourcesReadyFn, sourcesReady SourcesReadyFn,
registerNode bool, registerNode bool,