diff --git a/cluster/cluster.go b/cluster/cluster.go index 24866997..066f0b36 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -85,6 +85,7 @@ const ( FullStateConfigMapName = "full-cluster-state" UpdateStateTimeout = 30 GetStateTimeout = 30 + RewriteWorkers = 5 SyncWorkers = 10 NoneAuthorizationMode = "none" LocalNodeAddress = "127.0.0.1" diff --git a/cluster/encryption.go b/cluster/encryption.go index 0e95394c..0bf355d2 100644 --- a/cluster/encryption.go +++ b/cluster/encryption.go @@ -40,6 +40,7 @@ type encryptionKey struct { Name string Secret string } + type keyList struct { KeyList []*encryptionKey } @@ -142,9 +143,12 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error { return true } + logrus.Debugf("[%v] retrieving cluster secrets with batch size: %v", rewriteSecretsOperation, secretBatchSize) + var continueToken string var secrets []v1.Secret var restart bool + var batchNum uint for { err := retry.OnError(retry.DefaultRetry, retryErr, func() error { l, err := k8sClient.CoreV1().Secrets("").List(ctx, metav1.ListOptions{ @@ -156,11 +160,15 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error { logrus.Debugf("[%v] continue token expired, restarting list operation", rewriteSecretsOperation) continueToken = "" restart = true + batchNum = 0 return nil } return err } + batchNum++ + logrus.Debugf("[%v] batch %v, retrieved %v secrets from cluster", rewriteSecretsOperation, batchNum, len(l.Items)) + secrets = append(secrets, l.Items...) continueToken = l.Continue @@ -191,14 +199,16 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error { // NOTE: since we retrieve secrets in batches, we don't know total number of secrets up front. // Telling the user how many we've rewritten so far is the best we can do - done := make(chan struct{}, SyncWorkers) - defer close(done) - go func() { - var rewritten int + done := make(chan struct{}, RewriteWorkers) + var numRewritten int + var wg sync.WaitGroup + wg.Add(1) + go func() { // track progress of secret rewrites + defer wg.Done() for range done { - rewritten++ - if rewritten%50 == 0 { // log a message every 50 secrets - log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, rewritten) + numRewritten++ + if numRewritten%50 == 0 { // log a message every 50 secrets + log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, numRewritten) } } }() @@ -214,7 +224,7 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error { // spawn workers to perform secret rewrites var errgrp errgroup.Group - for w := 0; w < SyncWorkers; w++ { + for w := 0; w < RewriteWorkers; w++ { errgrp.Go(func() error { var errList []error for secret := range rewrites { @@ -247,10 +257,14 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error { return err // worker error from rewrites } + // All secrets have been sent for rewrite, send exit signal to progress tracking go routine and wait for exit + close(done) + wg.Wait() + if cliErr != nil { log.Infof(ctx, "[%s] Operation encountered error: %v", rewriteSecretsOperation, cliErr) } else { - log.Infof(ctx, "[%s] Operation completed", rewriteSecretsOperation) + log.Infof(ctx, "[%s] Operation completed, %v secrets rewritten", rewriteSecretsOperation, numRewritten) } return cliErr