mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #105510 from damemi/wire-contexts-bootstrap
Wire contexts to Bootstrap controllers
This commit is contained in:
commit
3fdeb490e0
@ -34,7 +34,7 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
|
||||
}
|
||||
go bsc.Run(ctx.Done())
|
||||
go bsc.Run(ctx)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
@ -47,6 +47,6 @@ func startTokenCleanerController(ctx context.Context, controllerContext Controll
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
|
||||
}
|
||||
go tcc.Run(ctx.Done())
|
||||
go tcc.Run(ctx)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
@ -155,18 +155,18 @@ func NewSigner(cl clientset.Interface, secrets informers.SecretInformer, configM
|
||||
}
|
||||
|
||||
// Run runs controller loops and returns when they are done
|
||||
func (e *Signer) Run(stopCh <-chan struct{}) {
|
||||
func (e *Signer) Run(ctx context.Context) {
|
||||
// Shut down queues
|
||||
defer utilruntime.HandleCrash()
|
||||
defer e.syncQueue.ShutDown()
|
||||
|
||||
if !cache.WaitForNamedCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) {
|
||||
if !cache.WaitForNamedCacheSync("bootstrap_signer", ctx.Done(), e.configMapSynced, e.secretSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(5).Infof("Starting workers")
|
||||
go wait.Until(e.serviceConfigMapQueue, 0, stopCh)
|
||||
<-stopCh
|
||||
go wait.UntilWithContext(ctx, e.serviceConfigMapQueue, 0)
|
||||
<-ctx.Done()
|
||||
klog.V(1).Infof("Shutting down")
|
||||
}
|
||||
|
||||
@ -174,19 +174,19 @@ func (e *Signer) pokeConfigMapSync() {
|
||||
e.syncQueue.Add(e.configMapKey)
|
||||
}
|
||||
|
||||
func (e *Signer) serviceConfigMapQueue() {
|
||||
func (e *Signer) serviceConfigMapQueue(ctx context.Context) {
|
||||
key, quit := e.syncQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer e.syncQueue.Done(key)
|
||||
|
||||
e.signConfigMap()
|
||||
e.signConfigMap(ctx)
|
||||
}
|
||||
|
||||
// signConfigMap computes the signatures on our latest cached objects and writes
|
||||
// back if necessary.
|
||||
func (e *Signer) signConfigMap() {
|
||||
func (e *Signer) signConfigMap(ctx context.Context) {
|
||||
origCM := e.getConfigMap()
|
||||
|
||||
if origCM == nil {
|
||||
@ -239,12 +239,12 @@ func (e *Signer) signConfigMap() {
|
||||
}
|
||||
|
||||
if needUpdate {
|
||||
e.updateConfigMap(newCM)
|
||||
e.updateConfigMap(ctx, newCM)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Signer) updateConfigMap(cm *v1.ConfigMap) {
|
||||
_, err := e.client.CoreV1().ConfigMaps(cm.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{})
|
||||
func (e *Signer) updateConfigMap(ctx context.Context, cm *v1.ConfigMap) {
|
||||
_, err := e.client.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
|
||||
if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
|
||||
klog.V(3).Infof("Error updating ConfigMap: %v", err)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package bootstrap
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
@ -74,7 +75,7 @@ func TestNoConfigMap(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("error creating Signer: %v", err)
|
||||
}
|
||||
signer.signConfigMap()
|
||||
signer.signConfigMap(context.TODO())
|
||||
verifyActions(t, []core.Action{}, cl.Actions())
|
||||
}
|
||||
|
||||
@ -91,7 +92,7 @@ func TestSimpleSign(t *testing.T) {
|
||||
addSecretSigningUsage(secret, "true")
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
signer.signConfigMap()
|
||||
signer.signConfigMap(context.TODO())
|
||||
|
||||
expected := []core.Action{
|
||||
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
|
||||
@ -115,7 +116,7 @@ func TestNoSignNeeded(t *testing.T) {
|
||||
addSecretSigningUsage(secret, "true")
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
signer.signConfigMap()
|
||||
signer.signConfigMap(context.TODO())
|
||||
|
||||
verifyActions(t, []core.Action{}, cl.Actions())
|
||||
}
|
||||
@ -133,7 +134,7 @@ func TestUpdateSignature(t *testing.T) {
|
||||
addSecretSigningUsage(secret, "true")
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
signer.signConfigMap()
|
||||
signer.signConfigMap(context.TODO())
|
||||
|
||||
expected := []core.Action{
|
||||
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
|
||||
@ -153,7 +154,7 @@ func TestRemoveSignature(t *testing.T) {
|
||||
cm := newConfigMap(testTokenID, "old signature")
|
||||
configMaps.Informer().GetIndexer().Add(cm)
|
||||
|
||||
signer.signConfigMap()
|
||||
signer.signConfigMap(context.TODO())
|
||||
|
||||
expected := []core.Action{
|
||||
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
|
||||
|
@ -111,20 +111,20 @@ func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInforme
|
||||
}
|
||||
|
||||
// Run runs controller loops and returns when they are done
|
||||
func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
|
||||
func (tc *TokenCleaner) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer tc.queue.ShutDown()
|
||||
|
||||
klog.Infof("Starting token cleaner controller")
|
||||
defer klog.Infof("Shutting down token cleaner controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) {
|
||||
if !cache.WaitForNamedCacheSync("token_cleaner", ctx.Done(), tc.secretSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
go wait.Until(tc.worker, 10*time.Second, stopCh)
|
||||
go wait.UntilWithContext(ctx, tc.worker, 10*time.Second)
|
||||
|
||||
<-stopCh
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
|
||||
@ -137,20 +137,20 @@ func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
|
||||
}
|
||||
|
||||
// worker runs a thread that dequeues secrets, handles them, and marks them done.
|
||||
func (tc *TokenCleaner) worker() {
|
||||
for tc.processNextWorkItem() {
|
||||
func (tc *TokenCleaner) worker(ctx context.Context) {
|
||||
for tc.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (tc *TokenCleaner) processNextWorkItem() bool {
|
||||
func (tc *TokenCleaner) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := tc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer tc.queue.Done(key)
|
||||
|
||||
if err := tc.syncFunc(key.(string)); err != nil {
|
||||
if err := tc.syncFunc(ctx, key.(string)); err != nil {
|
||||
tc.queue.AddRateLimited(key)
|
||||
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
|
||||
return true
|
||||
@ -160,7 +160,7 @@ func (tc *TokenCleaner) processNextWorkItem() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (tc *TokenCleaner) syncFunc(key string) error {
|
||||
func (tc *TokenCleaner) syncFunc(ctx context.Context, key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Since(startTime))
|
||||
@ -182,12 +182,12 @@ func (tc *TokenCleaner) syncFunc(key string) error {
|
||||
}
|
||||
|
||||
if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
|
||||
tc.evalSecret(ret)
|
||||
tc.evalSecret(ctx, ret)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc *TokenCleaner) evalSecret(o interface{}) {
|
||||
func (tc *TokenCleaner) evalSecret(ctx context.Context, o interface{}) {
|
||||
secret := o.(*v1.Secret)
|
||||
ttl, alreadyExpired := bootstrapsecretutil.GetExpiration(secret, time.Now())
|
||||
if alreadyExpired {
|
||||
@ -196,7 +196,7 @@ func (tc *TokenCleaner) evalSecret(o interface{}) {
|
||||
if len(secret.UID) > 0 {
|
||||
options.Preconditions = &metav1.Preconditions{UID: &secret.UID}
|
||||
}
|
||||
err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, options)
|
||||
err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, options)
|
||||
// NotFound isn't a real error (it's already been deleted)
|
||||
// Conflict isn't a real error (the UID precondition failed)
|
||||
if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package bootstrap
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -57,7 +58,7 @@ func TestCleanerNoExpiration(t *testing.T) {
|
||||
secret := newTokenSecret("tokenID", "tokenSecret")
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
cleaner.evalSecret(secret)
|
||||
cleaner.evalSecret(context.TODO(), secret)
|
||||
|
||||
expected := []core.Action{}
|
||||
|
||||
@ -74,7 +75,7 @@ func TestCleanerExpired(t *testing.T) {
|
||||
addSecretExpiration(secret, timeString(-time.Hour))
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
cleaner.evalSecret(secret)
|
||||
cleaner.evalSecret(context.TODO(), secret)
|
||||
|
||||
expected := []core.Action{
|
||||
core.NewDeleteActionWithOptions(
|
||||
@ -99,7 +100,7 @@ func TestCleanerNotExpired(t *testing.T) {
|
||||
addSecretExpiration(secret, timeString(time.Hour))
|
||||
secrets.Informer().GetIndexer().Add(secret)
|
||||
|
||||
cleaner.evalSecret(secret)
|
||||
cleaner.evalSecret(context.TODO(), secret)
|
||||
|
||||
expected := []core.Action{}
|
||||
|
||||
@ -118,7 +119,7 @@ func TestCleanerExpiredAt(t *testing.T) {
|
||||
cleaner.enqueueSecrets(secret)
|
||||
expected := []core.Action{}
|
||||
verifyFunc := func() {
|
||||
cleaner.processNextWorkItem()
|
||||
cleaner.processNextWorkItem(context.TODO())
|
||||
verifyActions(t, expected, cl.Actions())
|
||||
}
|
||||
// token has not expired currently
|
||||
|
Loading…
Reference in New Issue
Block a user