mirror of
https://github.com/rancher/rke.git
synced 2025-04-27 11:21:08 +00:00
Merge pull request #2374 from rancher/revert-2326-rotate-encryption-keys
Revert "Encryption Key Rotation Changes"
This commit is contained in:
commit
a6327bbc2f
@ -6,30 +6,26 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
ghodssyaml "github.com/ghodss/yaml"
|
||||
"github.com/pkg/errors"
|
||||
normantypes "github.com/rancher/norman/types"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
sigsyaml "sigs.k8s.io/yaml"
|
||||
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/services"
|
||||
"github.com/rancher/rke/templates"
|
||||
v3 "github.com/rancher/rke/types"
|
||||
"github.com/rancher/rke/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/retry"
|
||||
sigsyaml "sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -117,205 +113,62 @@ func (c *Cluster) DisableSecretsEncryption(ctx context.Context, currentCluster *
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
rewriteSecretsOperation = "rewrite-secrets"
|
||||
secretBatchSize = 250
|
||||
)
|
||||
|
||||
// RewriteSecrets does the following:
|
||||
// - retrieves all cluster secrets in batches with size of <secretBatchSize>
|
||||
// - triggers rewrites with new encryption key by sending each secret over a channel consumed by workers that perform the rewrite
|
||||
// - logs progress of rewrite operation
|
||||
func (c *Cluster) RewriteSecrets(ctx context.Context) error {
|
||||
log.Infof(ctx, "Rewriting cluster secrets")
|
||||
|
||||
k8sClient, cliErr := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||
if cliErr != nil {
|
||||
return fmt.Errorf("failed to initialize new kubernetes client: %v", cliErr)
|
||||
}
|
||||
|
||||
rewrites := make(chan interface{}, secretBatchSize)
|
||||
go func() {
|
||||
defer close(rewrites) // exiting this go routine triggers workers to exit
|
||||
|
||||
retryErr := func(err error) bool { // all returned errors can be retried
|
||||
return true
|
||||
}
|
||||
|
||||
var continueToken string
|
||||
var secrets []v1.Secret
|
||||
var restart bool
|
||||
for {
|
||||
err := retry.OnError(retry.DefaultRetry, retryErr, func() error {
|
||||
l, err := k8sClient.CoreV1().Secrets("").List(ctx, metav1.ListOptions{
|
||||
Limit: secretBatchSize, // keep the per request secrets batch size small to avoid client timeouts
|
||||
Continue: continueToken,
|
||||
})
|
||||
if err != nil {
|
||||
if isExpiredTokenErr(err) { // restart list operation due to token expiration
|
||||
logrus.Debugf("[%v] continue token expired, restarting list operation", rewriteSecretsOperation)
|
||||
continueToken = ""
|
||||
restart = true
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
secrets = append(secrets, l.Items...)
|
||||
continueToken = l.Continue
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
cliErr = err
|
||||
break
|
||||
}
|
||||
|
||||
// send this batch to workers for rewrite
|
||||
// duplicates are ok because we cache the names of secrets that have been rewritten, thus workers will only rewrite each secret once
|
||||
for _, s := range secrets {
|
||||
rewrites <- s
|
||||
}
|
||||
secrets = nil // reset secrets since they've been sent to workers
|
||||
|
||||
// if there's no continue token and the list operation doesn't need to be restarted, we've retrieved all secrets
|
||||
if continueToken == "" && !restart {
|
||||
break
|
||||
}
|
||||
|
||||
restart = false
|
||||
}
|
||||
|
||||
logrus.Debugf("[%v] All secrets retrieved and sent for rewrite", rewriteSecretsOperation)
|
||||
}()
|
||||
|
||||
// NOTE: since we retrieve secrets in batches, we don't know total number of secrets up front.
|
||||
// Telling the user how many we've rewritten so far is the best we can do
|
||||
done := make(chan struct{}, SyncWorkers)
|
||||
defer close(done)
|
||||
go func() {
|
||||
var rewritten int
|
||||
for range done {
|
||||
rewritten++
|
||||
if rewritten%50 == 0 { // log a message every 50 secrets
|
||||
log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, rewritten)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
getSecretID := func(s v1.Secret) string {
|
||||
return strings.Join([]string{s.Namespace, s.Name}, "/")
|
||||
}
|
||||
|
||||
// track secrets that have been rewritten
|
||||
// this is needed in case the continue token expires and the list secrets operation needs to be restarted
|
||||
rewritten := make(map[string]struct{})
|
||||
var rmtx sync.RWMutex
|
||||
|
||||
// spawn workers to perform secret rewrites
|
||||
var errgrp errgroup.Group
|
||||
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize new kubernetes client: %v", err)
|
||||
}
|
||||
secretsList, err := k8s.GetSecretsList(k8sClient, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
secretsQueue := util.GetObjectQueue(secretsList.Items)
|
||||
for w := 0; w < SyncWorkers; w++ {
|
||||
errgrp.Go(func() error {
|
||||
var errList []error
|
||||
for secret := range rewrites {
|
||||
for secret := range secretsQueue {
|
||||
s := secret.(v1.Secret)
|
||||
id := getSecretID(s)
|
||||
|
||||
rmtx.RLock()
|
||||
_, ok := rewritten[id]
|
||||
rmtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
err := rewriteSecret(k8sClient, &s)
|
||||
if err != nil {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
|
||||
rmtx.Lock()
|
||||
rewritten[id] = struct{}{}
|
||||
rmtx.Unlock()
|
||||
|
||||
done <- struct{}{}
|
||||
err := rewriteSecret(k8sClient, &s)
|
||||
if err != nil {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
}
|
||||
|
||||
return util.ErrList(errList)
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
logrus.Errorf("[%v] error: %v", rewriteSecretsOperation, err)
|
||||
return err // worker error from rewrites
|
||||
return err
|
||||
}
|
||||
|
||||
if cliErr != nil {
|
||||
log.Infof(ctx, "[%s] Operation encountered error: %v", rewriteSecretsOperation, cliErr)
|
||||
} else {
|
||||
log.Infof(ctx, "[%s] Operation completed", rewriteSecretsOperation)
|
||||
}
|
||||
|
||||
return cliErr
|
||||
log.Infof(ctx, "Cluster secrets rewritten successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) RotateEncryptionKey(ctx context.Context, fullState *FullState) error {
|
||||
// generate new key
|
||||
//generate new key
|
||||
newKey, err := generateEncryptionKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oldKey, err := c.extractActiveKey(c.EncryptionConfig.EncryptionProviderFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debug("adding new encryption key, provider config: [newKey, oldKey]")
|
||||
|
||||
// Ensure encryption is done with newKey
|
||||
err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey, oldKey}, fullState)
|
||||
// reverse the keys order in the file, making newKey the Active Key
|
||||
initialKeyList := []*encryptionKey{ // order is critical here!
|
||||
newKey,
|
||||
oldKey,
|
||||
}
|
||||
initialProviderConfig, err := providerFileFromKeyList(keyList{KeyList: initialKeyList})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// rewrite secrets via updates to secrets
|
||||
if err := c.RewriteSecrets(ctx); err != nil {
|
||||
// if there's a rewrite error, the cluster will need to be restored, so redeploy the initial encryption provider config
|
||||
var updateErr error
|
||||
for i := 0; i < 3; i++ { // up to 3 retries
|
||||
updateErr = c.updateEncryptionProvider(ctx, []*encryptionKey{oldKey}, fullState)
|
||||
if updateErr == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if updateErr != nil {
|
||||
err = errors.Wrap(err, updateErr.Error())
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// At this point, all secrets have been rewritten using the newKey, so we remove the old one.
|
||||
logrus.Debug("removing old encryption key, provider config: [newKey]")
|
||||
|
||||
err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey}, fullState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) updateEncryptionProvider(ctx context.Context, keys []*encryptionKey, fullState *FullState) error {
|
||||
providerConfig, err := providerFileFromKeyList(keyList{KeyList: keys})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.EncryptionConfig.EncryptionProviderFile = providerConfig
|
||||
c.EncryptionConfig.EncryptionProviderFile = initialProviderConfig
|
||||
if err := c.DeployEncryptionProviderFile(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// commit to state as soon as possible
|
||||
logrus.Debugf("[%s] Updating cluster state", services.ControlRole)
|
||||
if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil {
|
||||
@ -324,7 +177,30 @@ func (c *Cluster) updateEncryptionProvider(ctx context.Context, keys []*encrypti
|
||||
if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// rewrite secrets
|
||||
if err := c.RewriteSecrets(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
// At this point, all secrets have been rewritten using the newKey, so we remove the old one.
|
||||
finalKeyList := []*encryptionKey{
|
||||
newKey,
|
||||
}
|
||||
finalProviderConfig, err := providerFileFromKeyList(keyList{KeyList: finalKeyList})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.EncryptionConfig.EncryptionProviderFile = finalProviderConfig
|
||||
if err := c.DeployEncryptionProviderFile(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
// commit to state
|
||||
logrus.Debugf("[%s] Updating cluster state", services.ControlRole)
|
||||
if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -425,18 +301,6 @@ func (c *Cluster) generateDisabledEncryptionProviderFile() (string, error) {
|
||||
return disabledProviderFileFromKey(key)
|
||||
}
|
||||
|
||||
const (
|
||||
errExpiredToken = "The provided continue parameter is too old"
|
||||
)
|
||||
|
||||
// isExpiredTokenErr returns true if the error passed in is due to a continue token expiring
|
||||
func isExpiredTokenErr(err error) bool {
|
||||
if strings.Contains(err.Error(), errExpiredToken) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error {
|
||||
var err error
|
||||
if err = k8s.UpdateSecret(k8sClient, secret); err == nil {
|
||||
@ -445,10 +309,6 @@ func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error {
|
||||
if apierrors.IsConflict(err) {
|
||||
secret, err = k8s.GetSecret(k8sClient, secret.Name, secret.Namespace)
|
||||
if err != nil {
|
||||
// if the secret no longer exists, we can skip it since it does not need to be rewritten
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = k8s.UpdateSecret(k8sClient, secret)
|
||||
@ -475,7 +335,6 @@ func isEncryptionEnabled(rkeConfig *v3.RancherKubernetesEngineConfig) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isEncryptionCustomConfig(rkeConfig *v3.RancherKubernetesEngineConfig) bool {
|
||||
if isEncryptionEnabled(rkeConfig) &&
|
||||
rkeConfig.Services.KubeAPI.SecretsEncryptionConfig.CustomConfig != nil {
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/rke/pki/cert"
|
||||
v3 "github.com/rancher/rke/types"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
@ -58,25 +57,18 @@ func rotateEncryptionKeyFromCli(ctx *cli.Context) error {
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, false, false, false, "", filePath)
|
||||
|
||||
_, _, _, _, _, err = RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
return err
|
||||
return RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
}
|
||||
|
||||
func RotateEncryptionKey(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dialersOptions hosts.DialersOptions,
|
||||
flags cluster.ExternalFlags,
|
||||
) (string, string, string, string, map[string]pki.CertificatePKI, error) {
|
||||
log.Infof(ctx, "Rotating cluster secrets encryption key")
|
||||
|
||||
var APIURL, caCrt, clientCert, clientKey string
|
||||
|
||||
func RotateEncryptionKey(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) error {
|
||||
log.Infof(ctx, "Rotating cluster secrets encryption key..")
|
||||
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||
|
||||
// We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState
|
||||
// We generate the first encryption config in ClusterInit, to store it ASAP. It's written
|
||||
// to the DesiredState
|
||||
stateEncryptionConfig := rkeFullState.DesiredState.EncryptionConfig
|
||||
|
||||
// if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster
|
||||
if rkeFullState.CurrentState.EncryptionConfig != "" {
|
||||
stateEncryptionConfig = rkeFullState.CurrentState.EncryptionConfig
|
||||
@ -84,43 +76,32 @@ func RotateEncryptionKey(
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, stateEncryptionConfig)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if kubeCluster.IsEncryptionCustomConfig() {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration")
|
||||
return fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration")
|
||||
}
|
||||
if !kubeCluster.IsEncryptionEnabled() {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled")
|
||||
return fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled")
|
||||
}
|
||||
|
||||
kubeCluster.Certificates = rkeFullState.DesiredState.CertificatesBundle
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
return err
|
||||
}
|
||||
if len(kubeCluster.ControlPlaneHosts) > 0 {
|
||||
APIURL = fmt.Sprintf("https://%s:6443", kubeCluster.ControlPlaneHosts[0].Address)
|
||||
}
|
||||
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate))
|
||||
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key))
|
||||
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
|
||||
|
||||
err = kubeCluster.RotateEncryptionKey(ctx, rkeFullState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure we have the latest state
|
||||
rkeFullState, _ = cluster.ReadStateFile(ctx, stateFilePath)
|
||||
|
||||
log.Infof(ctx, "Reconciling cluster state")
|
||||
if err := kubeCluster.ReconcileDesiredStateEncryptionConfig(ctx, rkeFullState); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Cluster secrets encryption key rotated successfully")
|
||||
return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil
|
||||
return nil
|
||||
}
|
||||
|
@ -85,9 +85,10 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
// We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState
|
||||
// We generate the first encryption config in ClusterInit, to store it ASAP. It's written
|
||||
// to the DesiredState
|
||||
stateEncryptionConfig := clusterState.DesiredState.EncryptionConfig
|
||||
|
||||
// if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster
|
||||
if clusterState.CurrentState.EncryptionConfig != "" {
|
||||
stateEncryptionConfig = clusterState.CurrentState.EncryptionConfig
|
||||
@ -102,10 +103,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
|
||||
if kubeCluster.RancherKubernetesEngineConfig.RotateCertificates != nil {
|
||||
return rebuildClusterWithRotatedCertificates(ctx, dialersOptions, flags, svcOptionsData)
|
||||
}
|
||||
// if we need to rotate the encryption key, do so and then return
|
||||
if kubeCluster.RancherKubernetesEngineConfig.RotateEncryptionKey {
|
||||
return RotateEncryptionKey(ctx, clusterState.CurrentState.RancherKubernetesEngineConfig.DeepCopy(), dialersOptions, flags)
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Building Kubernetes cluster")
|
||||
err = kubeCluster.SetupDialers(ctx, dialersOptions)
|
||||
|
@ -32,6 +32,7 @@ func getLogger(ctx context.Context) logger {
|
||||
|
||||
func Infof(ctx context.Context, msg string, args ...interface{}) {
|
||||
getLogger(ctx).Infof(msg, args...)
|
||||
|
||||
}
|
||||
|
||||
func Warnf(ctx context.Context, msg string, args ...interface{}) {
|
||||
|
@ -57,8 +57,6 @@ type RancherKubernetesEngineConfig struct {
|
||||
Restore RestoreConfig `yaml:"restore" json:"restore,omitempty"`
|
||||
// Rotating Certificates Option
|
||||
RotateCertificates *RotateCertificates `yaml:"rotate_certificates,omitempty" json:"rotateCertificates,omitempty"`
|
||||
// Rotate Encryption Key Option
|
||||
RotateEncryptionKey bool `yaml:"rotate_encryption_key" json:"rotateEncryptionKey"`
|
||||
// DNS Config
|
||||
DNS *DNSConfig `yaml:"dns" json:"dns,omitempty"`
|
||||
// Upgrade Strategy for the cluster
|
||||
|
4
vendor/k8s.io/client-go/util/retry/OWNERS
generated
vendored
4
vendor/k8s.io/client-go/util/retry/OWNERS
generated
vendored
@ -1,4 +0,0 @@
|
||||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
|
||||
reviewers:
|
||||
- caesarxuchao
|
105
vendor/k8s.io/client-go/util/retry/util.go
generated
vendored
105
vendor/k8s.io/client-go/util/retry/util.go
generated
vendored
@ -1,105 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package retry
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
// DefaultRetry is the recommended retry for a conflict where multiple clients
|
||||
// are making changes to the same resource.
|
||||
var DefaultRetry = wait.Backoff{
|
||||
Steps: 5,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// DefaultBackoff is the recommended backoff for a conflict where a client
|
||||
// may be attempting to make an unrelated modification to a resource under
|
||||
// active management by one or more controllers.
|
||||
var DefaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// OnError allows the caller to retry fn in case the error returned by fn is retriable
|
||||
// according to the provided function. backoff defines the maximum retries and the wait
|
||||
// interval between two retries.
|
||||
func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error {
|
||||
var lastErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
case retriable(err):
|
||||
lastErr = err
|
||||
return false, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = lastErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// RetryOnConflict is used to make an update to a resource when you have to worry about
|
||||
// conflicts caused by other code making unrelated updates to the resource at the same
|
||||
// time. fn should fetch the resource to be modified, make appropriate changes to it, try
|
||||
// to update it, and return (unmodified) the error from the update function. On a
|
||||
// successful update, RetryOnConflict will return nil. If the update function returns a
|
||||
// "Conflict" error, RetryOnConflict will wait some amount of time as described by
|
||||
// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times
|
||||
// and gives up, RetryOnConflict will return an error to the caller.
|
||||
//
|
||||
// err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
// // Fetch the resource here; you need to refetch it on every try, since
|
||||
// // if you got a conflict on the last update attempt then you need to get
|
||||
// // the current version before making your own changes.
|
||||
// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{})
|
||||
// if err ! nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // Make whatever updates to the resource are needed
|
||||
// pod.Status.Phase = v1.PodFailed
|
||||
//
|
||||
// // Try to update
|
||||
// _, err = c.Pods("mynamespace").UpdateStatus(pod)
|
||||
// // You have to return err itself here (not wrapped inside another error)
|
||||
// // so that RetryOnConflict can identify it correctly.
|
||||
// return err
|
||||
// })
|
||||
// if err != nil {
|
||||
// // May be conflict if max retries were hit, or may be something unrelated
|
||||
// // like permissions or a network error
|
||||
// return err
|
||||
// }
|
||||
// ...
|
||||
//
|
||||
// TODO: Make Backoff an interface?
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
return OnError(backoff, errors.IsConflict, fn)
|
||||
}
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -510,7 +510,6 @@ k8s.io/client-go/util/flowcontrol
|
||||
k8s.io/client-go/util/homedir
|
||||
k8s.io/client-go/util/jsonpath
|
||||
k8s.io/client-go/util/keyutil
|
||||
k8s.io/client-go/util/retry
|
||||
k8s.io/client-go/util/workqueue
|
||||
# k8s.io/component-base v0.18.0
|
||||
k8s.io/component-base/version
|
||||
|
Loading…
Reference in New Issue
Block a user