make admission configuration manager retry 5 times if it's not bootstrapped yet

This commit is contained in:
Chao Xu 2017-06-14 10:19:20 -07:00
parent cc568f6433
commit 4d834b22ea
4 changed files with 153 additions and 66 deletions

View File

@ -26,8 +26,10 @@ import (
)
const (
defaultInterval = 1 * time.Second
defaultFailureThreshold = 5
defaultInterval = 1 * time.Second
defaultFailureThreshold = 5
defaultBootstrapRetries = 5
defaultBootstrapGraceperiod = 5 * time.Second
)
var (
@ -47,26 +49,43 @@ type poller struct {
// a function to consistently read the latest configuration
get getFunc
// consistent read interval
// read-only
interval time.Duration
// if the number of consecutive read failure equals or exceeds the failureThreshold , the
// configuration is regarded as not ready.
// read-only
failureThreshold int
// number of consecutive failures so far.
failures int
// If the poller has passed the bootstrap phase. The poller is considered
// bootstrapped either bootstrapGracePeriod after the first call of
// configuration(), or when setConfigurationAndReady() is called, whichever
// comes first.
bootstrapped bool
// configuration() retries bootstrapRetries times if poller is not bootstrapped
// read-only
bootstrapRetries int
// Grace period for bootstrapping
// read-only
bootstrapGracePeriod time.Duration
once sync.Once
// if the configuration is regarded as ready.
ready bool
mergedConfiguration runtime.Object
// lock much be hold when reading ready or mergedConfiguration
lock sync.RWMutex
lastErr error
lastErr error
// lock must be hold when reading/writing the data fields of poller.
lock sync.RWMutex
}
func newPoller(get getFunc) *poller {
return &poller{
get: get,
interval: defaultInterval,
failureThreshold: defaultFailureThreshold,
p := poller{
get: get,
interval: defaultInterval,
failureThreshold: defaultFailureThreshold,
bootstrapRetries: defaultBootstrapRetries,
bootstrapGracePeriod: defaultBootstrapGraceperiod,
}
return &p
}
func (a *poller) lastError(err error) {
@ -81,21 +100,47 @@ func (a *poller) notReady() {
a.ready = false
}
func (a *poller) bootstrapping() {
// bootstrapGracePeriod is read-only, so no lock is required
timer := time.NewTimer(a.bootstrapGracePeriod)
go func() {
<-timer.C
a.lock.Lock()
defer a.lock.Unlock()
a.bootstrapped = true
}()
}
// If the poller is not bootstrapped yet, the configuration() gets a few chances
// to retry. This hides transient failures during system startup.
func (a *poller) configuration() (runtime.Object, error) {
a.once.Do(a.bootstrapping)
a.lock.RLock()
defer a.lock.RUnlock()
if !a.ready {
if a.lastErr != nil {
return nil, a.lastErr
}
return nil, ErrNotReady
retries := 1
if !a.bootstrapped {
retries = a.bootstrapRetries
}
return a.mergedConfiguration, nil
for count := 0; count < retries; count++ {
if count > 0 {
a.lock.RUnlock()
time.Sleep(a.interval)
a.lock.RLock()
}
if a.ready {
return a.mergedConfiguration, nil
}
}
if a.lastErr != nil {
return nil, a.lastErr
}
return nil, ErrNotReady
}
func (a *poller) setConfigurationAndReady(value runtime.Object) {
a.lock.Lock()
defer a.lock.Unlock()
a.bootstrapped = true
a.mergedConfiguration = value
a.ready = true
a.lastErr = nil

View File

@ -0,0 +1,93 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configuration
import (
"fmt"
"math"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
func TestTolerateBootstrapFailure(t *testing.T) {
var fakeGetSucceed bool
var fakeGetSucceedLock sync.RWMutex
fakeGetFn := func() (runtime.Object, error) {
fakeGetSucceedLock.RLock()
defer fakeGetSucceedLock.RUnlock()
if fakeGetSucceed {
return nil, nil
} else {
return nil, fmt.Errorf("this error shouldn't be exposed to caller")
}
}
poller := newPoller(fakeGetFn)
poller.bootstrapGracePeriod = 100 * time.Second
poller.bootstrapRetries = math.MaxInt32
// set failureThreshold to 0 so that one single failure will set "ready" to false.
poller.failureThreshold = 0
stopCh := make(chan struct{})
defer close(stopCh)
go poller.Run(stopCh)
go func() {
// The test might have false negative, but won't be flaky
timer := time.NewTimer(2 * time.Second)
<-timer.C
fakeGetSucceedLock.Lock()
defer fakeGetSucceedLock.Unlock()
fakeGetSucceed = true
}()
done := make(chan struct{})
go func(t *testing.T) {
_, err := poller.configuration()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
close(done)
}(t)
<-done
}
func TestNotTolerateNonbootstrapFailure(t *testing.T) {
fakeGetFn := func() (runtime.Object, error) {
return nil, fmt.Errorf("this error should be exposed to caller")
}
poller := newPoller(fakeGetFn)
poller.bootstrapGracePeriod = 1 * time.Second
poller.interval = 1 * time.Millisecond
stopCh := make(chan struct{})
defer close(stopCh)
go poller.Run(stopCh)
// to kick the bootstrap timer
go poller.configuration()
wait.PollInfinite(1*time.Second, func() (bool, error) {
poller.lock.Lock()
defer poller.lock.Unlock()
return poller.bootstrapped, nil
})
_, err := poller.configuration()
if err == nil {
t.Errorf("unexpected no error")
}
}

View File

@ -22,7 +22,6 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/pkg/apis/admissionregistration"
)
// TestAuthorizer is a testing struct for testing that fulfills the authorizer interface.
@ -123,26 +122,3 @@ func TestWantsClientCert(t *testing.T) {
t.Errorf("plumbing fail - %v %v", ccw.gotCert, ccw.gotKey)
}
}
type fakeHookSource struct{}
func (f *fakeHookSource) List() ([]admissionregistration.ExternalAdmissionHook, error) {
return nil, nil
}
type hookSourceWanter struct {
doNothingAdmission
got WebhookSource
}
func (s *hookSourceWanter) SetWebhookSource(w WebhookSource) { s.got = w }
func TestWantsWebhookSource(t *testing.T) {
hsw := &hookSourceWanter{}
fhs := &fakeHookSource{}
i := &PluginInitializer{}
i.SetWebhookSource(fhs).Initialize(hsw)
if got, ok := hsw.got.(*fakeHookSource); !ok || got != fhs {
t.Errorf("plumbing fail - %v %v#", ok, got)
}
}

View File

@ -22,7 +22,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/pkg/apis/admissionregistration"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
@ -83,23 +82,12 @@ type WantsClientCert interface {
SetClientCert(cert, key []byte)
}
// WantsWebhookSource defines a function that accepts a webhook lister for the
// dynamic webhook plugin.
type WantsWebhookSource interface {
SetWebhookSource(WebhookSource)
}
// ServiceResolver knows how to convert a service reference into an actual
// location.
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
// WebhookSource can list dynamic webhook plugins.
type WebhookSource interface {
List() ([]admissionregistration.ExternalAdmissionHook, error)
}
type PluginInitializer struct {
internalClient internalclientset.Interface
externalClient clientset.Interface
@ -109,7 +97,6 @@ type PluginInitializer struct {
restMapper meta.RESTMapper
quotaRegistry quota.Registry
serviceResolver ServiceResolver
webhookSource WebhookSource
// for proving we are apiserver in call-outs
clientCert []byte
@ -155,13 +142,6 @@ func (i *PluginInitializer) SetClientCert(cert, key []byte) *PluginInitializer {
return i
}
// SetWebhookSource sets the webhook source-- admittedly this is probably
// specific to the external admission hook plugin.
func (i *PluginInitializer) SetWebhookSource(w WebhookSource) *PluginInitializer {
i.webhookSource = w
return i
}
// Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *PluginInitializer) Initialize(plugin admission.Interface) {
@ -206,11 +186,4 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) {
}
wants.SetClientCert(i.clientCert, i.clientKey)
}
if wants, ok := plugin.(WantsWebhookSource); ok {
if i.webhookSource == nil {
panic("An admission plugin wants a webhook source, but it was not provided.")
}
wants.SetWebhookSource(i.webhookSource)
}
}