1
0
mirror of https://github.com/rancher/rke.git synced 2025-07-13 23:26:02 +00:00

[v1.4.19] s4: Fix 478

This commit is contained in:
Bruno Bachmann 2024-04-16 10:14:28 -07:00 committed by Bruno Bachmann
parent 589cb504a2
commit f7485b8dce
9 changed files with 352 additions and 73 deletions

View File

@ -560,7 +560,7 @@ func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, add
select { select {
case <-timeout: case <-timeout:
return updated, nil return updated, nil
case <-time.After(time.Second * UpdateStateTimeout): case <-time.After(UpdateStateTimeout):
return updated, fmt.Errorf("[addons] Timeout waiting for kubernetes to be ready") return updated, fmt.Errorf("[addons] Timeout waiting for kubernetes to be ready")
} }
} }

View File

@ -86,8 +86,9 @@ const (
AuthnWebhookProvider = "webhook" AuthnWebhookProvider = "webhook"
StateConfigMapName = "cluster-state" StateConfigMapName = "cluster-state"
FullStateConfigMapName = "full-cluster-state" FullStateConfigMapName = "full-cluster-state"
UpdateStateTimeout = 30 FullStateSecretName = "full-cluster-state"
GetStateTimeout = 30 UpdateStateTimeout = time.Second * 30
GetStateTimeout = time.Second * 30
RewriteWorkers = 5 RewriteWorkers = 5
SyncWorkers = 10 SyncWorkers = 10
NoneAuthorizationMode = "none" NoneAuthorizationMode = "none"

View File

@ -3,6 +3,7 @@ package cluster
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -11,8 +12,6 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/client-go/transport"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s" "github.com/rancher/rke/k8s"
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
@ -22,6 +21,11 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
) )
const ( const (
@ -29,6 +33,10 @@ const (
certDirExt = "_certs" certDirExt = "_certs"
) )
var (
ErrFullStateIsNil = errors.New("fullState argument cannot be nil")
)
type FullState struct { type FullState struct {
DesiredState State `json:"desiredState,omitempty"` DesiredState State `json:"desiredState,omitempty"`
CurrentState State `json:"currentState,omitempty"` CurrentState State `json:"currentState,omitempty"`
@ -79,45 +87,152 @@ func (c *Cluster) GetStateFileFromConfigMap(ctx context.Context) (string, error)
} }
return stateFile, nil return stateFile, nil
} }
return "", fmt.Errorf("Unable to get ConfigMap with cluster state from any Control Plane host") return "", fmt.Errorf("[state] Unable to get ConfigMap with cluster state from any Control Plane host")
} }
func SaveFullStateToKubernetes(ctx context.Context, kubeCluster *Cluster, fullState *FullState) error { // SaveFullStateToK8s saves the full cluster state to a k8s secret. If any errors that occur on attempts to update
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) // the secret will be retired up until some limit.
if err != nil { func SaveFullStateToK8s(ctx context.Context, k8sClient kubernetes.Interface, fullState *FullState) error {
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
log.Infof(ctx, "[state] Saving full cluster state to Kubernetes") log.Infof(ctx, "[state] Saving full cluster state to Kubernetes")
stateFile, err := json.Marshal(*fullState)
if fullState == nil {
return ErrFullStateIsNil
}
secrets := k8sClient.CoreV1().Secrets(metav1.NamespaceSystem)
configMaps := k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem)
stateBytes, err := json.Marshal(fullState)
if err != nil { if err != nil {
return err return fmt.Errorf("[state] error marshalling full state to JSON: %w", err)
} }
timeout := make(chan bool, 1)
go func() { // Back off for 1s between attempts.
for { backoff := wait.Backoff{
_, err := k8s.UpdateConfigMap(k8sClient, stateFile, FullStateConfigMapName) Duration: time.Second,
if err != nil { Steps: int(UpdateStateTimeout.Seconds()),
time.Sleep(time.Second * 5) }
continue
// Try to create or update the secret and delete the old configmap in k8s, if it still exists.
saveState := func(ctx context.Context) (bool, error) {
// Check if the secret already exists.
existingSecret, err := secrets.Get(ctx, FullStateSecretName, metav1.GetOptions{})
if err == nil {
// The secret already exists, update it.
existingSecretCopy := existingSecret.DeepCopy()
existingSecretCopy.Data[FullStateSecretName] = stateBytes
if _, err := secrets.Update(ctx, existingSecretCopy, metav1.UpdateOptions{}); err != nil {
return false, fmt.Errorf("[state] error updating secret: %w", err)
} }
log.Infof(ctx, "[state] Successfully Saved full cluster state to Kubernetes ConfigMap: %s", FullStateConfigMapName) } else if apierrors.IsNotFound(err) {
timeout <- true // The secret does not exist, create it.
break _, err := secrets.Create(ctx, &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: FullStateSecretName,
Namespace: metav1.NamespaceSystem,
},
Data: map[string][]byte{
FullStateSecretName: stateBytes,
},
}, metav1.CreateOptions{})
if err != nil {
return false, fmt.Errorf("[state] error creating secret: %w", err)
}
} else {
return false, fmt.Errorf("[state] error getting secret: %w", err)
} }
}()
select { // Delete the old configmap.
case <-timeout: err = configMaps.Delete(ctx, FullStateConfigMapName, metav1.DeleteOptions{})
return nil if err != nil && !apierrors.IsNotFound(err) {
case <-time.After(time.Second * UpdateStateTimeout): return false, fmt.Errorf("[state] error deleting configmap: %w", err)
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready") }
return true, nil
} }
// Retry until success or backoff.Steps has been reached ctx is cancelled.
if err = wait.ExponentialBackoffWithContext(ctx, backoff, saveState); err != nil {
return fmt.Errorf("[state] error updating secret: %w", err)
}
return nil
}
// GetFullStateFromK8s fetches the full cluster state from the k8s cluster.
// In earlier versions of RKE, the full cluster state was stored in a configmap, but it has since been moved
// to a secret. This function tries fetching it from the secret first and will fall back on the configmap if the secret
// doesn't exist.
func GetFullStateFromK8s(ctx context.Context, k8sClient kubernetes.Interface) (*FullState, error) {
// Back off for 1s between attempts.
backoff := wait.Backoff{
Duration: time.Second,
Steps: int(GetStateTimeout.Seconds()),
}
// Try to fetch secret or configmap in k8s.
var fullState FullState
getState := func(ctx context.Context) (bool, error) {
fullStateBytes, err := getFullStateBytesFromSecret(ctx, k8sClient, FullStateSecretName)
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debug("full-state secret not found, falling back to configmap")
fullStateBytes, err = getFullStateBytesFromConfigMap(ctx, k8sClient, FullStateConfigMapName)
if err != nil {
return false, fmt.Errorf("[state] error getting full state from configmap: %w", err)
}
} else {
return false, fmt.Errorf("[state] error getting full state from secret: %w", err)
}
}
if err := json.Unmarshal(fullStateBytes, &fullState); err != nil {
return false, fmt.Errorf("[state] error unmarshalling full state from JSON: %w", err)
}
return true, nil
}
// Retry until success or backoff.Steps has been reached or ctx is cancelled.
err := wait.ExponentialBackoffWithContext(ctx, backoff, getState)
return &fullState, err
}
// getFullStateBytesFromConfigMap fetches the full state from the configmap with the given name in the kube-system namespace.
func getFullStateBytesFromConfigMap(ctx context.Context, k8sClient kubernetes.Interface, name string) ([]byte, error) {
confMap, err := k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("[state] error getting configmap %s: %w", name, err)
}
data, ok := confMap.Data[name]
if !ok {
return nil, fmt.Errorf("[state] expected configmap %s to have field %s, but none was found", name, name)
}
return []byte(data), nil
}
// getFullStateBytesFromSecret fetches the full state from the secret with the given name in the kube-system namespace.
func getFullStateBytesFromSecret(ctx context.Context, k8sClient kubernetes.Interface, name string) ([]byte, error) {
secret, err := k8sClient.CoreV1().Secrets(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("[state] error getting secret %s: %w", name, err)
}
data, ok := secret.Data[name]
if !ok {
return nil, fmt.Errorf("[state] expected secret %s to have field %s, but none was found", name, name)
}
return data, nil
} }
func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster, error) { func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster, error) {
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes") log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err) return nil, fmt.Errorf("[state] Failed to create Kubernetes Client: %v", err)
} }
var cfgMap *v1.ConfigMap var cfgMap *v1.ConfigMap
var currentCluster Cluster var currentCluster Cluster
@ -139,12 +254,12 @@ func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster
clusterData := cfgMap.Data[StateConfigMapName] clusterData := cfgMap.Data[StateConfigMapName]
err := yaml.Unmarshal([]byte(clusterData), &currentCluster) err := yaml.Unmarshal([]byte(clusterData), &currentCluster)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to unmarshal cluster data") return nil, fmt.Errorf("[state] Failed to unmarshal cluster data")
} }
return &currentCluster, nil return &currentCluster, nil
case <-time.After(time.Second * GetStateTimeout): case <-time.After(GetStateTimeout):
log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state") log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state")
return nil, fmt.Errorf("Timeout waiting for kubernetes cluster to get state") return nil, fmt.Errorf("[state] Timeout waiting for kubernetes cluster to get state")
} }
} }
@ -152,13 +267,13 @@ func GetK8sVersion(localConfigPath string, k8sWrapTransport transport.WrapperFun
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath) logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport) k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
if err != nil { if err != nil {
return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err) return "", fmt.Errorf("[state] Failed to create Kubernetes Client: %v", err)
} }
discoveryClient := k8sClient.DiscoveryClient discoveryClient := k8sClient.DiscoveryClient
logrus.Debugf("[version] Getting Kubernetes server version..") logrus.Debugf("[version] Getting Kubernetes server version..")
serverVersion, err := discoveryClient.ServerVersion() serverVersion, err := discoveryClient.ServerVersion()
if err != nil { if err != nil {
return "", fmt.Errorf("Failed to get Kubernetes server version: %v", err) return "", fmt.Errorf("[state] Failed to get Kubernetes server version: %v", err)
} }
return fmt.Sprintf("%#v", *serverVersion), nil return fmt.Sprintf("%#v", *serverVersion), nil
} }
@ -174,11 +289,11 @@ func RebuildState(ctx context.Context, kubeCluster *Cluster, oldState *FullState
if flags.CustomCerts { if flags.CustomCerts {
certBundle, err := pki.ReadCertsAndKeysFromDir(flags.CertificateDir) certBundle, err := pki.ReadCertsAndKeysFromDir(flags.CertificateDir)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to read certificates from dir [%s]: %v", flags.CertificateDir, err) return nil, fmt.Errorf("[state] Failed to read certificates from dir [%s]: %v", flags.CertificateDir, err)
} }
// make sure all custom certs are included // make sure all custom certs are included
if err := pki.ValidateBundleContent(rkeConfig, certBundle, flags.ClusterFilePath, flags.ConfigDir); err != nil { if err := pki.ValidateBundleContent(rkeConfig, certBundle, flags.ClusterFilePath, flags.ConfigDir); err != nil {
return nil, fmt.Errorf("Failed to validates certificates from dir [%s]: %v", flags.CertificateDir, err) return nil, fmt.Errorf("[state] Failed to validates certificates from dir [%s]: %v", flags.CertificateDir, err)
} }
newState.DesiredState.CertificatesBundle = certBundle newState.DesiredState.CertificatesBundle = certBundle
newState.CurrentState = oldState.CurrentState newState.CurrentState = oldState.CurrentState
@ -207,11 +322,11 @@ func RebuildState(ctx context.Context, kubeCluster *Cluster, oldState *FullState
func (s *FullState) WriteStateFile(ctx context.Context, statePath string) error { func (s *FullState) WriteStateFile(ctx context.Context, statePath string) error {
stateFile, err := json.MarshalIndent(s, "", " ") stateFile, err := json.MarshalIndent(s, "", " ")
if err != nil { if err != nil {
return fmt.Errorf("Failed to Marshal state object: %v", err) return fmt.Errorf("[state] Failed to Marshal state object: %v", err)
} }
logrus.Tracef("Writing state file: %s", stateFile) logrus.Tracef("Writing state file: %s", stateFile)
if err := os.WriteFile(statePath, stateFile, 0600); err != nil { if err := os.WriteFile(statePath, stateFile, 0600); err != nil {
return fmt.Errorf("Failed to write state file: %v", err) return fmt.Errorf("[state] Failed to write state file: %v", err)
} }
log.Infof(ctx, "Successfully Deployed state file at [%s]", statePath) log.Infof(ctx, "Successfully Deployed state file at [%s]", statePath)
return nil return nil
@ -264,19 +379,19 @@ func ReadStateFile(ctx context.Context, statePath string) (*FullState, error) {
rkeFullState := &FullState{} rkeFullState := &FullState{}
fp, err := filepath.Abs(statePath) fp, err := filepath.Abs(statePath)
if err != nil { if err != nil {
return rkeFullState, fmt.Errorf("failed to lookup current directory name: %v", err) return rkeFullState, fmt.Errorf("[state] failed to lookup current directory name: %v", err)
} }
file, err := os.Open(fp) file, err := os.Open(fp)
if err != nil { if err != nil {
return rkeFullState, fmt.Errorf("Can not find RKE state file: %v", err) return rkeFullState, fmt.Errorf("[state] Can not find RKE state file: %v", err)
} }
defer file.Close() defer file.Close()
buf, err := io.ReadAll(file) buf, err := io.ReadAll(file)
if err != nil { if err != nil {
return rkeFullState, fmt.Errorf("failed to read state file: %v", err) return rkeFullState, fmt.Errorf("[state] failed to read state file: %v", err)
} }
if err := json.Unmarshal(buf, rkeFullState); err != nil { if err := json.Unmarshal(buf, rkeFullState); err != nil {
return rkeFullState, fmt.Errorf("failed to unmarshal the state file: %v", err) return rkeFullState, fmt.Errorf("[state] failed to unmarshal the state file: %v", err)
} }
rkeFullState.DesiredState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.DesiredState.CertificatesBundle) rkeFullState.DesiredState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.DesiredState.CertificatesBundle)
rkeFullState.CurrentState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.CurrentState.CertificatesBundle) rkeFullState.CurrentState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.CurrentState.CertificatesBundle)
@ -322,7 +437,7 @@ func buildFreshState(ctx context.Context, kubeCluster *Cluster, newState *FullSt
// Get the certificate Bundle // Get the certificate Bundle
certBundle, err := pki.GenerateRKECerts(ctx, *rkeConfig, "", "") certBundle, err := pki.GenerateRKECerts(ctx, *rkeConfig, "", "")
if err != nil { if err != nil {
return fmt.Errorf("Failed to generate certificate bundle: %v", err) return fmt.Errorf("[state] Failed to generate certificate bundle: %v", err)
} }
newState.DesiredState.CertificatesBundle = certBundle newState.DesiredState.CertificatesBundle = certBundle
if isEncryptionEnabled(rkeConfig) { if isEncryptionEnabled(rkeConfig) {

162
cluster/state_test.go Normal file
View File

@ -0,0 +1,162 @@
package cluster
import (
"context"
"encoding/json"
"errors"
"reflect"
"testing"
"time"
"github.com/rancher/rke/pki"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
func setup(t *testing.T, withConfigMap bool) (context.Context, FullState, kubernetes.Interface) {
ctx := context.Background()
client := fake.NewSimpleClientset()
fullState := FullState{
CurrentState: State{
RancherKubernetesEngineConfig: GetLocalRKEConfig(),
CertificatesBundle: map[string]pki.CertificatePKI{
"test": {
CertificatePEM: "fake cert",
KeyPEM: "fake key",
},
},
},
}
if withConfigMap {
fullStateBytes, err := json.Marshal(fullState)
assert.NoError(t, err)
_, err = client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: FullStateConfigMapName,
},
Data: map[string]string{
FullStateConfigMapName: string(fullStateBytes),
},
}, metav1.CreateOptions{})
assert.NoError(t, err)
}
return ctx, fullState, client
}
func checkSecretMatches(t *testing.T, ctx context.Context, client kubernetes.Interface, expected FullState) {
secret, err := client.CoreV1().Secrets(metav1.NamespaceSystem).Get(ctx, FullStateSecretName, metav1.GetOptions{})
assert.NoError(t, err)
fullStateFromSecret := FullState{}
err = json.Unmarshal(secret.Data[FullStateConfigMapName], &fullStateFromSecret)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(fullStateFromSecret, expected))
}
func checkConfigMapDeleted(t *testing.T, ctx context.Context, client kubernetes.Interface) {
_, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(ctx, FullStateConfigMapName, metav1.GetOptions{})
assert.True(t, apierrors.IsNotFound(err))
}
func TestSaveFullStateToK8s_Nil(t *testing.T) {
err := SaveFullStateToK8s(context.Background(), &fake.Clientset{}, nil)
assert.True(t, errors.Is(err, ErrFullStateIsNil))
}
// Tests the scenario where the cluster stores no existing state. In this case, a new full state secret should be
// created and the old configmap should be deleted.
func TestSaveAndGetFullStateFromK8s_ClusterWithoutSecretOrCM(t *testing.T) {
// Set up a fake cluster without a secret or configmap.
ctx, fullState, client := setup(t, false)
// We should not be able to fetch and load the state from the secret or configmap.
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*500)
defer cancel()
fetchedFullState, err := GetFullStateFromK8s(ctx, client)
assert.True(t, apierrors.IsNotFound(err))
// Create the secret and delete the configmap.
err = SaveFullStateToK8s(ctx, client, &fullState)
assert.NoError(t, err)
// There should be a secret containing the full state.
checkSecretMatches(t, ctx, client, fullState)
// There should be no configmap.
checkConfigMapDeleted(t, ctx, client)
// We should be able to fetch and load the state from the secret.
fetchedFullState, err = GetFullStateFromK8s(ctx, client)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(*fetchedFullState, fullState))
}
// Tests the scenario where the cluster already stores a full state secret but no configmap. In this case, the secret
// should be updated and there should still be no configmap.
func TestSaveAndGetFullStateFromK8s_ClusterWithSecretAndNoCM(t *testing.T) {
// Set up a fake cluster without a secret or configmap.
ctx, fullState, client := setup(t, false)
// Add the secret to the cluster.
err := SaveFullStateToK8s(ctx, client, &fullState)
assert.NoError(t, err)
// There should be a secret containing the full state.
checkSecretMatches(t, ctx, client, fullState)
// Change the state.
for k, v := range fullState.CurrentState.CertificatesBundle {
v.CertificatePEM = "fake PEM"
fullState.CurrentState.CertificatesBundle[k] = v
}
// Saving again should update the existing secret.
err = SaveFullStateToK8s(ctx, client, &fullState)
assert.NoError(t, err)
// There should be a secret containing the updated full state.
checkSecretMatches(t, ctx, client, fullState)
// There should be no configmap.
checkConfigMapDeleted(t, ctx, client)
// We should be able to fetch and load the state from the secret.
fullStateFromK8s, err := GetFullStateFromK8s(ctx, client)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(*fullStateFromK8s, fullState))
}
// Tests the scenario where the cluster already stores existing state in a configmap and there is no secret. In this
// case, a new full state secret should be created and the configmap should be deleted.
func TestSaveAndGetFullStateFromK8s_OldClusterWithCM(t *testing.T) {
// Create a fake cluster without a secret but with a configmap.
ctx, fullState, client := setup(t, true)
// Make sure we can fall back to the configmap when we fetch and load full cluster state given that the secret does
// not yet exist.
fullStateFromK8s, err := GetFullStateFromK8s(ctx, client)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(*fullStateFromK8s, fullState))
// Saving should create a new secret.
err = SaveFullStateToK8s(ctx, client, &fullState)
assert.NoError(t, err)
// There should be a secret containing the full state.
checkSecretMatches(t, ctx, client, fullState)
// The configmap should have been deleted.
checkConfigMapDeleted(t, ctx, client)
// We should be able to fetch and load the state from the secret.
fullStateFromK8s, err = GetFullStateFromK8s(ctx, client)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(*fullStateFromK8s, fullState))
}

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"time"
"github.com/rancher/rke/cluster" "github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
"github.com/rancher/rke/pki" "github.com/rancher/rke/pki"
"github.com/rancher/rke/pki/cert" "github.com/rancher/rke/pki/cert"
@ -207,22 +207,19 @@ func rebuildClusterWithRotatedCertificates(ctx context.Context,
} }
func saveClusterState(ctx context.Context, kubeCluster *cluster.Cluster, clusterState *cluster.FullState) error { func saveClusterState(ctx context.Context, kubeCluster *cluster.Cluster, clusterState *cluster.FullState) error {
var err error if err := kubeCluster.UpdateClusterCurrentState(ctx, clusterState); err != nil {
if err = kubeCluster.UpdateClusterCurrentState(ctx, clusterState); err != nil { return fmt.Errorf("error updating cluster state: %w", err)
return err
}
// Attempt to store cluster full state to Kubernetes
for i := 1; i <= 3; i++ {
err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster, clusterState)
if err != nil {
time.Sleep(time.Second * time.Duration(2))
continue
}
break
} }
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil { if err != nil {
logrus.Warnf("Failed to save full cluster state to Kubernetes") return fmt.Errorf("failed to create Kubernetes Client: %w", err)
} }
if err := cluster.SaveFullStateToK8s(ctx, k8sClient, clusterState); err != nil {
logrus.Warnf("Failed to save full state to Kubernetes: %v", err)
}
return nil return nil
} }

View File

@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/rancher/rke/k8s"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/rancher/rke/cluster" "github.com/rancher/rke/cluster"
@ -223,7 +224,12 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err return APIURL, caCrt, clientCert, clientKey, nil, err
} }
err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster, clusterState) k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("failed to create Kubernetes Client: %w", err)
}
err = cluster.SaveFullStateToK8s(ctx, k8sClient, clusterState)
if err != nil { if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err return APIURL, caCrt, clientCert, clientKey, nil, err
} }

View File

@ -2,7 +2,6 @@ package cmd
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/rancher/rke/cluster" "github.com/rancher/rke/cluster"
@ -121,20 +120,19 @@ func getStateFile(ctx *cli.Context) error {
return nil return nil
} }
logrus.Infof("Successfully connected to server using kubeconfig, retrieved server version [%s]", serverVersion) logrus.Infof("Successfully connected to server using kubeconfig, retrieved server version [%s]", serverVersion)
// Retrieve full-cluster-state configmap
k8sClient, err := k8s.NewClient(localKubeConfig, nil) k8sClient, err := k8s.NewClient(localKubeConfig, nil)
if err != nil { if err != nil {
return err return err
} }
cfgMap, err := k8s.GetConfigMap(k8sClient, cluster.FullStateConfigMapName)
// Try fetch full cluster state from a secret. In older versions of RKE, this was stored in a configmap, but it
// is now a secret.
rkeFullState, err := cluster.GetFullStateFromK8s(context.Background(), k8sClient)
if err != nil { if err != nil {
return err return fmt.Errorf("error getting full cluster state from secret: %w", err)
}
clusterData := cfgMap.Data[cluster.FullStateConfigMapName]
rkeFullState := &cluster.FullState{}
if err = json.Unmarshal([]byte(clusterData), rkeFullState); err != nil {
return err
} }
// Move current state file // Move current state file

View File

@ -10,7 +10,7 @@ import (
) )
const ( const (
defaultURL = "https://releases.rancher.com/kontainer-driver-metadata/dev-v2.7-2024-05-patches/data.json" defaultURL = "https://releases.rancher.com/kontainer-driver-metadata/release-v2.7/data.json"
dataFile = "data/data.json" dataFile = "data/data.json"
) )

View File

@ -8,19 +8,19 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func GetSystemSecret(k8sClient *kubernetes.Clientset, secretName string) (*v1.Secret, error) { func GetSystemSecret(k8sClient kubernetes.Interface, secretName string) (*v1.Secret, error) {
return GetSecret(k8sClient, secretName, metav1.NamespaceSystem) return GetSecret(k8sClient, secretName, metav1.NamespaceSystem)
} }
func GetSecret(k8sClient *kubernetes.Clientset, secretName, namespace string) (*v1.Secret, error) { func GetSecret(k8sClient kubernetes.Interface, secretName, namespace string) (*v1.Secret, error) {
return k8sClient.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) return k8sClient.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
} }
func GetSecretsList(k8sClient *kubernetes.Clientset, namespace string) (*v1.SecretList, error) { func GetSecretsList(k8sClient kubernetes.Interface, namespace string) (*v1.SecretList, error) {
return k8sClient.CoreV1().Secrets("").List(context.TODO(), metav1.ListOptions{}) return k8sClient.CoreV1().Secrets("").List(context.TODO(), metav1.ListOptions{})
} }
func UpdateSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error { func UpdateSecret(k8sClient kubernetes.Interface, secret *v1.Secret) error {
var err error var err error
_, err = k8sClient.CoreV1().Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}) _, err = k8sClient.CoreV1().Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{})
return err return err