Wire contexts to Bootstrap controllers

This commit is contained in:
Mike Dame 2021-04-22 14:20:58 -04:00
parent afd55590e2
commit 6ce2924818
5 changed files with 35 additions and 33 deletions

View File

@ -34,7 +34,7 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr
if err != nil {
return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
}
go bsc.Run(ctx.Done())
go bsc.Run(ctx)
return nil, true, nil
}
@ -47,6 +47,6 @@ func startTokenCleanerController(ctx context.Context, controllerContext Controll
if err != nil {
return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
}
go tcc.Run(ctx.Done())
go tcc.Run(ctx)
return nil, true, nil
}

View File

@ -155,18 +155,18 @@ func NewSigner(cl clientset.Interface, secrets informers.SecretInformer, configM
}
// Run runs controller loops and returns when they are done
func (e *Signer) Run(stopCh <-chan struct{}) {
func (e *Signer) Run(ctx context.Context) {
// Shut down queues
defer utilruntime.HandleCrash()
defer e.syncQueue.ShutDown()
if !cache.WaitForNamedCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) {
if !cache.WaitForNamedCacheSync("bootstrap_signer", ctx.Done(), e.configMapSynced, e.secretSynced) {
return
}
klog.V(5).Infof("Starting workers")
go wait.Until(e.serviceConfigMapQueue, 0, stopCh)
<-stopCh
go wait.UntilWithContext(ctx, e.serviceConfigMapQueue, 0)
<-ctx.Done()
klog.V(1).Infof("Shutting down")
}
@ -174,19 +174,19 @@ func (e *Signer) pokeConfigMapSync() {
e.syncQueue.Add(e.configMapKey)
}
func (e *Signer) serviceConfigMapQueue() {
func (e *Signer) serviceConfigMapQueue(ctx context.Context) {
key, quit := e.syncQueue.Get()
if quit {
return
}
defer e.syncQueue.Done(key)
e.signConfigMap()
e.signConfigMap(ctx)
}
// signConfigMap computes the signatures on our latest cached objects and writes
// back if necessary.
func (e *Signer) signConfigMap() {
func (e *Signer) signConfigMap(ctx context.Context) {
origCM := e.getConfigMap()
if origCM == nil {
@ -239,12 +239,12 @@ func (e *Signer) signConfigMap() {
}
if needUpdate {
e.updateConfigMap(newCM)
e.updateConfigMap(ctx, newCM)
}
}
func (e *Signer) updateConfigMap(cm *v1.ConfigMap) {
_, err := e.client.CoreV1().ConfigMaps(cm.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{})
func (e *Signer) updateConfigMap(ctx context.Context, cm *v1.ConfigMap) {
_, err := e.client.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
klog.V(3).Infof("Error updating ConfigMap: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package bootstrap
import (
"context"
"testing"
"github.com/davecgh/go-spew/spew"
@ -74,7 +75,7 @@ func TestNoConfigMap(t *testing.T) {
if err != nil {
t.Fatalf("error creating Signer: %v", err)
}
signer.signConfigMap()
signer.signConfigMap(context.TODO())
verifyActions(t, []core.Action{}, cl.Actions())
}
@ -91,7 +92,7 @@ func TestSimpleSign(t *testing.T) {
addSecretSigningUsage(secret, "true")
secrets.Informer().GetIndexer().Add(secret)
signer.signConfigMap()
signer.signConfigMap(context.TODO())
expected := []core.Action{
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
@ -115,7 +116,7 @@ func TestNoSignNeeded(t *testing.T) {
addSecretSigningUsage(secret, "true")
secrets.Informer().GetIndexer().Add(secret)
signer.signConfigMap()
signer.signConfigMap(context.TODO())
verifyActions(t, []core.Action{}, cl.Actions())
}
@ -133,7 +134,7 @@ func TestUpdateSignature(t *testing.T) {
addSecretSigningUsage(secret, "true")
secrets.Informer().GetIndexer().Add(secret)
signer.signConfigMap()
signer.signConfigMap(context.TODO())
expected := []core.Action{
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
@ -153,7 +154,7 @@ func TestRemoveSignature(t *testing.T) {
cm := newConfigMap(testTokenID, "old signature")
configMaps.Informer().GetIndexer().Add(cm)
signer.signConfigMap()
signer.signConfigMap(context.TODO())
expected := []core.Action{
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},

View File

@ -111,20 +111,20 @@ func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInforme
}
// Run runs controller loops and returns when they are done
func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
func (tc *TokenCleaner) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer tc.queue.ShutDown()
klog.Infof("Starting token cleaner controller")
defer klog.Infof("Shutting down token cleaner controller")
if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) {
if !cache.WaitForNamedCacheSync("token_cleaner", ctx.Done(), tc.secretSynced) {
return
}
go wait.Until(tc.worker, 10*time.Second, stopCh)
go wait.UntilWithContext(ctx, tc.worker, 10*time.Second)
<-stopCh
<-ctx.Done()
}
func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
@ -137,20 +137,20 @@ func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
}
// worker runs a thread that dequeues secrets, handles them, and marks them done.
func (tc *TokenCleaner) worker() {
for tc.processNextWorkItem() {
func (tc *TokenCleaner) worker(ctx context.Context) {
for tc.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (tc *TokenCleaner) processNextWorkItem() bool {
func (tc *TokenCleaner) processNextWorkItem(ctx context.Context) bool {
key, quit := tc.queue.Get()
if quit {
return false
}
defer tc.queue.Done(key)
if err := tc.syncFunc(key.(string)); err != nil {
if err := tc.syncFunc(ctx, key.(string)); err != nil {
tc.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
return true
@ -160,7 +160,7 @@ func (tc *TokenCleaner) processNextWorkItem() bool {
return true
}
func (tc *TokenCleaner) syncFunc(key string) error {
func (tc *TokenCleaner) syncFunc(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Since(startTime))
@ -182,12 +182,12 @@ func (tc *TokenCleaner) syncFunc(key string) error {
}
if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
tc.evalSecret(ret)
tc.evalSecret(ctx, ret)
}
return nil
}
func (tc *TokenCleaner) evalSecret(o interface{}) {
func (tc *TokenCleaner) evalSecret(ctx context.Context, o interface{}) {
secret := o.(*v1.Secret)
ttl, alreadyExpired := bootstrapsecretutil.GetExpiration(secret, time.Now())
if alreadyExpired {
@ -196,7 +196,7 @@ func (tc *TokenCleaner) evalSecret(o interface{}) {
if len(secret.UID) > 0 {
options.Preconditions = &metav1.Preconditions{UID: &secret.UID}
}
err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, options)
err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, options)
// NotFound isn't a real error (it's already been deleted)
// Conflict isn't a real error (the UID precondition failed)
if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package bootstrap
import (
"context"
"testing"
"time"
@ -56,7 +57,7 @@ func TestCleanerNoExpiration(t *testing.T) {
secret := newTokenSecret("tokenID", "tokenSecret")
secrets.Informer().GetIndexer().Add(secret)
cleaner.evalSecret(secret)
cleaner.evalSecret(context.TODO(), secret)
expected := []core.Action{}
@ -73,7 +74,7 @@ func TestCleanerExpired(t *testing.T) {
addSecretExpiration(secret, timeString(-time.Hour))
secrets.Informer().GetIndexer().Add(secret)
cleaner.evalSecret(secret)
cleaner.evalSecret(context.TODO(), secret)
expected := []core.Action{
core.NewDeleteAction(
@ -95,7 +96,7 @@ func TestCleanerNotExpired(t *testing.T) {
addSecretExpiration(secret, timeString(time.Hour))
secrets.Informer().GetIndexer().Add(secret)
cleaner.evalSecret(secret)
cleaner.evalSecret(context.TODO(), secret)
expected := []core.Action{}
@ -114,7 +115,7 @@ func TestCleanerExpiredAt(t *testing.T) {
cleaner.enqueueSecrets(secret)
expected := []core.Action{}
verifyFunc := func() {
cleaner.processNextWorkItem()
cleaner.processNextWorkItem(context.TODO())
verifyActions(t, expected, cl.Actions())
}
// token has not expired currently