make backoff parameters configurable for webhook

Currently webhook retry backoff parameters are hard coded, we want
to have the ability to configure the backoff parameters for webhook
retry logic.
This commit is contained in:
Abu Kashem 2020-10-30 11:25:32 -04:00
parent a0d23de086
commit 53a1307f68
No known key found for this signature in database
GPG Key ID: 76146D1A14E658ED
34 changed files with 346 additions and 92 deletions

View File

@ -260,9 +260,10 @@ func TestAddFlags(t *testing.T) {
ClientCA: "/client-ca",
},
WebHook: &kubeoptions.WebHookAuthenticationOptions{
CacheTTL: 180000000000,
ConfigFile: "/token-webhook-config",
Version: "v1beta1",
CacheTTL: 180000000000,
ConfigFile: "/token-webhook-config",
Version: "v1beta1",
RetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
},
BootstrapToken: &kubeoptions.BootstrapTokenAuthenticationOptions{},
OIDC: &kubeoptions.OIDCAuthenticationOptions{
@ -284,6 +285,7 @@ func TestAddFlags(t *testing.T) {
WebhookCacheAuthorizedTTL: 180000000000,
WebhookCacheUnauthorizedTTL: 60000000000,
WebhookVersion: "v1beta1",
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
},
CloudProvider: &kubeoptions.CloudProviderOptions{
CloudConfigFile: "/cloud-config",

View File

@ -405,8 +405,9 @@ func TestAddFlags(t *testing.T) {
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
CacheTTL: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
@ -418,6 +419,7 @@ func TestAddFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/*
},

View File

@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/serviceaccount:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library",
@ -25,6 +26,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/authentication/token/tokenfile:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/token/union:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/oidc:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook:go_default_library",
"//staging/src/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"github.com/go-openapi/spec"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/group"
@ -35,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/authentication/token/tokenfile"
tokenunion "k8s.io/apiserver/pkg/authentication/token/union"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/plugin/pkg/authenticator/token/oidc"
"k8s.io/apiserver/plugin/pkg/authenticator/token/webhook"
@ -66,6 +68,10 @@ type Config struct {
WebhookTokenAuthnConfigFile string
WebhookTokenAuthnVersion string
WebhookTokenAuthnCacheTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
TokenSuccessCacheTTL time.Duration
TokenFailureCacheTTL time.Duration
@ -280,7 +286,13 @@ func newServiceAccountAuthenticator(iss string, keyfiles []string, apiAudiences
}
func newWebhookTokenAuthenticator(config Config) (authenticator.Token, error) {
webhookTokenAuthenticator, err := webhook.New(config.WebhookTokenAuthnConfigFile, config.WebhookTokenAuthnVersion, config.APIAudiences, config.CustomDial)
// Provide a default if WebhookRetryBackoff has not been set by the user.
retryBackoff := config.WebhookRetryBackoff
if retryBackoff == nil {
retryBackoff = genericoptions.DefaultAuthWebhookRetryBackoff()
}
webhookTokenAuthenticator, err := webhook.New(config.WebhookTokenAuthnConfigFile, config.WebhookTokenAuthnVersion, config.APIAudiences, *retryBackoff, config.CustomDial)
if err != nil {
return nil, err
}

View File

@ -17,9 +17,11 @@ go_library(
"//plugin/pkg/auth/authorizer/rbac:go_default_library",
"//plugin/pkg/auth/authorizer/rbac/bootstrappolicy:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authorizer/webhook:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
],

View File

@ -21,9 +21,11 @@ import (
"time"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/authorization/union"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/plugin/pkg/authorizer/webhook"
versionedinformers "k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/auth/authorizer/abac"
@ -53,6 +55,10 @@ type Config struct {
WebhookCacheAuthorizedTTL time.Duration
// TTL for caching of unauthorized responses from the webhook server.
WebhookCacheUnauthorizedTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
VersionedInformerFactory versionedinformers.SharedInformerFactory
@ -104,10 +110,17 @@ func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, erro
authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer)
case modes.ModeWebhook:
// Provide a default if WebhookRetryBackoff has not been set by the user.
retryBackoff := config.WebhookRetryBackoff
if retryBackoff == nil {
retryBackoff = genericoptions.DefaultAuthWebhookRetryBackoff()
}
webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
config.WebhookVersion,
config.WebhookCacheAuthorizedTTL,
config.WebhookCacheUnauthorizedTTL,
*retryBackoff,
config.CustomDial)
if err != nil {
return nil, nil, err

View File

@ -53,6 +53,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota:go_default_library",

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
@ -104,6 +105,11 @@ type WebHookAuthenticationOptions struct {
ConfigFile string
Version string
CacheTTL time.Duration
// RetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
RetryBackoff *wait.Backoff
}
// NewBuiltInAuthenticationOptions create a new BuiltInAuthenticationOptions, just set default token cache TTL
@ -172,8 +178,9 @@ func (o *BuiltInAuthenticationOptions) WithTokenFile() *BuiltInAuthenticationOpt
// WithWebHook set default value for web hook authentication
func (o *BuiltInAuthenticationOptions) WithWebHook() *BuiltInAuthenticationOptions {
o.WebHook = &WebHookAuthenticationOptions{
Version: "v1beta1",
CacheTTL: 2 * time.Minute,
Version: "v1beta1",
CacheTTL: 2 * time.Minute,
RetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
}
return o
}
@ -216,6 +223,13 @@ func (o *BuiltInAuthenticationOptions) Validate() []error {
}
}
if o.WebHook != nil {
retryBackoff := o.WebHook.RetryBackoff
if retryBackoff != nil && retryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", retryBackoff.Steps))
}
}
return allErrors
}
@ -415,6 +429,7 @@ func (o *BuiltInAuthenticationOptions) ToAuthenticationConfig() (kubeauthenticat
ret.WebhookTokenAuthnConfigFile = o.WebHook.ConfigFile
ret.WebhookTokenAuthnVersion = o.WebHook.Version
ret.WebhookTokenAuthnCacheTTL = o.WebHook.CacheTTL
ret.WebhookRetryBackoff = o.WebHook.RetryBackoff
if len(o.WebHook.ConfigFile) > 0 && o.WebHook.CacheTTL > 0 {
if o.TokenSuccessCacheTTL > 0 && o.WebHook.CacheTTL < o.TokenSuccessCacheTTL {

View File

@ -24,6 +24,8 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericoptions "k8s.io/apiserver/pkg/server/options"
versionedinformers "k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
authzmodes "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
@ -37,6 +39,10 @@ type BuiltInAuthorizationOptions struct {
WebhookVersion string
WebhookCacheAuthorizedTTL time.Duration
WebhookCacheUnauthorizedTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
}
// NewBuiltInAuthorizationOptions create a BuiltInAuthorizationOptions with default value
@ -46,6 +52,7 @@ func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
WebhookVersion: "v1beta1",
WebhookCacheAuthorizedTTL: 5 * time.Minute,
WebhookCacheUnauthorizedTTL: 30 * time.Second,
WebhookRetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
}
}
@ -89,6 +96,10 @@ func (o *BuiltInAuthorizationOptions) Validate() []error {
allErrors = append(allErrors, fmt.Errorf("authorization-mode %q has mode specified more than once", o.Modes))
}
if o.WebhookRetryBackoff != nil && o.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", o.WebhookRetryBackoff.Steps))
}
return allErrors
}
@ -127,5 +138,6 @@ func (o *BuiltInAuthorizationOptions) ToAuthorizationConfig(versionedInformerFac
WebhookCacheAuthorizedTTL: o.WebhookCacheAuthorizedTTL,
WebhookCacheUnauthorizedTTL: o.WebhookCacheUnauthorizedTTL,
VersionedInformerFactory: versionedInformerFactory,
WebhookRetryBackoff: o.WebhookRetryBackoff,
}
}

View File

@ -261,7 +261,8 @@ func NewImagePolicyWebhook(configFile io.Reader) (*Plugin, error) {
return nil, err
}
gw, err := webhook.NewGenericWebhook(legacyscheme.Scheme, legacyscheme.Codecs, whConfig.KubeConfigFile, groupVersions, whConfig.RetryBackoff, nil)
retryBackoff := webhook.DefaultRetryBackoffWithInitialDelay(whConfig.RetryBackoff)
gw, err := webhook.NewGenericWebhook(legacyscheme.Scheme, legacyscheme.Codecs, whConfig.KubeConfigFile, groupVersions, retryBackoff, nil)
if err != nil {
return nil, err
}

View File

@ -15,6 +15,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory",
importpath = "k8s.io/apiserver/pkg/authentication/authenticatorfactory",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/group:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/request/anonymous:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"github.com/go-openapi/spec"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/group"
"k8s.io/apiserver/pkg/authentication/request/anonymous"
@ -43,6 +44,11 @@ type DelegatingAuthenticatorConfig struct {
// TokenAccessReviewClient is a client to do token review. It can be nil. Then every token is ignored.
TokenAccessReviewClient authenticationclient.TokenReviewInterface
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
// CacheTTL is the length of time that a token authentication answer will be cached.
CacheTTL time.Duration
@ -79,7 +85,13 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur
}
if c.TokenAccessReviewClient != nil {
tokenAuth, err := webhooktoken.NewFromInterface(c.TokenAccessReviewClient, c.APIAudiences)
// Provide a default if WebhookRetryBackoff has not been set by the user.
retryBackoff := c.WebhookRetryBackoff
if retryBackoff == nil {
retryBackoff = webhooktoken.DefaultRetryBackoff()
}
tokenAuth, err := webhooktoken.NewFromInterface(c.TokenAccessReviewClient, c.APIAudiences, *retryBackoff)
if err != nil {
return nil, nil, err
}

View File

@ -25,6 +25,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory",
importpath = "k8s.io/apiserver/pkg/authorization/authorizerfactory",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authorizer/webhook:go_default_library",

View File

@ -19,6 +19,7 @@ package authorizerfactory
import (
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/plugin/pkg/authorizer/webhook"
authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1"
@ -35,12 +36,24 @@ type DelegatingAuthorizerConfig struct {
// DenyCacheTTL is the length of time that an unsuccessful authorization response will be cached.
// You generally want more responsive, "deny, try again" flows.
DenyCacheTTL time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
}
func (c DelegatingAuthorizerConfig) New() (authorizer.Authorizer, error) {
// Provide a default if WebhookRetryBackoff has not been set by the user.
retryBackoff := c.WebhookRetryBackoff
if retryBackoff == nil {
retryBackoff = webhook.DefaultRetryBackoff()
}
return webhook.NewFromInterface(
c.SubjectAccessReviewClient,
c.AllowCacheTTL,
c.DenyCacheTTL,
*retryBackoff,
)
}

View File

@ -34,6 +34,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/metrics:go_default_library",
@ -69,6 +70,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/util/webhook"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
@ -154,7 +155,7 @@ type AuditDynamicOptions struct {
func NewAuditOptions() *AuditOptions {
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
InitialBackoff: pluginwebhook.DefaultInitialBackoffDelay,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: defaultWebhookBatchConfig(),
@ -569,7 +570,7 @@ func (o *AuditWebhookOptions) enabled() bool {
// this is done so that the same trucate backend can wrap both the webhook and dynamic backends
func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc) (audit.Backend, error) {
groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString)
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, groupVersion, o.InitialBackoff, customDial)
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, groupVersion, webhook.DefaultRetryBackoffWithInitialDelay(o.InitialBackoff), customDial)
if err != nil {
return nil, fmt.Errorf("initializing audit webhook: %v", err)
}

View File

@ -26,6 +26,7 @@ import (
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/request/headerrequest"
"k8s.io/apiserver/pkg/server"
@ -36,6 +37,17 @@ import (
openapicommon "k8s.io/kube-openapi/pkg/common"
)
// DefaultAuthWebhookRetryBackoff is the default backoff parameters for
// both authentication and authorization webhook used by the apiserver.
func DefaultAuthWebhookRetryBackoff() *wait.Backoff {
return &wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
}
type RequestHeaderAuthenticationOptions struct {
// ClientCAFile is the root certificate bundle to verify client certificates on incoming requests
// before trusting usernames in headers.
@ -177,6 +189,11 @@ type DelegatingAuthenticationOptions struct {
// TolerateInClusterLookupFailure indicates failures to look up authentication configuration from the cluster configmap should not be fatal.
// Setting this can result in an authenticator that will reject all requests.
TolerateInClusterLookupFailure bool
// WebhookRetryBackoff specifies the backoff parameters for the authentication webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
}
func NewDelegatingAuthenticationOptions() *DelegatingAuthenticationOptions {
@ -189,13 +206,23 @@ func NewDelegatingAuthenticationOptions() *DelegatingAuthenticationOptions {
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
WebhookRetryBackoff: DefaultAuthWebhookRetryBackoff(),
}
}
// WithCustomRetryBackoff sets the custom backoff parameters for the authentication webhook retry logic.
func (s *DelegatingAuthenticationOptions) WithCustomRetryBackoff(backoff wait.Backoff) {
s.WebhookRetryBackoff = &backoff
}
func (s *DelegatingAuthenticationOptions) Validate() []error {
allErrors := []error{}
allErrors = append(allErrors, s.RequestHeader.Validate()...)
if s.WebhookRetryBackoff != nil && s.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", s.WebhookRetryBackoff.Steps))
}
return allErrors
}
@ -233,8 +260,9 @@ func (s *DelegatingAuthenticationOptions) ApplyTo(authenticationInfo *server.Aut
}
cfg := authenticatorfactory.DelegatingAuthenticatorConfig{
Anonymous: true,
CacheTTL: s.CacheTTL,
Anonymous: true,
CacheTTL: s.CacheTTL,
WebhookRetryBackoff: s.WebhookRetryBackoff,
}
client, err := s.getClient()

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/authorization/path"
@ -63,14 +64,20 @@ type DelegatingAuthorizationOptions struct {
// ClientTimeout specifies a time limit for requests made by SubjectAccessReviews client.
// The default value is set to 10 seconds.
ClientTimeout time.Duration
// WebhookRetryBackoff specifies the backoff parameters for the authorization webhook retry logic.
// This allows us to configure the sleep time at each iteration and the maximum number of retries allowed
// before we fail the webhook call in order to limit the fan out that ensues when the system is degraded.
WebhookRetryBackoff *wait.Backoff
}
func NewDelegatingAuthorizationOptions() *DelegatingAuthorizationOptions {
return &DelegatingAuthorizationOptions{
// very low for responsiveness, but high enough to handle storms
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: DefaultAuthWebhookRetryBackoff(),
}
}
@ -91,8 +98,18 @@ func (s *DelegatingAuthorizationOptions) WithClientTimeout(timeout time.Duration
s.ClientTimeout = timeout
}
// WithCustomRetryBackoff sets the custom backoff parameters for the authorization webhook retry logic.
func (s *DelegatingAuthorizationOptions) WithCustomRetryBackoff(backoff wait.Backoff) {
s.WebhookRetryBackoff = &backoff
}
func (s *DelegatingAuthorizationOptions) Validate() []error {
allErrors := []error{}
if s.WebhookRetryBackoff != nil && s.WebhookRetryBackoff.Steps <= 0 {
allErrors = append(allErrors, fmt.Errorf("number of webhook retry attempts must be greater than 1, but is: %d", s.WebhookRetryBackoff.Steps))
}
return allErrors
}
@ -159,6 +176,7 @@ func (s *DelegatingAuthorizationOptions) toAuthorizer(client kubernetes.Interfac
SubjectAccessReviewClient: client.AuthorizationV1().SubjectAccessReviews(),
AllowCacheTTL: s.AllowCacheTTL,
DenyCacheTTL: s.DenyCacheTTL,
WebhookRetryBackoff: s.WebhookRetryBackoff,
}
delegatedAuthorizer, err := cfg.New()
if err != nil {

View File

@ -52,6 +52,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",

View File

@ -36,12 +36,23 @@ import (
// timeout of the HTTP request, including reading the response body.
const defaultRequestTimeout = 30 * time.Second
// DefaultRetryBackoffWithInitialDelay returns the default backoff parameters for webhook retry from a given initial delay.
// Handy for the client that provides a custom initial delay only.
func DefaultRetryBackoffWithInitialDelay(initialBackoffDelay time.Duration) wait.Backoff {
return wait.Backoff{
Duration: initialBackoffDelay,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
}
// GenericWebhook defines a generic client for webhooks with commonly used capabilities,
// such as retry requests.
type GenericWebhook struct {
RestClient *rest.RESTClient
InitialBackoff time.Duration
ShouldRetry func(error) bool
RestClient *rest.RESTClient
RetryBackoff wait.Backoff
ShouldRetry func(error) bool
}
// DefaultShouldRetry is a default implementation for the GenericWebhook ShouldRetry function property.
@ -61,11 +72,11 @@ func DefaultShouldRetry(err error) bool {
}
// NewGenericWebhook creates a new GenericWebhook from the provided kubeconfig file.
func NewGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) {
return newGenericWebhook(scheme, codecFactory, kubeConfigFile, groupVersions, initialBackoff, defaultRequestTimeout, customDial)
func NewGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*GenericWebhook, error) {
return newGenericWebhook(scheme, codecFactory, kubeConfigFile, groupVersions, retryBackoff, defaultRequestTimeout, customDial)
}
func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, initialBackoff, requestTimeout time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) {
func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFactory, kubeConfigFile string, groupVersions []schema.GroupVersion, retryBackoff wait.Backoff, requestTimeout time.Duration, customDial utilnet.DialFunc) (*GenericWebhook, error) {
for _, groupVersion := range groupVersions {
if !scheme.IsVersionRegistered(groupVersion) {
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
@ -102,19 +113,20 @@ func newGenericWebhook(scheme *runtime.Scheme, codecFactory serializer.CodecFact
return nil, err
}
return &GenericWebhook{restClient, initialBackoff, DefaultShouldRetry}, nil
return &GenericWebhook{restClient, retryBackoff, DefaultShouldRetry}, nil
}
// WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when
// it returns an error for which this GenericWebhook's ShouldRetry function returns true, confirming it to
// be retriable. If no ShouldRetry has been defined for the webhook, then the default one is used (DefaultShouldRetry).
// WithExponentialBackoff will retry webhookFn() as specified by the given backoff parameters with exponentially
// increasing backoff when it returns an error for which this GenericWebhook's ShouldRetry function returns true,
// confirming it to be retriable. If no ShouldRetry has been defined for the webhook,
// then the default one is used (DefaultShouldRetry).
func (g *GenericWebhook) WithExponentialBackoff(ctx context.Context, webhookFn func() rest.Result) rest.Result {
var result rest.Result
shouldRetry := g.ShouldRetry
if shouldRetry == nil {
shouldRetry = DefaultShouldRetry
}
WithExponentialBackoff(ctx, g.InitialBackoff, func() error {
WithExponentialBackoff(ctx, g.RetryBackoff, func() error {
result = webhookFn()
return result.Error()
}, shouldRetry)
@ -123,18 +135,11 @@ func (g *GenericWebhook) WithExponentialBackoff(ctx context.Context, webhookFn f
// WithExponentialBackoff will retry webhookFn up to 5 times with exponentially increasing backoff when
// it returns an error for which shouldRetry returns true, confirming it to be retriable.
func WithExponentialBackoff(ctx context.Context, initialBackoff time.Duration, webhookFn func() error, shouldRetry func(error) bool) error {
backoff := wait.Backoff{
Duration: initialBackoff,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
func WithExponentialBackoff(ctx context.Context, retryBackoff wait.Backoff, webhookFn func() error, shouldRetry func(error) bool) error {
// having a webhook error allows us to track the last actual webhook error for requests that
// are later cancelled or time out.
var webhookErr error
err := wait.ExponentialBackoffWithContext(ctx, backoff, func() (bool, error) {
err := wait.ExponentialBackoffWithContext(ctx, retryBackoff, func() (bool, error) {
webhookErr = webhookFn()
if shouldRetry(webhookErr) {
return false, nil

View File

@ -36,6 +36,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
@ -69,7 +70,7 @@ var (
Name: "test-cluster",
}
groupVersions = []schema.GroupVersion{}
retryBackoff = time.Duration(500) * time.Millisecond
retryBackoff = DefaultRetryBackoffWithInitialDelay(time.Duration(500) * time.Millisecond)
)
// TestKubeConfigFile ensures that a kube config file, regardless of validity, is handled properly
@ -670,7 +671,8 @@ func TestWithExponentialBackoffContextIsAlreadyCanceled(t *testing.T) {
cancel()
// We don't expect the webhook function to be called since the context is already canceled.
err := WithExponentialBackoff(ctx, time.Millisecond, webhookFunc, alwaysRetry)
retryBackoff := wait.Backoff{Steps: 5}
err := WithExponentialBackoff(ctx, retryBackoff, webhookFunc, alwaysRetry)
errExpected := fmt.Errorf("webhook call failed: %s", context.Canceled)
if errExpected.Error() != err.Error() {
@ -699,7 +701,8 @@ func TestWithExponentialBackoffWebhookErrorIsMostImportant(t *testing.T) {
}
// webhook err has higher priority than ctx error. we expect the webhook error to be returned.
err := WithExponentialBackoff(ctx, time.Millisecond, webhookFunc, alwaysRetry)
retryBackoff := wait.Backoff{Steps: 5}
err := WithExponentialBackoff(ctx, retryBackoff, webhookFunc, alwaysRetry)
if attemptsGot != 1 {
t.Errorf("expected %d webhook attempts, but got: %d", 1, attemptsGot)
@ -708,3 +711,56 @@ func TestWithExponentialBackoffWebhookErrorIsMostImportant(t *testing.T) {
t.Errorf("expected error: %v, but got: %v", errExpected, err)
}
}
func TestWithExponentialBackoffParametersNotSet(t *testing.T) {
alwaysRetry := func(e error) bool {
return true
}
attemptsGot := 0
webhookFunc := func() error {
attemptsGot++
return nil
}
err := WithExponentialBackoff(context.TODO(), wait.Backoff{}, webhookFunc, alwaysRetry)
errExpected := fmt.Errorf("webhook call failed: %s", wait.ErrWaitTimeout)
if errExpected.Error() != err.Error() {
t.Errorf("expected error: %v, but got: %v", errExpected, err)
}
if attemptsGot != 0 {
t.Errorf("expected %d webhook attempts, but got: %d", 0, attemptsGot)
}
}
func TestGenericWebhookWithExponentialBackoff(t *testing.T) {
attemptsPerCallExpected := 5
webhook := &GenericWebhook{
RetryBackoff: wait.Backoff{
Duration: time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: attemptsPerCallExpected,
},
ShouldRetry: func(e error) bool {
return true
},
}
attemptsGot := 0
webhookFunc := func() rest.Result {
attemptsGot++
return rest.Result{}
}
// number of retries should always be local to each call.
totalAttemptsExpected := attemptsPerCallExpected * 2
webhook.WithExponentialBackoff(context.TODO(), webhookFunc)
webhook.WithExponentialBackoff(context.TODO(), webhookFunc)
if totalAttemptsExpected != attemptsGot {
t.Errorf("expected a total of %d webhook attempts but got: %d", totalAttemptsExpected, attemptsGot)
}
}

View File

@ -14,6 +14,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
@ -32,6 +33,7 @@ go_library(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/audit/install"
"k8s.io/apiserver/pkg/audit"
@ -36,9 +37,9 @@ const (
// PluginName is the name of this plugin, to be used in help and logs.
PluginName = "webhook"
// DefaultInitialBackoff is the default amount of time to wait before
// DefaultInitialBackoffDelay is the default amount of time to wait before
// retrying sending audit events through a webhook.
DefaultInitialBackoff = 10 * time.Second
DefaultInitialBackoffDelay = 10 * time.Second
)
func init() {
@ -61,9 +62,9 @@ func retryOnError(err error) bool {
return false
}
func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (*webhook.GenericWebhook, error) {
func loadWebhook(configFile string, groupVersion schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*webhook.GenericWebhook, error) {
w, err := webhook.NewGenericWebhook(audit.Scheme, audit.Codecs, configFile,
[]schema.GroupVersion{groupVersion}, initialBackoff, customDial)
[]schema.GroupVersion{groupVersion}, retryBackoff, customDial)
if err != nil {
return nil, err
}
@ -79,20 +80,20 @@ type backend struct {
// NewDynamicBackend returns an audit backend configured from a REST client that
// sends events over HTTP to an external service.
func NewDynamicBackend(rc *rest.RESTClient, initialBackoff time.Duration) audit.Backend {
func NewDynamicBackend(rc *rest.RESTClient, retryBackoff wait.Backoff) audit.Backend {
return &backend{
w: &webhook.GenericWebhook{
RestClient: rc,
InitialBackoff: initialBackoff,
ShouldRetry: retryOnError,
RestClient: rc,
RetryBackoff: retryBackoff,
ShouldRetry: retryOnError,
},
name: fmt.Sprintf("dynamic_%s", PluginName),
}
}
// NewBackend returns an audit backend that sends events over HTTP to an external service.
func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration, customDial utilnet.DialFunc) (audit.Backend, error) {
w, err := loadWebhook(kubeConfigFile, groupVersion, initialBackoff, customDial)
func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (audit.Backend, error) {
w, err := loadWebhook(kubeConfigFile, groupVersion, retryBackoff, customDial)
if err != nil {
return nil, err
}

View File

@ -26,6 +26,7 @@ import (
"os"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
@ -106,7 +108,13 @@ func newWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion)
// NOTE(ericchiang): Do we need to use a proper serializer?
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
b, err := NewBackend(f.Name(), groupVersion, DefaultInitialBackoff, nil)
retryBackoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
b, err := NewBackend(f.Name(), groupVersion, retryBackoff, nil)
require.NoError(t, err, "initializing backend")
return b.(*backend)

View File

@ -20,6 +20,7 @@ go_test(
"//staging/src/k8s.io/api/authentication/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/token/cache:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
@ -40,6 +41,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/util/webhook"
@ -37,7 +38,11 @@ import (
"k8s.io/klog/v2"
)
const retryBackoff = 500 * time.Millisecond
// DefaultRetryBackoff returns the default backoff parameters for webhook retry.
func DefaultRetryBackoff() *wait.Backoff {
backoff := webhook.DefaultRetryBackoffWithInitialDelay(500 * time.Millisecond)
return &backoff
}
// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
@ -47,16 +52,16 @@ type tokenReviewer interface {
}
type WebhookTokenAuthenticator struct {
tokenReview tokenReviewer
initialBackoff time.Duration
implicitAuds authenticator.Audiences
tokenReview tokenReviewer
retryBackoff wait.Backoff
implicitAuds authenticator.Audiences
}
// NewFromInterface creates a webhook authenticator using the given tokenReview
// client. It is recommend to wrap this authenticator with the token cache
// authenticator implemented in
// k8s.io/apiserver/pkg/authentication/token/cache.
func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) {
func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, implicitAuds authenticator.Audiences, retryBackoff wait.Backoff) (*WebhookTokenAuthenticator, error) {
return newWithBackoff(tokenReview, retryBackoff, implicitAuds)
}
@ -64,8 +69,8 @@ func NewFromInterface(tokenReview authenticationv1client.TokenReviewInterface, i
// file. It is recommend to wrap this authenticator with the token cache
// authenticator implemented in
// k8s.io/apiserver/pkg/authentication/token/cache.
func New(kubeConfigFile string, version string, implicitAuds authenticator.Audiences, customDial utilnet.DialFunc) (*WebhookTokenAuthenticator, error) {
tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile, version, customDial)
func New(kubeConfigFile string, version string, implicitAuds authenticator.Audiences, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*WebhookTokenAuthenticator, error) {
tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile, version, retryBackoff, customDial)
if err != nil {
return nil, err
}
@ -73,8 +78,8 @@ func New(kubeConfigFile string, version string, implicitAuds authenticator.Audie
}
// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(tokenReview tokenReviewer, initialBackoff time.Duration, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) {
return &WebhookTokenAuthenticator{tokenReview, initialBackoff, implicitAuds}, nil
func newWithBackoff(tokenReview tokenReviewer, retryBackoff wait.Backoff, implicitAuds authenticator.Audiences) (*WebhookTokenAuthenticator, error) {
return &WebhookTokenAuthenticator{tokenReview, retryBackoff, implicitAuds}, nil
}
// AuthenticateToken implements the authenticator.Token interface.
@ -102,7 +107,7 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(ctx context.Context, token
err error
auds authenticator.Audiences
)
webhook.WithExponentialBackoff(ctx, w.initialBackoff, func() error {
webhook.WithExponentialBackoff(ctx, w.retryBackoff, func() error {
result, err = w.tokenReview.Create(ctx, r, metav1.CreateOptions{})
return err
}, webhook.DefaultShouldRetry)
@ -154,7 +159,7 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(ctx context.Context, token
// tokenReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file,
// and returns a TokenReviewInterface that uses that client. Note that the client submits TokenReview
// requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted.
func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, customDial utilnet.DialFunc) (tokenReviewer, error) {
func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (tokenReviewer, error) {
localScheme := runtime.NewScheme()
if err := scheme.AddToScheme(localScheme); err != nil {
return nil, err
@ -166,7 +171,7 @@ func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, c
if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err
}
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial)
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil {
return nil, err
}
@ -177,7 +182,7 @@ func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, c
if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err
}
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial)
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil {
return nil, err
}

View File

@ -33,12 +33,20 @@ import (
authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/token/cache"
"k8s.io/apiserver/pkg/authentication/user"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
)
var testRetryBackoff = wait.Backoff{
Duration: 5 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
// V1Service mocks a remote authentication service.
type V1Service interface {
// Review looks at the TokenReviewSpec and provides an authentication
@ -193,12 +201,12 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte,
return nil, err
}
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1", nil)
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil {
return nil, err
}
authn, err := newWithBackoff(c, 0, implicitAuds)
authn, err := newWithBackoff(c, testRetryBackoff, implicitAuds)
if err != nil {
return nil, err
}

View File

@ -195,12 +195,12 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []
return nil, err
}
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1beta1", nil)
c, err := tokenReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil {
return nil, err
}
authn, err := newWithBackoff(c, 0, implicitAuds)
authn, err := newWithBackoff(c, testRetryBackoff, implicitAuds)
if err != nil {
return nil, err
}

View File

@ -20,6 +20,7 @@ go_test(
"//staging/src/k8s.io/api/authorization/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
@ -40,6 +41,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/cache"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/util/webhook"
@ -40,11 +41,16 @@ import (
)
const (
retryBackoff = 500 * time.Millisecond
// The maximum length of requester-controlled attributes to allow caching.
maxControlledAttrCacheSize = 10000
)
// DefaultRetryBackoff returns the default backoff parameters for webhook retry.
func DefaultRetryBackoff() *wait.Backoff {
backoff := webhook.DefaultRetryBackoffWithInitialDelay(500 * time.Millisecond)
return &backoff
}
// Ensure Webhook implements the authorizer.Authorizer interface.
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)
@ -57,12 +63,12 @@ type WebhookAuthorizer struct {
responseCache *cache.LRUExpireCache
authorizedTTL time.Duration
unauthorizedTTL time.Duration
initialBackoff time.Duration
retryBackoff wait.Backoff
decisionOnError authorizer.Decision
}
// NewFromInterface creates a WebhookAuthorizer using the given subjectAccessReview client
func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessReviewInterface, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) {
func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessReviewInterface, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff) (*WebhookAuthorizer, error) {
return newWithBackoff(subjectAccessReview, authorizedTTL, unauthorizedTTL, retryBackoff)
}
@ -85,8 +91,8 @@ func NewFromInterface(subjectAccessReview authorizationv1client.SubjectAccessRev
//
// For additional HTTP configuration, refer to the kubeconfig documentation
// https://kubernetes.io/docs/user-guide/kubeconfig-file/.
func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL time.Duration, customDial utilnet.DialFunc) (*WebhookAuthorizer, error) {
subjectAccessReview, err := subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile, version, customDial)
func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (*WebhookAuthorizer, error) {
subjectAccessReview, err := subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile, version, retryBackoff, customDial)
if err != nil {
return nil, err
}
@ -94,13 +100,13 @@ func New(kubeConfigFile string, version string, authorizedTTL, unauthorizedTTL t
}
// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(subjectAccessReview subjectAccessReviewer, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) {
func newWithBackoff(subjectAccessReview subjectAccessReviewer, authorizedTTL, unauthorizedTTL time.Duration, retryBackoff wait.Backoff) (*WebhookAuthorizer, error) {
return &WebhookAuthorizer{
subjectAccessReview: subjectAccessReview,
responseCache: cache.NewLRUExpireCache(8192),
authorizedTTL: authorizedTTL,
unauthorizedTTL: unauthorizedTTL,
initialBackoff: initialBackoff,
retryBackoff: retryBackoff,
decisionOnError: authorizer.DecisionNoOpinion,
}, nil
}
@ -190,7 +196,7 @@ func (w *WebhookAuthorizer) Authorize(ctx context.Context, attr authorizer.Attri
result *authorizationv1.SubjectAccessReview
err error
)
webhook.WithExponentialBackoff(ctx, w.initialBackoff, func() error {
webhook.WithExponentialBackoff(ctx, w.retryBackoff, func() error {
result, err = w.subjectAccessReview.Create(ctx, r, metav1.CreateOptions{})
return err
}, webhook.DefaultShouldRetry)
@ -246,7 +252,7 @@ func convertToSARExtra(extra map[string][]string) map[string]authorizationv1.Ext
// subjectAccessReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file,
// and returns a SubjectAccessReviewInterface that uses that client. Note that the client submits SubjectAccessReview
// requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted.
func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, customDial utilnet.DialFunc) (subjectAccessReviewer, error) {
func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version string, retryBackoff wait.Backoff, customDial utilnet.DialFunc) (subjectAccessReviewer, error) {
localScheme := runtime.NewScheme()
if err := scheme.AddToScheme(localScheme); err != nil {
return nil, err
@ -258,7 +264,7 @@ func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version s
if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err
}
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial)
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil {
return nil, err
}
@ -269,7 +275,7 @@ func subjectAccessReviewInterfaceFromKubeconfig(kubeConfigFile string, version s
if err := localScheme.SetVersionPriority(groupVersions...); err != nil {
return nil, err
}
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, 0, customDial)
gw, err := webhook.NewGenericWebhook(localScheme, scheme.Codecs, kubeConfigFile, groupVersions, retryBackoff, customDial)
if err != nil {
return nil, err
}

View File

@ -37,11 +37,19 @@ import (
authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
)
var testRetryBackoff = wait.Backoff{
Duration: 5 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
func TestV1NewFromConfig(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
@ -186,11 +194,11 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err)
}
// Create a new authorizer
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", nil)
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil {
return fmt.Errorf("error building sar client: %v", err)
}
_, err = newWithBackoff(sarClient, 0, 0, 0)
_, err = newWithBackoff(sarClient, 0, 0, testRetryBackoff)
return err
}()
if err != nil && !tt.wantErr {
@ -325,11 +333,11 @@ func newV1Authorizer(callbackURL string, clientCert, clientKey, ca []byte, cache
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", nil)
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1", testRetryBackoff, nil)
if err != nil {
return nil, fmt.Errorf("error building sar client: %v", err)
}
return newWithBackoff(sarClient, cacheTime, cacheTime, 0)
return newWithBackoff(sarClient, cacheTime, cacheTime, testRetryBackoff)
}
func TestV1TLSConfig(t *testing.T) {

View File

@ -186,11 +186,11 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err)
}
// Create a new authorizer
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", nil)
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil {
return fmt.Errorf("error building sar client: %v", err)
}
_, err = newWithBackoff(sarClient, 0, 0, 0)
_, err = newWithBackoff(sarClient, 0, 0, testRetryBackoff)
return err
}()
if err != nil && !tt.wantErr {
@ -325,11 +325,11 @@ func newV1beta1Authorizer(callbackURL string, clientCert, clientKey, ca []byte,
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", nil)
sarClient, err := subjectAccessReviewInterfaceFromKubeconfig(p, "v1beta1", testRetryBackoff, nil)
if err != nil {
return nil, fmt.Errorf("error building sar client: %v", err)
}
return newWithBackoff(sarClient, cacheTime, cacheTime, 0)
return newWithBackoff(sarClient, cacheTime, cacheTime, testRetryBackoff)
}
func TestV1beta1TLSConfig(t *testing.T) {

View File

@ -103,8 +103,9 @@ func TestDefaultFlags(t *testing.T) {
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
CacheTTL: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
@ -116,6 +117,7 @@ func TestDefaultFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
},
@ -236,8 +238,9 @@ func TestAddFlags(t *testing.T) {
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
CacheTTL: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
@ -249,6 +252,7 @@ func TestAddFlags(t *testing.T) {
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
WebhookRetryBackoff: apiserveroptions.DefaultAuthWebhookRetryBackoff(),
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
},

View File

@ -38,6 +38,7 @@ import (
authenticationv1beta1 "k8s.io/api/authentication/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/group"
"k8s.io/apiserver/pkg/authentication/request/bearertoken"
@ -86,7 +87,14 @@ func getTestWebhookTokenAuth(serverURL string, customDial utilnet.DialFunc) (aut
if err := json.NewEncoder(kubecfgFile).Encode(config); err != nil {
return nil, err
}
webhookTokenAuth, err := webhook.New(kubecfgFile.Name(), "v1beta1", nil, customDial)
retryBackoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
webhookTokenAuth, err := webhook.New(kubecfgFile.Name(), "v1beta1", nil, retryBackoff, customDial)
if err != nil {
return nil, err
}