mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Improve dynamic cert file change detection
DynamicFileCAContent and DynamicCertKeyPairContent used periodical job to check whether the file content has changed, leading to 1 minute of delay in worst case. This patch improves it by leveraging fsnotify watcher. The content change will be reflected immediately.
This commit is contained in:
parent
5be21c50c2
commit
3cfe3d048f
@ -11,6 +11,7 @@ require (
|
|||||||
github.com/davecgh/go-spew v1.1.1
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/emicklei/go-restful v2.9.5+incompatible
|
github.com/emicklei/go-restful v2.9.5+incompatible
|
||||||
github.com/evanphx/json-patch v4.11.0+incompatible
|
github.com/evanphx/json-patch v4.11.0+incompatible
|
||||||
|
github.com/fsnotify/fsnotify v1.4.9
|
||||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||||
github.com/go-openapi/jsonreference v0.19.5 // indirect
|
github.com/go-openapi/jsonreference v0.19.5 // indirect
|
||||||
github.com/go-openapi/swag v0.19.14 // indirect
|
github.com/go-openapi/swag v0.19.14 // indirect
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
"k8s.io/client-go/util/cert"
|
"k8s.io/client-go/util/cert"
|
||||||
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -44,7 +45,7 @@ type ControllerRunner interface {
|
|||||||
Run(workers int, stopCh <-chan struct{})
|
Run(workers int, stopCh <-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DynamicFileCAContent provies a CAContentProvider that can dynamically react to new file content
|
// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content
|
||||||
// It also fulfills the authenticator interface to provide verifyoptions
|
// It also fulfills the authenticator interface to provide verifyoptions
|
||||||
type DynamicFileCAContent struct {
|
type DynamicFileCAContent struct {
|
||||||
name string
|
name string
|
||||||
@ -147,7 +148,7 @@ func (c *DynamicFileCAContent) RunOnce() error {
|
|||||||
return c.loadCABundle()
|
return c.loadCABundle()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the kube-apiserver and blocks until stopCh is closed.
|
// Run starts the controller and blocks until stopCh is closed.
|
||||||
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
|
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
@ -158,17 +159,62 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
|
|||||||
// doesn't matter what workers say, only start one.
|
// doesn't matter what workers say, only start one.
|
||||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||||
|
|
||||||
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
|
// start the loop that watches the CA file until stopCh is closed.
|
||||||
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
|
go wait.Until(func() {
|
||||||
c.queue.Add(workItemKey)
|
if err := c.watchCAFile(stopCh); err != nil {
|
||||||
return false, nil
|
klog.ErrorS(err, "Failed to watch CA file, will retry later")
|
||||||
}, stopCh)
|
}
|
||||||
|
}, time.Minute, stopCh)
|
||||||
// TODO this can be wired to an fsnotifier as well.
|
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error {
|
||||||
|
// Trigger a check here to ensure the content will be checked periodically even if the following watch fails.
|
||||||
|
c.queue.Add(workItemKey)
|
||||||
|
|
||||||
|
w, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating fsnotify watcher: %v", err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
if err = w.Add(c.filename); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %v", c.filename, err)
|
||||||
|
}
|
||||||
|
// Trigger a check in case the file is updated before the watch starts.
|
||||||
|
c.queue.Add(workItemKey)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e := <-w.Events:
|
||||||
|
if err := c.handleWatchEvent(e, w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-w.Errors:
|
||||||
|
return fmt.Errorf("received fsnotify error: %v", err)
|
||||||
|
case <-stopCh:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleWatchEvent triggers reloading the CA file, and restarts a new watch if it's a Remove or Rename event.
|
||||||
|
func (c *DynamicFileCAContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error {
|
||||||
|
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
|
||||||
|
defer c.queue.Add(workItemKey)
|
||||||
|
if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := w.Remove(c.filename); err != nil {
|
||||||
|
klog.InfoS("Failed to remove file watch, it may have been deleted", "file", c.filename, "err", err)
|
||||||
|
}
|
||||||
|
if err := w.Add(c.filename); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %v", c.filename, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *DynamicFileCAContent) runWorker() {
|
func (c *DynamicFileCAContent) runWorker() {
|
||||||
for c.processNextWorkItem() {
|
for c.processNextWorkItem() {
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
@ -38,7 +40,7 @@ type DynamicCertKeyPairContent struct {
|
|||||||
// keyFile is the name of the key file to read.
|
// keyFile is the name of the key file to read.
|
||||||
keyFile string
|
keyFile string
|
||||||
|
|
||||||
// servingCert is a certKeyContent that contains the last read, non-zero length content of the key and cert
|
// certKeyPair is a certKeyContent that contains the last read, non-zero length content of the key and cert
|
||||||
certKeyPair atomic.Value
|
certKeyPair atomic.Value
|
||||||
|
|
||||||
listeners []Listener
|
listeners []Listener
|
||||||
@ -75,7 +77,7 @@ func (c *DynamicCertKeyPairContent) AddListener(listener Listener) {
|
|||||||
c.listeners = append(c.listeners, listener)
|
c.listeners = append(c.listeners, listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadServingCert determines the next set of content for the file.
|
// loadCertKeyPair determines the next set of content for the file.
|
||||||
func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
|
func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
|
||||||
cert, err := ioutil.ReadFile(c.certFile)
|
cert, err := ioutil.ReadFile(c.certFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -132,17 +134,68 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
|
|||||||
// doesn't matter what workers say, only start one.
|
// doesn't matter what workers say, only start one.
|
||||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||||
|
|
||||||
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
|
// start the loop that watches the cert and key files until stopCh is closed.
|
||||||
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
|
go wait.Until(func() {
|
||||||
c.queue.Add(workItemKey)
|
if err := c.watchCertKeyFile(stopCh); err != nil {
|
||||||
return false, nil
|
klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
|
||||||
}, stopCh)
|
}
|
||||||
|
}, time.Minute, stopCh)
|
||||||
// TODO this can be wired to an fsnotifier as well.
|
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {
|
||||||
|
// Trigger a check here to ensure the content will be checked periodically even if the following watch fails.
|
||||||
|
c.queue.Add(workItemKey)
|
||||||
|
|
||||||
|
w, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating fsnotify watcher: %v", err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
if err := w.Add(c.certFile); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %v", c.certFile, err)
|
||||||
|
}
|
||||||
|
if err := w.Add(c.keyFile); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %v", c.keyFile, err)
|
||||||
|
}
|
||||||
|
// Trigger a check in case the file is updated before the watch starts.
|
||||||
|
c.queue.Add(workItemKey)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e := <-w.Events:
|
||||||
|
if err := c.handleWatchEvent(e, w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-w.Errors:
|
||||||
|
return fmt.Errorf("received fsnotify error: %v", err)
|
||||||
|
case <-stopCh:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleWatchEvent triggers reloading the cert and key file, and restarts a new watch if it's a Remove or Rename event.
|
||||||
|
// If one file is updated before the other, the loadCertKeyPair method will catch the mismatch and will not apply the
|
||||||
|
// change. When an event of the other file is received, it will trigger reloading the files again and the new content
|
||||||
|
// will be loaded and used.
|
||||||
|
func (c *DynamicCertKeyPairContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error {
|
||||||
|
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
|
||||||
|
defer c.queue.Add(workItemKey)
|
||||||
|
if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := w.Remove(e.Name); err != nil {
|
||||||
|
klog.InfoS("Failed to remove file watch, it may have been deleted", "file", e.Name, "err", err)
|
||||||
|
}
|
||||||
|
if err := w.Add(e.Name); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %v", e.Name, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *DynamicCertKeyPairContent) runWorker() {
|
func (c *DynamicCertKeyPairContent) runWorker() {
|
||||||
for c.processNextWorkItem() {
|
for c.processNextWorkItem() {
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -36,7 +37,6 @@ import (
|
|||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
@ -127,7 +127,15 @@ func newTestCAWithClient(caSubject pkix.Name, caSerial *big.Int, clientSubject p
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientCA(t *testing.T) {
|
func TestClientCAUpdate(t *testing.T) {
|
||||||
|
testClientCA(t, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientCARecreate(t *testing.T) {
|
||||||
|
testClientCA(t, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testClientCA(t *testing.T, recreate bool) {
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
@ -172,7 +180,6 @@ func TestClientCA(t *testing.T) {
|
|||||||
clientCAFilename = opts.Authentication.ClientCert.ClientCA
|
clientCAFilename = opts.Authentication.ClientCert.ClientCA
|
||||||
frontProxyCAFilename = opts.Authentication.RequestHeader.ClientCAFile
|
frontProxyCAFilename = opts.Authentication.RequestHeader.ClientCAFile
|
||||||
opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver")
|
opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver")
|
||||||
dynamiccertificates.FileRefreshDuration = 1 * time.Second
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -187,6 +194,15 @@ func TestClientCA(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if recreate {
|
||||||
|
if err := os.Remove(path.Join(clientCAFilename)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(path.Join(frontProxyCAFilename)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// when we run this the second time, we know which one we are expecting
|
// when we run this the second time, we know which one we are expecting
|
||||||
if err := ioutil.WriteFile(clientCAFilename, clientCA.CACert, 0644); err != nil {
|
if err := ioutil.WriteFile(clientCAFilename, clientCA.CACert, 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -446,7 +462,15 @@ iUnnLkZt2ya1cDJDiCnJjo7r5KxMo0XXFDc=
|
|||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func TestServingCert(t *testing.T) {
|
func TestServingCertUpdate(t *testing.T) {
|
||||||
|
testServingCert(t, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServingCertRecreate(t *testing.T) {
|
||||||
|
testServingCert(t, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testServingCert(t *testing.T, recreate bool) {
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
@ -456,10 +480,18 @@ func TestServingCert(t *testing.T) {
|
|||||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||||
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
|
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
|
||||||
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
|
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
|
||||||
dynamiccertificates.FileRefreshDuration = 1 * time.Second
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if recreate {
|
||||||
|
if err := os.Remove(path.Join(servingCertPath, "apiserver.key")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(path.Join(servingCertPath, "apiserver.crt")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := ioutil.WriteFile(path.Join(servingCertPath, "apiserver.key"), serverKey, 0644); err != nil {
|
if err := ioutil.WriteFile(path.Join(servingCertPath, "apiserver.key"), serverKey, 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -497,7 +529,6 @@ func TestSNICert(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dynamiccertificates.FileRefreshDuration = 1 * time.Second
|
|
||||||
opts.SecureServing.SNICertKeys = []flag.NamedCertKey{{
|
opts.SecureServing.SNICertKeys = []flag.NamedCertKey{{
|
||||||
Names: []string{"foo"},
|
Names: []string{"foo"},
|
||||||
CertFile: path.Join(servingCertPath, "foo.crt"),
|
CertFile: path.Join(servingCertPath, "foo.crt"),
|
||||||
|
Loading…
Reference in New Issue
Block a user