diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 07a3260f1f5..35ec3db5936 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "context" "fmt" "time" @@ -24,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" @@ -148,7 +150,13 @@ func BuildGenericConfig( } var enablesRBAC bool - genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, enablesRBAC, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers) + genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, enablesRBAC, err = BuildAuthorizer( + wait.ContextForChannel(genericConfig.ShutdownInitiatedNotify()), + s, + genericConfig.EgressSelector, + genericConfig.APIServerID, + versionedInformers, + ) if err != nil { lastErr = fmt.Errorf("invalid authorization config: %v", err) return @@ -170,7 +178,7 @@ func BuildGenericConfig( } // BuildAuthorizer constructs the authorizer. If authorization is not set in s, it returns nil, nil, false, nil -func BuildAuthorizer(s controlplaneapiserver.CompletedOptions, egressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, bool, error) { +func BuildAuthorizer(ctx context.Context, s controlplaneapiserver.CompletedOptions, egressSelector *egressselector.EgressSelector, apiserverID string, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, bool, error) { authorizationConfig, err := s.Authorization.ToAuthorizationConfig(versionedInformers) if err != nil { return nil, nil, false, err @@ -195,7 +203,7 @@ func BuildAuthorizer(s controlplaneapiserver.CompletedOptions, egressSelector *e } } - authorizer, ruleResolver, err := authorizationConfig.New() + authorizer, ruleResolver, err := authorizationConfig.New(ctx, apiserverID) return authorizer, ruleResolver, enablesRBAC, err } diff --git a/pkg/kubeapiserver/authorizer/config.go b/pkg/kubeapiserver/authorizer/config.go index 7c4b0e6f221..06bc0b9d373 100644 --- a/pkg/kubeapiserver/authorizer/config.go +++ b/pkg/kubeapiserver/authorizer/config.go @@ -17,8 +17,11 @@ limitations under the License. package authorizer import ( + "context" "fmt" + "os" "strings" + "time" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -56,6 +59,8 @@ type Config struct { // Optional field, custom dial function used to connect to webhook CustomDial utilnet.DialFunc + // ReloadFile holds the filename to reload authorization configuration from + ReloadFile string // AuthorizationConfiguration stores the configuration for the Authorizer chain // It will deprecate most of the above flags when GA AuthorizationConfiguration *authzconfig.AuthorizationConfiguration @@ -63,17 +68,25 @@ type Config struct { // New returns the right sort of union of multiple authorizer.Authorizer objects // based on the authorizationMode or an error. -func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) { +// stopCh is used to shut down config reload goroutines when the server is shutting down. +func (config Config) New(ctx context.Context, serverID string) (authorizer.Authorizer, authorizer.RuleResolver, error) { if len(config.AuthorizationConfiguration.Authorizers) == 0 { return nil, nil, fmt.Errorf("at least one authorization mode must be passed") } r := &reloadableAuthorizerResolver{ - initialConfig: config, + initialConfig: config, + apiServerID: serverID, + lastLoadedConfig: config.AuthorizationConfiguration, + reloadInterval: time.Minute, } + seenTypes := sets.New[authzconfig.AuthorizerType]() + // Build and store authorizers which will persist across reloads for _, configuredAuthorizer := range config.AuthorizationConfiguration.Authorizers { + seenTypes.Insert(configuredAuthorizer.Type) + // Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go. switch configuredAuthorizer.Type { case authzconfig.AuthorizerType(modes.ModeNode): @@ -104,16 +117,25 @@ func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, erro } } + // Require all non-webhook authorizer types to remain specified in the file on reload + seenTypes.Delete(authzconfig.TypeWebhook) + r.requireNonWebhookTypes = seenTypes + // Construct the authorizers / ruleResolvers for the given configuration authorizer, ruleResolver, err := r.newForConfig(r.initialConfig.AuthorizationConfiguration) if err != nil { return nil, nil, err } + r.current.Store(&authorizerResolver{ authorizer: authorizer, ruleResolver: ruleResolver, }) + if r.initialConfig.ReloadFile != "" { + go r.runReload(ctx) + } + return r, r, nil } @@ -127,10 +149,18 @@ func GetNameForAuthorizerMode(mode string) string { } func LoadAndValidateFile(configFile string, requireNonWebhookTypes sets.Set[authzconfig.AuthorizerType]) (*authzconfig.AuthorizationConfiguration, error) { - // load the file and check for errors - authorizationConfiguration, err := load.LoadFromFile(configFile) + data, err := os.ReadFile(configFile) if err != nil { - return nil, fmt.Errorf("failed to load AuthorizationConfiguration from file: %v", err) + return nil, err + } + return LoadAndValidateData(data, requireNonWebhookTypes) +} + +func LoadAndValidateData(data []byte, requireNonWebhookTypes sets.Set[authzconfig.AuthorizerType]) (*authzconfig.AuthorizationConfiguration, error) { + // load the file and check for errors + authorizationConfiguration, err := load.LoadFromData(data) + if err != nil { + return nil, fmt.Errorf("failed to load AuthorizationConfiguration from file: %w", err) } // validate the file and return any error diff --git a/pkg/kubeapiserver/authorizer/reload.go b/pkg/kubeapiserver/authorizer/reload.go index f18bffb10a4..af025b3da84 100644 --- a/pkg/kubeapiserver/authorizer/reload.go +++ b/pkg/kubeapiserver/authorizer/reload.go @@ -17,20 +17,29 @@ limitations under the License. package authorizer import ( + "bytes" "context" "errors" "fmt" + "os" + "reflect" + "sync" "sync/atomic" + "time" + "k8s.io/apimachinery/pkg/util/sets" authzconfig "k8s.io/apiserver/pkg/apis/apiserver" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/authorization/union" + "k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics" webhookutil "k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/plugin/pkg/authorizer/webhook" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/auth/authorizer/abac" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" + "k8s.io/kubernetes/pkg/util/filesystem" "k8s.io/kubernetes/plugin/pkg/auth/authorizer/node" "k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac" ) @@ -41,10 +50,19 @@ type reloadableAuthorizerResolver struct { // configuration file (dial function, backoff settings, etc) initialConfig Config + apiServerID string + + reloadInterval time.Duration + requireNonWebhookTypes sets.Set[authzconfig.AuthorizerType] + nodeAuthorizer *node.NodeAuthorizer rbacAuthorizer *rbac.RBACAuthorizer abacAuthorizer abac.PolicyList + lastLoadedLock sync.Mutex + lastLoadedConfig *authzconfig.AuthorizationConfiguration + lastReadData []byte + current atomic.Pointer[authorizerResolver] } @@ -142,3 +160,68 @@ func (r *reloadableAuthorizerResolver) newForConfig(authzConfig *authzconfig.Aut return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil } + +// runReload starts checking the config file for changes and reloads the authorizer when it changes. +// Blocks until ctx is complete. +func (r *reloadableAuthorizerResolver) runReload(ctx context.Context) { + metrics.RegisterMetrics() + metrics.RecordAuthorizationConfigAutomaticReloadSuccess(r.apiServerID) + + filesystem.WatchUntil( + ctx, + r.reloadInterval, + r.initialConfig.ReloadFile, + func() { + r.checkFile(ctx) + }, + func(err error) { + klog.ErrorS(err, "watching authorization config file") + }, + ) +} + +func (r *reloadableAuthorizerResolver) checkFile(ctx context.Context) { + r.lastLoadedLock.Lock() + defer r.lastLoadedLock.Unlock() + + data, err := os.ReadFile(r.initialConfig.ReloadFile) + if err != nil { + klog.ErrorS(err, "reloading authorization config") + metrics.RecordAuthorizationConfigAutomaticReloadFailure(r.apiServerID) + return + } + if bytes.Equal(data, r.lastReadData) { + // no change + return + } + klog.InfoS("found new authorization config data") + r.lastReadData = data + + config, err := LoadAndValidateData(data, r.requireNonWebhookTypes) + if err != nil { + klog.ErrorS(err, "reloading authorization config") + metrics.RecordAuthorizationConfigAutomaticReloadFailure(r.apiServerID) + return + } + if reflect.DeepEqual(config, r.lastLoadedConfig) { + // no change + return + } + klog.InfoS("found new authorization config") + r.lastLoadedConfig = config + + authorizer, ruleResolver, err := r.newForConfig(config) + if err != nil { + klog.ErrorS(err, "reloading authorization config") + metrics.RecordAuthorizationConfigAutomaticReloadFailure(r.apiServerID) + return + } + klog.InfoS("constructed new authorizer") + + r.current.Store(&authorizerResolver{ + authorizer: authorizer, + ruleResolver: ruleResolver, + }) + klog.InfoS("reloaded authz config") + metrics.RecordAuthorizationConfigAutomaticReloadSuccess(r.apiServerID) +} diff --git a/pkg/kubeapiserver/options/authorization.go b/pkg/kubeapiserver/options/authorization.go index f1bee22f43c..dd50323eebe 100644 --- a/pkg/kubeapiserver/options/authorization.go +++ b/pkg/kubeapiserver/options/authorization.go @@ -249,6 +249,7 @@ func (o *BuiltInAuthorizationOptions) ToAuthorizationConfig(versionedInformerFac VersionedInformerFactory: versionedInformerFactory, WebhookRetryBackoff: o.WebhookRetryBackoff, + ReloadFile: o.AuthorizationConfigurationFile, AuthorizationConfiguration: authorizationConfiguration, }, nil } diff --git a/pkg/util/filesystem/watcher.go b/pkg/util/filesystem/watcher.go index 5141d97b1cb..cbbc83985de 100644 --- a/pkg/util/filesystem/watcher.go +++ b/pkg/util/filesystem/watcher.go @@ -17,6 +17,10 @@ limitations under the License. package filesystem import ( + "context" + "fmt" + "time" + "github.com/fsnotify/fsnotify" ) @@ -87,3 +91,126 @@ func (w *fsnotifyWatcher) Run() { } }() } + +type watchAddRemover interface { + Add(path string) error + Remove(path string) error +} +type noopWatcher struct{} + +func (noopWatcher) Add(path string) error { return nil } +func (noopWatcher) Remove(path string) error { return nil } + +// WatchUntil watches the specified path for changes and blocks until ctx is canceled. +// eventHandler() must be non-nil, and pollInterval must be greater than 0. +// eventHandler() is invoked whenever a change event is observed or pollInterval elapses. +// errorHandler() is invoked (if non-nil) whenever an error occurs initializing or watching the specified path. +// +// If path is a directory, only the directory and immediate children are watched. +// +// If path does not exist or cannot be watched, an error is passed to errorHandler() and eventHandler() is called at pollInterval. +// +// Multiple observed events may collapse to a single invocation of eventHandler(). +// +// eventHandler() is invoked immediately after successful initialization of the filesystem watch, +// in case the path changed concurrent with calling WatchUntil(). +func WatchUntil(ctx context.Context, pollInterval time.Duration, path string, eventHandler func(), errorHandler func(err error)) { + if pollInterval <= 0 { + panic(fmt.Errorf("pollInterval must be > 0")) + } + if eventHandler == nil { + panic(fmt.Errorf("eventHandler must be non-nil")) + } + if errorHandler == nil { + errorHandler = func(err error) {} + } + + // Initialize watcher, fall back to no-op + var ( + eventsCh chan fsnotify.Event + errorCh chan error + watcher watchAddRemover + ) + if w, err := fsnotify.NewWatcher(); err != nil { + errorHandler(fmt.Errorf("error creating file watcher, falling back to poll at interval %s: %w", pollInterval, err)) + watcher = noopWatcher{} + } else { + watcher = w + eventsCh = w.Events + errorCh = w.Errors + defer func() { + _ = w.Close() + }() + } + + // Initialize background poll + t := time.NewTicker(pollInterval) + defer t.Stop() + + attemptPeriodicRewatch := false + + // Start watching the path + if err := watcher.Add(path); err != nil { + errorHandler(err) + attemptPeriodicRewatch = true + } else { + // Invoke handle() at least once after successfully registering the listener, + // in case the file changed concurrent with calling WatchUntil. + eventHandler() + } + + for { + select { + case <-ctx.Done(): + return + + case <-t.C: + // Prioritize exiting if context is canceled + if ctx.Err() != nil { + return + } + + // Try to re-establish the watcher if we previously got a watch error + if attemptPeriodicRewatch { + _ = watcher.Remove(path) + if err := watcher.Add(path); err != nil { + errorHandler(err) + } else { + attemptPeriodicRewatch = false + } + } + + // Handle + eventHandler() + + case e := <-eventsCh: + // Prioritize exiting if context is canceled + if ctx.Err() != nil { + return + } + + // Try to re-establish the watcher for events which dropped the existing watch + if e.Name == path && (e.Has(fsnotify.Remove) || e.Has(fsnotify.Rename)) { + _ = watcher.Remove(path) + if err := watcher.Add(path); err != nil { + errorHandler(err) + attemptPeriodicRewatch = true + } + } + + // Handle + eventHandler() + + case err := <-errorCh: + // Prioritize exiting if context is canceled + if ctx.Err() != nil { + return + } + + // If the error occurs in response to calling watcher.Add, re-adding here could hot-loop. + // The periodic poll will attempt to re-establish the watch. + errorHandler(err) + attemptPeriodicRewatch = true + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index d42baab63bc..a48bee2c939 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -668,6 +668,11 @@ func (c *Config) DrainedNotify() <-chan struct{} { return c.lifecycleSignals.InFlightRequestsDrained.Signaled() } +// ShutdownInitiated returns a lifecycle signal of apiserver shutdown having been initiated. +func (c *Config) ShutdownInitiatedNotify() <-chan struct{} { + return c.lifecycleSignals.ShutdownInitiated.Signaled() +} + // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics.go new file mode 100644 index 00000000000..09089348a76 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics.go @@ -0,0 +1,101 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "crypto/sha256" + "fmt" + "hash" + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "apiserver" + subsystem = "authorization_config_controller" +) + +var ( + authorizationConfigAutomaticReloadsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "automatic_reloads_total", + Help: "Total number of automatic reloads of authorization configuration split by status and apiserver identity.", + StabilityLevel: metrics.ALPHA, + }, + []string{"status", "apiserver_id_hash"}, + ) + + authorizationConfigAutomaticReloadLastTimestampSeconds = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "automatic_reload_last_timestamp_seconds", + Help: "Timestamp of the last automatic reload of authorization configuration split by status and apiserver identity.", + StabilityLevel: metrics.ALPHA, + }, + []string{"status", "apiserver_id_hash"}, + ) +) + +var registerMetrics sync.Once +var hashPool *sync.Pool + +func RegisterMetrics() { + registerMetrics.Do(func() { + hashPool = &sync.Pool{ + New: func() interface{} { + return sha256.New() + }, + } + legacyregistry.MustRegister(authorizationConfigAutomaticReloadsTotal) + legacyregistry.MustRegister(authorizationConfigAutomaticReloadLastTimestampSeconds) + }) +} + +func ResetMetricsForTest() { + authorizationConfigAutomaticReloadsTotal.Reset() + authorizationConfigAutomaticReloadLastTimestampSeconds.Reset() + legacyregistry.Reset() +} + +func RecordAuthorizationConfigAutomaticReloadFailure(apiServerID string) { + apiServerIDHash := getHash(apiServerID) + authorizationConfigAutomaticReloadsTotal.WithLabelValues("failure", apiServerIDHash).Inc() + authorizationConfigAutomaticReloadLastTimestampSeconds.WithLabelValues("failure", apiServerIDHash).SetToCurrentTime() +} + +func RecordAuthorizationConfigAutomaticReloadSuccess(apiServerID string) { + apiServerIDHash := getHash(apiServerID) + authorizationConfigAutomaticReloadsTotal.WithLabelValues("success", apiServerIDHash).Inc() + authorizationConfigAutomaticReloadLastTimestampSeconds.WithLabelValues("success", apiServerIDHash).SetToCurrentTime() +} + +func getHash(data string) string { + if len(data) == 0 { + return "" + } + h := hashPool.Get().(hash.Hash) + h.Reset() + h.Write([]byte(data)) + dataHash := fmt.Sprintf("sha256:%x", h.Sum(nil)) + hashPool.Put(h) + return dataHash +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics_test.go new file mode 100644 index 00000000000..1b7c7e0a653 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics/metrics_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" +) + +const ( + testAPIServerID = "testAPIServerID" + testAPIServerIDHash = "sha256:14f9d63e669337ac6bfda2e2162915ee6a6067743eddd4e5c374b572f951ff37" +) + +func TestRecordAuthorizationConfigAutomaticReloadFailure(t *testing.T) { + expectedValue := ` + # HELP apiserver_authorization_config_controller_automatic_reloads_total [ALPHA] Total number of automatic reloads of authorization configuration split by status and apiserver identity. + # TYPE apiserver_authorization_config_controller_automatic_reloads_total counter + apiserver_authorization_config_controller_automatic_reloads_total {apiserver_id_hash="sha256:14f9d63e669337ac6bfda2e2162915ee6a6067743eddd4e5c374b572f951ff37",status="failure"} 1 + ` + metrics := []string{ + namespace + "_" + subsystem + "_automatic_reloads_total", + } + + authorizationConfigAutomaticReloadsTotal.Reset() + RegisterMetrics() + + RecordAuthorizationConfigAutomaticReloadFailure(testAPIServerID) + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedValue), metrics...); err != nil { + t.Fatal(err) + } +} + +func TestRecordAuthorizationConfigAutomaticReloadSuccess(t *testing.T) { + expectedValue := ` + # HELP apiserver_authorization_config_controller_automatic_reloads_total [ALPHA] Total number of automatic reloads of authorization configuration split by status and apiserver identity. + # TYPE apiserver_authorization_config_controller_automatic_reloads_total counter + apiserver_authorization_config_controller_automatic_reloads_total {apiserver_id_hash="sha256:14f9d63e669337ac6bfda2e2162915ee6a6067743eddd4e5c374b572f951ff37",status="success"} 1 + ` + metrics := []string{ + namespace + "_" + subsystem + "_automatic_reloads_total", + } + + authorizationConfigAutomaticReloadsTotal.Reset() + RegisterMetrics() + + RecordAuthorizationConfigAutomaticReloadSuccess(testAPIServerID) + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedValue), metrics...); err != nil { + t.Fatal(err) + } +} + +func TestAuthorizationConfigAutomaticReloadLastTimestampSeconds(t *testing.T) { + testCases := []struct { + expectedValue string + resultLabel string + timestamp int64 + }{ + { + expectedValue: ` + # HELP apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds [ALPHA] Timestamp of the last automatic reload of authorization configuration split by status and apiserver identity. + # TYPE apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds gauge + apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds{apiserver_id_hash="sha256:14f9d63e669337ac6bfda2e2162915ee6a6067743eddd4e5c374b572f951ff37",status="failure"} 1.689101941e+09 + `, + resultLabel: "failure", + timestamp: 1689101941, + }, + { + expectedValue: ` + # HELP apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds [ALPHA] Timestamp of the last automatic reload of authorization configuration split by status and apiserver identity. + # TYPE apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds gauge + apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds{apiserver_id_hash="sha256:14f9d63e669337ac6bfda2e2162915ee6a6067743eddd4e5c374b572f951ff37",status="success"} 1.689101941e+09 + `, + resultLabel: "success", + timestamp: 1689101941, + }, + } + + metrics := []string{ + namespace + "_" + subsystem + "_automatic_reload_last_timestamp_seconds", + } + RegisterMetrics() + + for _, tc := range testCases { + authorizationConfigAutomaticReloadLastTimestampSeconds.Reset() + authorizationConfigAutomaticReloadLastTimestampSeconds.WithLabelValues(tc.resultLabel, testAPIServerIDHash).Set(float64(tc.timestamp)) + + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tc.expectedValue), metrics...); err != nil { + t.Fatal(err) + } + } +} diff --git a/test/integration/auth/authz_config_test.go b/test/integration/auth/authz_config_test.go index b0ccc334998..5dd8d69c823 100644 --- a/test/integration/auth/authz_config_test.go +++ b/test/integration/auth/authz_config_test.go @@ -25,6 +25,8 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" + "strings" "sync/atomic" "testing" "time" @@ -32,7 +34,9 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/features" + authzmetrics "k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -113,6 +117,8 @@ authorizers: } func TestMultiWebhookAuthzConfig(t *testing.T) { + authzmetrics.ResetMetricsForTest() + defer authzmetrics.ResetMetricsForTest() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StructuredAuthorizationConfiguration, true)() dir := t.TempDir() @@ -235,14 +241,36 @@ users: t.Fatal(err) } + // returns an allow response when called + serverAllowReloadedCalled := atomic.Int32{} + serverAllowReloaded := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + serverAllowReloadedCalled.Add(1) + sar := &authorizationv1.SubjectAccessReview{} + if err := json.NewDecoder(req.Body).Decode(sar); err != nil { + t.Error(err) + } + t.Log("serverAllowReloaded", sar) + sar.Status.Allowed = true + sar.Status.Reason = "allowed2 by webhook" + if err := json.NewEncoder(w).Encode(sar); err != nil { + t.Error(err) + } + })) + defer serverAllowReloaded.Close() + serverAllowReloadedKubeconfigName := filepath.Join(dir, "serverAllowReloaded.yaml") + if err := os.WriteFile(serverAllowReloadedKubeconfigName, []byte(fmt.Sprintf(kubeconfigTemplate, serverAllowReloaded.URL)), os.FileMode(0644)); err != nil { + t.Fatal(err) + } + resetCounts := func() { serverErrorCalled.Store(0) serverTimeoutCalled.Store(0) serverDenyCalled.Store(0) serverNoOpinionCalled.Store(0) serverAllowCalled.Store(0) + serverAllowReloadedCalled.Store(0) } - assertCounts := func(errorCount, timeoutCount, denyCount, noOpinionCount, allowCount int32) { + assertCounts := func(errorCount, timeoutCount, denyCount, noOpinionCount, allowCount, allowReloadedCount int32) { t.Helper() if e, a := errorCount, serverErrorCalled.Load(); e != a { t.Errorf("expected fail webhook calls: %d, got %d", e, a) @@ -259,6 +287,9 @@ users: if e, a := allowCount, serverAllowCalled.Load(); e != a { t.Errorf("expected allow webhook calls: %d, got %d", e, a) } + if e, a := allowReloadedCount, serverAllowReloadedCalled.Load(); e != a { + t.Errorf("expected allowReloaded webhook calls: %d, got %d", e, a) + } resetCounts() } @@ -274,6 +305,8 @@ authorizers: failurePolicy: Deny subjectAccessReviewVersion: v1 matchConditionSubjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms connectionInfo: type: KubeConfigFile kubeConfigFile: `+serverErrorKubeconfigName+` @@ -289,6 +322,8 @@ authorizers: failurePolicy: Deny subjectAccessReviewVersion: v1 matchConditionSubjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms connectionInfo: type: KubeConfigFile kubeConfigFile: `+serverTimeoutKubeconfigName+` @@ -304,6 +339,8 @@ authorizers: failurePolicy: NoOpinion subjectAccessReviewVersion: v1 matchConditionSubjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms connectionInfo: type: KubeConfigFile kubeConfigFile: `+serverDenyKubeconfigName+` @@ -317,6 +354,8 @@ authorizers: timeout: 5s failurePolicy: Deny subjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms connectionInfo: type: KubeConfigFile kubeConfigFile: `+serverNoOpinionKubeconfigName+` @@ -327,6 +366,8 @@ authorizers: timeout: 5s failurePolicy: Deny subjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms connectionInfo: type: KubeConfigFile kubeConfigFile: `+serverAllowKubeconfigName+` @@ -362,7 +403,7 @@ authorizers: t.Fatal("expected denied, got allowed") } else { t.Log(result.Status.Reason) - assertCounts(1, 0, 0, 0, 0) + assertCounts(1, 0, 0, 0, 0, 0) } // timeout webhook short circuits @@ -383,7 +424,7 @@ authorizers: t.Fatal("expected denied, got allowed") } else { t.Log(result.Status.Reason) - assertCounts(0, 1, 0, 0, 0) + assertCounts(0, 1, 0, 0, 0, 0) } // deny webhook short circuits @@ -404,7 +445,7 @@ authorizers: t.Fatal("expected denied, got allowed") } else { t.Log(result.Status.Reason) - assertCounts(0, 0, 1, 0, 0) + assertCounts(0, 0, 1, 0, 0, 0) } // no-opinion webhook passes through, allow webhook allows @@ -425,6 +466,231 @@ authorizers: t.Fatal("expected allowed, got denied") } else { t.Log(result.Status.Reason) - assertCounts(0, 0, 0, 1, 1) + assertCounts(0, 0, 0, 1, 1, 0) + } + + // check last loaded success/failure metric timestamps, ensure success is present, failure is not + initialReloadSuccess, initialReloadFailure, err := getReloadTimes(t, adminClient) + if err != nil { + t.Fatal(err) + } + if initialReloadSuccess == nil { + t.Fatal("expected success timestamp, got none") + } + if initialReloadFailure != nil { + t.Fatal("expected no failure timestamp, got one") + } + + // write bogus file + if err := os.WriteFile(configFileName, []byte(`apiVersion: apiserver.config.k8s.io`), os.FileMode(0644)); err != nil { + t.Fatal(err) + } + + // wait for failure timestamp > success timestamp + var reload1Success, reload1Failure *time.Time + err = wait.PollUntilContextTimeout(context.TODO(), time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + reload1Success, reload1Failure, err = getReloadTimes(t, adminClient) + if err != nil { + t.Fatal(err) + } + if reload1Success == nil { + t.Fatal("expected success timestamp, got none") + } + if !reload1Success.Equal(*initialReloadSuccess) { + t.Fatalf("success timestamp changed from initial success %s to %s unexpectedly", initialReloadSuccess.String(), reload1Success.String()) + } + if reload1Failure == nil { + t.Log("expected failure timestamp, got nil, retrying") + return false, nil + } + if !reload1Failure.After(*reload1Success) { + t.Fatalf("expected failure timestamp to be more recent than success timestamp, got %s <= %s", reload1Failure.String(), reload1Success.String()) + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + // ensure authz still works + t.Log("checking allow") + if result, err := adminClient.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), &authorizationv1.SubjectAccessReview{Spec: authorizationv1.SubjectAccessReviewSpec{ + User: "alice", + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: "list", + Group: "", + Version: "v1", + Resource: "configmaps", + Namespace: "allow", + Name: "", + }, + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } else if !result.Status.Allowed { + t.Fatal("expected allowed, got denied") + } else { + t.Log(result.Status.Reason) + assertCounts(0, 0, 0, 1, 1, 0) + } + + // write good config with different webhook + if err := os.WriteFile(configFileName, []byte(` +apiVersion: apiserver.config.k8s.io/v1alpha1 +kind: AuthorizationConfiguration +authorizers: +- type: Webhook + name: allowreloaded.example.com + webhook: + timeout: 5s + failurePolicy: Deny + subjectAccessReviewVersion: v1 + authorizedTTL: 1ms + unauthorizedTTL: 1ms + connectionInfo: + type: KubeConfigFile + kubeConfigFile: `+serverAllowReloadedKubeconfigName+` +`), os.FileMode(0644)); err != nil { + t.Fatal(err) + } + + // wait for success timestamp > reload1Failure timestamp + var reload2Success, reload2Failure *time.Time + err = wait.PollUntilContextTimeout(context.TODO(), time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + reload2Success, reload2Failure, err = getReloadTimes(t, adminClient) + if err != nil { + t.Fatal(err) + } + if reload2Failure == nil { + t.Log("expected failure timestamp, got nil, retrying") + return false, nil + } + if !reload2Failure.Equal(*reload1Failure) { + t.Fatalf("failure timestamp changed from reload1Failure %s to %s unexpectedly", reload1Failure.String(), reload2Failure.String()) + } + if reload2Success == nil { + t.Fatal("expected success timestamp, got none") + } + if reload2Success.Equal(*initialReloadSuccess) { + t.Log("success timestamp hasn't updated from initial success, retrying") + return false, nil + } + if !reload2Success.After(*reload2Failure) { + t.Fatalf("expected success timestamp to be more recent than failure, got %s <= %s", reload2Success.String(), reload2Failure.String()) + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + // ensure authz still works, new webhook is called + t.Log("checking allow") + if result, err := adminClient.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), &authorizationv1.SubjectAccessReview{Spec: authorizationv1.SubjectAccessReviewSpec{ + User: "alice", + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: "list", + Group: "", + Version: "v1", + Resource: "configmaps", + Namespace: "allow", + Name: "", + }, + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } else if !result.Status.Allowed { + t.Fatal("expected allowed, got denied") + } else { + t.Log(result.Status.Reason) + assertCounts(0, 0, 0, 0, 0, 1) + } + + // delete file (do this test last because it makes file watch fall back to one minute poll interval) + if err := os.Remove(configFileName); err != nil { + t.Fatal(err) + } + + // wait for failure timestamp > success timestamp + var reload3Success, reload3Failure *time.Time + err = wait.PollUntilContextTimeout(context.TODO(), time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + reload3Success, reload3Failure, err = getReloadTimes(t, adminClient) + if err != nil { + t.Fatal(err) + } + if reload3Success == nil { + t.Fatal("expected success timestamp, got none") + } + if !reload3Success.Equal(*reload2Success) { + t.Fatalf("success timestamp changed from %s to %s unexpectedly", reload2Success.String(), reload3Success.String()) + } + if reload3Failure == nil { + t.Log("expected failure timestamp, got nil, retrying") + return false, nil + } + if reload3Failure.Equal(*reload2Failure) { + t.Log("failure timestamp hasn't updated, retrying") + return false, nil + } + if !reload3Failure.After(*reload3Success) { + t.Fatalf("expected failure timestamp to be more recent than success, got %s <= %s", reload3Failure.String(), reload3Success.String()) + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + // ensure authz still works, new webhook is called + t.Log("checking allow") + if result, err := adminClient.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), &authorizationv1.SubjectAccessReview{Spec: authorizationv1.SubjectAccessReviewSpec{ + User: "alice", + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: "list", + Group: "", + Version: "v1", + Resource: "configmaps", + Namespace: "allow", + Name: "", + }, + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } else if !result.Status.Allowed { + t.Fatal("expected allowed, got denied") + } else { + t.Log(result.Status.Reason) + assertCounts(0, 0, 0, 0, 0, 1) } } + +func getReloadTimes(t *testing.T, client *clientset.Clientset) (*time.Time, *time.Time, error) { + data, err := client.RESTClient().Get().AbsPath("/metrics").DoRaw(context.TODO()) + + // apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds{apiserver_id_hash="sha256:4b86cfa719a83dd63a4dc6a9831edb2b59240d0f59cf215b2d51aacb3f5c395e",status="success"} 1.7002567356895502e+09 + // apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds{apiserver_id_hash="sha256:4b86cfa719a83dd63a4dc6a9831edb2b59240d0f59cf215b2d51aacb3f5c395e",status="failure"} 1.7002567356895502e+09 + if err != nil { + return nil, nil, err + } + + var success, failure *time.Time + for _, line := range strings.Split(string(data), "\n") { + if strings.HasPrefix(line, "apiserver_authorization_config_controller_automatic_reload_last_timestamp_seconds") { + t.Log(line) + values := strings.Split(line, " ") + value, err := strconv.ParseFloat(values[len(values)-1], 64) + if err != nil { + return nil, nil, err + } + seconds := int64(value) + nanoseconds := int64((value - float64(seconds)) * 1000000000) + tm := time.Unix(seconds, nanoseconds) + if strings.Contains(line, `"success"`) { + success = &tm + t.Log("success", success.String()) + } + if strings.Contains(line, `"failure"`) { + failure = &tm + t.Log("failure", failure.String()) + } + } + } + return success, failure, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6cd78471b49..f786e462a46 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1472,6 +1472,7 @@ k8s.io/apiserver/pkg/server/healthz k8s.io/apiserver/pkg/server/httplog k8s.io/apiserver/pkg/server/mux k8s.io/apiserver/pkg/server/options +k8s.io/apiserver/pkg/server/options/authorizationconfig/metrics k8s.io/apiserver/pkg/server/options/encryptionconfig k8s.io/apiserver/pkg/server/options/encryptionconfig/controller k8s.io/apiserver/pkg/server/options/encryptionconfig/metrics