mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #46672 from smarterclayton/initializer_with_config
Automatic merge from submit-queue (batch tested with PRs 46967, 46992, 43338, 46717, 46672) Select initializers from the dynamic configuration Continues #36721 kubernetes/features#209
This commit is contained in:
commit
a552ee61a0
@ -25,6 +25,7 @@ go_library(
|
||||
"//pkg/apis/extensions:go_default_library",
|
||||
"//pkg/apis/networking:go_default_library",
|
||||
"//pkg/capabilities:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
|
@ -61,6 +61,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apis/networking"
|
||||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
@ -380,6 +381,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
|
||||
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
||||
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
||||
}
|
||||
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
|
||||
}
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
||||
|
||||
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
|
||||
@ -398,6 +403,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
|
||||
pluginInitializer, err := BuildAdmissionPluginInitializer(
|
||||
s,
|
||||
client,
|
||||
externalClient,
|
||||
sharedInformers,
|
||||
genericConfig.Authorizer,
|
||||
)
|
||||
@ -415,7 +421,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
|
||||
}
|
||||
|
||||
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
|
||||
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
|
||||
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
|
||||
var cloudConfig []byte
|
||||
|
||||
if s.CloudProvider.CloudConfigFile != "" {
|
||||
@ -433,7 +439,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
|
||||
// do not require us to open watches for all items tracked by quota.
|
||||
quotaRegistry := quotainstall.NewRegistry(nil, nil)
|
||||
|
||||
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)
|
||||
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)
|
||||
|
||||
// Read client cert/key for plugins that need to make calls out
|
||||
if len(s.ProxyClientCertFile) > 0 && len(s.ProxyClientKeyFile) > 0 {
|
||||
|
@ -258,7 +258,7 @@ function create-federation-api-objects {
|
||||
export FEDERATION_APISERVER_KEY_BASE64="${FEDERATION_APISERVER_KEY_BASE64}"
|
||||
|
||||
# Enable the NamespaceLifecycle admission control by default.
|
||||
export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-Initializers,NamespaceLifecycle}"
|
||||
export FEDERATION_ADMISSION_CONTROL="${FEDERATION_ADMISSION_CONTROL:-NamespaceLifecycle}"
|
||||
|
||||
for file in federation-etcd-pvc.yaml federation-apiserver-{deployment,secrets}.yaml federation-controller-manager-deployment.yaml; do
|
||||
echo "Creating manifest: ${file}"
|
||||
|
@ -41,6 +41,7 @@ go_library(
|
||||
"//pkg/apis/extensions:go_default_library",
|
||||
"//pkg/apis/extensions/install:go_default_library",
|
||||
"//pkg/apis/extensions/v1beta1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||
"//pkg/cloudprovider/providers:go_default_library",
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||
"k8s.io/kubernetes/pkg/generated/openapi"
|
||||
@ -180,6 +181,10 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create clientset: %v", err)
|
||||
}
|
||||
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create external clientset: %v", err)
|
||||
}
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
||||
|
||||
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
||||
@ -199,7 +204,7 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
|
||||
// NOTE: we do not provide informers to the quota registry because admission level decisions
|
||||
// do not require us to open watches for all items tracked by quota.
|
||||
quotaRegistry := quotainstall.NewRegistry(nil, nil)
|
||||
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)
|
||||
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)
|
||||
|
||||
err = s.Admission.ApplyTo(
|
||||
genericConfig,
|
||||
|
@ -692,7 +692,7 @@ func createAPIServer(clientset client.Interface, namespace, name, federationName
|
||||
"--client-ca-file": "/etc/federation/apiserver/ca.crt",
|
||||
"--tls-cert-file": "/etc/federation/apiserver/server.crt",
|
||||
"--tls-private-key-file": "/etc/federation/apiserver/server.key",
|
||||
"--admission-control": "Initializers,NamespaceLifecycle",
|
||||
"--admission-control": "NamespaceLifecycle",
|
||||
}
|
||||
|
||||
if advertiseAddress != "" {
|
||||
|
@ -869,7 +869,7 @@ func fakeInitHostFactory(apiserverServiceType v1.ServiceType, federationName, na
|
||||
fmt.Sprintf("--secure-port=%d", apiServerSecurePort),
|
||||
"--tls-cert-file=/etc/federation/apiserver/server.crt",
|
||||
"--tls-private-key-file=/etc/federation/apiserver/server.key",
|
||||
"--admission-control=Initializers,NamespaceLifecycle",
|
||||
"--admission-control=NamespaceLifecycle",
|
||||
fmt.Sprintf("--advertise-address=%s", address),
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ function run_federation_apiserver() {
|
||||
kube::log::status "Starting federation-apiserver"
|
||||
|
||||
# Admission Controllers to invoke prior to persisting objects in cluster
|
||||
ADMISSION_CONTROL="Initializers,NamespaceLifecycle"
|
||||
ADMISSION_CONTROL="NamespaceLifecycle"
|
||||
|
||||
"${KUBE_OUTPUT_HOSTBIN}/federation-apiserver" \
|
||||
--insecure-port="${API_PORT}" \
|
||||
|
@ -26,6 +26,7 @@ go_library(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/admissionregistration:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||
"//pkg/quota:go_default_library",
|
||||
|
@ -29,6 +29,8 @@ go_library(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
|
@ -30,6 +30,11 @@ const (
|
||||
defaultFailureThreshold = 5
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotReady = fmt.Errorf("configuration is not ready")
|
||||
ErrDisabled = fmt.Errorf("disabled")
|
||||
)
|
||||
|
||||
type getFunc func() (runtime.Object, error)
|
||||
|
||||
// When running, poller calls `get` every `interval`. If `get` is
|
||||
@ -52,7 +57,8 @@ type poller struct {
|
||||
ready bool
|
||||
mergedConfiguration runtime.Object
|
||||
// lock much be hold when reading ready or mergedConfiguration
|
||||
lock sync.RWMutex
|
||||
lock sync.RWMutex
|
||||
lastErr error
|
||||
}
|
||||
|
||||
func newPoller(get getFunc) *poller {
|
||||
@ -63,6 +69,12 @@ func newPoller(get getFunc) *poller {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *poller) lastError(err error) {
|
||||
a.lock.Lock()
|
||||
defer a.lock.Unlock()
|
||||
a.lastErr = err
|
||||
}
|
||||
|
||||
func (a *poller) notReady() {
|
||||
a.lock.Lock()
|
||||
defer a.lock.Unlock()
|
||||
@ -73,7 +85,10 @@ func (a *poller) configuration() (runtime.Object, error) {
|
||||
a.lock.RLock()
|
||||
defer a.lock.RUnlock()
|
||||
if !a.ready {
|
||||
return nil, fmt.Errorf("configuration is not ready")
|
||||
if a.lastErr != nil {
|
||||
return nil, a.lastErr
|
||||
}
|
||||
return nil, ErrNotReady
|
||||
}
|
||||
return a.mergedConfiguration, nil
|
||||
}
|
||||
@ -83,6 +98,7 @@ func (a *poller) setConfigurationAndReady(value runtime.Object) {
|
||||
defer a.lock.Unlock()
|
||||
a.mergedConfiguration = value
|
||||
a.ready = true
|
||||
a.lastErr = nil
|
||||
}
|
||||
|
||||
func (a *poller) Run(stopCh <-chan struct{}) {
|
||||
@ -93,6 +109,7 @@ func (a *poller) sync() {
|
||||
configuration, err := a.get()
|
||||
if err != nil {
|
||||
a.failures++
|
||||
a.lastError(err)
|
||||
if a.failures >= a.failureThreshold {
|
||||
a.notReady()
|
||||
}
|
||||
|
@ -33,6 +33,20 @@ type ExternalAdmissionHookConfigurationManager struct {
|
||||
*poller
|
||||
}
|
||||
|
||||
func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
|
||||
getFn := func() (runtime.Object, error) {
|
||||
list, err := c.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mergeExternalAdmissionHookConfigurations(list), nil
|
||||
}
|
||||
|
||||
return &ExternalAdmissionHookConfigurationManager{
|
||||
newPoller(getFn),
|
||||
}
|
||||
}
|
||||
|
||||
// ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration.
|
||||
func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) {
|
||||
configuration, err := im.poller.configuration()
|
||||
@ -46,17 +60,8 @@ func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*
|
||||
return externalAdmissionHookConfiguration, nil
|
||||
}
|
||||
|
||||
func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
|
||||
getFn := func() (runtime.Object, error) {
|
||||
list, err := c.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mergeExternalAdmissionHookConfigurations(list), nil
|
||||
}
|
||||
|
||||
return &ExternalAdmissionHookConfigurationManager{
|
||||
newPoller(getFn)}
|
||||
func (im *ExternalAdmissionHookConfigurationManager) Run(stopCh <-chan struct{}) {
|
||||
im.poller.Run(stopCh)
|
||||
}
|
||||
|
||||
func mergeExternalAdmissionHookConfigurations(
|
||||
|
@ -21,6 +21,9 @@ import (
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
|
||||
@ -34,6 +37,23 @@ type InitializerConfigurationManager struct {
|
||||
*poller
|
||||
}
|
||||
|
||||
func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
|
||||
getFn := func() (runtime.Object, error) {
|
||||
list, err := c.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) || errors.IsForbidden(err) {
|
||||
glog.V(5).Infof("Initializers are disabled due to an error: %v", err)
|
||||
return nil, ErrDisabled
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return mergeInitializerConfigurations(list), nil
|
||||
}
|
||||
return &InitializerConfigurationManager{
|
||||
newPoller(getFn),
|
||||
}
|
||||
}
|
||||
|
||||
// Initializers returns the merged InitializerConfiguration.
|
||||
func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) {
|
||||
configuration, err := im.poller.configuration()
|
||||
@ -47,16 +67,8 @@ func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.Initializer
|
||||
return initializerConfiguration, nil
|
||||
}
|
||||
|
||||
func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
|
||||
getFn := func() (runtime.Object, error) {
|
||||
list, err := c.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mergeInitializerConfigurations(list), nil
|
||||
}
|
||||
return &InitializerConfigurationManager{
|
||||
newPoller(getFn)}
|
||||
func (im *InitializerConfigurationManager) Run(stopCh <-chan struct{}) {
|
||||
im.poller.Run(stopCh)
|
||||
}
|
||||
|
||||
func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration {
|
||||
|
@ -57,7 +57,7 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{}
|
||||
// TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer
|
||||
// interface is implemented.
|
||||
func TestWantsAuthorizer(t *testing.T) {
|
||||
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil, nil, nil)
|
||||
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, nil, nil, nil)
|
||||
wantAuthorizerAdmission := &WantAuthorizerAdmission{}
|
||||
initializer.Initialize(wantAuthorizerAdmission)
|
||||
if wantAuthorizerAdmission.auth == nil {
|
||||
@ -76,7 +76,7 @@ func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte)
|
||||
|
||||
func TestCloudConfigAdmissionPlugin(t *testing.T) {
|
||||
cloudConfig := []byte("cloud-configuration")
|
||||
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
|
||||
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
|
||||
wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{}
|
||||
initializer.Initialize(wantsCloudConfigAdmission)
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/kubernetes/pkg/apis/admissionregistration"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
@ -36,6 +37,12 @@ type WantsInternalKubeClientSet interface {
|
||||
admission.Validator
|
||||
}
|
||||
|
||||
// WantsExternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it
|
||||
type WantsExternalKubeClientSet interface {
|
||||
SetExternalKubeClientSet(clientset.Interface)
|
||||
admission.Validator
|
||||
}
|
||||
|
||||
// WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it
|
||||
type WantsInternalKubeInformerFactory interface {
|
||||
SetInternalKubeInformerFactory(informers.SharedInformerFactory)
|
||||
@ -95,6 +102,7 @@ type WebhookSource interface {
|
||||
|
||||
type PluginInitializer struct {
|
||||
internalClient internalclientset.Interface
|
||||
externalClient clientset.Interface
|
||||
informers informers.SharedInformerFactory
|
||||
authorizer authorizer.Authorizer
|
||||
cloudConfig []byte
|
||||
@ -113,14 +121,18 @@ var _ admission.PluginInitializer = &PluginInitializer{}
|
||||
// NewPluginInitializer constructs new instance of PluginInitializer
|
||||
// TODO: switch these parameters to use the builder pattern or just make them
|
||||
// all public, this construction method is pointless boilerplate.
|
||||
func NewPluginInitializer(internalClient internalclientset.Interface,
|
||||
func NewPluginInitializer(
|
||||
internalClient internalclientset.Interface,
|
||||
externalClient clientset.Interface,
|
||||
sharedInformers informers.SharedInformerFactory,
|
||||
authz authorizer.Authorizer,
|
||||
cloudConfig []byte,
|
||||
restMapper meta.RESTMapper,
|
||||
quotaRegistry quota.Registry) *PluginInitializer {
|
||||
quotaRegistry quota.Registry,
|
||||
) *PluginInitializer {
|
||||
return &PluginInitializer{
|
||||
internalClient: internalClient,
|
||||
externalClient: externalClient,
|
||||
informers: sharedInformers,
|
||||
authorizer: authz,
|
||||
cloudConfig: cloudConfig,
|
||||
@ -157,6 +169,10 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) {
|
||||
wants.SetInternalKubeClientSet(i.internalClient)
|
||||
}
|
||||
|
||||
if wants, ok := plugin.(WantsExternalKubeClientSet); ok {
|
||||
wants.SetExternalKubeClientSet(i.externalClient)
|
||||
}
|
||||
|
||||
if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {
|
||||
wants.SetInternalKubeInformerFactory(i.informers)
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func newGCPermissionsEnforcement() *gcPermissionsEnforcement {
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
whiteList: whiteList,
|
||||
}
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
|
||||
pluginInitializer.Initialize(gcAdmit)
|
||||
return gcAdmit
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ licenses(["notice"])
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
@ -12,6 +13,10 @@ go_library(
|
||||
srcs = ["initialization.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubeapiserver/admission/configuration:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
@ -19,6 +24,7 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
||||
],
|
||||
@ -36,3 +42,14 @@ filegroup(
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["initialization_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -19,6 +19,12 @@ package initialization
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
@ -28,8 +34,13 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/kubeapiserver/admission/configuration"
|
||||
)
|
||||
|
||||
// Register registers a plugin
|
||||
@ -43,38 +54,131 @@ type initializerOptions struct {
|
||||
Initializers []string
|
||||
}
|
||||
|
||||
type InitializationConfig interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
Initializers() (*v1alpha1.InitializerConfiguration, error)
|
||||
}
|
||||
|
||||
type initializer struct {
|
||||
resources map[schema.GroupResource]initializerOptions
|
||||
config InitializationConfig
|
||||
authorizer authorizer.Authorizer
|
||||
}
|
||||
|
||||
// NewAlwaysAdmit creates a new always admit admission handler
|
||||
// Retry config loading failures for up to four and a half seconds if the config hasn't been loaded
|
||||
// yet or if the server is down. Creation requests are delayed during this interval, which prevents
|
||||
// racy failures during startup until the initializer configuration becomes available.
|
||||
// TODO: move into InitializationConfigurationManager, since these values depend on the config
|
||||
// refresh loop.
|
||||
const (
|
||||
retryTemporaryConfigFailures = 8
|
||||
retryTemporaryConfigInterval = 550 * time.Millisecond
|
||||
)
|
||||
|
||||
// NewInitializer creates a new initializer plugin which assigns newly created resources initializers
|
||||
// based on configuration loaded from the admission API group.
|
||||
// FUTURE: this may be moved to the storage layer of the apiserver, but for now this is an alpha feature
|
||||
// that can be disabled.
|
||||
func NewInitializer() admission.Interface {
|
||||
return &initializer{
|
||||
resources: map[schema.GroupResource]initializerOptions{
|
||||
//schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}},
|
||||
},
|
||||
}
|
||||
return &initializer{}
|
||||
}
|
||||
|
||||
func (i *initializer) Validate() error {
|
||||
if i.config == nil {
|
||||
return fmt.Errorf("the Initializer admission plugin requires a Kubernetes client to be provided")
|
||||
}
|
||||
i.config.Run(wait.NeverStop)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *initializer) SetExternalKubeClientSet(client clientset.Interface) {
|
||||
i.config = configuration.NewInitializerConfigurationManager(client.Admissionregistration().InitializerConfigurations())
|
||||
}
|
||||
|
||||
func (i *initializer) SetAuthorizer(a authorizer.Authorizer) {
|
||||
i.authorizer = a
|
||||
}
|
||||
|
||||
var initializerFieldPath = field.NewPath("metadata", "initializers")
|
||||
|
||||
// temporaryConnectionError returns true if the error is considered temporary
|
||||
func temporaryConnectionError(err error) bool {
|
||||
if urlError, ok := err.(*url.Error); ok {
|
||||
if urlError.Temporary() {
|
||||
return true
|
||||
}
|
||||
if opError, ok := urlError.Err.(*net.OpError); ok {
|
||||
if syscallError, ok := opError.Err.(*os.SyscallError); ok {
|
||||
if errno, ok := syscallError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// readConfigWithRetry holds requests instead of failing them if the server is not yet initialized
|
||||
// or is unresponsive. It formats the returned error for client use if necessary.
|
||||
func (i *initializer) readConfigWithRetry(a admission.Attributes) (*v1alpha1.InitializerConfiguration, error) {
|
||||
var lastErr error
|
||||
for count := 0; count < retryTemporaryConfigFailures; count++ {
|
||||
if count > 0 {
|
||||
time.Sleep(retryTemporaryConfigInterval)
|
||||
}
|
||||
|
||||
// read initializers from config
|
||||
config, err := i.config.Initializers()
|
||||
if err == nil {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// if initializer configuration is disabled, fail open
|
||||
if err == configuration.ErrDisabled {
|
||||
return &v1alpha1.InitializerConfiguration{}, nil
|
||||
}
|
||||
|
||||
// retry certain errors
|
||||
lastErr = err
|
||||
if err != configuration.ErrNotReady && !temporaryConnectionError(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
e := errors.NewServerTimeout(a.GetResource().GroupResource(), "create", 1)
|
||||
if lastErr == configuration.ErrNotReady {
|
||||
e.ErrStatus.Message = fmt.Sprintf("Waiting for initialization configuration to load: %v", lastErr)
|
||||
e.ErrStatus.Reason = "LoadingConfiguration"
|
||||
e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{
|
||||
Type: "InitializerConfigurationPending",
|
||||
Message: "The server is waiting for the initializer configuration to be loaded.",
|
||||
})
|
||||
} else {
|
||||
e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the initializer configuration: %v", lastErr)
|
||||
e.ErrStatus.Reason = "LoadingConfiguration"
|
||||
e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{
|
||||
Type: "InitializerConfigurationFailure",
|
||||
Message: "An error has occurred while refreshing the initializer configuration, no resources can be created until a refresh succeeds.",
|
||||
})
|
||||
}
|
||||
return nil, e
|
||||
}
|
||||
|
||||
// Admit checks for create requests to add initializers, or update request to enforce invariants.
|
||||
// The admission controller fails open if the object doesn't have ObjectMeta (can't be initialized).
|
||||
// A client with sufficient permission ("initialize" verb on resource) can specify its own initializers
|
||||
// or an empty initializers struct (which bypasses initialization). Only clients with the initialize verb
|
||||
// can update objects that have not completed initialization. Sub resources can still be modified on
|
||||
// resources that are undergoing initialization.
|
||||
// TODO: once this logic is ready for beta, move it into the REST storage layer.
|
||||
func (i *initializer) Admit(a admission.Attributes) (err error) {
|
||||
// TODO: sub-resource action should be denied until the object is initialized
|
||||
if len(a.GetSubresource()) > 0 {
|
||||
switch a.GetOperation() {
|
||||
case admission.Create, admission.Update:
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
resource, ok := i.resources[a.GetResource().GroupResource()]
|
||||
if !ok {
|
||||
// TODO: should sub-resource action should be denied until the object is initialized?
|
||||
if len(a.GetSubresource()) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,15 +191,44 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
|
||||
return nil
|
||||
}
|
||||
existing := accessor.GetInitializers()
|
||||
// it must be possible for some users to bypass initialization - for now, check the initialize operation
|
||||
if existing != nil {
|
||||
if err := i.canInitialize(a); err != nil {
|
||||
glog.V(5).Infof("Admin bypassing initialization for %s", a.GetResource())
|
||||
|
||||
// it must be possible for some users to bypass initialization - for now, check the initialize operation
|
||||
if err := i.canInitialize(a, "create with initializers denied"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// allow administrators to bypass initialization by setting an empty initializers struct
|
||||
if len(existing.Pending) == 0 && existing.Result == nil {
|
||||
accessor.SetInitializers(nil)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
glog.V(5).Infof("Checking initialization for %s", a.GetResource())
|
||||
|
||||
// TODO: pull this from config
|
||||
accessor.SetInitializers(copiedInitializers(resource.Initializers))
|
||||
config, err := i.readConfigWithRetry(a)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mirror pods are exempt from initialization because they are created and initialized
|
||||
// on the Kubelet before they appear in the API.
|
||||
// TODO: once this moves to REST storage layer, this becomes a pod specific concern
|
||||
if pod, ok := a.GetObject().(*api.Pod); ok && pod != nil {
|
||||
if _, isMirror := pod.Annotations[api.MirrorPodAnnotationKey]; isMirror {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
names := findInitializers(config, a.GetResource())
|
||||
if len(names) == 0 {
|
||||
glog.V(5).Infof("No initializers needed")
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(5).Infof("Found initializers for %s: %v", a.GetResource(), names)
|
||||
accessor.SetInitializers(newInitializers(names))
|
||||
}
|
||||
|
||||
case admission.Update:
|
||||
accessor, err := meta.Accessor(a.GetObject())
|
||||
@ -113,13 +246,20 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
|
||||
}
|
||||
existing := existingAccessor.GetInitializers()
|
||||
|
||||
// updates on initialized resources are allowed
|
||||
if updated == nil && existing == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(5).Infof("Modifying uninitialized resource %s", a.GetResource())
|
||||
|
||||
// because we are called before validation, we need to ensure the update transition is valid.
|
||||
if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 {
|
||||
return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs)
|
||||
}
|
||||
|
||||
// caller must have the ability to mutate un-initialized resources
|
||||
if err := i.canInitialize(a); err != nil {
|
||||
if err := i.canInitialize(a, "update to uninitialized resource denied"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -129,7 +269,7 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *initializer) canInitialize(a admission.Attributes) error {
|
||||
func (i *initializer) canInitialize(a admission.Attributes, message string) error {
|
||||
// if no authorizer is present, the initializer plugin allows modification of uninitialized resources
|
||||
if i.authorizer == nil {
|
||||
glog.V(4).Infof("No authorizer provided to initialization admission control, unable to check permissions")
|
||||
@ -150,16 +290,17 @@ func (i *initializer) canInitialize(a admission.Attributes) error {
|
||||
return err
|
||||
}
|
||||
if !authorized {
|
||||
return fmt.Errorf("user must have permission to initialize resources: %s", reason)
|
||||
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("%s: %s", message, reason))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *initializer) Handles(op admission.Operation) bool {
|
||||
return true
|
||||
return op == admission.Create || op == admission.Update
|
||||
}
|
||||
|
||||
func copiedInitializers(names []string) *metav1.Initializers {
|
||||
// newInitializers populates an Initializers struct.
|
||||
func newInitializers(names []string) *metav1.Initializers {
|
||||
if len(names) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -171,3 +312,71 @@ func copiedInitializers(names []string) *metav1.Initializers {
|
||||
Pending: init,
|
||||
}
|
||||
}
|
||||
|
||||
// findInitializers returns the list of initializer names that apply to a config. It returns an empty list
|
||||
// if no initializers apply.
|
||||
func findInitializers(initializers *v1alpha1.InitializerConfiguration, gvr schema.GroupVersionResource) []string {
|
||||
var names []string
|
||||
for _, init := range initializers.Initializers {
|
||||
if !matchRule(init.Rules, gvr) {
|
||||
continue
|
||||
}
|
||||
names = append(names, init.Name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// matchRule returns true if any rule matches the provided group version resource.
|
||||
func matchRule(rules []v1alpha1.Rule, gvr schema.GroupVersionResource) bool {
|
||||
for _, rule := range rules {
|
||||
if !hasGroup(rule.APIGroups, gvr.Group) {
|
||||
return false
|
||||
}
|
||||
if !hasVersion(rule.APIVersions, gvr.Version) {
|
||||
return false
|
||||
}
|
||||
if !hasResource(rule.Resources, gvr.Resource) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return len(rules) > 0
|
||||
}
|
||||
|
||||
func hasGroup(groups []string, group string) bool {
|
||||
if groups[0] == "*" {
|
||||
return true
|
||||
}
|
||||
for _, g := range groups {
|
||||
if g == group {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func hasVersion(versions []string, version string) bool {
|
||||
if versions[0] == "*" {
|
||||
return true
|
||||
}
|
||||
for _, v := range versions {
|
||||
if v == version {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func hasResource(resources []string, resource string) bool {
|
||||
if resources[0] == "*" || resources[0] == "*/*" {
|
||||
return true
|
||||
}
|
||||
for _, r := range resources {
|
||||
if strings.Contains(r, "/") {
|
||||
continue
|
||||
}
|
||||
if r == resource {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
99
plugin/pkg/admission/initialization/initialization_test.go
Normal file
99
plugin/pkg/admission/initialization/initialization_test.go
Normal file
@ -0,0 +1,99 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package initialization
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
|
||||
)
|
||||
|
||||
func newInitializer(name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration {
|
||||
return addInitializer(&v1alpha1.InitializerConfiguration{}, name, rules...)
|
||||
}
|
||||
|
||||
func addInitializer(base *v1alpha1.InitializerConfiguration, name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration {
|
||||
base.Initializers = append(base.Initializers, v1alpha1.Initializer{
|
||||
Name: name,
|
||||
Rules: rules,
|
||||
})
|
||||
return base
|
||||
}
|
||||
|
||||
func TestFindInitializers(t *testing.T) {
|
||||
type args struct {
|
||||
initializers *v1alpha1.InitializerConfiguration
|
||||
gvr schema.GroupVersionResource
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
args: args{
|
||||
gvr: schema.GroupVersionResource{},
|
||||
initializers: newInitializer("1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "everything",
|
||||
args: args{
|
||||
gvr: schema.GroupVersionResource{},
|
||||
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{"*"}, APIVersions: []string{"*"}, Resources: []string{"*"}}),
|
||||
},
|
||||
want: []string{"1"},
|
||||
},
|
||||
{
|
||||
name: "empty group",
|
||||
args: args{
|
||||
gvr: schema.GroupVersionResource{},
|
||||
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"*"}}),
|
||||
},
|
||||
want: []string{"1"},
|
||||
},
|
||||
{
|
||||
name: "pod",
|
||||
args: args{
|
||||
gvr: schema.GroupVersionResource{Resource: "pods"},
|
||||
initializers: addInitializer(
|
||||
newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}),
|
||||
"2", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
|
||||
),
|
||||
},
|
||||
want: []string{"1", "2"},
|
||||
},
|
||||
{
|
||||
name: "multiple matches",
|
||||
args: args{
|
||||
gvr: schema.GroupVersionResource{Resource: "pods"},
|
||||
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}),
|
||||
},
|
||||
want: []string{"1"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := findInitializers(tt.args.initializers, tt.args.gvr); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("findInitializers() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh
|
||||
if err != nil {
|
||||
return nil, f, err
|
||||
}
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
|
||||
pluginInitializer.Initialize(handler)
|
||||
err = admission.Validate(handler)
|
||||
return handler, f, err
|
||||
|
@ -38,7 +38,7 @@ import (
|
||||
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
||||
handler := NewProvision()
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
|
||||
pluginInitializer.Initialize(handler)
|
||||
err := admission.Validate(handler)
|
||||
return handler, f, err
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
||||
handler := NewExists()
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
|
||||
pluginInitializer.Initialize(handler)
|
||||
err := admission.Validate(handler)
|
||||
return handler, f, err
|
||||
|
@ -191,7 +191,7 @@ func TestHandles(t *testing.T) {
|
||||
func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) {
|
||||
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
||||
handler := NewPodNodeSelector(nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
|
||||
pluginInitializer.Initialize(handler)
|
||||
err := admission.Validate(handler)
|
||||
return handler, f, err
|
||||
|
@ -193,7 +193,7 @@ func newHandlerForTest(c clientset.Interface) (*podTolerationsPlugin, informers.
|
||||
return nil, nil, err
|
||||
}
|
||||
handler := NewPodTolerationsPlugin(pluginConfig)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
|
||||
pluginInitializer.Initialize(handler)
|
||||
err = admission.Validate(handler)
|
||||
return handler, f, err
|
||||
|
@ -41,7 +41,7 @@ type AdmissionOptions struct {
|
||||
func NewAdmissionOptions() *AdmissionOptions {
|
||||
options := &AdmissionOptions{
|
||||
Plugins: &admission.Plugins{},
|
||||
PluginNames: []string{"Initializers"},
|
||||
PluginNames: []string{},
|
||||
}
|
||||
server.RegisterAllAdmissionPlugins(options.Plugins)
|
||||
return options
|
||||
|
@ -13,6 +13,9 @@ go_library(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/retry:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
|
@ -23,10 +23,14 @@ import (
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
clientretry "k8s.io/kubernetes/pkg/client/retry"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
@ -65,10 +69,19 @@ var _ = framework.KubeDescribe("Initializers", func() {
|
||||
|
||||
// verify that we can update an initializing pod
|
||||
pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
pod.Annotations = map[string]string{"update-1": "test"}
|
||||
pod, err = c.Core().Pods(ns).Update(pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// verify the list call filters out uninitialized pods
|
||||
pods, err := c.Core().Pods(ns).List(metav1.ListOptions{IncludeUninitialized: true})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pods.Items).To(HaveLen(1))
|
||||
pods, err = c.Core().Pods(ns).List(metav1.ListOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pods.Items).To(HaveLen(0))
|
||||
|
||||
// clear initializers
|
||||
pod.Initializers = nil
|
||||
pod, err = c.Core().Pods(ns).Update(pod)
|
||||
@ -93,17 +106,120 @@ var _ = framework.KubeDescribe("Initializers", func() {
|
||||
}
|
||||
})
|
||||
|
||||
It("should dynamically register and apply initializers to pods [Serial]", func() {
|
||||
ns := f.Namespace.Name
|
||||
c := f.ClientSet
|
||||
|
||||
podName := "uninitialized-pod"
|
||||
framework.Logf("Creating pod %s", podName)
|
||||
|
||||
// create and register an initializer
|
||||
initializerName := "pod.test.e2e.kubernetes.io"
|
||||
initializerConfigName := "e2e-test-initializer"
|
||||
_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
|
||||
Initializers: []v1alpha1.Initializer{
|
||||
{
|
||||
Name: initializerName,
|
||||
Rules: []v1alpha1.Rule{
|
||||
{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
|
||||
defer func() {
|
||||
if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
|
||||
framework.Logf("got error on deleting %s", initializerConfigName)
|
||||
}
|
||||
// poller configuration is 1 second, wait at least that long
|
||||
time.Sleep(3 * time.Second)
|
||||
// clear our initializer from anyone who got it
|
||||
removeInitializersFromAllPods(c, initializerName)
|
||||
}()
|
||||
|
||||
// poller configuration is 1 second, wait at least that long
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
// run create that blocks
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
_, err := c.Core().Pods(ns).Create(newPod(podName))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
// wait until the pod shows up uninitialized
|
||||
By("Waiting until the pod is visible to a client")
|
||||
var pod *v1.Pod
|
||||
err = wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) {
|
||||
pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{IncludeUninitialized: true})
|
||||
if errors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pod.Initializers).NotTo(BeNil())
|
||||
Expect(pod.Initializers.Pending).To(HaveLen(1))
|
||||
Expect(pod.Initializers.Pending[0].Name).To(Equal(initializerName))
|
||||
|
||||
// pretend we are an initializer
|
||||
By("Completing initialization")
|
||||
pod.Initializers = nil
|
||||
pod, err = c.Core().Pods(ns).Update(pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// ensure create call returns
|
||||
<-ch
|
||||
|
||||
// pod should now start running
|
||||
err = framework.WaitForPodRunningInNamespace(c, pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// bypass initialization by explicitly passing an empty pending list
|
||||
By("Setting an empty initializer as an admin to bypass initialization")
|
||||
podName = "preinitialized-pod"
|
||||
pod = newUninitializedPod(podName)
|
||||
pod.Initializers.Pending = nil
|
||||
pod, err = c.Core().Pods(ns).Create(pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pod.Initializers).To(BeNil())
|
||||
|
||||
// bypass initialization for mirror pods
|
||||
By("Creating a mirror pod that bypasses initialization")
|
||||
podName = "mirror-pod"
|
||||
pod = newPod(podName)
|
||||
pod.Annotations = map[string]string{
|
||||
v1.MirrorPodAnnotationKey: "true",
|
||||
}
|
||||
pod.Spec.NodeName = "node-does-not-yet-exist"
|
||||
pod, err = c.Core().Pods(ns).Create(pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pod.Initializers).To(BeNil())
|
||||
Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true"))
|
||||
})
|
||||
})
|
||||
|
||||
func newUninitializedPod(podName string) *v1.Pod {
|
||||
pod := newPod(podName)
|
||||
pod.Initializers = &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: "Test"}},
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
func newPod(podName string) *v1.Pod {
|
||||
containerName := fmt.Sprintf("%s-container", podName)
|
||||
port := 8080
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Initializers: &metav1.Initializers{
|
||||
Pending: []metav1.Initializer{{Name: "Test"}},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
@ -119,3 +235,48 @@ func newUninitializedPod(podName string) *v1.Pod {
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
// removeInitializersFromAllPods walks all pods and ensures they don't have the provided initializer,
|
||||
// to guarantee completing the test doesn't block the entire cluster.
|
||||
func removeInitializersFromAllPods(c clientset.Interface, initializerName string) {
|
||||
pods, err := c.Core().Pods("").List(metav1.ListOptions{IncludeUninitialized: true})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, p := range pods.Items {
|
||||
if p.Initializers == nil {
|
||||
continue
|
||||
}
|
||||
err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
|
||||
pod, err := c.Core().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{IncludeUninitialized: true})
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if pod.Initializers == nil {
|
||||
return nil
|
||||
}
|
||||
var updated []metav1.Initializer
|
||||
for _, pending := range pod.Initializers.Pending {
|
||||
if pending.Name != initializerName {
|
||||
updated = append(updated, pending)
|
||||
}
|
||||
}
|
||||
if len(updated) == len(pod.Initializers.Pending) {
|
||||
return nil
|
||||
}
|
||||
pod.Initializers.Pending = updated
|
||||
if len(updated) == 0 {
|
||||
pod.Initializers = nil
|
||||
}
|
||||
framework.Logf("Found initializer on pod %s in ns %s", pod.Name, pod.Namespace)
|
||||
_, err = c.Core().Pods(p.Namespace).Update(pod)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
framework.Logf("Unable to remove initializer from pod %s in ns %s: %v", p.Name, p.Namespace, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user