From 3cfe3d048ff37c1c6994d131ed8557f3c8bddc8a Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Tue, 3 Aug 2021 21:28:01 +0800 Subject: [PATCH] Improve dynamic cert file change detection DynamicFileCAContent and DynamicCertKeyPairContent used periodical job to check whether the file content has changed, leading to 1 minute of delay in worst case. This patch improves it by leveraging fsnotify watcher. The content change will be reflected immediately. --- staging/src/k8s.io/apiserver/go.mod | 1 + .../dynamic_cafile_content.go | 64 ++++++++++++++--- .../dynamic_serving_content.go | 71 ++++++++++++++++--- .../apiserver/certreload/certreload_test.go | 43 +++++++++-- 4 files changed, 155 insertions(+), 24 deletions(-) diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index 10f5f8ddbd5..d5fac09abf0 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -11,6 +11,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/emicklei/go-restful v2.9.5+incompatible github.com/evanphx/json-patch v4.11.0+incompatible + github.com/fsnotify/fsnotify v1.4.9 github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go index 15f3c1dad18..fb1515c182a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + "github.com/fsnotify/fsnotify" "k8s.io/client-go/util/cert" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -44,7 +45,7 @@ type ControllerRunner interface { Run(workers int, stopCh <-chan struct{}) } -// DynamicFileCAContent provies a CAContentProvider that can dynamically react to new file content +// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content // It also fulfills the authenticator interface to provide verifyoptions type DynamicFileCAContent struct { name string @@ -147,7 +148,7 @@ func (c *DynamicFileCAContent) RunOnce() error { return c.loadCABundle() } -// Run starts the kube-apiserver and blocks until stopCh is closed. +// Run starts the controller and blocks until stopCh is closed. func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -158,17 +159,62 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { // doesn't matter what workers say, only start one. go wait.Until(c.runWorker, time.Second, stopCh) - // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. - go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { - c.queue.Add(workItemKey) - return false, nil - }, stopCh) - - // TODO this can be wired to an fsnotifier as well. + // start the loop that watches the CA file until stopCh is closed. + go wait.Until(func() { + if err := c.watchCAFile(stopCh); err != nil { + klog.ErrorS(err, "Failed to watch CA file, will retry later") + } + }, time.Minute, stopCh) <-stopCh } +func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { + // Trigger a check here to ensure the content will be checked periodically even if the following watch fails. + c.queue.Add(workItemKey) + + w, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("error creating fsnotify watcher: %v", err) + } + defer w.Close() + + if err = w.Add(c.filename); err != nil { + return fmt.Errorf("error adding watch for file %s: %v", c.filename, err) + } + // Trigger a check in case the file is updated before the watch starts. + c.queue.Add(workItemKey) + + for { + select { + case e := <-w.Events: + if err := c.handleWatchEvent(e, w); err != nil { + return err + } + case err := <-w.Errors: + return fmt.Errorf("received fsnotify error: %v", err) + case <-stopCh: + return nil + } + } +} + +// handleWatchEvent triggers reloading the CA file, and restarts a new watch if it's a Remove or Rename event. +func (c *DynamicFileCAContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error { + // This should be executed after restarting the watch (if applicable) to ensure no file event will be missing. + defer c.queue.Add(workItemKey) + if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 { + return nil + } + if err := w.Remove(c.filename); err != nil { + klog.InfoS("Failed to remove file watch, it may have been deleted", "file", c.filename, "err", err) + } + if err := w.Add(c.filename); err != nil { + return fmt.Errorf("error adding watch for file %s: %v", c.filename, err) + } + return nil +} + func (c *DynamicFileCAContent) runWorker() { for c.processNextWorkItem() { } diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go index de79fb58f5f..00117176b0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "github.com/fsnotify/fsnotify" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -38,7 +40,7 @@ type DynamicCertKeyPairContent struct { // keyFile is the name of the key file to read. keyFile string - // servingCert is a certKeyContent that contains the last read, non-zero length content of the key and cert + // certKeyPair is a certKeyContent that contains the last read, non-zero length content of the key and cert certKeyPair atomic.Value listeners []Listener @@ -75,7 +77,7 @@ func (c *DynamicCertKeyPairContent) AddListener(listener Listener) { c.listeners = append(c.listeners, listener) } -// loadServingCert determines the next set of content for the file. +// loadCertKeyPair determines the next set of content for the file. func (c *DynamicCertKeyPairContent) loadCertKeyPair() error { cert, err := ioutil.ReadFile(c.certFile) if err != nil { @@ -132,17 +134,68 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { // doesn't matter what workers say, only start one. go wait.Until(c.runWorker, time.Second, stopCh) - // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. - go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { - c.queue.Add(workItemKey) - return false, nil - }, stopCh) - - // TODO this can be wired to an fsnotifier as well. + // start the loop that watches the cert and key files until stopCh is closed. + go wait.Until(func() { + if err := c.watchCertKeyFile(stopCh); err != nil { + klog.ErrorS(err, "Failed to watch cert and key file, will retry later") + } + }, time.Minute, stopCh) <-stopCh } +func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { + // Trigger a check here to ensure the content will be checked periodically even if the following watch fails. + c.queue.Add(workItemKey) + + w, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("error creating fsnotify watcher: %v", err) + } + defer w.Close() + + if err := w.Add(c.certFile); err != nil { + return fmt.Errorf("error adding watch for file %s: %v", c.certFile, err) + } + if err := w.Add(c.keyFile); err != nil { + return fmt.Errorf("error adding watch for file %s: %v", c.keyFile, err) + } + // Trigger a check in case the file is updated before the watch starts. + c.queue.Add(workItemKey) + + for { + select { + case e := <-w.Events: + if err := c.handleWatchEvent(e, w); err != nil { + return err + } + case err := <-w.Errors: + return fmt.Errorf("received fsnotify error: %v", err) + case <-stopCh: + return nil + } + } +} + +// handleWatchEvent triggers reloading the cert and key file, and restarts a new watch if it's a Remove or Rename event. +// If one file is updated before the other, the loadCertKeyPair method will catch the mismatch and will not apply the +// change. When an event of the other file is received, it will trigger reloading the files again and the new content +// will be loaded and used. +func (c *DynamicCertKeyPairContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error { + // This should be executed after restarting the watch (if applicable) to ensure no file event will be missing. + defer c.queue.Add(workItemKey) + if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 { + return nil + } + if err := w.Remove(e.Name); err != nil { + klog.InfoS("Failed to remove file watch, it may have been deleted", "file", e.Name, "err", err) + } + if err := w.Add(e.Name); err != nil { + return fmt.Errorf("error adding watch for file %s: %v", e.Name, err) + } + return nil +} + func (c *DynamicCertKeyPairContent) runWorker() { for c.processNextWorkItem() { } diff --git a/test/integration/apiserver/certreload/certreload_test.go b/test/integration/apiserver/certreload/certreload_test.go index e12bb9e6ce6..eeddb6e5bfa 100644 --- a/test/integration/apiserver/certreload/certreload_test.go +++ b/test/integration/apiserver/certreload/certreload_test.go @@ -27,6 +27,7 @@ import ( "fmt" "io/ioutil" "math/big" + "os" "path" "strings" "testing" @@ -36,7 +37,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -127,7 +127,15 @@ func newTestCAWithClient(caSubject pkix.Name, caSerial *big.Int, clientSubject p }, nil } -func TestClientCA(t *testing.T) { +func TestClientCAUpdate(t *testing.T) { + testClientCA(t, false) +} + +func TestClientCARecreate(t *testing.T) { + testClientCA(t, true) +} + +func testClientCA(t *testing.T, recreate bool) { stopCh := make(chan struct{}) defer close(stopCh) @@ -172,7 +180,6 @@ func TestClientCA(t *testing.T) { clientCAFilename = opts.Authentication.ClientCert.ClientCA frontProxyCAFilename = opts.Authentication.RequestHeader.ClientCAFile opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver") - dynamiccertificates.FileRefreshDuration = 1 * time.Second }, }) @@ -187,6 +194,15 @@ func TestClientCA(t *testing.T) { t.Fatal(err) } + if recreate { + if err := os.Remove(path.Join(clientCAFilename)); err != nil { + t.Fatal(err) + } + if err := os.Remove(path.Join(frontProxyCAFilename)); err != nil { + t.Fatal(err) + } + } + // when we run this the second time, we know which one we are expecting if err := ioutil.WriteFile(clientCAFilename, clientCA.CACert, 0644); err != nil { t.Fatal(err) @@ -446,7 +462,15 @@ iUnnLkZt2ya1cDJDiCnJjo7r5KxMo0XXFDc= -----END CERTIFICATE----- `) -func TestServingCert(t *testing.T) { +func TestServingCertUpdate(t *testing.T) { + testServingCert(t, false) +} + +func TestServingCertRecreate(t *testing.T) { + testServingCert(t, true) +} + +func testServingCert(t *testing.T, recreate bool) { stopCh := make(chan struct{}) defer close(stopCh) @@ -456,10 +480,18 @@ func TestServingCert(t *testing.T) { ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 servingCertPath = opts.SecureServing.ServerCert.CertDirectory - dynamiccertificates.FileRefreshDuration = 1 * time.Second }, }) + if recreate { + if err := os.Remove(path.Join(servingCertPath, "apiserver.key")); err != nil { + t.Fatal(err) + } + if err := os.Remove(path.Join(servingCertPath, "apiserver.crt")); err != nil { + t.Fatal(err) + } + } + if err := ioutil.WriteFile(path.Join(servingCertPath, "apiserver.key"), serverKey, 0644); err != nil { t.Fatal(err) } @@ -497,7 +529,6 @@ func TestSNICert(t *testing.T) { t.Fatal(err) } - dynamiccertificates.FileRefreshDuration = 1 * time.Second opts.SecureServing.SNICertKeys = []flag.NamedCertKey{{ Names: []string{"foo"}, CertFile: path.Join(servingCertPath, "foo.crt"),