Move Kubernetes Secrets storage update to goroutine

Fixes issue where apiserver outages can block dynamiclistener from accepting new connections.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2022-04-29 17:38:08 -07:00 committed by Brad Davidson
parent 148d38076d
commit b1d65efb6f
6 changed files with 76 additions and 28 deletions

16
cert/secret.go Normal file
View File

@ -0,0 +1,16 @@
package cert
import v1 "k8s.io/api/core/v1"
func IsValidTLSSecret(secret *v1.Secret) bool {
if secret == nil {
return false
}
if _, ok := secret.Data[v1.TLSCertKey]; !ok {
return false
}
if _, ok := secret.Data[v1.TLSPrivateKeyKey]; !ok {
return false
}
return true
}

View File

@ -179,6 +179,7 @@ func (t *TLS) generateCert(secret *v1.Secret, cn ...string) (*v1.Secret, bool, e
if secret.Data == nil {
secret.Data = map[string][]byte{}
}
secret.Type = v1.SecretTypeTLS
secret.Data[v1.TLSCertKey] = certBytes
secret.Data[v1.TLSPrivateKeyKey] = keyBytes
secret.Annotations[fingerprint] = fmt.Sprintf("SHA1=%X", sha1.Sum(newCert.Raw))

1
go.mod
View File

@ -8,4 +8,5 @@ require (
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
k8s.io/api v0.18.8
k8s.io/apimachinery v0.18.8
k8s.io/client-go v0.18.8
)

View File

@ -408,6 +408,9 @@ func (l *listener) loadCert(currentConn net.Conn) (*tls.Certificate, error) {
if err != nil {
return nil, err
}
if !cert.IsValidTLSSecret(secret) {
return l.cert, nil
}
if l.cert != nil && l.version == secret.ResourceVersion && secret.ResourceVersion != "" {
return l.cert, nil
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/rancher/dynamiclistener"
"github.com/rancher/dynamiclistener/cert"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
v1controller "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/start"
@ -13,6 +14,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)
type CoreGetter func() *core.Factory
@ -39,10 +41,9 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
// lazy init
go func() {
for {
core := core()
if core != nil {
storage.init(core.Core().V1().Secret())
_ = start.All(ctx, 5, core)
if coreFactory := core(); coreFactory != nil {
storage.init(coreFactory.Core().V1().Secret())
_ = start.All(ctx, 5, coreFactory)
return
}
@ -58,11 +59,11 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
}
type storage struct {
sync.Mutex
sync.RWMutex
namespace, name string
storage dynamiclistener.TLSStorage
secrets v1controller.SecretClient
secrets v1controller.SecretController
ctx context.Context
tls dynamiclistener.TLSFactory
}
@ -90,7 +91,7 @@ func (s *storage) init(secrets v1controller.SecretController) {
s.secrets = secrets
secret, err := s.storage.Get()
if err == nil && secret != nil && len(secret.Data) > 0 {
if err == nil && cert.IsValidTLSSecret(secret) {
// local storage had a cached secret, ensure that it exists in Kubernetes
_, err := s.secrets.Create(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
@ -119,13 +120,16 @@ func (s *storage) init(secrets v1controller.SecretController) {
}
func (s *storage) Get() (*v1.Secret, error) {
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()
return s.storage.Get()
}
func (s *storage) targetSecret() (*v1.Secret, error) {
s.RLock()
defer s.RUnlock()
existingSecret, err := s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return &v1.Secret{
@ -133,13 +137,14 @@ func (s *storage) targetSecret() (*v1.Secret, error) {
Name: s.name,
Namespace: s.namespace,
},
Type: v1.SecretTypeTLS,
}, nil
}
return existingSecret, err
}
func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
if s.secrets == nil {
if !s.controllerHasSynced() {
return secret, nil
}
@ -152,14 +157,14 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
// in favor of just blindly replacing the fields on the Kubernetes secret.
if s.tls != nil {
// merge new secret with secret from backing storage, if one exists
if existing, err := s.storage.Get(); err == nil && existing != nil && len(existing.Data) > 0 {
if existing, err := s.Get(); err == nil && cert.IsValidTLSSecret(existing) {
if newSecret, updated, err := s.tls.Merge(existing, secret); err == nil && updated {
secret = newSecret
}
}
// merge new secret with existing secret from Kubernetes, if one exists
if len(targetSecret.Data) > 0 {
if cert.IsValidTLSSecret(targetSecret) {
if newSecret, updated, err := s.tls.Merge(targetSecret, secret); err != nil {
return nil, err
} else if !updated {
@ -170,36 +175,58 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
}
}
// ensure that the merged secret actually contains data before overwriting the existing fields
if !cert.IsValidTLSSecret(secret) {
logrus.Warnf("Skipping save of TLS secret for %s/%s due to missing certificate data", secret.Namespace, secret.Name)
return targetSecret, nil
}
targetSecret.Annotations = secret.Annotations
targetSecret.Type = v1.SecretTypeTLS
targetSecret.Data = secret.Data
if targetSecret.UID == "" {
logrus.Infof("Creating new TLS secret for %v (count: %d): %v", targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
logrus.Infof("Creating new TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
return s.secrets.Create(targetSecret)
}
logrus.Infof("Updating TLS secret for %v (count: %d): %v", targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
return s.secrets.Update(targetSecret)
}
func (s *storage) Update(secret *v1.Secret) (err error) {
s.Lock()
defer s.Unlock()
for i := 0; i < 3; i++ {
secret, err = s.saveInK8s(secret)
if errors.IsConflict(err) {
continue
} else if err != nil {
return err
func (s *storage) Update(secret *v1.Secret) error {
// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
// accepting new connections if the apiserver becomes unavailable after the Secrets controller
// has been initialized. We're not passing around any contexts here, nor does the controller
// accept any, so there's no good way to soft-fail with a reasonable timeout.
go func() {
if err := s.update(secret); err != nil {
logrus.Errorf("Failed to save TLS secret for %s/%s: %v", secret.Namespace, secret.Name, err)
}
break
}
}()
return nil
}
func (s *storage) update(secret *v1.Secret) (err error) {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
secret, err = s.saveInK8s(secret)
return err
})
if err != nil {
return err
}
// update underlying storage
// Only hold the lock while updating underlying storage
s.Lock()
defer s.Unlock()
return s.storage.Update(secret)
}
func (s *storage) controllerHasSynced() bool {
s.RLock()
defer s.RUnlock()
if s.secrets == nil {
return false
}
return s.secrets.Informer().HasSynced()
}

View File

@ -39,7 +39,7 @@ func (m *memory) Update(secret *v1.Secret) error {
}
}
logrus.Infof("Active TLS secret %s (ver=%s) (count %d): %v", secret.Name, secret.ResourceVersion, len(secret.Annotations)-1, secret.Annotations)
logrus.Infof("Active TLS secret %s/%s (ver=%s) (count %d): %v", secret.Namespace, secret.Name, secret.ResourceVersion, len(secret.Annotations)-1, secret.Annotations)
m.secret = secret
}
return nil