diff --git a/factory/gen.go b/factory/gen.go index 9888a3e..aa72784 100644 --- a/factory/gen.go +++ b/factory/gen.go @@ -195,6 +195,10 @@ func (t *TLS) generateCert(secret *v1.Secret, cn ...string) (*v1.Secret, bool, e } func (t *TLS) IsExpired(secret *v1.Secret) bool { + if secret == nil { + return false + } + certsPem := secret.Data[v1.TLSCertKey] if len(certsPem) == 0 { return false diff --git a/storage/kubernetes/controller.go b/storage/kubernetes/controller.go index e82bb40..a6f6fe1 100644 --- a/storage/kubernetes/controller.go +++ b/storage/kubernetes/controller.go @@ -2,33 +2,47 @@ package kubernetes import ( "context" - "sync" + "maps" "time" "github.com/rancher/dynamiclistener" "github.com/rancher/dynamiclistener/cert" "github.com/rancher/wrangler/v3/pkg/generated/controllers/core" v1controller "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" - "github.com/rancher/wrangler/v3/pkg/start" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" ) type CoreGetter func() *core.Factory +type storage struct { + namespace, name string + storage dynamiclistener.TLSStorage + secrets v1controller.SecretController + tls dynamiclistener.TLSFactory + queue workqueue.TypedInterface[string] + queuedSecret *v1.Secret +} + func Load(ctx context.Context, secrets v1controller.SecretController, namespace, name string, backing dynamiclistener.TLSStorage) dynamiclistener.TLSStorage { storage := &storage{ name: name, namespace: namespace, storage: backing, - ctx: ctx, - initSync: &sync.Once{}, + queue: workqueue.NewTyped[string](), } - storage.init(secrets) + storage.runQueue() + storage.init(ctx, secrets) return storage } @@ -37,16 +51,16 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d name: name, namespace: namespace, storage: backing, - ctx: ctx, - initSync: &sync.Once{}, + queue: workqueue.NewTyped[string](), } + storage.runQueue() // lazy init go func() { wait.PollImmediateUntilWithContext(ctx, time.Second, func(cxt context.Context) (bool, error) { if coreFactory := core(); coreFactory != nil { - storage.init(coreFactory.Core().V1().Secret()) - return true, start.All(ctx, 5, coreFactory) + storage.init(ctx, coreFactory.Core().V1().Secret()) + return true, nil } return false, nil }) @@ -55,100 +69,94 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d return storage } -type storage struct { - sync.RWMutex - - namespace, name string - storage dynamiclistener.TLSStorage - secrets v1controller.SecretController - ctx context.Context - tls dynamiclistener.TLSFactory - initialized bool - initSync *sync.Once -} - -func (s *storage) SetFactory(tls dynamiclistener.TLSFactory) { - s.Lock() - defer s.Unlock() - s.tls = tls -} - -func (s *storage) init(secrets v1controller.SecretController) { - s.Lock() - defer s.Unlock() - - secrets.OnChange(s.ctx, "tls-storage", func(key string, secret *v1.Secret) (*v1.Secret, error) { - if secret == nil { - return nil, nil - } - if secret.Namespace == s.namespace && secret.Name == s.name { - if err := s.Update(secret); err != nil { - return nil, err - } - } - - return secret, nil - }) - s.secrets = secrets - - // Asynchronously sync the backing storage to the Kubernetes secret, as doing so inline may - // block the listener from accepting new connections if the apiserver becomes unavailable - // after the Secrets controller has been initialized. We're not passing around any contexts - // here, nor does the controller accept any, so there's no good way to soft-fail with a - // reasonable timeout. - go s.syncStorage() -} - -func (s *storage) syncStorage() { - var updateStorage bool - secret, err := s.Get() - if err == nil && cert.IsValidTLSSecret(secret) { - // local storage had a cached secret, ensure that it exists in Kubernetes - _, err := s.secrets.Create(&v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: s.name, - Namespace: s.namespace, - Annotations: secret.Annotations, - }, - Type: v1.SecretTypeTLS, - Data: secret.Data, - }) - if err != nil && !errors.IsAlreadyExists(err) { - logrus.Warnf("Failed to create Kubernetes secret: %v", err) - } - } else { - // local storage was empty, try to populate it - secret, err = s.secrets.Get(s.namespace, s.name, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - logrus.Warnf("Failed to init Kubernetes secret: %v", err) - } - } else { - updateStorage = true - } - } - - s.Lock() - defer s.Unlock() - s.initialized = true - if updateStorage { - if err := s.storage.Update(secret); err != nil { - logrus.Warnf("Failed to init backing storage secret: %v", err) - } - } -} - +// always return secret from backing storage func (s *storage) Get() (*v1.Secret, error) { - s.RLock() - defer s.RUnlock() - return s.storage.Get() } -func (s *storage) targetSecret() (*v1.Secret, error) { - s.RLock() - defer s.RUnlock() +// sync secret to Kubernetes and backing storage via workqueue +func (s *storage) Update(secret *v1.Secret) error { + // Asynchronously update the Kubernetes secret, as doing so inline may block the listener from + // accepting new connections if the apiserver becomes unavailable after the Secrets controller + // has been initialized. + s.queuedSecret = secret + s.queue.Add(s.name) + return nil +} +func (s *storage) SetFactory(tls dynamiclistener.TLSFactory) { + s.tls = tls +} + +func (s *storage) init(ctx context.Context, secrets v1controller.SecretController) { + s.secrets = secrets + + // Watch just the target secret, instead of using a wrangler OnChange handler + // which watches all secrets in all namespaces. Changes to the secret + // will be sent through the workqueue. + go func() { + fieldSelector := fields.Set{"metadata.name": s.name}.String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return secrets.List(s.namespace, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return secrets.Watch(s.namespace, options) + }, + } + _, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Secret{}) + + defer func() { + s.queue.ShutDown() + watch.Stop() + <-done + }() + + for { + select { + case <-ctx.Done(): + return + case ev := <-watch.ResultChan(): + if secret, ok := ev.Object.(*v1.Secret); ok { + s.queuedSecret = secret + s.queue.Add(secret.Name) + } + } + } + }() + + // enqueue initial sync of the backing secret + s.queuedSecret, _ = s.Get() + s.queue.Add(s.name) +} + +// runQueue starts a goroutine to process secrets updates from the workqueue +func (s *storage) runQueue() { + go func() { + for s.processQueue() { + } + }() +} + +// processQueue processes the secret update queue. +// The key doesn't actually matter, as we are only handling a single secret with a single worker. +func (s *storage) processQueue() bool { + key, shutdown := s.queue.Get() + if shutdown { + return false + } + + defer s.queue.Done(key) + if err := s.update(); err != nil { + logrus.Errorf("Failed to update Secret %s/%s: %v", s.namespace, s.name, err) + } + + return true +} + +func (s *storage) targetSecret() (*v1.Secret, error) { existingSecret, err := s.secrets.Get(s.namespace, s.name, metav1.GetOptions{}) if errors.IsNotFound(err) { return &v1.Secret{ @@ -162,22 +170,16 @@ func (s *storage) targetSecret() (*v1.Secret, error) { return existingSecret, err } +// saveInK8s handles merging the provided secret with the kubernetes secret. +// This includes calling the tls factory to sign a new certificate with the +// merged SAN entries, if possible. Note that the provided secret could be +// either from Kubernetes due to the secret being changed by another client, or +// from the listener trying to add SANs or regenerate the cert. func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) { - if !s.initComplete() { - // Start a goroutine to attempt to save the secret later, once init is complete. - // If this was already handled by initComplete, it should be a no-op, or at worst get - // merged with the Kubernetes secret. - go s.initSync.Do(func() { - if err := wait.Poll(100*time.Millisecond, 15*time.Minute, func() (bool, error) { - if !s.initComplete() { - return false, nil - } - _, err := s.saveInK8s(secret) - return true, err - }); err != nil { - logrus.Errorf("Failed to save TLS secret after controller init: %v", err) - } - }) + // secret controller not initialized yet, just return the current secret. + // if there is an existing secret in Kubernetes, that will get synced by the + // list/watch once the controller is initialized. + if s.secrets == nil { return secret, nil } @@ -214,54 +216,38 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) { return targetSecret, nil } - targetSecret.Annotations = secret.Annotations + // Any changes to the cert will change the fingerprint annotation, so we can use that + // for change detection, and skip updating an existing secret if it has not changed. + changed := !maps.Equal(targetSecret.Annotations, secret.Annotations) + targetSecret.Type = v1.SecretTypeTLS + targetSecret.Annotations = secret.Annotations targetSecret.Data = secret.Data if targetSecret.UID == "" { logrus.Infof("Creating new TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations) return s.secrets.Create(targetSecret) + } else if changed { + logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations) + return s.secrets.Update(targetSecret) } - logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations) - return s.secrets.Update(targetSecret) -} - -func (s *storage) Update(secret *v1.Secret) error { - // Asynchronously update the Kubernetes secret, as doing so inline may block the listener from - // accepting new connections if the apiserver becomes unavailable after the Secrets controller - // has been initialized. We're not passing around any contexts here, nor does the controller - // accept any, so there's no good way to soft-fail with a reasonable timeout. - go func() { - if err := s.update(secret); err != nil { - logrus.Errorf("Failed to save TLS secret for %s/%s: %v", secret.Namespace, secret.Name, err) - } - }() - return nil + return targetSecret, nil } func isConflictOrAlreadyExists(err error) bool { return errors.IsConflict(err) || errors.IsAlreadyExists(err) } -func (s *storage) update(secret *v1.Secret) (err error) { +// update wraps a conflict retry around saveInK8s, which handles merging the +// queued secret with the Kubernetes secret. Only after successfully +// updating the Kubernetes secret will the backing storage be updated. +func (s *storage) update() (err error) { var newSecret *v1.Secret - err = retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error { - newSecret, err = s.saveInK8s(secret) + if err := retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error { + newSecret, err = s.saveInK8s(s.queuedSecret) return err - }) - - if err != nil { + }); err != nil { return err } - - // Only hold the lock while updating underlying storage - s.Lock() - defer s.Unlock() return s.storage.Update(newSecret) } - -func (s *storage) initComplete() bool { - s.RLock() - defer s.RUnlock() - return s.initialized -}