From a4542ae5285bdd220da7293a6877e97663490c02 Mon Sep 17 00:00:00 2001 From: staebler Date: Thu, 27 Jul 2017 16:41:19 -0400 Subject: [PATCH] Create an EventRateLimit admission control plug-in for the API Server. The EventRateLimit plug-in limits the number of events that the API Server will accept in a given time period. It allows for server-wide, per-namespace, per-user,and per-source+object rate limiting. --- cmd/kube-apiserver/app/options/BUILD | 1 + cmd/kube-apiserver/app/options/plugins.go | 2 + hack/.golint_failures | 2 + plugin/BUILD | 1 + plugin/pkg/admission/eventratelimit/BUILD | 74 +++ .../pkg/admission/eventratelimit/admission.go | 92 ++++ .../eventratelimit/admission_test.go | 502 ++++++++++++++++++ .../eventratelimit/apis/eventratelimit/BUILD | 43 ++ .../eventratelimit/apis/eventratelimit/OWNERS | 7 + .../eventratelimit/apis/eventratelimit/doc.go | 19 + .../apis/eventratelimit/install/BUILD | 34 ++ .../apis/eventratelimit/install/install.go | 43 ++ .../apis/eventratelimit/register.go | 51 ++ .../apis/eventratelimit/types.go | 85 +++ .../apis/eventratelimit/v1alpha1/BUILD | 42 ++ .../apis/eventratelimit/v1alpha1/defaults.go | 25 + .../apis/eventratelimit/v1alpha1/doc.go | 23 + .../apis/eventratelimit/v1alpha1/register.go | 50 ++ .../apis/eventratelimit/v1alpha1/types.go | 85 +++ .../v1alpha1/zz_generated.conversion.go | 89 ++++ .../v1alpha1/zz_generated.deepcopy.go | 95 ++++ .../v1alpha1/zz_generated.defaults.go | 37 ++ .../apis/eventratelimit/validation/BUILD | 40 ++ .../eventratelimit/validation/validation.go | 63 +++ .../validation/validation_test.go | 192 +++++++ .../eventratelimit/zz_generated.deepcopy.go | 95 ++++ plugin/pkg/admission/eventratelimit/cache.go | 57 ++ .../admission/eventratelimit/cache_test.go | 119 +++++ plugin/pkg/admission/eventratelimit/clock.go | 34 ++ plugin/pkg/admission/eventratelimit/config.go | 72 +++ plugin/pkg/admission/eventratelimit/doc.go | 18 + .../admission/eventratelimit/limitenforcer.go | 145 +++++ .../apimachinery/pkg/api/errors/errors.go | 18 + 33 files changed, 2255 insertions(+) create mode 100644 plugin/pkg/admission/eventratelimit/BUILD create mode 100644 plugin/pkg/admission/eventratelimit/admission.go create mode 100644 plugin/pkg/admission/eventratelimit/admission_test.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/BUILD create mode 100755 plugin/pkg/admission/eventratelimit/apis/eventratelimit/OWNERS create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/doc.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/BUILD create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/install.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/register.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/types.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/BUILD create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/defaults.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/doc.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/register.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/types.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.conversion.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.deepcopy.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.defaults.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/BUILD create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation_test.go create mode 100644 plugin/pkg/admission/eventratelimit/apis/eventratelimit/zz_generated.deepcopy.go create mode 100644 plugin/pkg/admission/eventratelimit/cache.go create mode 100644 plugin/pkg/admission/eventratelimit/cache_test.go create mode 100644 plugin/pkg/admission/eventratelimit/clock.go create mode 100644 plugin/pkg/admission/eventratelimit/config.go create mode 100644 plugin/pkg/admission/eventratelimit/doc.go create mode 100644 plugin/pkg/admission/eventratelimit/limitenforcer.go diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index a9fe44b77f6..9a36fd5b360 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -26,6 +26,7 @@ go_library( "//plugin/pkg/admission/antiaffinity:go_default_library", "//plugin/pkg/admission/defaulttolerationseconds:go_default_library", "//plugin/pkg/admission/deny:go_default_library", + "//plugin/pkg/admission/eventratelimit:go_default_library", "//plugin/pkg/admission/exec:go_default_library", "//plugin/pkg/admission/gc:go_default_library", "//plugin/pkg/admission/imagepolicy:go_default_library", diff --git a/cmd/kube-apiserver/app/options/plugins.go b/cmd/kube-apiserver/app/options/plugins.go index 7d0e8bb04a3..192fd275d38 100644 --- a/cmd/kube-apiserver/app/options/plugins.go +++ b/cmd/kube-apiserver/app/options/plugins.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/antiaffinity" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" "k8s.io/kubernetes/plugin/pkg/admission/deny" + "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit" "k8s.io/kubernetes/plugin/pkg/admission/exec" "k8s.io/kubernetes/plugin/pkg/admission/gc" "k8s.io/kubernetes/plugin/pkg/admission/imagepolicy" @@ -59,6 +60,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { antiaffinity.Register(plugins) defaulttolerationseconds.Register(plugins) deny.Register(plugins) + eventratelimit.Register(plugins) exec.Register(plugins) gc.Register(plugins) imagepolicy.Register(plugins) diff --git a/hack/.golint_failures b/hack/.golint_failures index 3098d5667b3..32b943d60ca 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -467,6 +467,8 @@ pkg/volume/util pkg/volume/vsphere_volume plugin/cmd/kube-scheduler/app plugin/pkg/admission/antiaffinity +plugin/pkg/admission/eventratelimit/apis/eventratelimit +plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1 plugin/pkg/admission/initialization plugin/pkg/admission/initialresources plugin/pkg/admission/limitranger diff --git a/plugin/BUILD b/plugin/BUILD index 216302fe787..0fa7c38696f 100644 --- a/plugin/BUILD +++ b/plugin/BUILD @@ -17,6 +17,7 @@ filegroup( "//plugin/pkg/admission/antiaffinity:all-srcs", "//plugin/pkg/admission/defaulttolerationseconds:all-srcs", "//plugin/pkg/admission/deny:all-srcs", + "//plugin/pkg/admission/eventratelimit:all-srcs", "//plugin/pkg/admission/exec:all-srcs", "//plugin/pkg/admission/gc:all-srcs", "//plugin/pkg/admission/imagepolicy:all-srcs", diff --git a/plugin/pkg/admission/eventratelimit/BUILD b/plugin/pkg/admission/eventratelimit/BUILD new file mode 100644 index 00000000000..e2b9f31d81a --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/BUILD @@ -0,0 +1,74 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "admission_test.go", + "cache_test.go", + ], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "admission.go", + "cache.go", + "clock.go", + "config.go", + "doc.go", + "limitenforcer.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/install:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/eventratelimit/admission.go b/plugin/pkg/admission/eventratelimit/admission.go new file mode 100644 index 00000000000..4318d66c688 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/admission.go @@ -0,0 +1,92 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "io" + + "k8s.io/apiserver/pkg/admission" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/kubernetes/pkg/api" + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" + "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation" +) + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register("EventRateLimit", + func(config io.Reader) (admission.Interface, error) { + // load the configuration provided (if any) + configuration, err := LoadConfiguration(config) + if err != nil { + return nil, err + } + // validate the configuration (if any) + if configuration != nil { + if errs := validation.ValidateConfiguration(configuration); len(errs) != 0 { + return nil, errs.ToAggregate() + } + } + return newEventRateLimit(configuration, realClock{}) + }) +} + +// eventRateLimitAdmission implements an admission controller that can enforce event rate limits +type eventRateLimitAdmission struct { + *admission.Handler + // limitEnforcers is the collection of limit enforcers. There is one limit enforcer for each + // active limit type. As there are 4 limit types, the length of the array will be at most 4. + // The array is read-only after construction. + limitEnforcers []*limitEnforcer +} + +// newEventRateLimit configures an admission controller that can enforce event rate limits +func newEventRateLimit(config *eventratelimitapi.Configuration, clock flowcontrol.Clock) (admission.Interface, error) { + limitEnforcers := make([]*limitEnforcer, 0, len(config.Limits)) + for _, limitConfig := range config.Limits { + enforcer, err := newLimitEnforcer(limitConfig, clock) + if err != nil { + return nil, err + } + limitEnforcers = append(limitEnforcers, enforcer) + } + + eventRateLimitAdmission := &eventRateLimitAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + limitEnforcers: limitEnforcers, + } + + return eventRateLimitAdmission, nil +} + +// Admit makes admission decisions while enforcing event rate limits +func (a *eventRateLimitAdmission) Admit(attr admission.Attributes) (err error) { + // ignore all operations that do not correspond to an Event kind + if attr.GetKind().GroupKind() != api.Kind("Event") { + return nil + } + + var rejectionError error + // give each limit enforcer a chance to reject the event + for _, enforcer := range a.limitEnforcers { + if err := enforcer.accept(attr); err != nil { + rejectionError = err + } + } + + return rejectionError +} diff --git a/plugin/pkg/admission/eventratelimit/admission_test.go b/plugin/pkg/admission/eventratelimit/admission_test.go new file mode 100644 index 00000000000..579ab36fb1c --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/admission_test.go @@ -0,0 +1,502 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "testing" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/kubernetes/pkg/api" + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" +) + +const ( + qps = 1 + eventKind = "Event" + nonEventKind = "NonEvent" +) + +// attributesForRequest generates the admission.Attributes that for the specified request +func attributesForRequest(rq request) admission.Attributes { + return admission.NewAttributesRecord( + rq.event, + nil, + api.Kind(rq.kind).WithVersion("version"), + rq.namespace, + "name", + api.Resource("resource").WithVersion("version"), + "", + admission.Create, + &user.DefaultInfo{Name: rq.username}) +} + +type request struct { + kind string + namespace string + username string + event *api.Event + delay time.Duration + accepted bool +} + +func newRequest(kind string) request { + return request{ + kind: kind, + accepted: true, + } +} + +func newEventRequest() request { + return newRequest(eventKind) +} + +func newNonEventRequest() request { + return newRequest(nonEventKind) +} + +func (r request) withNamespace(namespace string) request { + r.namespace = namespace + return r +} + +func (r request) withEvent(event *api.Event) request { + r.event = event + return r +} + +func (r request) withEventComponent(component string) request { + return r.withEvent(&api.Event{ + Source: api.EventSource{ + Component: component, + }, + }) +} + +func (r request) withUser(name string) request { + r.username = name + return r +} + +func (r request) blocked() request { + r.accepted = false + return r +} + +// withDelay will adjust the clock to simulate the specified delay, in seconds +func (r request) withDelay(delayInSeconds int) request { + r.delay = time.Duration(delayInSeconds) * time.Second + return r +} + +// createSourceAndObjectKeyInclusionRequests creates a series of requests that can be used +// to test that a particular part of the event is included in the source+object key +func createSourceAndObjectKeyInclusionRequests(eventFactory func(label string) *api.Event) []request { + return []request{ + newEventRequest().withEvent(eventFactory("A")), + newEventRequest().withEvent(eventFactory("A")).blocked(), + newEventRequest().withEvent(eventFactory("B")), + } +} + +func TestEventRateLimiting(t *testing.T) { + cases := []struct { + name string + serverBurst int32 + namespaceBurst int32 + namespaceCacheSize int32 + sourceAndObjectBurst int32 + sourceAndObjectCacheSize int32 + userBurst int32 + userCacheSize int32 + requests []request + }{ + { + name: "event not blocked when tokens available", + serverBurst: 3, + requests: []request{ + newEventRequest(), + }, + }, + { + name: "non-event not blocked", + serverBurst: 3, + requests: []request{ + newNonEventRequest(), + }, + }, + { + name: "event blocked after tokens exhausted", + serverBurst: 3, + requests: []request{ + newEventRequest(), + newEventRequest(), + newEventRequest(), + newEventRequest().blocked(), + }, + }, + { + name: "non-event not blocked after tokens exhausted", + serverBurst: 3, + requests: []request{ + newEventRequest(), + newEventRequest(), + newEventRequest(), + newNonEventRequest(), + }, + }, + { + name: "non-events should not count against limit", + serverBurst: 3, + requests: []request{ + newEventRequest(), + newEventRequest(), + newNonEventRequest(), + newEventRequest(), + }, + }, + { + name: "event accepted after token refill", + serverBurst: 3, + requests: []request{ + newEventRequest(), + newEventRequest(), + newEventRequest(), + newEventRequest().blocked(), + newEventRequest().withDelay(1), + }, + }, + { + name: "event blocked by namespace limits", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 10, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A").blocked(), + }, + }, + { + name: "event from other namespace not blocked", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 10, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("B"), + }, + }, + { + name: "events from other namespaces should not count against limit", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 10, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("B"), + newEventRequest().withNamespace("A"), + }, + }, + { + name: "event accepted after namespace token refill", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 10, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A").blocked(), + newEventRequest().withNamespace("A").withDelay(1), + }, + }, + { + name: "event from other namespaces should not clear namespace limits", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 10, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("B"), + newEventRequest().withNamespace("A").blocked(), + }, + }, + { + name: "namespace limits from lru namespace should clear when cache size exceeded", + serverBurst: 100, + namespaceBurst: 3, + namespaceCacheSize: 2, + requests: []request{ + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("B"), + newEventRequest().withNamespace("B"), + newEventRequest().withNamespace("B"), + newEventRequest().withNamespace("A"), + newEventRequest().withNamespace("B").blocked(), + newEventRequest().withNamespace("A").blocked(), + // This should clear out namespace B from the lru cache + newEventRequest().withNamespace("C"), + newEventRequest().withNamespace("A").blocked(), + newEventRequest().withNamespace("B"), + }, + }, + { + name: "event blocked by source+object limits", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 10, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A").blocked(), + }, + }, + { + name: "event from other source+object not blocked", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 10, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("B"), + }, + }, + { + name: "events from other source+object should not count against limit", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 10, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("B"), + newEventRequest().withEventComponent("A"), + }, + }, + { + name: "event accepted after source+object token refill", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 10, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A").blocked(), + newEventRequest().withEventComponent("A").withDelay(1), + }, + }, + { + name: "event from other source+object should not clear source+object limits", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 10, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("B"), + newEventRequest().withEventComponent("A").blocked(), + }, + }, + { + name: "source+object limits from lru source+object should clear when cache size exceeded", + serverBurst: 100, + sourceAndObjectBurst: 3, + sourceAndObjectCacheSize: 2, + requests: []request{ + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("B"), + newEventRequest().withEventComponent("B"), + newEventRequest().withEventComponent("B"), + newEventRequest().withEventComponent("A"), + newEventRequest().withEventComponent("B").blocked(), + newEventRequest().withEventComponent("A").blocked(), + // This should clear out component B from the lru cache + newEventRequest().withEventComponent("C"), + newEventRequest().withEventComponent("A").blocked(), + newEventRequest().withEventComponent("B"), + }, + }, + { + name: "source host should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{Source: api.EventSource{Host: label}} + }), + }, + { + name: "involved object kind should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{InvolvedObject: api.ObjectReference{Kind: label}} + }), + }, + { + name: "involved object namespace should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{InvolvedObject: api.ObjectReference{Namespace: label}} + }), + }, + { + name: "involved object name should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{InvolvedObject: api.ObjectReference{Name: label}} + }), + }, + { + name: "involved object UID should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{InvolvedObject: api.ObjectReference{UID: types.UID(label)}} + }), + }, + { + name: "involved object APIVersion should be included in source+object key", + serverBurst: 100, + sourceAndObjectBurst: 1, + sourceAndObjectCacheSize: 10, + requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event { + return &api.Event{InvolvedObject: api.ObjectReference{APIVersion: label}} + }), + }, + { + name: "event blocked by user limits", + userBurst: 3, + userCacheSize: 10, + requests: []request{ + newEventRequest().withUser("A"), + newEventRequest().withUser("A"), + newEventRequest().withUser("A"), + newEventRequest().withUser("A").blocked(), + }, + }, + { + name: "event from other user not blocked", + requests: []request{ + newEventRequest().withUser("A"), + newEventRequest().withUser("A"), + newEventRequest().withUser("A"), + newEventRequest().withUser("B"), + }, + }, + { + name: "events from other user should not count against limit", + requests: []request{ + newEventRequest().withUser("A"), + newEventRequest().withUser("A"), + newEventRequest().withUser("B"), + newEventRequest().withUser("A"), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + clock := clock.NewFakeClock(time.Now()) + config := &eventratelimitapi.Configuration{} + if tc.serverBurst > 0 { + serverLimit := eventratelimitapi.Limit{ + Type: eventratelimitapi.ServerLimitType, + QPS: qps, + Burst: tc.serverBurst, + } + config.Limits = append(config.Limits, serverLimit) + } + if tc.namespaceBurst > 0 { + namespaceLimit := eventratelimitapi.Limit{ + Type: eventratelimitapi.NamespaceLimitType, + Burst: tc.namespaceBurst, + QPS: qps, + CacheSize: tc.namespaceCacheSize, + } + config.Limits = append(config.Limits, namespaceLimit) + } + if tc.userBurst > 0 { + userLimit := eventratelimitapi.Limit{ + Type: eventratelimitapi.UserLimitType, + Burst: tc.userBurst, + QPS: qps, + CacheSize: tc.userCacheSize, + } + config.Limits = append(config.Limits, userLimit) + } + if tc.sourceAndObjectBurst > 0 { + sourceAndObjectLimit := eventratelimitapi.Limit{ + Type: eventratelimitapi.SourceAndObjectLimitType, + Burst: tc.sourceAndObjectBurst, + QPS: qps, + CacheSize: tc.sourceAndObjectCacheSize, + } + config.Limits = append(config.Limits, sourceAndObjectLimit) + } + eventratelimit, err := newEventRateLimit(config, clock) + if err != nil { + t.Fatalf("%v: Could not create EventRateLimit: %v", tc.name, err) + } + + for rqIndex, rq := range tc.requests { + if rq.delay > 0 { + clock.Step(rq.delay) + } + attributes := attributesForRequest(rq) + err = eventratelimit.Admit(attributes) + if rq.accepted != (err == nil) { + expectedAction := "admitted" + if !rq.accepted { + expectedAction = "blocked" + } + t.Fatalf("%v: Request %v should have been %v: %v", tc.name, rqIndex, expectedAction, err) + } + if err != nil { + statusErr, ok := err.(*errors.StatusError) + if ok && statusErr.ErrStatus.Code != errors.StatusTooManyRequests { + t.Fatalf("%v: Request %v should yield a 429 response: %v", tc.name, rqIndex, err) + } + } + } + }) + } +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/BUILD b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/BUILD new file mode 100644 index 00000000000..5c30dafeae5 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/BUILD @@ -0,0 +1,43 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "register.go", + "types.go", + "zz_generated.deepcopy.go", + ], + tags = ["automanaged"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/install:all-srcs", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1:all-srcs", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/OWNERS b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/OWNERS new file mode 100755 index 00000000000..6c48a1a83ef --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/OWNERS @@ -0,0 +1,7 @@ +reviewers: +- deads2k +- derekwaynecarr +approvers: +- deads2k +- derekwaynecarr +- smarterclayton \ No newline at end of file diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/doc.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/doc.go new file mode 100644 index 00000000000..c70a60a56fd --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package,register + +package eventratelimit // import "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/BUILD b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/BUILD new file mode 100644 index 00000000000..a5d67e1170b --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/BUILD @@ -0,0 +1,34 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["install.go"], + tags = ["automanaged"], + deps = [ + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library", + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/install.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/install.go new file mode 100644 index 00000000000..805cb579ffc --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install/install.go @@ -0,0 +1,43 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package install installs the experimental API group, making it available as +// an option to all of the API encoding/decoding machinery. +package install + +import ( + "k8s.io/apimachinery/pkg/apimachinery/announced" + "k8s.io/apimachinery/pkg/apimachinery/registered" + "k8s.io/apimachinery/pkg/runtime" + internalapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" + versionedapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1" +) + +// Install registers the API group and adds types to a scheme +func Install(groupFactoryRegistry announced.APIGroupFactoryRegistry, registry *registered.APIRegistrationManager, scheme *runtime.Scheme) { + if err := announced.NewGroupMetaFactory( + &announced.GroupMetaFactoryArgs{ + GroupName: internalapi.GroupName, + VersionPreferenceOrder: []string{versionedapi.SchemeGroupVersion.Version}, + AddInternalObjectsToScheme: internalapi.AddToScheme, + }, + announced.VersionToSchemeFunc{ + versionedapi.SchemeGroupVersion.Version: versionedapi.AddToScheme, + }, + ).Announce(groupFactoryRegistry).RegisterAndEnable(registry, scheme); err != nil { + panic(err) + } +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/register.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/register.go new file mode 100644 index 00000000000..d763c55cb24 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/register.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +// GroupName is the group name use in this package +const GroupName = "eventratelimit.admission.k8s.io" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} + +// Kind takes an unqualified kind and returns a Group qualified GroupKind +func Kind(kind string) schema.GroupKind { + return SchemeGroupVersion.WithKind(kind).GroupKind() +} + +// Resource takes an unqualified resource and returns a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +func addKnownTypes(scheme *runtime.Scheme) error { + // TODO this will get cleaned up with the scheme types are fixed + scheme.AddKnownTypes(SchemeGroupVersion, + &Configuration{}, + ) + return nil +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/types.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/types.go new file mode 100644 index 00000000000..397027c2493 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/types.go @@ -0,0 +1,85 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// LimitType is the type of the limit (e.g., per-namespace) +type LimitType string + +const ( + // ServerLimitType is a type of limit where there is one bucket shared by + // all of the event queries received by the API Server. + ServerLimitType LimitType = "Server" + // NamespaceLimitType is a type of limit where there is one bucket used by + // each namespace + NamespaceLimitType LimitType = "Namespace" + // UserLimitType is a type of limit where there is one bucket used by each + // user + UserLimitType LimitType = "User" + // SourceAndObjectLimitType is a type of limit where there is one bucket used + // by each combination of source and involved object of the event. + SourceAndObjectLimitType LimitType = "SourceAndObject" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// Configuration provides configuration for the EventRateLimit admission +// controller. +type Configuration struct { + metav1.TypeMeta `json:",inline"` + + // limits are the limits to place on event queries received. + // Limits can be placed on events received server-wide, per namespace, + // per user, and per source+object. + // At least one limit is required. + Limits []Limit `json:"limits"` +} + +// Limit is the configuration for a particular limit type +type Limit struct { + // type is the type of limit to which this configuration applies + Type LimitType `json:"type"` + + // qps is the number of event queries per second that are allowed for this + // type of limit. The qps and burst fields are used together to determine if + // a particular event query is accepted. The qps determines how many queries + // are accepted once the burst amount of queries has been exhausted. + QPS int32 `json:"qps"` + + // burst is the burst number of event queries that are allowed for this type + // of limit. The qps and burst fields are used together to determine if a + // particular event query is accepted. The burst determines the maximum size + // of the allowance granted for a particular bucket. For example, if the burst + // is 10 and the qps is 3, then the admission control will accept 10 queries + // before blocking any queries. Every second, 3 more queries will be allowed. + // If some of that allowance is not used, then it will roll over to the next + // second, until the maximum allowance of 10 is reached. + Burst int32 `json:"burst"` + + // cacheSize is the size of the LRU cache for this type of limit. If a bucket + // is evicted from the cache, then the allowance for that bucket is reset. If + // more queries are later received for an evicted bucket, then that bucket + // will re-enter the cache with a clean slate, giving that bucket a full + // allowance of burst queries. + // + // The default cache size is 4096. + // + // If limitType is 'server', then cacheSize is ignored. + // +optional + CacheSize int32 `json:"cacheSize,omitempty"` +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/BUILD b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/BUILD new file mode 100644 index 00000000000..ec318571dd8 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/BUILD @@ -0,0 +1,42 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "defaults.go", + "doc.go", + "register.go", + "types.go", + "zz_generated.conversion.go", + "zz_generated.deepcopy.go", + "zz_generated.defaults.go", + ], + tags = ["automanaged"], + deps = [ + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/defaults.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/defaults.go new file mode 100644 index 00000000000..ebade2de231 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/defaults.go @@ -0,0 +1,25 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import kruntime "k8s.io/apimachinery/pkg/runtime" + +func addDefaultingFuncs(scheme *kruntime.Scheme) error { + return RegisterDefaults(scheme) +} + +func SetDefaults_Configuration(obj *Configuration) {} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/doc.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/doc.go new file mode 100644 index 00000000000..483c858deec --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package,register +// +k8s:conversion-gen=k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit +// +k8s:defaulter-gen=TypeMeta + +// Package v1alpha1 is the v1alpha1 version of the API. +// +groupName=eventratelimit.admission.k8s.io +package v1alpha1 // import "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1" diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/register.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/register.go new file mode 100644 index 00000000000..74c039344d3 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/register.go @@ -0,0 +1,50 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GroupName is the group name use in this package +const GroupName = "eventratelimit.admission.k8s.io" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"} + +var ( + // TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api. + // localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes. + SchemeBuilder runtime.SchemeBuilder + localSchemeBuilder = &SchemeBuilder + AddToScheme = localSchemeBuilder.AddToScheme +) + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes, addDefaultingFuncs) +} + +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &Configuration{}, + ) + return nil +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/types.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/types.go new file mode 100644 index 00000000000..2f9d482c4e5 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/types.go @@ -0,0 +1,85 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// LimitType is the type of the limit (e.g., per-namespace) +type LimitType string + +const ( + // ServerLimitType is a type of limit where there is one bucket shared by + // all of the event queries received by the API Server. + ServerLimitType LimitType = "Server" + // NamespaceLimitType is a type of limit where there is one bucket used by + // each namespace + NamespaceLimitType LimitType = "Namespace" + // UserLimitType is a type of limit where there is one bucket used by each + // user + UserLimitType LimitType = "User" + // SourceAndObjectLimitType is a type of limit where there is one bucket used + // by each combination of source and involved object of the event. + SourceAndObjectLimitType LimitType = "SourceAndObject" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// Configuration provides configuration for the EventRateLimit admission +// controller. +type Configuration struct { + metav1.TypeMeta `json:",inline"` + + // limits are the limits to place on event queries received. + // Limits can be placed on events received server-wide, per namespace, + // per user, and per source+object. + // At least one limit is required. + Limits []Limit `json:"limits"` +} + +// Limit is the configuration for a particular limit type +type Limit struct { + // type is the type of limit to which this configuration applies + Type LimitType `json:"type"` + + // qps is the number of event queries per second that are allowed for this + // type of limit. The qps and burst fields are used together to determine if + // a particular event query is accepted. The qps determines how many queries + // are accepted once the burst amount of queries has been exhausted. + QPS int32 `json:"qps"` + + // burst is the burst number of event queries that are allowed for this type + // of limit. The qps and burst fields are used together to determine if a + // particular event query is accepted. The burst determines the maximum size + // of the allowance granted for a particular bucket. For example, if the burst + // is 10 and the qps is 3, then the admission control will accept 10 queries + // before blocking any queries. Every second, 3 more queries will be allowed. + // If some of that allowance is not used, then it will roll over to the next + // second, until the maximum allowance of 10 is reached. + Burst int32 `json:"burst"` + + // cacheSize is the size of the LRU cache for this type of limit. If a bucket + // is evicted from the cache, then the allowance for that bucket is reset. If + // more queries are later received for an evicted bucket, then that bucket + // will re-enter the cache with a clean slate, giving that bucket a full + // allowance of burst queries. + // + // The default cache size is 4096. + // + // If limitType is 'server', then cacheSize is ignored. + // +optional + CacheSize int32 `json:"cacheSize,omitempty"` +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.conversion.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.conversion.go new file mode 100644 index 00000000000..b8da46acb6d --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.conversion.go @@ -0,0 +1,89 @@ +// +build !ignore_autogenerated + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was autogenerated by conversion-gen. Do not edit it manually! + +package v1alpha1 + +import ( + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" + eventratelimit "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" + unsafe "unsafe" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(scheme *runtime.Scheme) error { + return scheme.AddGeneratedConversionFuncs( + Convert_v1alpha1_Configuration_To_eventratelimit_Configuration, + Convert_eventratelimit_Configuration_To_v1alpha1_Configuration, + Convert_v1alpha1_Limit_To_eventratelimit_Limit, + Convert_eventratelimit_Limit_To_v1alpha1_Limit, + ) +} + +func autoConvert_v1alpha1_Configuration_To_eventratelimit_Configuration(in *Configuration, out *eventratelimit.Configuration, s conversion.Scope) error { + out.Limits = *(*[]eventratelimit.Limit)(unsafe.Pointer(&in.Limits)) + return nil +} + +// Convert_v1alpha1_Configuration_To_eventratelimit_Configuration is an autogenerated conversion function. +func Convert_v1alpha1_Configuration_To_eventratelimit_Configuration(in *Configuration, out *eventratelimit.Configuration, s conversion.Scope) error { + return autoConvert_v1alpha1_Configuration_To_eventratelimit_Configuration(in, out, s) +} + +func autoConvert_eventratelimit_Configuration_To_v1alpha1_Configuration(in *eventratelimit.Configuration, out *Configuration, s conversion.Scope) error { + out.Limits = *(*[]Limit)(unsafe.Pointer(&in.Limits)) + return nil +} + +// Convert_eventratelimit_Configuration_To_v1alpha1_Configuration is an autogenerated conversion function. +func Convert_eventratelimit_Configuration_To_v1alpha1_Configuration(in *eventratelimit.Configuration, out *Configuration, s conversion.Scope) error { + return autoConvert_eventratelimit_Configuration_To_v1alpha1_Configuration(in, out, s) +} + +func autoConvert_v1alpha1_Limit_To_eventratelimit_Limit(in *Limit, out *eventratelimit.Limit, s conversion.Scope) error { + out.Type = eventratelimit.LimitType(in.Type) + out.QPS = in.QPS + out.Burst = in.Burst + out.CacheSize = in.CacheSize + return nil +} + +// Convert_v1alpha1_Limit_To_eventratelimit_Limit is an autogenerated conversion function. +func Convert_v1alpha1_Limit_To_eventratelimit_Limit(in *Limit, out *eventratelimit.Limit, s conversion.Scope) error { + return autoConvert_v1alpha1_Limit_To_eventratelimit_Limit(in, out, s) +} + +func autoConvert_eventratelimit_Limit_To_v1alpha1_Limit(in *eventratelimit.Limit, out *Limit, s conversion.Scope) error { + out.Type = LimitType(in.Type) + out.QPS = in.QPS + out.Burst = in.Burst + out.CacheSize = in.CacheSize + return nil +} + +// Convert_eventratelimit_Limit_To_v1alpha1_Limit is an autogenerated conversion function. +func Convert_eventratelimit_Limit_To_v1alpha1_Limit(in *eventratelimit.Limit, out *Limit, s conversion.Scope) error { + return autoConvert_eventratelimit_Limit_To_v1alpha1_Limit(in, out, s) +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.deepcopy.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..49fc8779134 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,95 @@ +// +build !ignore_autogenerated + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was autogenerated by deepcopy-gen. Do not edit it manually! + +package v1alpha1 + +import ( + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" + reflect "reflect" +) + +func init() { + SchemeBuilder.Register(RegisterDeepCopies) +} + +// RegisterDeepCopies adds deep-copy functions to the given scheme. Public +// to allow building arbitrary schemes. +// +// Deprecated: deepcopy registration will go away when static deepcopy is fully implemented. +func RegisterDeepCopies(scheme *runtime.Scheme) error { + return scheme.AddGeneratedDeepCopyFuncs( + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*Configuration).DeepCopyInto(out.(*Configuration)) + return nil + }, InType: reflect.TypeOf(&Configuration{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*Limit).DeepCopyInto(out.(*Limit)) + return nil + }, InType: reflect.TypeOf(&Limit{})}, + ) +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Configuration) DeepCopyInto(out *Configuration) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Limits != nil { + in, out := &in.Limits, &out.Limits + *out = make([]Limit, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration. +func (in *Configuration) DeepCopy() *Configuration { + if in == nil { + return nil + } + out := new(Configuration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Configuration) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } else { + return nil + } +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Limit) DeepCopyInto(out *Limit) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Limit. +func (in *Limit) DeepCopy() *Limit { + if in == nil { + return nil + } + out := new(Limit) + in.DeepCopyInto(out) + return out +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.defaults.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.defaults.go new file mode 100644 index 00000000000..53f9cb92ef7 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1/zz_generated.defaults.go @@ -0,0 +1,37 @@ +// +build !ignore_autogenerated + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was autogenerated by defaulter-gen. Do not edit it manually! + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + scheme.AddTypeDefaultingFunc(&Configuration{}, func(obj interface{}) { SetObjectDefaults_Configuration(obj.(*Configuration)) }) + return nil +} + +func SetObjectDefaults_Configuration(in *Configuration) { + SetDefaults_Configuration(in) +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/BUILD b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/BUILD new file mode 100644 index 00000000000..551eb5f51d5 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["validation.go"], + tags = ["automanaged"], + deps = [ + "//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) + +go_test( + name = "go_default_test", + srcs = ["validation_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//plugin/pkg/admission/eventratelimit/apis/eventratelimit:go_default_library"], +) diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation.go new file mode 100644 index 00000000000..f09acfd1322 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation.go @@ -0,0 +1,63 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validation + +import ( + "k8s.io/apimachinery/pkg/util/validation/field" + + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" +) + +var limitTypes = map[eventratelimitapi.LimitType]bool{ + eventratelimitapi.ServerLimitType: true, + eventratelimitapi.NamespaceLimitType: true, + eventratelimitapi.UserLimitType: true, + eventratelimitapi.SourceAndObjectLimitType: true, +} + +// ValidateConfiguration validates the configuration. +func ValidateConfiguration(config *eventratelimitapi.Configuration) field.ErrorList { + allErrs := field.ErrorList{} + limitsPath := field.NewPath("limits") + if len(config.Limits) == 0 { + allErrs = append(allErrs, field.Invalid(limitsPath, config.Limits, "must not be empty")) + } + for i, limit := range config.Limits { + idxPath := limitsPath.Index(i) + if !limitTypes[limit.Type] { + allowedValues := make([]string, len(limitTypes)) + i := 0 + for limitType := range limitTypes { + allowedValues[i] = string(limitType) + i++ + } + allErrs = append(allErrs, field.NotSupported(idxPath.Child("type"), limit.Type, allowedValues)) + } + if limit.Burst <= 0 { + allErrs = append(allErrs, field.Invalid(idxPath.Child("burst"), limit.Burst, "must be positive")) + } + if limit.QPS <= 0 { + allErrs = append(allErrs, field.Invalid(idxPath.Child("qps"), limit.QPS, "must be positive")) + } + if limit.Type != eventratelimitapi.ServerLimitType { + if limit.CacheSize < 0 { + allErrs = append(allErrs, field.Invalid(idxPath.Child("cacheSize"), limit.CacheSize, "must not be negative")) + } + } + } + return allErrs +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation_test.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation_test.go new file mode 100644 index 00000000000..03415d0449d --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/validation/validation_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validation + +import ( + "testing" + + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" +) + +func TestValidateConfiguration(t *testing.T) { + cases := []struct { + name string + config eventratelimitapi.Configuration + expectedResult bool + }{ + { + name: "valid server", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Server", + Burst: 5, + QPS: 1, + }, + }, + }, + expectedResult: true, + }, + { + name: "valid namespace", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Namespace", + Burst: 10, + QPS: 2, + CacheSize: 100, + }, + }, + }, + expectedResult: true, + }, + { + name: "valid user", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "User", + Burst: 10, + QPS: 2, + CacheSize: 100, + }, + }, + }, + expectedResult: true, + }, + { + name: "valid source+object", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "SourceAndObject", + Burst: 5, + QPS: 1, + CacheSize: 1000, + }, + }, + }, + expectedResult: true, + }, + { + name: "valid multiple", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Server", + Burst: 5, + QPS: 1, + }, + { + Type: "Namespace", + Burst: 10, + QPS: 2, + CacheSize: 100, + }, + { + Type: "SourceAndObject", + Burst: 25, + QPS: 10, + CacheSize: 1000, + }, + }, + }, + expectedResult: true, + }, + { + name: "missing limits", + config: eventratelimitapi.Configuration{}, + expectedResult: false, + }, + { + name: "missing type", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Burst: 25, + QPS: 10, + CacheSize: 1000, + }, + }, + }, + expectedResult: false, + }, + { + name: "invalid type", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "unknown-type", + Burst: 25, + QPS: 10, + CacheSize: 1000, + }, + }, + }, + expectedResult: false, + }, + { + name: "missing burst", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Server", + QPS: 1, + }, + }, + }, + expectedResult: false, + }, + { + name: "missing qps", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Server", + Burst: 5, + }, + }, + }, + expectedResult: false, + }, + { + name: "negative cache size", + config: eventratelimitapi.Configuration{ + Limits: []eventratelimitapi.Limit{ + { + Type: "Namespace", + Burst: 10, + QPS: 2, + CacheSize: -1, + }, + }, + }, + expectedResult: false, + }, + } + for _, tc := range cases { + errs := ValidateConfiguration(&tc.config) + if e, a := tc.expectedResult, len(errs) == 0; e != a { + if e { + t.Errorf("%v: expected success: %v", tc.name, errs) + } else { + t.Errorf("%v: expected failure", tc.name) + } + } + } +} diff --git a/plugin/pkg/admission/eventratelimit/apis/eventratelimit/zz_generated.deepcopy.go b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/zz_generated.deepcopy.go new file mode 100644 index 00000000000..a1bd7475410 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/apis/eventratelimit/zz_generated.deepcopy.go @@ -0,0 +1,95 @@ +// +build !ignore_autogenerated + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was autogenerated by deepcopy-gen. Do not edit it manually! + +package eventratelimit + +import ( + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" + reflect "reflect" +) + +func init() { + SchemeBuilder.Register(RegisterDeepCopies) +} + +// RegisterDeepCopies adds deep-copy functions to the given scheme. Public +// to allow building arbitrary schemes. +// +// Deprecated: deepcopy registration will go away when static deepcopy is fully implemented. +func RegisterDeepCopies(scheme *runtime.Scheme) error { + return scheme.AddGeneratedDeepCopyFuncs( + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*Configuration).DeepCopyInto(out.(*Configuration)) + return nil + }, InType: reflect.TypeOf(&Configuration{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*Limit).DeepCopyInto(out.(*Limit)) + return nil + }, InType: reflect.TypeOf(&Limit{})}, + ) +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Configuration) DeepCopyInto(out *Configuration) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Limits != nil { + in, out := &in.Limits, &out.Limits + *out = make([]Limit, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration. +func (in *Configuration) DeepCopy() *Configuration { + if in == nil { + return nil + } + out := new(Configuration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Configuration) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } else { + return nil + } +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Limit) DeepCopyInto(out *Limit) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Limit. +func (in *Limit) DeepCopy() *Limit { + if in == nil { + return nil + } + out := new(Limit) + in.DeepCopyInto(out) + return out +} diff --git a/plugin/pkg/admission/eventratelimit/cache.go b/plugin/pkg/admission/eventratelimit/cache.go new file mode 100644 index 00000000000..7e1016372e8 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/cache.go @@ -0,0 +1,57 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "github.com/hashicorp/golang-lru" + + "k8s.io/client-go/util/flowcontrol" +) + +// cache is an interface for caching the limits of a particular type +type cache interface { + // get the rate limiter associated with the specified key + get(key interface{}) flowcontrol.RateLimiter +} + +// singleCache is a cache that only stores a single, constant item +type singleCache struct { + // the single rate limiter held by the cache + rateLimiter flowcontrol.RateLimiter +} + +func (c *singleCache) get(key interface{}) flowcontrol.RateLimiter { + return c.rateLimiter +} + +// lruCache is a least-recently-used cache +type lruCache struct { + // factory to use to create new rate limiters + rateLimiterFactory func() flowcontrol.RateLimiter + // the actual LRU cache + cache *lru.Cache +} + +func (c *lruCache) get(key interface{}) flowcontrol.RateLimiter { + value, found := c.cache.Get(key) + if !found { + rateLimter := c.rateLimiterFactory() + c.cache.Add(key, rateLimter) + return rateLimter + } + return value.(flowcontrol.RateLimiter) +} diff --git a/plugin/pkg/admission/eventratelimit/cache_test.go b/plugin/pkg/admission/eventratelimit/cache_test.go new file mode 100644 index 00000000000..bced5f7d346 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/cache_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "testing" + + "github.com/hashicorp/golang-lru" + + "k8s.io/client-go/util/flowcontrol" +) + +func TestSingleCache(t *testing.T) { + rateLimiter := flowcontrol.NewTokenBucketRateLimiter(1., 1) + cache := singleCache{ + rateLimiter: rateLimiter, + } + cases := []interface{}{nil, "key1", "key2"} + for _, tc := range cases { + actual := cache.get(tc) + if e, a := rateLimiter, actual; e != a { + t.Errorf("unexpected entry in cache for key %v: expected %v, got %v", tc, e, a) + } + } +} + +func TestLRUCache(t *testing.T) { + rateLimiters := []flowcontrol.RateLimiter{ + flowcontrol.NewTokenBucketRateLimiter(1., 1), + flowcontrol.NewTokenBucketRateLimiter(2., 2), + flowcontrol.NewTokenBucketRateLimiter(3., 3), + flowcontrol.NewTokenBucketRateLimiter(4., 4), + } + nextRateLimiter := 0 + rateLimiterFactory := func() flowcontrol.RateLimiter { + rateLimiter := rateLimiters[nextRateLimiter] + nextRateLimiter++ + return rateLimiter + } + underlyingCache, err := lru.New(2) + if err != nil { + t.Fatalf("Could not create LRU cache: %v", err) + } + cache := lruCache{ + rateLimiterFactory: rateLimiterFactory, + cache: underlyingCache, + } + cases := []struct { + name string + key int + expected flowcontrol.RateLimiter + }{ + { + name: "first added", + key: 0, + expected: rateLimiters[0], + }, + { + name: "first obtained", + key: 0, + expected: rateLimiters[0], + }, + { + name: "second added", + key: 1, + expected: rateLimiters[1], + }, + { + name: "second obtained", + key: 1, + expected: rateLimiters[1], + }, + { + name: "first obtained second time", + key: 0, + expected: rateLimiters[0], + }, + { + name: "third added", + key: 2, + expected: rateLimiters[2], + }, + { + name: "third obtained", + key: 2, + expected: rateLimiters[2], + }, + { + name: "first obtained third time", + key: 0, + expected: rateLimiters[0], + }, + { + name: "second re-added after eviction", + key: 1, + expected: rateLimiters[3], + }, + } + for _, tc := range cases { + actual := cache.get(tc.key) + if e, a := tc.expected, actual; e != a { + t.Errorf("%v: unexpected entry in cache for key %v: expected %v, got %v", tc.name, tc.key, e, a) + } + } +} diff --git a/plugin/pkg/admission/eventratelimit/clock.go b/plugin/pkg/admission/eventratelimit/clock.go new file mode 100644 index 00000000000..507d57a7505 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/clock.go @@ -0,0 +1,34 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "time" +) + +// realClock implements flowcontrol.Clock in terms of standard time functions. +type realClock struct{} + +// Now is identical to time.Now. +func (realClock) Now() time.Time { + return time.Now() +} + +// Sleep is identical to time.Sleep. +func (realClock) Sleep(d time.Duration) { + time.Sleep(d) +} diff --git a/plugin/pkg/admission/eventratelimit/config.go b/plugin/pkg/admission/eventratelimit/config.go new file mode 100644 index 00000000000..993ac9ef445 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/config.go @@ -0,0 +1,72 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "fmt" + "io" + "io/ioutil" + "os" + + "k8s.io/apimachinery/pkg/apimachinery/announced" + "k8s.io/apimachinery/pkg/apimachinery/registered" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" + "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/install" + eventratelimitv1alpha1 "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit/v1alpha1" +) + +var ( + groupFactoryRegistry = make(announced.APIGroupFactoryRegistry) + registry = registered.NewOrDie(os.Getenv("KUBE_API_VERSIONS")) + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(groupFactoryRegistry, registry, scheme) +} + +// LoadConfiguration loads the provided configuration. +func LoadConfiguration(config io.Reader) (*eventratelimitapi.Configuration, error) { + // if no config is provided, return a default configuration + if config == nil { + externalConfig := &eventratelimitv1alpha1.Configuration{} + scheme.Default(externalConfig) + internalConfig := &eventratelimitapi.Configuration{} + if err := scheme.Convert(externalConfig, internalConfig, nil); err != nil { + return nil, err + } + return internalConfig, nil + } + // we have a config so parse it. + data, err := ioutil.ReadAll(config) + if err != nil { + return nil, err + } + decoder := codecs.UniversalDecoder() + decodedObj, err := runtime.Decode(decoder, data) + if err != nil { + return nil, err + } + resourceQuotaConfiguration, ok := decodedObj.(*eventratelimitapi.Configuration) + if !ok { + return nil, fmt.Errorf("unexpected type: %T", decodedObj) + } + return resourceQuotaConfiguration, nil +} diff --git a/plugin/pkg/admission/eventratelimit/doc.go b/plugin/pkg/admission/eventratelimit/doc.go new file mode 100644 index 00000000000..d51d2379b6d --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package eventratelimit contains an admission controller that enforces a rate limit on events +package eventratelimit // import "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit" diff --git a/plugin/pkg/admission/eventratelimit/limitenforcer.go b/plugin/pkg/admission/eventratelimit/limitenforcer.go new file mode 100644 index 00000000000..4fa5ee90f73 --- /dev/null +++ b/plugin/pkg/admission/eventratelimit/limitenforcer.go @@ -0,0 +1,145 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventratelimit + +import ( + "fmt" + "strings" + + "github.com/hashicorp/golang-lru" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apiserver/pkg/admission" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/kubernetes/pkg/api" + eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit" +) + +const ( + // cache size to use if the user did not specify a cache size + defaultCacheSize = 4096 +) + +// limitEnforcer enforces a single type of event rate limit, such as server, namespace, or source+object +type limitEnforcer struct { + // type of this limit + limitType eventratelimitapi.LimitType + // cache for holding the rate limiters + cache cache + // a keyFunc which is responsible for computing a single key based on input + keyFunc func(admission.Attributes) string +} + +func newLimitEnforcer(config eventratelimitapi.Limit, clock flowcontrol.Clock) (*limitEnforcer, error) { + rateLimiterFactory := func() flowcontrol.RateLimiter { + return flowcontrol.NewTokenBucketRateLimiterWithClock(float32(config.QPS), int(config.Burst), clock) + } + + if config.Type == eventratelimitapi.ServerLimitType { + return &limitEnforcer{ + limitType: config.Type, + cache: &singleCache{ + rateLimiter: rateLimiterFactory(), + }, + keyFunc: getServerKey, + }, nil + } + + cacheSize := int(config.CacheSize) + if cacheSize == 0 { + cacheSize = defaultCacheSize + } + underlyingCache, err := lru.New(cacheSize) + if err != nil { + return nil, fmt.Errorf("could not create lru cache: %v", err) + } + cache := &lruCache{ + rateLimiterFactory: rateLimiterFactory, + cache: underlyingCache, + } + + var keyFunc func(admission.Attributes) string + switch t := config.Type; t { + case eventratelimitapi.NamespaceLimitType: + keyFunc = getNamespaceKey + case eventratelimitapi.UserLimitType: + keyFunc = getUserKey + case eventratelimitapi.SourceAndObjectLimitType: + keyFunc = getSourceAndObjectKey + default: + return nil, fmt.Errorf("unknown event rate limit type: %v", t) + } + + return &limitEnforcer{ + limitType: config.Type, + cache: cache, + keyFunc: keyFunc, + }, nil +} + +func (enforcer *limitEnforcer) accept(attr admission.Attributes) error { + key := enforcer.keyFunc(attr) + rateLimiter := enforcer.cache.get(key) + + // ensure we have available rate + allow := rateLimiter.TryAccept() + + if !allow { + return apierrors.NewTooManyRequestsError(fmt.Sprintf("limit reached on type %v for key %v", enforcer.limitType, key)) + } + + return nil +} + +func getServerKey(attr admission.Attributes) string { + return "" +} + +// getNamespaceKey returns a cache key that is based on the namespace of the event request +func getNamespaceKey(attr admission.Attributes) string { + return attr.GetNamespace() +} + +// getUserKey returns a cache key that is based on the user of the event request +func getUserKey(attr admission.Attributes) string { + userInfo := attr.GetUserInfo() + if userInfo == nil { + return "" + } + return userInfo.GetName() +} + +// getSourceAndObjectKey returns a cache key that is based on the source+object of the event +func getSourceAndObjectKey(attr admission.Attributes) string { + object := attr.GetObject() + if object == nil { + return "" + } + event, ok := object.(*api.Event) + if !ok { + return "" + } + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + }, "") +} diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index 98160227024..905a4934162 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -28,6 +28,12 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" ) +const ( + // StatusTooManyRequests means the server experienced too many requests within a + // given window and that the client must wait to perform the action again. + StatusTooManyRequests = 429 +) + // StatusError is an error intended for consumption by a REST API server; it can also be // reconstructed by clients from a REST response. Public to allow easy type switches. type StatusError struct { @@ -298,6 +304,18 @@ func NewTimeoutError(message string, retryAfterSeconds int) *StatusError { }} } +// NewTooManyRequestsError returns an error indicating that the request was rejected because +// the server has received too many requests. Client should wait and retry. But if the request +// is perishable, then the client should not retry the request. +func NewTooManyRequestsError(message string) *StatusError { + return &StatusError{metav1.Status{ + Status: metav1.StatusFailure, + Code: StatusTooManyRequests, + Reason: metav1.StatusReasonTooManyRequests, + Message: fmt.Sprintf("Too many requests: %s", message), + }} +} + // NewGenericServerResponse returns a new error for server responses that are not in a recognizable form. func NewGenericServerResponse(code int, verb string, qualifiedResource schema.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) *StatusError { reason := metav1.StatusReasonUnknown