Merge pull request #95705 from tkashem/webhook-retry

Make webhook retry backoff parameters configurable
This commit is contained in:
Kubernetes Prow Robot 2020-11-02 04:22:52 -08:00 committed by GitHub
commit cb0389c827
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 339 additions and 92 deletions

View File

@ -260,9 +260,10 @@ func TestAddFlags(t *testing.T) {
ClientCA: "/client-ca", ClientCA: "/client-ca",
}, },
WebHook: &kubeoptions.WebHookAuthenticationOptions{ WebHook: &kubeoptions.WebHookAuthenticationOptions{
CacheTTL: 180000000000, CacheTTL: 180000000000,
ConfigFile: "/token-webhook-config", ConfigFile: "/token-webhook-config",
Version: "v1beta1", Version: "v1beta1",
RetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
}, },
BootstrapToken: &kubeoptions.BootstrapTokenAuthenticationOptions{}, BootstrapToken: &kubeoptions.BootstrapTokenAuthenticationOptions{},
OIDC: &kubeoptions.OIDCAuthenticationOptions{ OIDC: &kubeoptions.OIDCAuthenticationOptions{
@ -284,6 +285,7 @@ func TestAddFlags(t *testing.T) {
WebhookCacheAuthorizedTTL: 180000000000, WebhookCacheAuthorizedTTL: 180000000000,
WebhookCacheUnauthorizedTTL: 60000000000, WebhookCacheUnauthorizedTTL: 60000000000,
WebhookVersion: "v1beta1", WebhookVersion: "v1beta1",
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
}, },
CloudProvider: &kubeoptions.CloudProviderOptions{ CloudProvider: &kubeoptions.CloudProviderOptions{
CloudConfigFile: "/cloud-config", CloudConfigFile: "/cloud-config",

View File

@ -405,8 +405,9 @@ func TestAddFlags(t *testing.T) {
BindNetwork: "tcp", BindNetwork: "tcp",
}).WithLoopback(), }).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{ Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second, CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{}, WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{ RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"}, UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"}, GroupHeaders: []string{"x-remote-group"},
@ -418,6 +419,7 @@ func TestAddFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second, AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second, DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second, ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true, RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/* AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/*
}, },

View File

@ -95,6 +95,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library",

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/dynamiccertificates"
genericoptions "k8s.io/apiserver/pkg/server/options"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
authenticationclient "k8s.io/client-go/kubernetes/typed/authentication/v1" authenticationclient "k8s.io/client-go/kubernetes/typed/authentication/v1"
authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1" authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1"
@ -84,6 +85,7 @@ func BuildAuthn(client authenticationclient.TokenReviewInterface, authn kubeletc
if client == nil { if client == nil {
return nil, nil, errors.New("no client provided, cannot use webhook authentication") return nil, nil, errors.New("no client provided, cannot use webhook authentication")
} }
authenticatorConfig.WebhookRetryBackoff = genericoptions.DefaultAuthWebhookRetryBackoff()
authenticatorConfig.TokenAccessReviewClient = client authenticatorConfig.TokenAccessReviewClient = client
} }
@ -113,6 +115,7 @@ func BuildAuthz(client authorizationclient.SubjectAccessReviewInterface, authz k
SubjectAccessReviewClient: client, SubjectAccessReviewClient: client,
AllowCacheTTL: authz.Webhook.CacheAuthorizedTTL.Duration, AllowCacheTTL: authz.Webhook.CacheAuthorizedTTL.Duration,
DenyCacheTTL: authz.Webhook.CacheUnauthorizedTTL.Duration, DenyCacheTTL: authz.Webhook.CacheUnauthorizedTTL.Duration,
WebhookRetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
} }
return authorizerConfig.New() return authorizerConfig.New()

View File

@ -12,6 +12,7 @@ go_library(
deps = [ deps = [
"//pkg/serviceaccount:go_default_library", "//pkg/serviceaccount:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library",

View File

@ -17,11 +17,13 @@ limitations under the License.
package authenticator package authenticator
import ( import (
"errors"
"time" "time"
"github.com/go-openapi/spec" "github.com/go-openapi/spec"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory" "k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/group" "k8s.io/apiserver/pkg/authentication/group"
@ -66,6 +68,10 @@ type Config struct {
WebhookTokenAuthnConfigFile string WebhookTokenAuthnConfigFile string
WebhookTokenAuthnVersion string WebhookTokenAuthnVersion string
WebhookTokenAuthnCacheTTL time.Duration WebhookTokenAuthnCacheTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
TokenSuccessCacheTTL time.Duration TokenSuccessCacheTTL time.Duration
TokenFailureCacheTTL time.Duration TokenFailureCacheTTL time.Duration
@ -280,7 +286,11 @@ func newServiceAccountAuthenticator(iss string, keyfiles []string, apiAudiences
} }
func newWebhookTokenAuthenticator(config Config) (authenticator.Token, error) { func newWebhookTokenAuthenticator(config Config) (authenticator.Token, error) {
webhookTokenAuthenticator, err := webhook.New(config.WebhookTokenAuthnConfigFile, config.WebhookTokenAuthnVersion, config.APIAudiences, config.CustomDial) if config.WebhookRetryBackoff == nil {
return nil, errors.New("retry backoff parameters for authentication webhook has not been specified")
}
webhookTokenAuthenticator, err := webhook.New(config.WebhookTokenAuthnConfigFile, config.WebhookTokenAuthnVersion, config.APIAudiences, *config.WebhookRetryBackoff, config.CustomDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -17,6 +17,7 @@ go_library(
"//plugin/pkg/auth/authorizer/rbac:go_default_library", "//plugin/pkg/auth/authorizer/rbac:go_default_library",
"//plugin/pkg/auth/authorizer/rbac/bootstrappolicy:go_default_library", "//plugin/pkg/auth/authorizer/rbac/bootstrappolicy:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library",

View File

@ -17,10 +17,12 @@ limitations under the License.
package authorizer package authorizer
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/authorization/union" "k8s.io/apiserver/pkg/authorization/union"
@ -53,6 +55,10 @@ type Config struct {
WebhookCacheAuthorizedTTL time.Duration WebhookCacheAuthorizedTTL time.Duration
// TTL for caching of unauthorized responses from the webhook server. // TTL for caching of unauthorized responses from the webhook server.
WebhookCacheUnauthorizedTTL time.Duration WebhookCacheUnauthorizedTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
VersionedInformerFactory versionedinformers.SharedInformerFactory VersionedInformerFactory versionedinformers.SharedInformerFactory
@ -104,10 +110,14 @@ func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, erro
authorizers = append(authorizers, abacAuthorizer) authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer) ruleResolvers = append(ruleResolvers, abacAuthorizer)
case modes.ModeWebhook: case modes.ModeWebhook:
if config.WebhookRetryBackoff == nil {
return nil, nil, errors.New("retry backoff parameters for authorization webhook has not been specified")
}
webhookAuthorizer, err := webhook.New(config.WebhookConfigFile, webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
config.WebhookVersion, config.WebhookVersion,
config.WebhookCacheAuthorizedTTL, config.WebhookCacheAuthorizedTTL,
config.WebhookCacheUnauthorizedTTL, config.WebhookCacheUnauthorizedTTL,
*config.WebhookRetryBackoff,
config.CustomDial) config.CustomDial)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@ -53,6 +53,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota:go_default_library",

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
@ -104,6 +105,11 @@ type WebHookAuthenticationOptions struct {
ConfigFile string ConfigFile string
Version string Version string
CacheTTL time.Duration CacheTTL time.Duration
// RetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
RetryBackoff *wait.Backoff
} }
// NewBuiltInAuthenticationOptions create a new BuiltInAuthenticationOptions, just set default token cache TTL // NewBuiltInAuthenticationOptions create a new BuiltInAuthenticationOptions, just set default token cache TTL
@ -172,8 +178,9 @@ func (o *BuiltInAuthenticationOptions) WithTokenFile() *BuiltInAuthenticationOpt
// WithWebHook set default value for web hook authentication // WithWebHook set default value for web hook authentication
func (o *BuiltInAuthenticationOptions) WithWebHook() *BuiltInAuthenticationOptions { func (o *BuiltInAuthenticationOptions) WithWebHook() *BuiltInAuthenticationOptions {
o.WebHook = &WebHookAuthenticationOptions{ o.WebHook = &WebHookAuthenticationOptions{
Version: "v1beta1", Version: "v1beta1",
CacheTTL: 2 * time.Minute, CacheTTL: 2 * time.Minute,
RetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
} }
return o return o
} }
@ -216,6 +223,13 @@ func (o *BuiltInAuthenticationOptions) Validate() []error {
} }
} }
if o.WebHook != nil {
retryBackoff := o.WebHook.RetryBackoff
if retryBackoff != nil && retryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", retryBackoff.Steps))
}
}
return allErrors return allErrors
} }
@ -415,6 +429,7 @@ func (o *BuiltInAuthenticationOptions) ToAuthenticationConfig() (kubeauthenticat
ret.WebhookTokenAuthnConfigFile = o.WebHook.ConfigFile ret.WebhookTokenAuthnConfigFile = o.WebHook.ConfigFile
ret.WebhookTokenAuthnVersion = o.WebHook.Version ret.WebhookTokenAuthnVersion = o.WebHook.Version
ret.WebhookTokenAuthnCacheTTL = o.WebHook.CacheTTL ret.WebhookTokenAuthnCacheTTL = o.WebHook.CacheTTL
ret.WebhookRetryBackoff = o.WebHook.RetryBackoff
if len(o.WebHook.ConfigFile) > 0 && o.WebHook.CacheTTL > 0 { if len(o.WebHook.ConfigFile) > 0 && o.WebHook.CacheTTL > 0 {
if o.TokenSuccessCacheTTL > 0 && o.WebHook.CacheTTL < o.TokenSuccessCacheTTL { if o.TokenSuccessCacheTTL > 0 && o.WebHook.CacheTTL < o.TokenSuccessCacheTTL {

View File

@ -24,6 +24,8 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericoptions "k8s.io/apiserver/pkg/server/options"
versionedinformers "k8s.io/client-go/informers" versionedinformers "k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
authzmodes "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" authzmodes "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
@ -37,6 +39,10 @@ type BuiltInAuthorizationOptions struct {
WebhookVersion string WebhookVersion string
WebhookCacheAuthorizedTTL time.Duration WebhookCacheAuthorizedTTL time.Duration
WebhookCacheUnauthorizedTTL time.Duration WebhookCacheUnauthorizedTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
} }
// NewBuiltInAuthorizationOptions create a BuiltInAuthorizationOptions with default value // NewBuiltInAuthorizationOptions create a BuiltInAuthorizationOptions with default value
@ -46,6 +52,7 @@ func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
WebhookVersion: "v1beta1", WebhookVersion: "v1beta1",
WebhookCacheAuthorizedTTL: 5 * time.Minute, WebhookCacheAuthorizedTTL: 5 * time.Minute,
WebhookCacheUnauthorizedTTL: 30 * time.Second, WebhookCacheUnauthorizedTTL: 30 * time.Second,
WebhookRetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
} }
} }
@ -89,6 +96,10 @@ func (o *BuiltInAuthorizationOptions) Validate() []error {
allErrors = append(allErrors, fmt.Errorf("authorization-mode %q has mode specified more than once", o.Modes)) allErrors = append(allErrors, fmt.Errorf("authorization-mode %q has mode specified more than once", o.Modes))
} }
if o.WebhookRetryBackoff != nil && o.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", o.WebhookRetryBackoff.Steps))
}
return allErrors return allErrors
} }
@ -127,5 +138,6 @@ func (o *BuiltInAuthorizationOptions) ToAuthorizationConfig(versionedInformerFac
WebhookCacheAuthorizedTTL: o.WebhookCacheAuthorizedTTL, WebhookCacheAuthorizedTTL: o.WebhookCacheAuthorizedTTL,
WebhookCacheUnauthorizedTTL: o.WebhookCacheUnauthorizedTTL, WebhookCacheUnauthorizedTTL: o.WebhookCacheUnauthorizedTTL,
VersionedInformerFactory: versionedInformerFactory, VersionedInformerFactory: versionedInformerFactory,
WebhookRetryBackoff: o.WebhookRetryBackoff,
} }
} }

View File

@ -261,7 +261,8 @@ func NewImagePolicyWebhook(configFile io.Reader) (*Plugin, error) {
return nil, err return nil, err
} }
gw, err := webhook.NewGenericWebhook(legacyscheme.Scheme, legacyscheme.Codecs, whConfig.KubeConfigFile, groupVersions, whConfig.RetryBackoff, nil) retryBackoff := webhook.DefaultRetryBackoffWithInitialDelay(whConfig.RetryBackoff)
gw, err := webhook.NewGenericWebhook(legacyscheme.Scheme, legacyscheme.Codecs, whConfig.KubeConfigFile, groupVersions, retryBackoff, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory",
importpath = "k8s.io/apiserver/pkg/authentication/authenticatorfactory", importpath = "k8s.io/apiserver/pkg/authentication/authenticatorfactory",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/request/anonymous:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/request/anonymous:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"github.com/go-openapi/spec" "github.com/go-openapi/spec"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/group" "k8s.io/apiserver/pkg/authentication/group"
"k8s.io/apiserver/pkg/authentication/request/anonymous" "k8s.io/apiserver/pkg/authentication/request/anonymous"
@ -43,6 +44,11 @@ type DelegatingAuthenticatorConfig struct {
// TokenAccessReviewClient is a client to do token review. It can be nil. Then every token is ignored. // TokenAccessReviewClient is a client to do token review. It can be nil. Then every token is ignored.
TokenAccessReviewClient authenticationclient.TokenReviewInterface TokenAccessReviewClient authenticationclient.TokenReviewInterface
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
// CacheTTL is the length of time that a token authentication answer will be cached. // CacheTTL is the length of time that a token authentication answer will be cached.
CacheTTL time.Duration CacheTTL time.Duration
@ -79,7 +85,10 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur
} }
if c.TokenAccessReviewClient != nil { if c.TokenAccessReviewClient != nil {
tokenAuth, err := webhooktoken.NewFromInterface(c.TokenAccessReviewClient, c.APIAudiences) if c.WebhookRetryBackoff == nil {
return nil, nil, errors.New("retry backoff parameters for delegating authentication webhook has not been specified")
}
tokenAuth, err := webhooktoken.NewFromInterface(c.TokenAccessReviewClient, c.APIAudiences, *c.WebhookRetryBackoff)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -25,6 +25,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory",
importpath = "k8s.io/apiserver/pkg/authorization/authorizerfactory", importpath = "k8s.io/apiserver/pkg/authorization/authorizerfactory",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authorizer/webhook:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/authorizer/webhook:go_default_library",

View File

@ -17,8 +17,10 @@ limitations under the License.
package authorizerfactory package authorizerfactory
import ( import (
"errors"
"time" "time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/plugin/pkg/authorizer/webhook" "k8s.io/apiserver/plugin/pkg/authorizer/webhook"
authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1" authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1"
@ -35,12 +37,22 @@ type DelegatingAuthorizerConfig struct {
// DenyCacheTTL is the length of time that an unsuccessful authorization response will be cached. // DenyCacheTTL is the length of time that an unsuccessful authorization response will be cached.
// You generally want more responsive, "deny, try again" flows. // You generally want more responsive, "deny, try again" flows.
DenyCacheTTL time.Duration DenyCacheTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
} }
func (c DelegatingAuthorizerConfig) New() (authorizer.Authorizer, error) { func (c DelegatingAuthorizerConfig) New() (authorizer.Authorizer, error) {
if c.WebhookRetryBackoff == nil {
return nil, errors.New("retry backoff parameters for delegating authorization webhook has not been specified")
}
return webhook.NewFromInterface( return webhook.NewFromInterface(
c.SubjectAccessReviewClient, c.SubjectAccessReviewClient,
c.AllowCacheTTL, c.AllowCacheTTL,
c.DenyCacheTTL, c.DenyCacheTTL,
*c.WebhookRetryBackoff,
) )
} }

View File

@ -34,6 +34,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/metrics:go_default_library",
@ -69,6 +70,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/audit/policy" "k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/util/webhook"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
@ -154,7 +155,7 @@ type AuditDynamicOptions struct {
func NewAuditOptions() *AuditOptions { func NewAuditOptions() *AuditOptions {
return &AuditOptions{ return &AuditOptions{
WebhookOptions: AuditWebhookOptions{ WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff, InitialBackoff: pluginwebhook.DefaultInitialBackoffDelay,
BatchOptions: AuditBatchOptions{ BatchOptions: AuditBatchOptions{
Mode: ModeBatch, Mode: ModeBatch,
BatchConfig: defaultWebhookBatchConfig(), BatchConfig: defaultWebhookBatchConfig(),
@ -569,7 +570,7 @@ func (o *AuditWebhookOptions) enabled() bool {
// this is done so that the same trucate backend can wrap both the webhook and dynamic backends // this is done so that the same trucate backend can wrap both the webhook and dynamic backends
func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc) (audit.Backend, error) { func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc) (audit.Backend, error) {
groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString) groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString)
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, groupVersion, o.InitialBackoff, customDial) webhook, err := pluginwebhook.NewBackend(o.ConfigFile, groupVersion, webhook.DefaultRetryBackoffWithInitialDelay(o.InitialBackoff), customDial)
if err != nil { if err != nil {
return nil, fmt.Errorf("initializing audit webhook: %v", err) return nil, fmt.Errorf("initializing audit webhook: %v", err)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory" "k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/request/headerrequest" "k8s.io/apiserver/pkg/authentication/request/headerrequest"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
@ -36,6 +37,17 @@ import (
openapicommon "k8s.io/kube-openapi/pkg/common" openapicommon "k8s.io/kube-openapi/pkg/common"
) )
// DefaultAuthWebhookRetryBackoff is the default backoff parameters for
// both authentication and authorization webhook used by the apiserver.
func DefaultAuthWebhookRetryBackoff() *wait.Backoff {
return &wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
}
type RequestHeaderAuthenticationOptions struct { type RequestHeaderAuthenticationOptions struct {
// ClientCAFile is the root certificate bundle to verify client certificates on incoming requests // ClientCAFile is the root certificate bundle to verify client certificates on incoming requests
// before trusting usernames in headers. // before trusting usernames in headers.
@ -177,6 +189,11 @@ type DelegatingAuthenticationOptions struct {
// TolerateInClusterLookupFailure indicates failures to look up authentication configuration from the cluster configmap should not be fatal. // TolerateInClusterLookupFailure indicates failures to look up authentication configuration from the cluster configmap should not be fatal.
// Setting this can result in an authenticator that will reject all requests. // Setting this can result in an authenticator that will reject all requests.
TolerateInClusterLookupFailure bool TolerateInClusterLookupFailure bool
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
} }
func NewDelegatingAuthenticationOptions() *DelegatingAuthenticationOptions { func NewDelegatingAuthenticationOptions() *DelegatingAuthenticationOptions {
@ -189,13 +206,23 @@ func NewDelegatingAuthenticationOptions() *DelegatingAuthenticationOptions {
GroupHeaders: []string{"x-remote-group"}, GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"}, ExtraHeaderPrefixes: []string{"x-remote-extra-"},
}, },
WebhookRetryBackoff: DefaultAuthWebhookRetryBackoff(),
} }
} }
// WithCustomRetryBackoff sets the custom backoff parameters for the authentication webhook retry logic.
func (s *DelegatingAuthenticationOptions) WithCustomRetryBackoff(backoff wait.Backoff) {
s.WebhookRetryBackoff = &backoff
}
func (s *DelegatingAuthenticationOptions) Validate() []error { func (s *DelegatingAuthenticationOptions) Validate() []error {
allErrors := []error{} allErrors := []error{}
allErrors = append(allErrors, s.RequestHeader.Validate()...) allErrors = append(allErrors, s.RequestHeader.Validate()...)
if s.WebhookRetryBackoff != nil && s.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", s.WebhookRetryBackoff.Steps))
}
return allErrors return allErrors
} }
@ -233,8 +260,9 @@ func (s *DelegatingAuthenticationOptions) ApplyTo(authenticationInfo *server.Aut
} }
cfg := authenticatorfactory.DelegatingAuthenticatorConfig{ cfg := authenticatorfactory.DelegatingAuthenticatorConfig{
Anonymous: true, Anonymous: true,
CacheTTL: s.CacheTTL, CacheTTL: s.CacheTTL,
WebhookRetryBackoff: s.WebhookRetryBackoff,
} }
client, err := s.getClient() client, err := s.getClient()

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/authorization/path" "k8s.io/apiserver/pkg/authorization/path"
@ -63,14 +64,20 @@ type DelegatingAuthorizationOptions struct {
// ClientTimeout specifies a time limit for requests made by SubjectAccessReviews client. // ClientTimeout specifies a time limit for requests made by SubjectAccessReviews client.
// The default value is set to 10 seconds. // The default value is set to 10 seconds.
ClientTimeout time.Duration ClientTimeout time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
} }
func NewDelegatingAuthorizationOptions() *DelegatingAuthorizationOptions { func NewDelegatingAuthorizationOptions() *DelegatingAuthorizationOptions {
return &DelegatingAuthorizationOptions{ return &DelegatingAuthorizationOptions{
// very low for responsiveness, but high enough to handle storms // very low for responsiveness, but high enough to handle storms
AllowCacheTTL: 10 * time.Second, AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second, DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second, ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: DefaultAuthWebhookRetryBackoff(),
} }
} }
@ -91,8 +98,18 @@ func (s *DelegatingAuthorizationOptions) WithClientTimeout(timeout time.Duration
s.ClientTimeout = timeout s.ClientTimeout = timeout
} }
// WithCustomRetryBackoff sets the custom backoff parameters for the authorization webhook retry logic.
func (s *DelegatingAuthorizationOptions) WithCustomRetryBackoff(backoff wait.Backoff) {
s.WebhookRetryBackoff = &backoff
}
func (s *DelegatingAuthorizationOptions) Validate() []error { func (s *DelegatingAuthorizationOptions) Validate() []error {
allErrors := []error{} allErrors := []error{}
if s.WebhookRetryBackoff != nil && s.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", s.WebhookRetryBackoff.Steps))
}
return allErrors return allErrors
} }
@ -159,6 +176,7 @@ func (s *DelegatingAuthorizationOptions) toAuthorizer(client kubernetes.Interfac
SubjectAccessReviewClient: client.AuthorizationV1().SubjectAccessReviews(), SubjectAccessReviewClient: client.AuthorizationV1().SubjectAccessReviews(),
AllowCacheTTL: s.AllowCacheTTL, AllowCacheTTL: s.AllowCacheTTL,
DenyCacheTTL: s.DenyCacheTTL, DenyCacheTTL: s.DenyCacheTTL,
WebhookRetryBackoff: s.WebhookRetryBackoff,
} }
delegatedAuthorizer, err := cfg.New() delegatedAuthorizer, err := cfg.New()
if err != nil { if err != nil {

View File

@ -52,6 +52,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",

View File

@ -36,12 +36,23 @@ import (
// timeout of the HTTP request, including reading the response body. // timeout of the HTTP request, including reading the response body.
const defaultRequestTimeout = 30 * time.Second const defaultRequestTimeout = 30 * time.Second
// DefaultRetryBackoffWithInitialDelay returns the default backoff parameters for webhook retry from a given initial delay.
// Handy for the client that provides a custom initial delay only.
func DefaultRetryBackoffWithInitialDelay(initialBackoffDelay time.Duration) wait.Backoff {
return wait.Backoff{
Duration: initialBackoffDelay,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
}
// GenericWebhook defines a generic client for webhooks with commonly used capabilities, // GenericWebhook defines a generic client for webhooks with commonly used capabilities,
// such as retry requests. // such as retry requests.
type GenericWebhook struct { type GenericWebhook struct {
RestClient *rest.RESTClient RestClient *rest.RESTClient
InitialBackoff time.Duration RetryBackoff wait.Backoff
ShouldRetry func(error) bool ShouldRetry func(error) bool
} }
// DefaultShouldRetry is a default implementation for the GenericWebhook ShouldRetry function property. // DefaultShouldRetry is a default implementation for the GenericWebhook ShouldRetry function property.
@ -61,11 +72,11 @@ func DefaultShouldRetry(err error) bool {
} }
// NewGenericWebhook creates a new GenericWebhook from the provided kubeconfig file. // NewGenericWebhook creates a new GenericWebhook from the provided kubeconfig file.
func NewGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) { func NewGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*GenericWebhook, error) {
return newGenericWebhook(scheme, codecFactory, kubeConfigFile, groupVersions, initialBackoff, defaultRequestTimeout, customDial) return newGenericWebhook(scheme, codecFactory, kubeConfigFile, groupVersions, retryBackoff, defaultRequestTimeout, customDial)
} }
func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, initialBackoff, requestTimeout time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) { func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, retryBackoff wait.Backoff, requestTimeout time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) {
for _, groupVersion := range groupVersions { for _, groupVersion := range groupVersions {
if !scheme.IsVersionRegistered(groupVersion) { if !scheme.IsVersionRegistered(groupVersion) {
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion) return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
@ -102,19 +113,20 @@ func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFact
return nil, err return nil, err
} }
return &GenericWebhook{restClient, initialBackoff, DefaultShouldRetry}, nil return &GenericWebhook{restClient, retryBackoff, DefaultShouldRetry}, nil
} }
// WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when // WithExponentialBackoff will retry webhookFn() as specified by the given backoff parameters with exponentially
// it returns an error for which this GenericWebhook's ShouldRetry function returns true, confirming it to // increasing backoff when it returns an error for which this GenericWebhook's ShouldRetry function returns true,
// be retriable. If no ShouldRetry has been defined for the webhook, then the default one is used (DefaultShouldRetry). // confirming it to be retriable. If no ShouldRetry has been defined for the webhook,
// then the default one is used (DefaultShouldRetry).
func (g *GenericWebhook) WithExponentialBackoff(ctx context.Context, webhookFn func() rest.Result) rest.Result { func (g *GenericWebhook) WithExponentialBackoff(ctx context.Context, webhookFn func() rest.Result) rest.Result {
var result rest.Result var result rest.Result
shouldRetry := g.ShouldRetry shouldRetry := g.ShouldRetry
if shouldRetry == nil { if shouldRetry == nil {
shouldRetry = DefaultShouldRetry shouldRetry = DefaultShouldRetry
} }
WithExponentialBackoff(ctx, g.InitialBackoff, func() error { WithExponentialBackoff(ctx, g.RetryBackoff, func() error {
result = webhookFn() result = webhookFn()
return result.Error() return result.Error()
}, shouldRetry) }, shouldRetry)
@ -123,18 +135,11 @@ func (g *GenericWebhook) WithExponentialBackoff(ctx context.Context, webhookFn f
// WithExponentialBackoff will retry webhookFn up to 5 times with exponentially increasing backoff when // WithExponentialBackoff will retry webhookFn up to 5 times with exponentially increasing backoff when
// it returns an error for which shouldRetry returns true, confirming it to be retriable. // it returns an error for which shouldRetry returns true, confirming it to be retriable.
func WithExponentialBackoff(ctx context.Context, initialBackoff time.Duration, webhookFn func() error, shouldRetry func(error) bool) error { func WithExponentialBackoff(ctx context.Context, retryBackoff wait.Backoff, webhookFn func() error, shouldRetry func(error) bool) error {
backoff := wait.Backoff{
Duration: initialBackoff,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
// having a webhook error allows us to track the last actual webhook error for requests that // having a webhook error allows us to track the last actual webhook error for requests that
// are later cancelled or time out. // are later cancelled or time out.
var webhookErr error var webhookErr error
err := wait.ExponentialBackoffWithContext(ctx, backoff, func() (bool, error) { err := wait.ExponentialBackoffWithContext(ctx, retryBackoff, func() (bool, error) {
webhookErr = webhookFn() webhookErr = webhookFn()
if shouldRetry(webhookErr) { if shouldRetry(webhookErr) {
return false, nil return false, nil

View File

@ -36,6 +36,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
v1 "k8s.io/client-go/tools/clientcmd/api/v1" v1 "k8s.io/client-go/tools/clientcmd/api/v1"
@ -69,7 +70,7 @@ var (
Name: "test-cluster", Name: "test-cluster",
} }
groupVersions = []schema.GroupVersion{} groupVersions = []schema.GroupVersion{}
retryBackoff = time.Duration(500) * time.Millisecond retryBackoff = DefaultRetryBackoffWithInitialDelay(time.Duration(500) * time.Millisecond)
) )
// TestKubeConfigFile ensures that a kube config file, regardless of validity, is handled properly // TestKubeConfigFile ensures that a kube config file, regardless of validity, is handled properly
@ -670,7 +671,8 @@ func TestWithExponentialBackoffContextIsAlreadyCanceled(t *testing.T) {
cancel() cancel()
// We don't expect the webhook function to be called since the context is already canceled. // We don't expect the webhook function to be called since the context is already canceled.
err := WithExponentialBackoff(ctx, time.Millisecond, webhookFunc, alwaysRetry) retryBackoff := wait.Backoff{Steps: 5}
err := WithExponentialBackoff(ctx, retryBackoff, webhookFunc, alwaysRetry)
errExpected := fmt.Errorf("webhook call failed: %s", context.Canceled) errExpected := fmt.Errorf("webhook call failed: %s", context.Canceled)
if errExpected.Error() != err.Error() { if errExpected.Error() != err.Error() {
@ -699,7 +701,8 @@ func TestWithExponentialBackoffWebhookErrorIsMostImportant(t *testing.T) {
} }
// webhook err has higher priority than ctx error. we expect the webhook error to be returned. // webhook err has higher priority than ctx error. we expect the webhook error to be returned.
err := WithExponentialBackoff(ctx, time.Millisecond, webhookFunc, alwaysRetry) retryBackoff := wait.Backoff{Steps: 5}
err := WithExponentialBackoff(ctx, retryBackoff, webhookFunc, alwaysRetry)
if attemptsGot != 1 { if attemptsGot != 1 {
t.Errorf("expected %d webhook attempts, but got: %d", 1, attemptsGot) t.Errorf("expected %d webhook attempts, but got: %d", 1, attemptsGot)
@ -708,3 +711,56 @@ func TestWithExponentialBackoffWebhookErrorIsMostImportant(t *testing.T) {
t.Errorf("expected error: %v, but got: %v", errExpected, err) t.Errorf("expected error: %v, but got: %v", errExpected, err)
} }
} }
func TestWithExponentialBackoffParametersNotSet(t *testing.T) {
alwaysRetry := func(e error) bool {
return true
}
attemptsGot := 0
webhookFunc := func() error {
attemptsGot++
return nil
}
err := WithExponentialBackoff(context.TODO(), wait.Backoff{}, webhookFunc, alwaysRetry)
errExpected := fmt.Errorf("webhook call failed: %s", wait.ErrWaitTimeout)
if errExpected.Error() != err.Error() {
t.Errorf("expected error: %v, but got: %v", errExpected, err)
}
if attemptsGot != 0 {
t.Errorf("expected %d webhook attempts, but got: %d", 0, attemptsGot)
}
}
func TestGenericWebhookWithExponentialBackoff(t *testing.T) {
attemptsPerCallExpected := 5
webhook := &GenericWebhook{
RetryBackoff: wait.Backoff{
Duration: time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: attemptsPerCallExpected,
},
ShouldRetry: func(e error) bool {
return true
},
}
attemptsGot := 0
webhookFunc := func() rest.Result {
attemptsGot++
return rest.Result{}
}
// number of retries should always be local to each call.
totalAttemptsExpected := attemptsPerCallExpected * 2
webhook.WithExponentialBackoff(context.TODO(), webhookFunc)
webhook.WithExponentialBackoff(context.TODO(), webhookFunc)
if totalAttemptsExpected != attemptsGot {
t.Errorf("expected a total of %d webhook attempts but got: %d", totalAttemptsExpected, attemptsGot)
}
}

View File

@ -14,6 +14,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
@ -32,6 +33,7 @@ go_library(
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/audit/install" "k8s.io/apiserver/pkg/apis/audit/install"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
@ -36,9 +37,9 @@ const (
// PluginName is the name of this plugin, to be used in help and logs. // PluginName is the name of this plugin, to be used in help and logs.
PluginName = "webhook" PluginName = "webhook"
// DefaultInitialBackoff is the default amount of time to wait before // DefaultInitialBackoffDelay is the default amount of time to wait before
// retrying sending audit events through a webhook. // retrying sending audit events through a webhook.
DefaultInitialBackoff = 10 * time.Second DefaultInitialBackoffDelay = 10 * time.Second
) )
func init() { func init() {
@ -61,9 +62,9 @@ func retryOnError(err error) bool {
return false return false
} }
func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (*webhook.GenericWebhook, error) { func loadWebhook(configFile string, groupVersion schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*webhook.GenericWebhook, error) {
w, err := webhook.NewGenericWebhook(audit.Scheme, audit.Codecs, configFile, w, err := webhook.NewGenericWebhook(audit.Scheme, audit.Codecs, configFile,
[]schema.GroupVersion{groupVersion}, initialBackoff, customDial) []schema.GroupVersion{groupVersion}, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -79,20 +80,20 @@ type backend struct {
// NewDynamicBackend returns an audit backend configured from a REST client that // NewDynamicBackend returns an audit backend configured from a REST client that
// sends events over HTTP to an external service. // sends events over HTTP to an external service.
func NewDynamicBackend(rc *rest.RESTClient, initialBackoff time.Duration) audit.Backend { func NewDynamicBackend(rc *rest.RESTClient, retryBackoff wait.Backoff) audit.Backend {
return &backend{ return &backend{
w: &webhook.GenericWebhook{ w: &webhook.GenericWebhook{
RestClient: rc, RestClient: rc,
InitialBackoff: initialBackoff, RetryBackoff: retryBackoff,
ShouldRetry: retryOnError, ShouldRetry: retryOnError,
}, },
name: fmt.Sprintf("dynamic_%s", PluginName), name: fmt.Sprintf("dynamic_%s", PluginName),
} }
} }
// NewBackend returns an audit backend that sends events over HTTP to an external service. // NewBackend returns an audit backend that sends events over HTTP to an external service.
func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (audit.Backend, error) { func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (audit.Backend, error) {
w, err := loadWebhook(kubeConfigFile, groupVersion, initialBackoff, customDial) w, err := loadWebhook(kubeConfigFile, groupVersion, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -26,6 +26,7 @@ import (
"os" "os"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
@ -106,7 +108,13 @@ func newWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion)
// NOTE(ericchiang): Do we need to use a proper serializer? // NOTE(ericchiang): Do we need to use a proper serializer?
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
b, err := NewBackend(f.Name(), groupVersion, DefaultInitialBackoff, nil) retryBackoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
b, err := NewBackend(f.Name(), groupVersion, retryBackoff, nil)
require.NoError(t, err, "initializing backend") require.NoError(t, err, "initializing backend")
return b.(*backend) return b.(*backend)

View File

@ -20,6 +20,7 @@ go_test(
"//staging/src/k8s.io/api/authentication/v1beta1:go_default_library", "//staging/src/k8s.io/api/authentication/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/token/cache:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/token/cache:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
@ -40,6 +41,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/util/webhook"
@ -37,7 +38,11 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
const retryBackoff = 500 * time.Millisecond // DefaultRetryBackoff returns the default backoff parameters for webhook retry.
func DefaultRetryBackoff() *wait.Backoff {
backoff := webhook.DefaultRetryBackoffWithInitialDelay(500 * time.Millisecond)
return &backoff
}
// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface. // Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil) var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
@ -47,16 +52,16 @@ type tokenReviewer interface {
} }
type WebhookTokenAuthenticator struct { type WebhookTokenAuthenticator struct {
tokenReview tokenReviewer tokenReview tokenReviewer
initialBackoff time.Duration retryBackoff wait.Backoff
implicitAuds authenticator.Audiences implicitAuds authenticator.Audiences
} }
// NewFromInterface creates a webhook authenticator using the given tokenReview // NewFromInterface creates a webhook authenticator using the given tokenReview
// client. It is recommend to wrap this authenticator with the token cache // client. It is recommend to wrap this authenticator with the token cache
// authenticator implemented in // authenticator implemented in
// k8s.io/apiserver/pkg/authentication/token/cache. // k8s.io/apiserver/pkg/authentication/token/cache.
func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) { func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, implicitAuds authenticator.Audiences, retryBackoff wait.Backoff) (*WebhookTokenAuthenticator, error) {
return newWithBackoff(tokenReview, retryBackoff, implicitAuds) return newWithBackoff(tokenReview, retryBackoff, implicitAuds)
} }
@ -64,8 +69,8 @@ func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, i
// file. It is recommend to wrap this authenticator with the token cache // file. It is recommend to wrap this authenticator with the token cache
// authenticator implemented in // authenticator implemented in
// k8s.io/apiserver/pkg/authentication/token/cache. // k8s.io/apiserver/pkg/authentication/token/cache.
func New(kubeConfigFile string, version string, implicitAuds authenticator.Audiences, customDial utilnet.DialFunc) (*WebhookTokenAuthenticator, error) { func New(kubeConfigFile string, version string, implicitAuds authenticator.Audiences, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*WebhookTokenAuthenticator, error) {
tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile, version, customDial) tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile, version, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,8 +78,8 @@ func New(kubeConfigFile string, version string, implicitAuds authenticator.Audie
} }
// newWithBackoff allows tests to skip the sleep. // newWithBackoff allows tests to skip the sleep.
func newWithBackoff(tokenReview tokenReviewer, initialBackoff time.Duration, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) { func newWithBackoff(tokenReview tokenReviewer, retryBackoff wait.Backoff, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) {
return &WebhookTokenAuthenticator{tokenReview, initialBackoff, implicitAuds}, nil return &WebhookTokenAuthenticator{tokenReview, retryBackoff, implicitAuds}, nil
} }
// AuthenticateToken implements the authenticator.Token interface. // AuthenticateToken implements the authenticator.Token interface.
@ -102,7 +107,7 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(ctx context.Context, token
err error err error
auds authenticator.Audiences auds authenticator.Audiences
) )
webhook.WithExponentialBackoff(ctx, w.initialBackoff, func() error { webhook.WithExponentialBackoff(ctx, w.retryBackoff, func() error {
result, err = w.tokenReview.Create(ctx, r, metav1.CreateOptions{}) result, err = w.tokenReview.Create(ctx, r, metav1.CreateOptions{})
return err return err
}, webhook.DefaultShouldRetry) }, webhook.DefaultShouldRetry)
@ -154,7 +159,7 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(ctx context.Context, token
// tokenReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file, // tokenReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file,
// and returns a TokenReviewInterface that uses that client. Note that the client submits TokenReview // and returns a TokenReviewInterface that uses that client. Note that the client submits TokenReview
// requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted. // requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted.
func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, customDial utilnet.DialFunc) (tokenReviewer, error) { func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (tokenReviewer, error) {
localScheme := runtime.NewScheme() localScheme := runtime.NewScheme()
if err := scheme.AddToScheme(localScheme); err != nil { if err := scheme.AddToScheme(localScheme); err != nil {
return nil, err return nil, err
@ -166,7 +171,7 @@ func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, c
if err := localScheme.SetVersionPriority(groupVersions...); err != nil { if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err return nil, err
} }
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial) gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -177,7 +182,7 @@ func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, c
if err := localScheme.SetVersionPriority(groupVersions...); err != nil { if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err return nil, err
} }
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial) gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -33,12 +33,20 @@ import (
authenticationv1 "k8s.io/api/authentication/v1" authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/token/cache" "k8s.io/apiserver/pkg/authentication/token/cache"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
v1 "k8s.io/client-go/tools/clientcmd/api/v1" v1 "k8s.io/client-go/tools/clientcmd/api/v1"
) )
var testRetryBackoff = wait.Backoff{
Duration: 5 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
// V1Service mocks a remote authentication service. // V1Service mocks a remote authentication service.
type V1Service interface { type V1Service interface {
// Review looks at the TokenReviewSpec and provides an authentication // Review looks at the TokenReviewSpec and provides an authentication
@ -193,12 +201,12 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte,
return nil, err return nil, err
} }
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1", nil) c, err := tokenReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
authn, err := newWithBackoff(c, 0, implicitAuds) authn, err := newWithBackoff(c, testRetryBackoff, implicitAuds)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -195,12 +195,12 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []
return nil, err return nil, err
} }
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1beta1", nil) c, err := tokenReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
authn, err := newWithBackoff(c, 0, implicitAuds) authn, err := newWithBackoff(c, testRetryBackoff, implicitAuds)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,6 +20,7 @@ go_test(
"//staging/src/k8s.io/api/authorization/v1beta1:go_default_library", "//staging/src/k8s.io/api/authorization/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
@ -40,6 +41,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/cache"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/util/webhook"
@ -40,11 +41,16 @@ import (
) )
const ( const (
retryBackoff = 500 * time.Millisecond
// The maximum length of requester-controlled attributes to allow caching. // The maximum length of requester-controlled attributes to allow caching.
maxControlledAttrCacheSize = 10000 maxControlledAttrCacheSize = 10000
) )
// DefaultRetryBackoff returns the default backoff parameters for webhook retry.
func DefaultRetryBackoff() *wait.Backoff {
backoff := webhook.DefaultRetryBackoffWithInitialDelay(500 * time.Millisecond)
return &backoff
}
// Ensure Webhook implements the authorizer.Authorizer interface. // Ensure Webhook implements the authorizer.Authorizer interface.
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil) var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)
@ -57,12 +63,12 @@ type WebhookAuthorizer struct {
responseCache *cache.LRUExpireCache responseCache *cache.LRUExpireCache
authorizedTTL time.Duration authorizedTTL time.Duration
unauthorizedTTL time.Duration unauthorizedTTL time.Duration
initialBackoff time.Duration retryBackoff wait.Backoff
decisionOnError authorizer.Decision decisionOnError authorizer.Decision
} }
// NewFromInterface creates a WebhookAuthorizer using the given subjectAccessReview client // NewFromInterface creates a WebhookAuthorizer using the given subjectAccessReview client
func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessReviewInterface, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) { func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessReviewInterface, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff) (*WebhookAuthorizer, error) {
return newWithBackoff(subjectAccessReview, authorizedTTL, unauthorizedTTL, retryBackoff) return newWithBackoff(subjectAccessReview, authorizedTTL, unauthorizedTTL, retryBackoff)
} }
@ -85,8 +91,8 @@ func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessRev
// //
// For additional HTTP configuration, refer to the kubeconfig documentation // For additional HTTP configuration, refer to the kubeconfig documentation
// https://kubernetes.io/docs/user-guide/kubeconfig-file/. // https://kubernetes.io/docs/user-guide/kubeconfig-file/.
func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL time.Duration, customDial utilnet.DialFunc) (*WebhookAuthorizer, error) { func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*WebhookAuthorizer, error) {
subjectAccessReview, err := subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile, version, customDial) subjectAccessReview, err := subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile, version, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -94,13 +100,13 @@ func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL t
} }
// newWithBackoff allows tests to skip the sleep. // newWithBackoff allows tests to skip the sleep.
func newWithBackoff(subjectAccessReview subjectAccessReviewer, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) { func newWithBackoff(subjectAccessReview subjectAccessReviewer, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff) (*WebhookAuthorizer, error) {
return &WebhookAuthorizer{ return &WebhookAuthorizer{
subjectAccessReview: subjectAccessReview, subjectAccessReview: subjectAccessReview,
responseCache: cache.NewLRUExpireCache(8192), responseCache: cache.NewLRUExpireCache(8192),
authorizedTTL: authorizedTTL, authorizedTTL: authorizedTTL,
unauthorizedTTL: unauthorizedTTL, unauthorizedTTL: unauthorizedTTL,
initialBackoff: initialBackoff, retryBackoff: retryBackoff,
decisionOnError: authorizer.DecisionNoOpinion, decisionOnError: authorizer.DecisionNoOpinion,
}, nil }, nil
} }
@ -190,7 +196,7 @@ func (w *WebhookAuthorizer) Authorize(ctx context.Context, attr authorizer.Attri
result *authorizationv1.SubjectAccessReview result *authorizationv1.SubjectAccessReview
err error err error
) )
webhook.WithExponentialBackoff(ctx, w.initialBackoff, func() error { webhook.WithExponentialBackoff(ctx, w.retryBackoff, func() error {
result, err = w.subjectAccessReview.Create(ctx, r, metav1.CreateOptions{}) result, err = w.subjectAccessReview.Create(ctx, r, metav1.CreateOptions{})
return err return err
}, webhook.DefaultShouldRetry) }, webhook.DefaultShouldRetry)
@ -246,7 +252,7 @@ func convertToSARExtra(extra map[string][]string) map[string]authorizationv1.Ext
// subjectAccessReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file, // subjectAccessReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file,
// and returns a SubjectAccessReviewInterface that uses that client. Note that the client submits SubjectAccessReview // and returns a SubjectAccessReviewInterface that uses that client. Note that the client submits SubjectAccessReview
// requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted. // requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted.
func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, customDial utilnet.DialFunc) (subjectAccessReviewer, error) { func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (subjectAccessReviewer, error) {
localScheme := runtime.NewScheme() localScheme := runtime.NewScheme()
if err := scheme.AddToScheme(localScheme); err != nil { if err := scheme.AddToScheme(localScheme); err != nil {
return nil, err return nil, err
@ -258,7 +264,7 @@ func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version s
if err := localScheme.SetVersionPriority(groupVersions...); err != nil { if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err return nil, err
} }
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial) gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -269,7 +275,7 @@ func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version s
if err := localScheme.SetVersionPriority(groupVersions...); err != nil { if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err return nil, err
} }
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial) gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -37,11 +37,19 @@ import (
authorizationv1 "k8s.io/api/authorization/v1" authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
v1 "k8s.io/client-go/tools/clientcmd/api/v1" v1 "k8s.io/client-go/tools/clientcmd/api/v1"
) )
var testRetryBackoff = wait.Backoff{
Duration: 5 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
func TestV1NewFromConfig(t *testing.T) { func TestV1NewFromConfig(t *testing.T) {
dir, err := ioutil.TempDir("", "") dir, err := ioutil.TempDir("", "")
if err != nil { if err != nil {
@ -186,11 +194,11 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err) return fmt.Errorf("failed to execute test template: %v", err)
} }
// Create a new authorizer // Create a new authorizer
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", nil) sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil { if err != nil {
return fmt.Errorf("error building sar client: %v", err) return fmt.Errorf("error building sar client: %v", err)
} }
_, err = newWithBackoff(sarClient, 0, 0, 0) _, err = newWithBackoff(sarClient, 0, 0, testRetryBackoff)
return err return err
}() }()
if err != nil && !tt.wantErr { if err != nil && !tt.wantErr {
@ -325,11 +333,11 @@ func newV1Authorizer(callbackURL string, clientCert, clientKey, ca []byte, cache
if err := json.NewEncoder(tempfile).Encode(config); err != nil { if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err return nil, err
} }
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", nil) sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("error building sar client: %v", err) return nil, fmt.Errorf("error building sar client: %v", err)
} }
return newWithBackoff(sarClient, cacheTime, cacheTime, 0) return newWithBackoff(sarClient, cacheTime, cacheTime, testRetryBackoff)
} }
func TestV1TLSConfig(t *testing.T) { func TestV1TLSConfig(t *testing.T) {

View File

@ -186,11 +186,11 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err) return fmt.Errorf("failed to execute test template: %v", err)
} }
// Create a new authorizer // Create a new authorizer
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", nil) sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil { if err != nil {
return fmt.Errorf("error building sar client: %v", err) return fmt.Errorf("error building sar client: %v", err)
} }
_, err = newWithBackoff(sarClient, 0, 0, 0) _, err = newWithBackoff(sarClient, 0, 0, testRetryBackoff)
return err return err
}() }()
if err != nil && !tt.wantErr { if err != nil && !tt.wantErr {
@ -325,11 +325,11 @@ func newV1beta1Authorizer(callbackURL string, clientCert, clientKey, ca []byte,
if err := json.NewEncoder(tempfile).Encode(config); err != nil { if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err return nil, err
} }
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", nil) sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("error building sar client: %v", err) return nil, fmt.Errorf("error building sar client: %v", err)
} }
return newWithBackoff(sarClient, cacheTime, cacheTime, 0) return newWithBackoff(sarClient, cacheTime, cacheTime, testRetryBackoff)
} }
func TestV1beta1TLSConfig(t *testing.T) { func TestV1beta1TLSConfig(t *testing.T) {

View File

@ -103,8 +103,9 @@ func TestDefaultFlags(t *testing.T) {
BindNetwork: "tcp", BindNetwork: "tcp",
}).WithLoopback(), }).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{ Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second, CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{}, WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{ RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"}, UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"}, GroupHeaders: []string{"x-remote-group"},
@ -116,6 +117,7 @@ func TestDefaultFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second, AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second, DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second, ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true, RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
}, },
@ -236,8 +238,9 @@ func TestAddFlags(t *testing.T) {
BindNetwork: "tcp", BindNetwork: "tcp",
}).WithLoopback(), }).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{ Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second, CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{}, WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{ RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"}, UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"}, GroupHeaders: []string{"x-remote-group"},
@ -249,6 +252,7 @@ func TestAddFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second, AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second, DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second, ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true, RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
}, },

View File

@ -38,6 +38,7 @@ import (
authenticationv1beta1 "k8s.io/api/authentication/v1beta1" authenticationv1beta1 "k8s.io/api/authentication/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/group" "k8s.io/apiserver/pkg/authentication/group"
"k8s.io/apiserver/pkg/authentication/request/bearertoken" "k8s.io/apiserver/pkg/authentication/request/bearertoken"
@ -86,7 +87,14 @@ func getTestWebhookTokenAuth(serverURL string, customDial utilnet.DialFunc) (aut
if err := json.NewEncoder(kubecfgFile).Encode(config); err != nil { if err := json.NewEncoder(kubecfgFile).Encode(config); err != nil {
return nil, err return nil, err
} }
webhookTokenAuth, err := webhook.New(kubecfgFile.Name(), "v1beta1", nil, customDial)
retryBackoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
webhookTokenAuth, err := webhook.New(kubecfgFile.Name(), "v1beta1", nil, retryBackoff, customDial)
if err != nil { if err != nil {
return nil, err return nil, err
} }