mirror of
https://github.com/rancher/dynamiclistener.git
synced 2025-09-09 00:50:38 +00:00
Compare commits
1 Commits
main
...
renovate/m
Author | SHA1 | Date | |
---|---|---|---|
|
a6357ebebf |
@@ -195,10 +195,6 @@ func (t *TLS) generateCert(secret *v1.Secret, cn ...string) (*v1.Secret, bool, e
|
||||
}
|
||||
|
||||
func (t *TLS) IsExpired(secret *v1.Secret) bool {
|
||||
if secret == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
certsPem := secret.Data[v1.TLSCertKey]
|
||||
if len(certsPem) == 0 {
|
||||
return false
|
||||
|
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module github.com/rancher/dynamiclistener
|
||||
|
||||
go 1.24.0
|
||||
|
||||
toolchain go1.24.3
|
||||
toolchain go1.24.7
|
||||
|
||||
require (
|
||||
github.com/rancher/wrangler/v3 v3.2.2-rc.3
|
||||
|
@@ -2,47 +2,33 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"maps"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/dynamiclistener"
|
||||
"github.com/rancher/dynamiclistener/cert"
|
||||
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
|
||||
v1controller "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
|
||||
"github.com/rancher/wrangler/v3/pkg/start"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
toolswatch "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
type CoreGetter func() *core.Factory
|
||||
|
||||
type storage struct {
|
||||
namespace, name string
|
||||
storage dynamiclistener.TLSStorage
|
||||
secrets v1controller.SecretController
|
||||
tls dynamiclistener.TLSFactory
|
||||
queue workqueue.TypedInterface[string]
|
||||
queuedSecret *v1.Secret
|
||||
}
|
||||
|
||||
func Load(ctx context.Context, secrets v1controller.SecretController, namespace, name string, backing dynamiclistener.TLSStorage) dynamiclistener.TLSStorage {
|
||||
storage := &storage{
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
storage: backing,
|
||||
queue: workqueue.NewTyped[string](),
|
||||
ctx: ctx,
|
||||
initSync: &sync.Once{},
|
||||
}
|
||||
storage.runQueue()
|
||||
storage.init(ctx, secrets)
|
||||
storage.init(secrets)
|
||||
return storage
|
||||
}
|
||||
|
||||
@@ -51,16 +37,16 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
storage: backing,
|
||||
queue: workqueue.NewTyped[string](),
|
||||
ctx: ctx,
|
||||
initSync: &sync.Once{},
|
||||
}
|
||||
storage.runQueue()
|
||||
|
||||
// lazy init
|
||||
go func() {
|
||||
wait.PollImmediateUntilWithContext(ctx, time.Second, func(cxt context.Context) (bool, error) {
|
||||
if coreFactory := core(); coreFactory != nil {
|
||||
storage.init(ctx, coreFactory.Core().V1().Secret())
|
||||
return true, nil
|
||||
storage.init(coreFactory.Core().V1().Secret())
|
||||
return true, start.All(ctx, 5, coreFactory)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
@@ -69,94 +55,100 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
|
||||
return storage
|
||||
}
|
||||
|
||||
// always return secret from backing storage
|
||||
func (s *storage) Get() (*v1.Secret, error) {
|
||||
return s.storage.Get()
|
||||
}
|
||||
type storage struct {
|
||||
sync.RWMutex
|
||||
|
||||
// sync secret to Kubernetes and backing storage via workqueue
|
||||
func (s *storage) Update(secret *v1.Secret) error {
|
||||
// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
|
||||
// accepting new connections if the apiserver becomes unavailable after the Secrets controller
|
||||
// has been initialized.
|
||||
s.queuedSecret = secret
|
||||
s.queue.Add(s.name)
|
||||
return nil
|
||||
namespace, name string
|
||||
storage dynamiclistener.TLSStorage
|
||||
secrets v1controller.SecretController
|
||||
ctx context.Context
|
||||
tls dynamiclistener.TLSFactory
|
||||
initialized bool
|
||||
initSync *sync.Once
|
||||
}
|
||||
|
||||
func (s *storage) SetFactory(tls dynamiclistener.TLSFactory) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.tls = tls
|
||||
}
|
||||
|
||||
func (s *storage) init(ctx context.Context, secrets v1controller.SecretController) {
|
||||
s.secrets = secrets
|
||||
func (s *storage) init(secrets v1controller.SecretController) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Watch just the target secret, instead of using a wrangler OnChange handler
|
||||
// which watches all secrets in all namespaces. Changes to the secret
|
||||
// will be sent through the workqueue.
|
||||
go func() {
|
||||
fieldSelector := fields.Set{"metadata.name": s.name}.String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return secrets.List(s.namespace, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return secrets.Watch(s.namespace, options)
|
||||
},
|
||||
secrets.OnChange(s.ctx, "tls-storage", func(key string, secret *v1.Secret) (*v1.Secret, error) {
|
||||
if secret == nil {
|
||||
return nil, nil
|
||||
}
|
||||
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Secret{})
|
||||
|
||||
defer func() {
|
||||
s.queue.ShutDown()
|
||||
watch.Stop()
|
||||
<-done
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ev := <-watch.ResultChan():
|
||||
if secret, ok := ev.Object.(*v1.Secret); ok {
|
||||
s.queuedSecret = secret
|
||||
s.queue.Add(secret.Name)
|
||||
}
|
||||
if secret.Namespace == s.namespace && secret.Name == s.name {
|
||||
if err := s.Update(secret); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// enqueue initial sync of the backing secret
|
||||
s.queuedSecret, _ = s.Get()
|
||||
s.queue.Add(s.name)
|
||||
return secret, nil
|
||||
})
|
||||
s.secrets = secrets
|
||||
|
||||
// Asynchronously sync the backing storage to the Kubernetes secret, as doing so inline may
|
||||
// block the listener from accepting new connections if the apiserver becomes unavailable
|
||||
// after the Secrets controller has been initialized. We're not passing around any contexts
|
||||
// here, nor does the controller accept any, so there's no good way to soft-fail with a
|
||||
// reasonable timeout.
|
||||
go s.syncStorage()
|
||||
}
|
||||
|
||||
// runQueue starts a goroutine to process secrets updates from the workqueue
|
||||
func (s *storage) runQueue() {
|
||||
go func() {
|
||||
for s.processQueue() {
|
||||
func (s *storage) syncStorage() {
|
||||
var updateStorage bool
|
||||
secret, err := s.Get()
|
||||
if err == nil && cert.IsValidTLSSecret(secret) {
|
||||
// local storage had a cached secret, ensure that it exists in Kubernetes
|
||||
_, err := s.secrets.Create(&v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: s.name,
|
||||
Namespace: s.namespace,
|
||||
Annotations: secret.Annotations,
|
||||
},
|
||||
Type: v1.SecretTypeTLS,
|
||||
Data: secret.Data,
|
||||
})
|
||||
if err != nil && !errors.IsAlreadyExists(err) {
|
||||
logrus.Warnf("Failed to create Kubernetes secret: %v", err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// local storage was empty, try to populate it
|
||||
secret, err = s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
logrus.Warnf("Failed to init Kubernetes secret: %v", err)
|
||||
}
|
||||
} else {
|
||||
updateStorage = true
|
||||
}
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.initialized = true
|
||||
if updateStorage {
|
||||
if err := s.storage.Update(secret); err != nil {
|
||||
logrus.Warnf("Failed to init backing storage secret: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processQueue processes the secret update queue.
|
||||
// The key doesn't actually matter, as we are only handling a single secret with a single worker.
|
||||
func (s *storage) processQueue() bool {
|
||||
key, shutdown := s.queue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
func (s *storage) Get() (*v1.Secret, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
defer s.queue.Done(key)
|
||||
if err := s.update(); err != nil {
|
||||
logrus.Errorf("Failed to update Secret %s/%s: %v", s.namespace, s.name, err)
|
||||
}
|
||||
|
||||
return true
|
||||
return s.storage.Get()
|
||||
}
|
||||
|
||||
func (s *storage) targetSecret() (*v1.Secret, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
existingSecret, err := s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
return &v1.Secret{
|
||||
@@ -170,16 +162,22 @@ func (s *storage) targetSecret() (*v1.Secret, error) {
|
||||
return existingSecret, err
|
||||
}
|
||||
|
||||
// saveInK8s handles merging the provided secret with the kubernetes secret.
|
||||
// This includes calling the tls factory to sign a new certificate with the
|
||||
// merged SAN entries, if possible. Note that the provided secret could be
|
||||
// either from Kubernetes due to the secret being changed by another client, or
|
||||
// from the listener trying to add SANs or regenerate the cert.
|
||||
func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
|
||||
// secret controller not initialized yet, just return the current secret.
|
||||
// if there is an existing secret in Kubernetes, that will get synced by the
|
||||
// list/watch once the controller is initialized.
|
||||
if s.secrets == nil {
|
||||
if !s.initComplete() {
|
||||
// Start a goroutine to attempt to save the secret later, once init is complete.
|
||||
// If this was already handled by initComplete, it should be a no-op, or at worst get
|
||||
// merged with the Kubernetes secret.
|
||||
go s.initSync.Do(func() {
|
||||
if err := wait.Poll(100*time.Millisecond, 15*time.Minute, func() (bool, error) {
|
||||
if !s.initComplete() {
|
||||
return false, nil
|
||||
}
|
||||
_, err := s.saveInK8s(secret)
|
||||
return true, err
|
||||
}); err != nil {
|
||||
logrus.Errorf("Failed to save TLS secret after controller init: %v", err)
|
||||
}
|
||||
})
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
@@ -216,38 +214,54 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
|
||||
return targetSecret, nil
|
||||
}
|
||||
|
||||
// Any changes to the cert will change the fingerprint annotation, so we can use that
|
||||
// for change detection, and skip updating an existing secret if it has not changed.
|
||||
changed := !maps.Equal(targetSecret.Annotations, secret.Annotations)
|
||||
|
||||
targetSecret.Type = v1.SecretTypeTLS
|
||||
targetSecret.Annotations = secret.Annotations
|
||||
targetSecret.Type = v1.SecretTypeTLS
|
||||
targetSecret.Data = secret.Data
|
||||
|
||||
if targetSecret.UID == "" {
|
||||
logrus.Infof("Creating new TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Create(targetSecret)
|
||||
} else if changed {
|
||||
logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Update(targetSecret)
|
||||
}
|
||||
return targetSecret, nil
|
||||
logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Update(targetSecret)
|
||||
}
|
||||
|
||||
func (s *storage) Update(secret *v1.Secret) error {
|
||||
// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
|
||||
// accepting new connections if the apiserver becomes unavailable after the Secrets controller
|
||||
// has been initialized. We're not passing around any contexts here, nor does the controller
|
||||
// accept any, so there's no good way to soft-fail with a reasonable timeout.
|
||||
go func() {
|
||||
if err := s.update(secret); err != nil {
|
||||
logrus.Errorf("Failed to save TLS secret for %s/%s: %v", secret.Namespace, secret.Name, err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func isConflictOrAlreadyExists(err error) bool {
|
||||
return errors.IsConflict(err) || errors.IsAlreadyExists(err)
|
||||
}
|
||||
|
||||
// update wraps a conflict retry around saveInK8s, which handles merging the
|
||||
// queued secret with the Kubernetes secret. Only after successfully
|
||||
// updating the Kubernetes secret will the backing storage be updated.
|
||||
func (s *storage) update() (err error) {
|
||||
func (s *storage) update(secret *v1.Secret) (err error) {
|
||||
var newSecret *v1.Secret
|
||||
if err := retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error {
|
||||
newSecret, err = s.saveInK8s(s.queuedSecret)
|
||||
err = retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error {
|
||||
newSecret, err = s.saveInK8s(secret)
|
||||
return err
|
||||
}); err != nil {
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only hold the lock while updating underlying storage
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
return s.storage.Update(newSecret)
|
||||
}
|
||||
|
||||
func (s *storage) initComplete() bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.initialized
|
||||
}
|
||||
|
Reference in New Issue
Block a user