From 38a104219922fb5ba6fe447546ac50f641adb8c8 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Thu, 23 Jun 2016 17:37:09 -0700 Subject: [PATCH] Add a 5x exponential backoff on 429s & 5xxs to the webhook Authenticator/Authorizer. --- .../authenticator/token/webhook/webhook.go | 14 ++++++-- .../token/webhook/webhook_test.go | 2 +- plugin/pkg/auth/authorizer/webhook/webhook.go | 14 ++++++-- .../auth/authorizer/webhook/webhook_test.go | 4 +-- plugin/pkg/webhook/webhook.go | 35 +++++++++++++++++-- 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/plugin/pkg/auth/authenticator/token/webhook/webhook.go b/plugin/pkg/auth/authenticator/token/webhook/webhook.go index 43e76d00d87..b6a41083017 100644 --- a/plugin/pkg/auth/authenticator/token/webhook/webhook.go +++ b/plugin/pkg/auth/authenticator/token/webhook/webhook.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/apis/authentication.k8s.io/v1beta1" "k8s.io/kubernetes/pkg/auth/authenticator" "k8s.io/kubernetes/pkg/auth/user" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/util/cache" "k8s.io/kubernetes/plugin/pkg/webhook" @@ -35,6 +36,8 @@ var ( groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion} ) +const retryBackoff = 500 * time.Millisecond + // Ensure WebhookTokenAuthenticator implements the authenticator.Token interface. var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil) @@ -46,7 +49,12 @@ type WebhookTokenAuthenticator struct { // New creates a new WebhookTokenAuthenticator from the provided kubeconfig file. func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) { - gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions) + return newWithBackoff(kubeConfigFile, ttl, retryBackoff) +} + +// newWithBackoff allows tests to skip the sleep. +func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) { + gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff) if err != nil { return nil, err } @@ -61,7 +69,9 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info, if entry, ok := w.responseCache.Get(r.Spec); ok { r.Status = entry.(v1beta1.TokenReviewStatus) } else { - result := w.RestClient.Post().Body(r).Do() + result := w.WithExponentialBackoff(func() restclient.Result { + return w.RestClient.Post().Body(r).Do() + }) if err := result.Error(); err != nil { return nil, false, err } diff --git a/plugin/pkg/auth/authenticator/token/webhook/webhook_test.go b/plugin/pkg/auth/authenticator/token/webhook/webhook_test.go index cbf63a24603..5cfdcd3d7e0 100644 --- a/plugin/pkg/auth/authenticator/token/webhook/webhook_test.go +++ b/plugin/pkg/auth/authenticator/token/webhook/webhook_test.go @@ -148,7 +148,7 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c if err := json.NewEncoder(tempfile).Encode(config); err != nil { return nil, err } - return New(p, cacheTime) + return newWithBackoff(p, cacheTime, 0) } func TestTLSConfig(t *testing.T) { diff --git a/plugin/pkg/auth/authorizer/webhook/webhook.go b/plugin/pkg/auth/authorizer/webhook/webhook.go index 956789864f5..f4b844ea387 100644 --- a/plugin/pkg/auth/authorizer/webhook/webhook.go +++ b/plugin/pkg/auth/authorizer/webhook/webhook.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/authorization/v1beta1" "k8s.io/kubernetes/pkg/auth/authorizer" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/util/cache" "k8s.io/kubernetes/plugin/pkg/webhook" @@ -36,6 +37,8 @@ var ( groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion} ) +const retryBackoff = 500 * time.Millisecond + // Ensure Webhook implements the authorizer.Authorizer interface. var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil) @@ -67,7 +70,12 @@ type WebhookAuthorizer struct { // For additional HTTP configuration, refer to the kubeconfig documentation // http://kubernetes.io/v1.1/docs/user-guide/kubeconfig-file.html. func New(kubeConfigFile string, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) { - gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions) + return newWithBackoff(kubeConfigFile, authorizedTTL, unauthorizedTTL, retryBackoff) +} + +// newWithBackoff allows tests to skip the sleep. +func newWithBackoff(kubeConfigFile string, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) { + gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff) if err != nil { return nil, err } @@ -148,7 +156,9 @@ func (w *WebhookAuthorizer) Authorize(attr authorizer.Attributes) (err error) { if entry, ok := w.responseCache.Get(string(key)); ok { r.Status = entry.(v1beta1.SubjectAccessReviewStatus) } else { - result := w.RestClient.Post().Body(r).Do() + result := w.WithExponentialBackoff(func() restclient.Result { + return w.RestClient.Post().Body(r).Do() + }) if err := result.Error(); err != nil { return err } diff --git a/plugin/pkg/auth/authorizer/webhook/webhook_test.go b/plugin/pkg/auth/authorizer/webhook/webhook_test.go index 2f95751c02d..7d3db499881 100644 --- a/plugin/pkg/auth/authorizer/webhook/webhook_test.go +++ b/plugin/pkg/auth/authorizer/webhook/webhook_test.go @@ -183,7 +183,7 @@ current-context: default return fmt.Errorf("failed to execute test template: %v", err) } // Create a new authorizer - _, err = New(p, 0, 0) + _, err = newWithBackoff(p, 0, 0, 0) return err }() if err != nil && !tt.wantErr { @@ -291,7 +291,7 @@ func newAuthorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTi if err := json.NewEncoder(tempfile).Encode(config); err != nil { return nil, err } - return New(p, cacheTime, cacheTime) + return newWithBackoff(p, cacheTime, cacheTime, 0) } func TestTLSConfig(t *testing.T) { diff --git a/plugin/pkg/webhook/webhook.go b/plugin/pkg/webhook/webhook.go index d00d54a5e5a..a907444b95c 100644 --- a/plugin/pkg/webhook/webhook.go +++ b/plugin/pkg/webhook/webhook.go @@ -19,6 +19,7 @@ package webhook import ( "fmt" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" @@ -27,16 +28,18 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/runtime" runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer" + "k8s.io/kubernetes/pkg/util/wait" _ "k8s.io/kubernetes/pkg/apis/authorization/install" ) type GenericWebhook struct { - RestClient *restclient.RESTClient + RestClient *restclient.RESTClient + initialBackoff time.Duration } // New creates a new GenericWebhook from the provided kubeconfig file. -func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion) (*GenericWebhook, error) { +func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion, initialBackoff time.Duration) (*GenericWebhook, error) { for _, groupVersion := range groupVersions { if !registered.IsEnabledVersion(groupVersion) { return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion) @@ -64,5 +67,31 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV // TODO(ericchiang): Can we ensure remote service is reachable? - return &GenericWebhook{restClient}, nil + return &GenericWebhook{restClient, initialBackoff}, nil +} + +// WithExponentialBackoff will retry webhookFn 5 times w/ exponentially +// increasing backoff when a 429 or a 5xx response code is returned. +func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result { + backoff := wait.Backoff{ + Duration: g.initialBackoff, + Factor: 1.5, + Jitter: 0.2, + Steps: 5, + } + var result restclient.Result + wait.ExponentialBackoff(backoff, func() (bool, error) { + result = webhookFn() + // Return from Request.Do() errors immediately. + if err := result.Error(); err != nil { + return false, err + } + // Retry 429s, and 5xxs. + var statusCode int + if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 { + return false, nil + } + return true, nil + }) + return result }