From b1d65efb6f8309327ed7beb41463056379ec18eb Mon Sep 17 00:00:00 2001
From: Brad Davidson <brad.davidson@rancher.com>
Date: Fri, 29 Apr 2022 17:38:08 -0700
Subject: [PATCH] Move Kubernetes Secrets storage update to goroutine

Fixes issue where apiserver outages can block dynamiclistener from accepting new connections.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
---
 cert/secret.go                   | 16 +++++++
 factory/gen.go                   |  1 +
 go.mod                           |  1 +
 listener.go                      |  3 ++
 storage/kubernetes/controller.go | 81 +++++++++++++++++++++-----------
 storage/memory/memory.go         |  2 +-
 6 files changed, 76 insertions(+), 28 deletions(-)
 create mode 100644 cert/secret.go

diff --git a/cert/secret.go b/cert/secret.go
new file mode 100644
index 0000000..5143bbb
--- /dev/null
+++ b/cert/secret.go
@@ -0,0 +1,16 @@
+package cert
+
+import v1 "k8s.io/api/core/v1"
+
+func IsValidTLSSecret(secret *v1.Secret) bool {
+	if secret == nil {
+		return false
+	}
+	if _, ok := secret.Data[v1.TLSCertKey]; !ok {
+		return false
+	}
+	if _, ok := secret.Data[v1.TLSPrivateKeyKey]; !ok {
+		return false
+	}
+	return true
+}
diff --git a/factory/gen.go b/factory/gen.go
index 48d21d8..b59f527 100644
--- a/factory/gen.go
+++ b/factory/gen.go
@@ -179,6 +179,7 @@ func (t *TLS) generateCert(secret *v1.Secret, cn ...string) (*v1.Secret, bool, e
 	if secret.Data == nil {
 		secret.Data = map[string][]byte{}
 	}
+	secret.Type = v1.SecretTypeTLS
 	secret.Data[v1.TLSCertKey] = certBytes
 	secret.Data[v1.TLSPrivateKeyKey] = keyBytes
 	secret.Annotations[fingerprint] = fmt.Sprintf("SHA1=%X", sha1.Sum(newCert.Raw))
diff --git a/go.mod b/go.mod
index a70fff5..7f8c3e9 100644
--- a/go.mod
+++ b/go.mod
@@ -8,4 +8,5 @@ require (
 	golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
 	k8s.io/api v0.18.8
 	k8s.io/apimachinery v0.18.8
+	k8s.io/client-go v0.18.8
 )
diff --git a/listener.go b/listener.go
index 072fa3e..c7f189b 100644
--- a/listener.go
+++ b/listener.go
@@ -408,6 +408,9 @@ func (l *listener) loadCert(currentConn net.Conn) (*tls.Certificate, error) {
 	if err != nil {
 		return nil, err
 	}
+	if !cert.IsValidTLSSecret(secret) {
+		return l.cert, nil
+	}
 	if l.cert != nil && l.version == secret.ResourceVersion && secret.ResourceVersion != "" {
 		return l.cert, nil
 	}
diff --git a/storage/kubernetes/controller.go b/storage/kubernetes/controller.go
index b1a9e40..9f10854 100644
--- a/storage/kubernetes/controller.go
+++ b/storage/kubernetes/controller.go
@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/rancher/dynamiclistener"
+	"github.com/rancher/dynamiclistener/cert"
 	"github.com/rancher/wrangler/pkg/generated/controllers/core"
 	v1controller "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
 	"github.com/rancher/wrangler/pkg/start"
@@ -13,6 +14,7 @@ import (
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/util/retry"
 )
 
 type CoreGetter func() *core.Factory
@@ -39,10 +41,9 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
 	// lazy init
 	go func() {
 		for {
-			core := core()
-			if core != nil {
-				storage.init(core.Core().V1().Secret())
-				_ = start.All(ctx, 5, core)
+			if coreFactory := core(); coreFactory != nil {
+				storage.init(coreFactory.Core().V1().Secret())
+				_ = start.All(ctx, 5, coreFactory)
 				return
 			}
 
@@ -58,11 +59,11 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
 }
 
 type storage struct {
-	sync.Mutex
+	sync.RWMutex
 
 	namespace, name string
 	storage         dynamiclistener.TLSStorage
-	secrets         v1controller.SecretClient
+	secrets         v1controller.SecretController
 	ctx             context.Context
 	tls             dynamiclistener.TLSFactory
 }
@@ -90,7 +91,7 @@ func (s *storage) init(secrets v1controller.SecretController) {
 	s.secrets = secrets
 
 	secret, err := s.storage.Get()
-	if err == nil && secret != nil && len(secret.Data) > 0 {
+	if err == nil && cert.IsValidTLSSecret(secret) {
 		// local storage had a cached secret, ensure that it exists in Kubernetes
 		_, err := s.secrets.Create(&v1.Secret{
 			ObjectMeta: metav1.ObjectMeta{
@@ -119,13 +120,16 @@ func (s *storage) init(secrets v1controller.SecretController) {
 }
 
 func (s *storage) Get() (*v1.Secret, error) {
-	s.Lock()
-	defer s.Unlock()
+	s.RLock()
+	defer s.RUnlock()
 
 	return s.storage.Get()
 }
 
 func (s *storage) targetSecret() (*v1.Secret, error) {
+	s.RLock()
+	defer s.RUnlock()
+
 	existingSecret, err := s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
 	if errors.IsNotFound(err) {
 		return &v1.Secret{
@@ -133,13 +137,14 @@ func (s *storage) targetSecret() (*v1.Secret, error) {
 				Name:      s.name,
 				Namespace: s.namespace,
 			},
+			Type: v1.SecretTypeTLS,
 		}, nil
 	}
 	return existingSecret, err
 }
 
 func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
-	if s.secrets == nil {
+	if !s.controllerHasSynced() {
 		return secret, nil
 	}
 
@@ -152,14 +157,14 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
 	// in favor of just blindly replacing the fields on the Kubernetes secret.
 	if s.tls != nil {
 		// merge new secret with secret from backing storage, if one exists
-		if existing, err := s.storage.Get(); err == nil && existing != nil && len(existing.Data) > 0 {
+		if existing, err := s.Get(); err == nil && cert.IsValidTLSSecret(existing) {
 			if newSecret, updated, err := s.tls.Merge(existing, secret); err == nil && updated {
 				secret = newSecret
 			}
 		}
 
 		// merge new secret with existing secret from Kubernetes, if one exists
-		if len(targetSecret.Data) > 0 {
+		if cert.IsValidTLSSecret(targetSecret) {
 			if newSecret, updated, err := s.tls.Merge(targetSecret, secret); err != nil {
 				return nil, err
 			} else if !updated {
@@ -170,36 +175,58 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
 		}
 	}
 
+	// ensure that the merged secret actually contains data before overwriting the existing fields
+	if !cert.IsValidTLSSecret(secret) {
+		logrus.Warnf("Skipping save of TLS secret for %s/%s due to missing certificate data", secret.Namespace, secret.Name)
+		return targetSecret, nil
+	}
+
 	targetSecret.Annotations = secret.Annotations
 	targetSecret.Type = v1.SecretTypeTLS
 	targetSecret.Data = secret.Data
 
 	if targetSecret.UID == "" {
-		logrus.Infof("Creating new TLS secret for %v (count: %d): %v", targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
+		logrus.Infof("Creating new TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
 		return s.secrets.Create(targetSecret)
 	}
-	logrus.Infof("Updating TLS secret for %v (count: %d): %v", targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
+	logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
 	return s.secrets.Update(targetSecret)
 }
 
-func (s *storage) Update(secret *v1.Secret) (err error) {
-	s.Lock()
-	defer s.Unlock()
-
-	for i := 0; i < 3; i++ {
-		secret, err = s.saveInK8s(secret)
-		if errors.IsConflict(err) {
-			continue
-		} else if err != nil {
-			return err
+func (s *storage) Update(secret *v1.Secret) error {
+	// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
+	// accepting new connections if the apiserver becomes unavailable after the Secrets controller
+	// has been initialized. We're not passing around any contexts here, nor does the controller
+	// accept any, so there's no good way to soft-fail with a reasonable timeout.
+	go func() {
+		if err := s.update(secret); err != nil {
+			logrus.Errorf("Failed to save TLS secret for %s/%s: %v", secret.Namespace, secret.Name, err)
 		}
-		break
-	}
+	}()
+	return nil
+}
+
+func (s *storage) update(secret *v1.Secret) (err error) {
+	err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
+		secret, err = s.saveInK8s(secret)
+		return err
+	})
 
 	if err != nil {
 		return err
 	}
 
-	// update underlying storage
+	// Only hold the lock while updating underlying storage
+	s.Lock()
+	defer s.Unlock()
 	return s.storage.Update(secret)
 }
+
+func (s *storage) controllerHasSynced() bool {
+	s.RLock()
+	defer s.RUnlock()
+	if s.secrets == nil {
+		return false
+	}
+	return s.secrets.Informer().HasSynced()
+}
diff --git a/storage/memory/memory.go b/storage/memory/memory.go
index 0edfd1b..cf73996 100644
--- a/storage/memory/memory.go
+++ b/storage/memory/memory.go
@@ -39,7 +39,7 @@ func (m *memory) Update(secret *v1.Secret) error {
 			}
 		}
 
-		logrus.Infof("Active TLS secret %s (ver=%s) (count %d): %v", secret.Name, secret.ResourceVersion, len(secret.Annotations)-1, secret.Annotations)
+		logrus.Infof("Active TLS secret %s/%s (ver=%s) (count %d): %v", secret.Namespace, secret.Name, secret.ResourceVersion, len(secret.Annotations)-1, secret.Annotations)
 		m.secret = secret
 	}
 	return nil