diff --git a/cluster/encryption.go b/cluster/encryption.go index 0e95394c..cdc06e75 100644 --- a/cluster/encryption.go +++ b/cluster/encryption.go @@ -6,30 +6,26 @@ import ( "encoding/base64" "encoding/json" "fmt" - "strings" - "sync" ghodssyaml "github.com/ghodss/yaml" - "github.com/pkg/errors" normantypes "github.com/rancher/norman/types" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + sigsyaml "sigs.k8s.io/yaml" + "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/services" "github.com/rancher/rke/templates" v3 "github.com/rancher/rke/types" "github.com/rancher/rke/util" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" apiserverconfig "k8s.io/apiserver/pkg/apis/config" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/retry" - sigsyaml "sigs.k8s.io/yaml" ) const ( @@ -117,205 +113,62 @@ func (c *Cluster) DisableSecretsEncryption(ctx context.Context, currentCluster * return nil } -const ( - rewriteSecretsOperation = "rewrite-secrets" - secretBatchSize = 250 -) - -// RewriteSecrets does the following: -// - retrieves all cluster secrets in batches with size of -// - triggers rewrites with new encryption key by sending each secret over a channel consumed by workers that perform the rewrite -// - logs progress of rewrite operation func (c *Cluster) RewriteSecrets(ctx context.Context) error { log.Infof(ctx, "Rewriting cluster secrets") - - k8sClient, cliErr := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) - if cliErr != nil { - return fmt.Errorf("failed to initialize new kubernetes client: %v", cliErr) - } - - rewrites := make(chan interface{}, secretBatchSize) - go func() { - defer close(rewrites) // exiting this go routine triggers workers to exit - - retryErr := func(err error) bool { // all returned errors can be retried - return true - } - - var continueToken string - var secrets []v1.Secret - var restart bool - for { - err := retry.OnError(retry.DefaultRetry, retryErr, func() error { - l, err := k8sClient.CoreV1().Secrets("").List(ctx, metav1.ListOptions{ - Limit: secretBatchSize, // keep the per request secrets batch size small to avoid client timeouts - Continue: continueToken, - }) - if err != nil { - if isExpiredTokenErr(err) { // restart list operation due to token expiration - logrus.Debugf("[%v] continue token expired, restarting list operation", rewriteSecretsOperation) - continueToken = "" - restart = true - return nil - } - return err - } - - secrets = append(secrets, l.Items...) - continueToken = l.Continue - - return nil - }) - if err != nil { - cliErr = err - break - } - - // send this batch to workers for rewrite - // duplicates are ok because we cache the names of secrets that have been rewritten, thus workers will only rewrite each secret once - for _, s := range secrets { - rewrites <- s - } - secrets = nil // reset secrets since they've been sent to workers - - // if there's no continue token and the list operation doesn't need to be restarted, we've retrieved all secrets - if continueToken == "" && !restart { - break - } - - restart = false - } - - logrus.Debugf("[%v] All secrets retrieved and sent for rewrite", rewriteSecretsOperation) - }() - - // NOTE: since we retrieve secrets in batches, we don't know total number of secrets up front. - // Telling the user how many we've rewritten so far is the best we can do - done := make(chan struct{}, SyncWorkers) - defer close(done) - go func() { - var rewritten int - for range done { - rewritten++ - if rewritten%50 == 0 { // log a message every 50 secrets - log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, rewritten) - } - } - }() - - getSecretID := func(s v1.Secret) string { - return strings.Join([]string{s.Namespace, s.Name}, "/") - } - - // track secrets that have been rewritten - // this is needed in case the continue token expires and the list secrets operation needs to be restarted - rewritten := make(map[string]struct{}) - var rmtx sync.RWMutex - - // spawn workers to perform secret rewrites var errgrp errgroup.Group + k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) + if err != nil { + return fmt.Errorf("failed to initialize new kubernetes client: %v", err) + } + secretsList, err := k8s.GetSecretsList(k8sClient, "") + if err != nil { + return err + } + + secretsQueue := util.GetObjectQueue(secretsList.Items) for w := 0; w < SyncWorkers; w++ { errgrp.Go(func() error { var errList []error - for secret := range rewrites { + for secret := range secretsQueue { s := secret.(v1.Secret) - id := getSecretID(s) - - rmtx.RLock() - _, ok := rewritten[id] - rmtx.RUnlock() - - if !ok { - err := rewriteSecret(k8sClient, &s) - if err != nil { - errList = append(errList, err) - } - - rmtx.Lock() - rewritten[id] = struct{}{} - rmtx.Unlock() - - done <- struct{}{} + err := rewriteSecret(k8sClient, &s) + if err != nil { + errList = append(errList, err) } } - return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { - logrus.Errorf("[%v] error: %v", rewriteSecretsOperation, err) - return err // worker error from rewrites + return err } - - if cliErr != nil { - log.Infof(ctx, "[%s] Operation encountered error: %v", rewriteSecretsOperation, cliErr) - } else { - log.Infof(ctx, "[%s] Operation completed", rewriteSecretsOperation) - } - - return cliErr + log.Infof(ctx, "Cluster secrets rewritten successfully") + return nil } func (c *Cluster) RotateEncryptionKey(ctx context.Context, fullState *FullState) error { - // generate new key + //generate new key newKey, err := generateEncryptionKey() if err != nil { return err } - oldKey, err := c.extractActiveKey(c.EncryptionConfig.EncryptionProviderFile) if err != nil { return err } - - logrus.Debug("adding new encryption key, provider config: [newKey, oldKey]") - - // Ensure encryption is done with newKey - err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey, oldKey}, fullState) + // reverse the keys order in the file, making newKey the Active Key + initialKeyList := []*encryptionKey{ // order is critical here! + newKey, + oldKey, + } + initialProviderConfig, err := providerFileFromKeyList(keyList{KeyList: initialKeyList}) if err != nil { return err } - - // rewrite secrets via updates to secrets - if err := c.RewriteSecrets(ctx); err != nil { - // if there's a rewrite error, the cluster will need to be restored, so redeploy the initial encryption provider config - var updateErr error - for i := 0; i < 3; i++ { // up to 3 retries - updateErr = c.updateEncryptionProvider(ctx, []*encryptionKey{oldKey}, fullState) - if updateErr == nil { - break - } - } - - if updateErr != nil { - err = errors.Wrap(err, updateErr.Error()) - } - - return err - } - - // At this point, all secrets have been rewritten using the newKey, so we remove the old one. - logrus.Debug("removing old encryption key, provider config: [newKey]") - - err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey}, fullState) - if err != nil { - return err - } - - return nil -} - -func (c *Cluster) updateEncryptionProvider(ctx context.Context, keys []*encryptionKey, fullState *FullState) error { - providerConfig, err := providerFileFromKeyList(keyList{KeyList: keys}) - if err != nil { - return err - } - - c.EncryptionConfig.EncryptionProviderFile = providerConfig + c.EncryptionConfig.EncryptionProviderFile = initialProviderConfig if err := c.DeployEncryptionProviderFile(ctx); err != nil { return err } - // commit to state as soon as possible logrus.Debugf("[%s] Updating cluster state", services.ControlRole) if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil { @@ -324,7 +177,30 @@ func (c *Cluster) updateEncryptionProvider(ctx context.Context, keys []*encrypti if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil { return err } - + // rewrite secrets + if err := c.RewriteSecrets(ctx); err != nil { + return err + } + // At this point, all secrets have been rewritten using the newKey, so we remove the old one. + finalKeyList := []*encryptionKey{ + newKey, + } + finalProviderConfig, err := providerFileFromKeyList(keyList{KeyList: finalKeyList}) + if err != nil { + return err + } + c.EncryptionConfig.EncryptionProviderFile = finalProviderConfig + if err := c.DeployEncryptionProviderFile(ctx); err != nil { + return err + } + // commit to state + logrus.Debugf("[%s] Updating cluster state", services.ControlRole) + if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil { + return err + } + if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil { + return err + } return nil } @@ -425,18 +301,6 @@ func (c *Cluster) generateDisabledEncryptionProviderFile() (string, error) { return disabledProviderFileFromKey(key) } -const ( - errExpiredToken = "The provided continue parameter is too old" -) - -// isExpiredTokenErr returns true if the error passed in is due to a continue token expiring -func isExpiredTokenErr(err error) bool { - if strings.Contains(err.Error(), errExpiredToken) { - return true - } - return false -} - func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error { var err error if err = k8s.UpdateSecret(k8sClient, secret); err == nil { @@ -445,10 +309,6 @@ func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error { if apierrors.IsConflict(err) { secret, err = k8s.GetSecret(k8sClient, secret.Name, secret.Namespace) if err != nil { - // if the secret no longer exists, we can skip it since it does not need to be rewritten - if apierrors.IsNotFound(err) { - return nil - } return err } err = k8s.UpdateSecret(k8sClient, secret) @@ -475,7 +335,6 @@ func isEncryptionEnabled(rkeConfig *v3.RancherKubernetesEngineConfig) bool { } return false } - func isEncryptionCustomConfig(rkeConfig *v3.RancherKubernetesEngineConfig) bool { if isEncryptionEnabled(rkeConfig) && rkeConfig.Services.KubeAPI.SecretsEncryptionConfig.CustomConfig != nil { diff --git a/cmd/encryption.go b/cmd/encryption.go index 61ed2cc3..070d6368 100644 --- a/cmd/encryption.go +++ b/cmd/encryption.go @@ -8,7 +8,6 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" - "github.com/rancher/rke/pki/cert" v3 "github.com/rancher/rke/types" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -58,25 +57,18 @@ func rotateEncryptionKeyFromCli(ctx *cli.Context) error { // setting up the flags flags := cluster.GetExternalFlags(false, false, false, false, "", filePath) - _, _, _, _, _, err = RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags) - return err + return RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags) } -func RotateEncryptionKey( - ctx context.Context, - rkeConfig *v3.RancherKubernetesEngineConfig, - dialersOptions hosts.DialersOptions, - flags cluster.ExternalFlags, -) (string, string, string, string, map[string]pki.CertificatePKI, error) { - log.Infof(ctx, "Rotating cluster secrets encryption key") - - var APIURL, caCrt, clientCert, clientKey string - +func RotateEncryptionKey(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, + dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) error { + log.Infof(ctx, "Rotating cluster secrets encryption key..") stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir) rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath) - - // We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState + // We generate the first encryption config in ClusterInit, to store it ASAP. It's written + // to the DesiredState stateEncryptionConfig := rkeFullState.DesiredState.EncryptionConfig + // if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster if rkeFullState.CurrentState.EncryptionConfig != "" { stateEncryptionConfig = rkeFullState.CurrentState.EncryptionConfig @@ -84,43 +76,32 @@ func RotateEncryptionKey( kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, stateEncryptionConfig) if err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err + return err } - if kubeCluster.IsEncryptionCustomConfig() { - return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration") + return fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration") } if !kubeCluster.IsEncryptionEnabled() { - return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled") + return fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled") } - kubeCluster.Certificates = rkeFullState.DesiredState.CertificatesBundle if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err + return err } if err := kubeCluster.TunnelHosts(ctx, flags); err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err + return err } - if len(kubeCluster.ControlPlaneHosts) > 0 { - APIURL = fmt.Sprintf("https://%s:6443", kubeCluster.ControlPlaneHosts[0].Address) - } - clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate)) - clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key)) - caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate)) err = kubeCluster.RotateEncryptionKey(ctx, rkeFullState) if err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err + return err } - // make sure we have the latest state rkeFullState, _ = cluster.ReadStateFile(ctx, stateFilePath) - log.Infof(ctx, "Reconciling cluster state") if err := kubeCluster.ReconcileDesiredStateEncryptionConfig(ctx, rkeFullState); err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err + return err } - log.Infof(ctx, "Cluster secrets encryption key rotated successfully") - return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil + return nil } diff --git a/cmd/up.go b/cmd/up.go index ed4527d8..91d76d01 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -85,9 +85,10 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } - - // We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState + // We generate the first encryption config in ClusterInit, to store it ASAP. It's written + // to the DesiredState stateEncryptionConfig := clusterState.DesiredState.EncryptionConfig + // if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster if clusterState.CurrentState.EncryptionConfig != "" { stateEncryptionConfig = clusterState.CurrentState.EncryptionConfig @@ -102,10 +103,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if kubeCluster.RancherKubernetesEngineConfig.RotateCertificates != nil { return rebuildClusterWithRotatedCertificates(ctx, dialersOptions, flags, svcOptionsData) } - // if we need to rotate the encryption key, do so and then return - if kubeCluster.RancherKubernetesEngineConfig.RotateEncryptionKey { - return RotateEncryptionKey(ctx, clusterState.CurrentState.RancherKubernetesEngineConfig.DeepCopy(), dialersOptions, flags) - } log.Infof(ctx, "Building Kubernetes cluster") err = kubeCluster.SetupDialers(ctx, dialersOptions) diff --git a/log/log.go b/log/log.go index 396cd106..d2639f18 100644 --- a/log/log.go +++ b/log/log.go @@ -32,6 +32,7 @@ func getLogger(ctx context.Context) logger { func Infof(ctx context.Context, msg string, args ...interface{}) { getLogger(ctx).Infof(msg, args...) + } func Warnf(ctx context.Context, msg string, args ...interface{}) { diff --git a/types/rke_types.go b/types/rke_types.go index 27ebb6b9..85e3a3d3 100644 --- a/types/rke_types.go +++ b/types/rke_types.go @@ -57,8 +57,6 @@ type RancherKubernetesEngineConfig struct { Restore RestoreConfig `yaml:"restore" json:"restore,omitempty"` // Rotating Certificates Option RotateCertificates *RotateCertificates `yaml:"rotate_certificates,omitempty" json:"rotateCertificates,omitempty"` - // Rotate Encryption Key Option - RotateEncryptionKey bool `yaml:"rotate_encryption_key" json:"rotateEncryptionKey"` // DNS Config DNS *DNSConfig `yaml:"dns" json:"dns,omitempty"` // Upgrade Strategy for the cluster diff --git a/vendor/k8s.io/client-go/util/retry/OWNERS b/vendor/k8s.io/client-go/util/retry/OWNERS deleted file mode 100644 index dec3e88d..00000000 --- a/vendor/k8s.io/client-go/util/retry/OWNERS +++ /dev/null @@ -1,4 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -reviewers: -- caesarxuchao diff --git a/vendor/k8s.io/client-go/util/retry/util.go b/vendor/k8s.io/client-go/util/retry/util.go deleted file mode 100644 index 15e2722f..00000000 --- a/vendor/k8s.io/client-go/util/retry/util.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package retry - -import ( - "time" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/wait" -) - -// DefaultRetry is the recommended retry for a conflict where multiple clients -// are making changes to the same resource. -var DefaultRetry = wait.Backoff{ - Steps: 5, - Duration: 10 * time.Millisecond, - Factor: 1.0, - Jitter: 0.1, -} - -// DefaultBackoff is the recommended backoff for a conflict where a client -// may be attempting to make an unrelated modification to a resource under -// active management by one or more controllers. -var DefaultBackoff = wait.Backoff{ - Steps: 4, - Duration: 10 * time.Millisecond, - Factor: 5.0, - Jitter: 0.1, -} - -// OnError allows the caller to retry fn in case the error returned by fn is retriable -// according to the provided function. backoff defines the maximum retries and the wait -// interval between two retries. -func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error { - var lastErr error - err := wait.ExponentialBackoff(backoff, func() (bool, error) { - err := fn() - switch { - case err == nil: - return true, nil - case retriable(err): - lastErr = err - return false, nil - default: - return false, err - } - }) - if err == wait.ErrWaitTimeout { - err = lastErr - } - return err -} - -// RetryOnConflict is used to make an update to a resource when you have to worry about -// conflicts caused by other code making unrelated updates to the resource at the same -// time. fn should fetch the resource to be modified, make appropriate changes to it, try -// to update it, and return (unmodified) the error from the update function. On a -// successful update, RetryOnConflict will return nil. If the update function returns a -// "Conflict" error, RetryOnConflict will wait some amount of time as described by -// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times -// and gives up, RetryOnConflict will return an error to the caller. -// -// err := retry.RetryOnConflict(retry.DefaultRetry, func() error { -// // Fetch the resource here; you need to refetch it on every try, since -// // if you got a conflict on the last update attempt then you need to get -// // the current version before making your own changes. -// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{}) -// if err ! nil { -// return err -// } -// -// // Make whatever updates to the resource are needed -// pod.Status.Phase = v1.PodFailed -// -// // Try to update -// _, err = c.Pods("mynamespace").UpdateStatus(pod) -// // You have to return err itself here (not wrapped inside another error) -// // so that RetryOnConflict can identify it correctly. -// return err -// }) -// if err != nil { -// // May be conflict if max retries were hit, or may be something unrelated -// // like permissions or a network error -// return err -// } -// ... -// -// TODO: Make Backoff an interface? -func RetryOnConflict(backoff wait.Backoff, fn func() error) error { - return OnError(backoff, errors.IsConflict, fn) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 61f4e7da..e10b3b56 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -510,7 +510,6 @@ k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir k8s.io/client-go/util/jsonpath k8s.io/client-go/util/keyutil -k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue # k8s.io/component-base v0.18.0 k8s.io/component-base/version