diff --git a/pkg/kubeapiserver/admission/configuration/configuration_manager.go b/pkg/kubeapiserver/admission/configuration/configuration_manager.go index 450befa9539..d31b391c0b7 100644 --- a/pkg/kubeapiserver/admission/configuration/configuration_manager.go +++ b/pkg/kubeapiserver/admission/configuration/configuration_manager.go @@ -26,8 +26,10 @@ import ( ) const ( - defaultInterval = 1 * time.Second - defaultFailureThreshold = 5 + defaultInterval = 1 * time.Second + defaultFailureThreshold = 5 + defaultBootstrapRetries = 5 + defaultBootstrapGraceperiod = 5 * time.Second ) var ( @@ -47,26 +49,43 @@ type poller struct { // a function to consistently read the latest configuration get getFunc // consistent read interval + // read-only interval time.Duration // if the number of consecutive read failure equals or exceeds the failureThreshold , the // configuration is regarded as not ready. + // read-only failureThreshold int // number of consecutive failures so far. failures int + // If the poller has passed the bootstrap phase. The poller is considered + // bootstrapped either bootstrapGracePeriod after the first call of + // configuration(), or when setConfigurationAndReady() is called, whichever + // comes first. + bootstrapped bool + // configuration() retries bootstrapRetries times if poller is not bootstrapped + // read-only + bootstrapRetries int + // Grace period for bootstrapping + // read-only + bootstrapGracePeriod time.Duration + once sync.Once // if the configuration is regarded as ready. ready bool mergedConfiguration runtime.Object - // lock much be hold when reading ready or mergedConfiguration - lock sync.RWMutex - lastErr error + lastErr error + // lock must be hold when reading/writing the data fields of poller. + lock sync.RWMutex } func newPoller(get getFunc) *poller { - return &poller{ - get: get, - interval: defaultInterval, - failureThreshold: defaultFailureThreshold, + p := poller{ + get: get, + interval: defaultInterval, + failureThreshold: defaultFailureThreshold, + bootstrapRetries: defaultBootstrapRetries, + bootstrapGracePeriod: defaultBootstrapGraceperiod, } + return &p } func (a *poller) lastError(err error) { @@ -81,21 +100,47 @@ func (a *poller) notReady() { a.ready = false } +func (a *poller) bootstrapping() { + // bootstrapGracePeriod is read-only, so no lock is required + timer := time.NewTimer(a.bootstrapGracePeriod) + go func() { + <-timer.C + a.lock.Lock() + defer a.lock.Unlock() + a.bootstrapped = true + }() +} + +// If the poller is not bootstrapped yet, the configuration() gets a few chances +// to retry. This hides transient failures during system startup. func (a *poller) configuration() (runtime.Object, error) { + a.once.Do(a.bootstrapping) a.lock.RLock() defer a.lock.RUnlock() - if !a.ready { - if a.lastErr != nil { - return nil, a.lastErr - } - return nil, ErrNotReady + retries := 1 + if !a.bootstrapped { + retries = a.bootstrapRetries } - return a.mergedConfiguration, nil + for count := 0; count < retries; count++ { + if count > 0 { + a.lock.RUnlock() + time.Sleep(a.interval) + a.lock.RLock() + } + if a.ready { + return a.mergedConfiguration, nil + } + } + if a.lastErr != nil { + return nil, a.lastErr + } + return nil, ErrNotReady } func (a *poller) setConfigurationAndReady(value runtime.Object) { a.lock.Lock() defer a.lock.Unlock() + a.bootstrapped = true a.mergedConfiguration = value a.ready = true a.lastErr = nil diff --git a/pkg/kubeapiserver/admission/configuration/configuration_manager_test.go b/pkg/kubeapiserver/admission/configuration/configuration_manager_test.go new file mode 100644 index 00000000000..26c262e3ef6 --- /dev/null +++ b/pkg/kubeapiserver/admission/configuration/configuration_manager_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "math" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) + +func TestTolerateBootstrapFailure(t *testing.T) { + var fakeGetSucceed bool + var fakeGetSucceedLock sync.RWMutex + fakeGetFn := func() (runtime.Object, error) { + fakeGetSucceedLock.RLock() + defer fakeGetSucceedLock.RUnlock() + if fakeGetSucceed { + return nil, nil + } else { + return nil, fmt.Errorf("this error shouldn't be exposed to caller") + } + } + poller := newPoller(fakeGetFn) + poller.bootstrapGracePeriod = 100 * time.Second + poller.bootstrapRetries = math.MaxInt32 + // set failureThreshold to 0 so that one single failure will set "ready" to false. + poller.failureThreshold = 0 + stopCh := make(chan struct{}) + defer close(stopCh) + go poller.Run(stopCh) + go func() { + // The test might have false negative, but won't be flaky + timer := time.NewTimer(2 * time.Second) + <-timer.C + fakeGetSucceedLock.Lock() + defer fakeGetSucceedLock.Unlock() + fakeGetSucceed = true + }() + + done := make(chan struct{}) + go func(t *testing.T) { + _, err := poller.configuration() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + close(done) + }(t) + <-done +} + +func TestNotTolerateNonbootstrapFailure(t *testing.T) { + fakeGetFn := func() (runtime.Object, error) { + return nil, fmt.Errorf("this error should be exposed to caller") + } + poller := newPoller(fakeGetFn) + poller.bootstrapGracePeriod = 1 * time.Second + poller.interval = 1 * time.Millisecond + stopCh := make(chan struct{}) + defer close(stopCh) + go poller.Run(stopCh) + // to kick the bootstrap timer + go poller.configuration() + + wait.PollInfinite(1*time.Second, func() (bool, error) { + poller.lock.Lock() + defer poller.lock.Unlock() + return poller.bootstrapped, nil + }) + + _, err := poller.configuration() + if err == nil { + t.Errorf("unexpected no error") + } +} diff --git a/pkg/kubeapiserver/admission/init_test.go b/pkg/kubeapiserver/admission/init_test.go index d09e4cc601c..74d24fe58ac 100644 --- a/pkg/kubeapiserver/admission/init_test.go +++ b/pkg/kubeapiserver/admission/init_test.go @@ -22,7 +22,6 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/kubernetes/pkg/apis/admissionregistration" ) // TestAuthorizer is a testing struct for testing that fulfills the authorizer interface. @@ -123,26 +122,3 @@ func TestWantsClientCert(t *testing.T) { t.Errorf("plumbing fail - %v %v", ccw.gotCert, ccw.gotKey) } } - -type fakeHookSource struct{} - -func (f *fakeHookSource) List() ([]admissionregistration.ExternalAdmissionHook, error) { - return nil, nil -} - -type hookSourceWanter struct { - doNothingAdmission - got WebhookSource -} - -func (s *hookSourceWanter) SetWebhookSource(w WebhookSource) { s.got = w } - -func TestWantsWebhookSource(t *testing.T) { - hsw := &hookSourceWanter{} - fhs := &fakeHookSource{} - i := &PluginInitializer{} - i.SetWebhookSource(fhs).Initialize(hsw) - if got, ok := hsw.got.(*fakeHookSource); !ok || got != fhs { - t.Errorf("plumbing fail - %v %v#", ok, got) - } -} diff --git a/pkg/kubeapiserver/admission/initializer.go b/pkg/kubeapiserver/admission/initializer.go index 275fde87f37..95b83dcd3ec 100644 --- a/pkg/kubeapiserver/admission/initializer.go +++ b/pkg/kubeapiserver/admission/initializer.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/kubernetes/pkg/apis/admissionregistration" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" @@ -83,23 +82,12 @@ type WantsClientCert interface { SetClientCert(cert, key []byte) } -// WantsWebhookSource defines a function that accepts a webhook lister for the -// dynamic webhook plugin. -type WantsWebhookSource interface { - SetWebhookSource(WebhookSource) -} - // ServiceResolver knows how to convert a service reference into an actual // location. type ServiceResolver interface { ResolveEndpoint(namespace, name string) (*url.URL, error) } -// WebhookSource can list dynamic webhook plugins. -type WebhookSource interface { - List() ([]admissionregistration.ExternalAdmissionHook, error) -} - type PluginInitializer struct { internalClient internalclientset.Interface externalClient clientset.Interface @@ -109,7 +97,6 @@ type PluginInitializer struct { restMapper meta.RESTMapper quotaRegistry quota.Registry serviceResolver ServiceResolver - webhookSource WebhookSource // for proving we are apiserver in call-outs clientCert []byte @@ -155,13 +142,6 @@ func (i *PluginInitializer) SetClientCert(cert, key []byte) *PluginInitializer { return i } -// SetWebhookSource sets the webhook source-- admittedly this is probably -// specific to the external admission hook plugin. -func (i *PluginInitializer) SetWebhookSource(w WebhookSource) *PluginInitializer { - i.webhookSource = w - return i -} - // Initialize checks the initialization interfaces implemented by each plugin // and provide the appropriate initialization data func (i *PluginInitializer) Initialize(plugin admission.Interface) { @@ -206,11 +186,4 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) { } wants.SetClientCert(i.clientCert, i.clientKey) } - - if wants, ok := plugin.(WantsWebhookSource); ok { - if i.webhookSource == nil { - panic("An admission plugin wants a webhook source, but it was not provided.") - } - wants.SetWebhookSource(i.webhookSource) - } }