diff --git a/pkg/kubeapiserver/admission/BUILD b/pkg/kubeapiserver/admission/BUILD index 333d3f76e4c..a55bf4ec166 100644 --- a/pkg/kubeapiserver/admission/BUILD +++ b/pkg/kubeapiserver/admission/BUILD @@ -42,6 +42,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/kubeapiserver/admission/configuration:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/kubeapiserver/admission/configuration/BUILD b/pkg/kubeapiserver/admission/configuration/BUILD new file mode 100644 index 00000000000..d557d01dff7 --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/BUILD @@ -0,0 +1,49 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["initializer_manager_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "configuration_manager.go", + "external_admission_hook_manager.go", + "initializer_manager.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/apis/admissionregistration/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubeapiserver/admission/configuration/configuration_manager.go b/pkg/kubeapiserver/admission/configuration/configuration_manager.go new file mode 100644 index 00000000000..cce198ec4d9 --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/configuration_manager.go @@ -0,0 +1,100 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + defaultInterval = 1 * time.Second + defaultFailureThreshold = 5 +) + +type getFunc func() (runtime.Object, error) + +// When running, poller calls `get` every `interval`. If `get` is +// successful, `Ready()` returns ready and `configuration()` returns the +// `mergedConfiguration`; if `get` has failed more than `failureThreshold ` times, +// `Ready()` returns not ready and `configuration()` returns nil configuration. +// In an HA setup, the poller is consistent only if the `get` is +// doing consistent read. +type poller struct { + // a function to consistently read the latest configuration + get getFunc + // consistent read interval + interval time.Duration + // if the number of consecutive read failure equals or exceeds the failureThreshold , the + // configuration is regarded as not ready. + failureThreshold int + // if the configuration is regarded as ready. + ready bool + mergedConfiguration runtime.Object + // lock much be hold when reading ready or mergedConfiguration + lock sync.RWMutex +} + +func newPoller(get getFunc) *poller { + return &poller{ + get: get, + interval: defaultInterval, + failureThreshold: defaultFailureThreshold, + } +} + +func (a *poller) notReady() { + a.lock.Lock() + defer a.lock.Unlock() + a.ready = false +} + +func (a *poller) configuration() (runtime.Object, error) { + a.lock.RLock() + defer a.lock.RUnlock() + if !a.ready { + return nil, fmt.Errorf("configuration is not ready") + } + return a.mergedConfiguration, nil +} + +func (a *poller) setConfigurationAndReady(value runtime.Object) { + a.lock.Lock() + defer a.lock.Unlock() + a.mergedConfiguration = value + a.ready = true +} + +func (a *poller) Run(stopCh <-chan struct{}) { + var failure int + go wait.Until(func() { + configuration, err := a.get() + if err != nil { + failure++ + if failure >= a.failureThreshold { + a.notReady() + } + return + } + failure = 0 + a.setConfigurationAndReady(configuration) + }, a.interval, stopCh) +} diff --git a/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go new file mode 100644 index 00000000000..56faf029f0c --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/external_admission_hook_manager.go @@ -0,0 +1,71 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" +) + +type ExternalAdmissionHookConfigurationLister interface { + List(opts metav1.ListOptions) (*v1alpha1.ExternalAdmissionHookConfigurationList, error) +} + +type ExternalAdmissionHookConfigurationManager struct { + *poller +} + +// ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration. +func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) { + configuration, err := im.poller.configuration() + if err != nil { + return nil, err + } + externalAdmissionHookConfiguration, ok := configuration.(*v1alpha1.ExternalAdmissionHookConfiguration) + if !ok { + return nil, fmt.Errorf("expected type %v, got type %v", reflect.TypeOf(externalAdmissionHookConfiguration), reflect.TypeOf(configuration)) + } + return externalAdmissionHookConfiguration, nil +} + +func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + return mergeExternalAdmissionHookConfigurations(list), nil + } + + return &ExternalAdmissionHookConfigurationManager{ + newPoller(getFn)} +} + +func mergeExternalAdmissionHookConfigurations( + list *v1alpha1.ExternalAdmissionHookConfigurationList, +) *v1alpha1.ExternalAdmissionHookConfiguration { + configurations := list.Items + var ret v1alpha1.ExternalAdmissionHookConfiguration + for _, c := range configurations { + ret.ExternalAdmissionHooks = append(ret.ExternalAdmissionHooks, c.ExternalAdmissionHooks...) + } + return &ret +} diff --git a/pkg/kubeapiserver/admission/configuration/initializer_manager.go b/pkg/kubeapiserver/admission/configuration/initializer_manager.go new file mode 100644 index 00000000000..eba763f943b --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/initializer_manager.go @@ -0,0 +1,76 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "reflect" + "sort" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" +) + +type InitializerConfigurationLister interface { + List(opts metav1.ListOptions) (*v1alpha1.InitializerConfigurationList, error) +} + +type InitializerConfigurationManager struct { + *poller +} + +// Initializers returns the merged InitializerConfiguration. +func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) { + configuration, err := im.poller.configuration() + if err != nil { + return nil, err + } + initializerConfiguration, ok := configuration.(*v1alpha1.InitializerConfiguration) + if !ok { + return nil, fmt.Errorf("expected type %v, got type %v", reflect.TypeOf(initializerConfiguration), reflect.TypeOf(configuration)) + } + return initializerConfiguration, nil +} + +func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager { + getFn := func() (runtime.Object, error) { + list, err := c.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + return mergeInitializerConfigurations(list), nil + } + return &InitializerConfigurationManager{ + newPoller(getFn)} +} + +func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration { + configurations := initializerConfigurationList.Items + sort.SliceStable(configurations, InitializerConfigurationSorter(configurations).ByName) + var ret v1alpha1.InitializerConfiguration + for _, c := range configurations { + ret.Initializers = append(ret.Initializers, c.Initializers...) + } + return &ret +} + +type InitializerConfigurationSorter []v1alpha1.InitializerConfiguration + +func (a InitializerConfigurationSorter) ByName(i, j int) bool { + return a[i].Name < a[j].Name +} diff --git a/pkg/kubeapiserver/admission/configuration/initializer_manager_test.go b/pkg/kubeapiserver/admission/configuration/initializer_manager_test.go new file mode 100644 index 00000000000..7f6bfd23506 --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/initializer_manager_test.go @@ -0,0 +1,171 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1" +) + +type mockLister struct { + invoked int + successes int + failures int + stopCh chan struct{} + configurationList v1alpha1.InitializerConfigurationList + t *testing.T +} + +func newMockLister(successes, failures int, configurationList v1alpha1.InitializerConfigurationList, t *testing.T) *mockLister { + return &mockLister{ + failures: failures, + successes: successes, + configurationList: configurationList, + stopCh: make(chan struct{}), + t: t, + } +} + +// The first List will be successful; the next m.failures List will +// fail; the next m.successes List will be successful; the stopCh is closed at +// the 1+m.failures+m.successes call. +func (m *mockLister) List(options metav1.ListOptions) (*v1alpha1.InitializerConfigurationList, error) { + m.invoked++ + // m.successes could be 0, so call this `if` first. + if m.invoked == 1+m.failures+m.successes { + close(m.stopCh) + } + if m.invoked == 1 { + return &m.configurationList, nil + } + if m.invoked <= 1+m.failures { + return nil, fmt.Errorf("some error") + } + if m.invoked <= 1+m.failures+m.successes { + return &m.configurationList, nil + } + m.t.Fatalf("unexpected call to List, stopCh has been closed at the %d time call", 1+m.successes+m.failures) + return nil, nil +} + +var _ InitializerConfigurationLister = &mockLister{} + +func TestConfiguration(t *testing.T) { + cases := []struct { + name string + failures int + // note that the first call to mockLister is always a success. + successes int + expectReady bool + }{ + { + name: "number of failures hasn't reached failureThreshold", + failures: defaultFailureThreshold - 1, + expectReady: true, + }, + { + name: "number of failures just reaches failureThreshold", + failures: defaultFailureThreshold, + expectReady: false, + }, + { + name: "number of failures exceeds failureThreshold", + failures: defaultFailureThreshold + 1, + expectReady: false, + }, + { + name: "number of failures exceeds failureThreshold, but then get another success", + failures: defaultFailureThreshold + 1, + successes: 1, + expectReady: true, + }, + } + for _, c := range cases { + mock := newMockLister(c.successes, c.failures, v1alpha1.InitializerConfigurationList{}, t) + manager := NewInitializerConfigurationManager(mock) + manager.interval = 1 * time.Millisecond + manager.Run(mock.stopCh) + <-mock.stopCh + _, err := manager.Initializers() + if err != nil && c.expectReady { + t.Errorf("case %s, expect ready, got: %v", c.name, err) + } + if err == nil && !c.expectReady { + t.Errorf("case %s, expect not ready", c.name) + } + } +} + +func TestMergeInitializerConfigurations(t *testing.T) { + configurationsList := v1alpha1.InitializerConfigurationList{ + Items: []v1alpha1.InitializerConfiguration{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "provider_2", + }, + Initializers: []v1alpha1.Initializer{ + { + Name: "initializer_a", + }, + { + Name: "initializer_b", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "provider_1", + }, + Initializers: []v1alpha1.Initializer{ + { + Name: "initializer_c", + }, + { + Name: "initializer_d", + }, + }, + }, + }, + } + + expected := &v1alpha1.InitializerConfiguration{ + Initializers: []v1alpha1.Initializer{ + { + Name: "initializer_c", + }, + { + Name: "initializer_d", + }, + { + Name: "initializer_a", + }, + { + Name: "initializer_b", + }, + }, + } + + got := mergeInitializerConfigurations(&configurationsList) + if !reflect.DeepEqual(got, expected) { + t.Errorf("expected: %#v, got: %#v", expected, got) + } +}