mirror of
https://github.com/rancher/rke.git
synced 2025-05-09 08:47:43 +00:00
reduce rewrite workers, add additional logging around secrets retrieval
This commit is contained in:
parent
c3e5f6f768
commit
c14c39f8c5
cluster
@ -85,6 +85,7 @@ const (
|
||||
FullStateConfigMapName = "full-cluster-state"
|
||||
UpdateStateTimeout = 30
|
||||
GetStateTimeout = 30
|
||||
RewriteWorkers = 5
|
||||
SyncWorkers = 10
|
||||
NoneAuthorizationMode = "none"
|
||||
LocalNodeAddress = "127.0.0.1"
|
||||
|
@ -40,6 +40,7 @@ type encryptionKey struct {
|
||||
Name string
|
||||
Secret string
|
||||
}
|
||||
|
||||
type keyList struct {
|
||||
KeyList []*encryptionKey
|
||||
}
|
||||
@ -142,9 +143,12 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
return true
|
||||
}
|
||||
|
||||
logrus.Debugf("[%v] retrieving cluster secrets with batch size: %v", rewriteSecretsOperation, secretBatchSize)
|
||||
|
||||
var continueToken string
|
||||
var secrets []v1.Secret
|
||||
var restart bool
|
||||
var batchNum uint
|
||||
for {
|
||||
err := retry.OnError(retry.DefaultRetry, retryErr, func() error {
|
||||
l, err := k8sClient.CoreV1().Secrets("").List(ctx, metav1.ListOptions{
|
||||
@ -156,11 +160,15 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
logrus.Debugf("[%v] continue token expired, restarting list operation", rewriteSecretsOperation)
|
||||
continueToken = ""
|
||||
restart = true
|
||||
batchNum = 0
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
batchNum++
|
||||
logrus.Debugf("[%v] batch %v, retrieved %v secrets from cluster", rewriteSecretsOperation, batchNum, len(l.Items))
|
||||
|
||||
secrets = append(secrets, l.Items...)
|
||||
continueToken = l.Continue
|
||||
|
||||
@ -191,14 +199,16 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
|
||||
// NOTE: since we retrieve secrets in batches, we don't know total number of secrets up front.
|
||||
// Telling the user how many we've rewritten so far is the best we can do
|
||||
done := make(chan struct{}, SyncWorkers)
|
||||
defer close(done)
|
||||
go func() {
|
||||
var rewritten int
|
||||
done := make(chan struct{}, RewriteWorkers)
|
||||
var numRewritten int
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() { // track progress of secret rewrites
|
||||
defer wg.Done()
|
||||
for range done {
|
||||
rewritten++
|
||||
if rewritten%50 == 0 { // log a message every 50 secrets
|
||||
log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, rewritten)
|
||||
numRewritten++
|
||||
if numRewritten%50 == 0 { // log a message every 50 secrets
|
||||
log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, numRewritten)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -214,7 +224,7 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
|
||||
// spawn workers to perform secret rewrites
|
||||
var errgrp errgroup.Group
|
||||
for w := 0; w < SyncWorkers; w++ {
|
||||
for w := 0; w < RewriteWorkers; w++ {
|
||||
errgrp.Go(func() error {
|
||||
var errList []error
|
||||
for secret := range rewrites {
|
||||
@ -247,10 +257,14 @@ func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
return err // worker error from rewrites
|
||||
}
|
||||
|
||||
// All secrets have been sent for rewrite, send exit signal to progress tracking go routine and wait for exit
|
||||
close(done)
|
||||
wg.Wait()
|
||||
|
||||
if cliErr != nil {
|
||||
log.Infof(ctx, "[%s] Operation encountered error: %v", rewriteSecretsOperation, cliErr)
|
||||
} else {
|
||||
log.Infof(ctx, "[%s] Operation completed", rewriteSecretsOperation)
|
||||
log.Infof(ctx, "[%s] Operation completed, %v secrets rewritten", rewriteSecretsOperation, numRewritten)
|
||||
}
|
||||
|
||||
return cliErr
|
||||
|
Loading…
Reference in New Issue
Block a user