mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
[kubeadm] Add etcd L7 check on upgrade
- Adds L7 check for kubeadm etcd static pod upgrade
This commit is contained in:
parent
8129480d44
commit
4c768bb2ca
@ -282,23 +282,9 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter
|
||||
return err
|
||||
}
|
||||
|
||||
// These are the same because kubeadm currently does not support reconciling a new config against an older one.
|
||||
// For instance, currently, changing CertificatesDir or EtcdDataDir breaks the upgrade, because oldcfg is not fetchable.
|
||||
// There would need to be additional upgrade code to handle copying the certs/data over to the new filepaths.
|
||||
// It's still useful to have these parameterized as separate clusters though, because it allows us to mock these
|
||||
// interfaces for tests.
|
||||
oldEtcdCluster := etcdutil.StaticPodCluster{
|
||||
Endpoints: []string{"localhost:2379"},
|
||||
ManifestDir: constants.GetStaticPodDirectory(),
|
||||
CertificatesDir: internalcfg.CertificatesDir,
|
||||
}
|
||||
newEtcdCluster := etcdutil.StaticPodCluster{
|
||||
Endpoints: []string{"localhost:2379"},
|
||||
ManifestDir: constants.GetStaticPodDirectory(),
|
||||
CertificatesDir: internalcfg.CertificatesDir,
|
||||
}
|
||||
|
||||
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdCluster, newEtcdCluster)
|
||||
// These are uninitialized because passing in the clients allow for mocking the client during testing
|
||||
var oldEtcdClient, newEtdClient etcdutil.Client
|
||||
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdClient, newEtdClient)
|
||||
}
|
||||
|
||||
// DryRunStaticPodUpgrade fakes an upgrade of the control plane
|
||||
|
@ -66,15 +66,18 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error {
|
||||
}
|
||||
|
||||
// Define Local Etcd cluster to be able to retrieve information
|
||||
etcdCluster := etcdutil.StaticPodCluster{
|
||||
Endpoints: []string{"localhost:2379"},
|
||||
ManifestDir: constants.GetStaticPodDirectory(),
|
||||
CertificatesDir: upgradeVars.cfg.CertificatesDir,
|
||||
etcdClient, err := etcdutil.NewStaticPodClient(
|
||||
[]string{"localhost:2379"},
|
||||
constants.GetStaticPodDirectory(),
|
||||
upgradeVars.cfg.CertificatesDir,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Compute which upgrade possibilities there are
|
||||
glog.V(1).Infof("[upgrade/plan] computing upgrade possibilities")
|
||||
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdCluster, upgradeVars.cfg.FeatureGates)
|
||||
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdClient, upgradeVars.cfg.FeatureGates)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[upgrade/versions] FATAL: %v", err)
|
||||
}
|
||||
|
@ -91,6 +91,7 @@ go_test(
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/util/version:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -74,7 +74,7 @@ type ClusterState struct {
|
||||
|
||||
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
|
||||
// kinds of upgrades can be performed
|
||||
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdCluster etcdutil.Cluster, featureGates map[string]bool) ([]Upgrade, error) {
|
||||
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) {
|
||||
fmt.Println("[upgrade] Fetching available versions to upgrade to")
|
||||
|
||||
// Collect the upgrades kubeadm can do in this list
|
||||
@ -107,7 +107,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
|
||||
}
|
||||
|
||||
// Get current etcd version
|
||||
etcdStatus, err := etcdCluster.GetStatus()
|
||||
etcdStatus, err := etcdClient.GetStatus()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package upgrade
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
versionutil "k8s.io/kubernetes/pkg/util/version"
|
||||
@ -63,7 +64,7 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
|
||||
|
||||
type fakeEtcdCluster struct{ TLS bool }
|
||||
|
||||
func (f fakeEtcdCluster) HasTLS() (bool, error) { return f.TLS, nil }
|
||||
func (f fakeEtcdCluster) HasTLS() bool { return f.TLS }
|
||||
|
||||
func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
client := &clientv3.StatusResponse{}
|
||||
@ -71,6 +72,10 @@ func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
|
||||
return f.GetStatus()
|
||||
}
|
||||
|
||||
func TestGetAvailableUpgrades(t *testing.T) {
|
||||
featureGates := make(map[string]bool)
|
||||
tests := []struct {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||
@ -224,13 +225,14 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP
|
||||
}
|
||||
|
||||
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
|
||||
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) (bool, error) {
|
||||
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) {
|
||||
// Add etcd static pod spec only if external etcd is not configured
|
||||
if len(cfg.Etcd.Endpoints) != 0 {
|
||||
return false, fmt.Errorf("external etcd detected, won't try to change any etcd state")
|
||||
}
|
||||
|
||||
// Checking health state of etcd before proceeding with the upgrade
|
||||
etcdStatus, err := oldEtcdCluster.GetStatus()
|
||||
etcdStatus, err := oldEtcdClient.GetStatus()
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("etcd cluster is not healthy: %v", err)
|
||||
}
|
||||
@ -276,7 +278,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
|
||||
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests, isTLSUpgrade); err != nil {
|
||||
// Since etcd upgrade component failed, the old manifest has been restored
|
||||
// now we need to check the health of etcd cluster if it came back up with old manifest
|
||||
if _, err := oldEtcdCluster.GetStatus(); err != nil {
|
||||
if _, err := oldEtcdClient.GetStatus(); err != nil {
|
||||
// At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest
|
||||
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
|
||||
// Even copying back datastore failed, no options for recovery left, bailing out
|
||||
@ -288,7 +290,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
|
||||
if _, err := oldEtcdCluster.GetStatus(); err != nil {
|
||||
if _, err := oldEtcdClient.GetStatus(); err != nil {
|
||||
// Nothing else left to try to recover etcd cluster
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
@ -299,51 +301,103 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
|
||||
return true, fmt.Errorf("fatal error when trying to upgrade the etcd cluster: %v, rolled the state back to pre-upgrade state", err)
|
||||
}
|
||||
|
||||
if isTLSUpgrade {
|
||||
fmt.Printf("[upgrade/etcd] Skipping L7 health-check for %s (as well as data rollback on failure)\n", constants.Etcd)
|
||||
} else {
|
||||
// Checking health state of etcd after the upgrade
|
||||
if _, err = newEtcdCluster.GetStatus(); err != nil {
|
||||
// Despite the fact that upgradeComponent was successful, there is something wrong with etcd cluster
|
||||
// First step is to restore back up of datastore
|
||||
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
|
||||
// Even copying back datastore failed, no options for recovery left, bailing out
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
// Old datastore has been copied, rolling back old manifests
|
||||
if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil {
|
||||
// Rolling back to old manifests failed, no options for recovery left, bailing out
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
|
||||
if _, err := oldEtcdCluster.GetStatus(); err != nil {
|
||||
// Nothing else left to try to recover etcd cluster
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
fmt.Println("[upgrade/etcd] waiting for etcd to become available")
|
||||
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err)
|
||||
// Initialize the new etcd client if it wasn't pre-initialized
|
||||
if newEtcdClient == nil {
|
||||
client, err := etcdutil.NewStaticPodClient(
|
||||
[]string{"localhost:2379"},
|
||||
constants.GetStaticPodDirectory(),
|
||||
cfg.CertificatesDir,
|
||||
)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("fatal error creating etcd client: %v", err)
|
||||
}
|
||||
newEtcdClient = client
|
||||
}
|
||||
|
||||
// Checking health state of etcd after the upgrade
|
||||
delay := 0 * time.Second
|
||||
if isTLSUpgrade {
|
||||
// If we are upgrading TLS we need to wait for old static pod to be removed.
|
||||
// This is needed because we are not able to currently verify that the static pod
|
||||
// has been updated through the apiserver across an etcd TLS upgrade.
|
||||
delay = 30 * time.Second
|
||||
}
|
||||
// The intial delay is required to ensure that the old static etcd pod
|
||||
// has stopped prior to polling for status.
|
||||
retries := 10
|
||||
retryInterval := 15 * time.Second
|
||||
if _, err = newEtcdClient.WaitForStatus(delay, retries, retryInterval); err != nil {
|
||||
// Despite the fact that upgradeComponent was successful, there is something wrong with etcd cluster
|
||||
// First step is to restore back up of datastore
|
||||
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
|
||||
// Even copying back datastore failed, no options for recovery left, bailing out
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
// Old datastore has been copied, rolling back old manifests
|
||||
if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil {
|
||||
// Rolling back to old manifests failed, no options for recovery left, bailing out
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
|
||||
if _, err := oldEtcdClient.GetStatus(); err != nil {
|
||||
// Nothing else left to try to recover etcd cluster
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
|
||||
}
|
||||
|
||||
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// StaticPodControlPlane upgrades a static pod-hosted control plane
|
||||
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) error {
|
||||
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error {
|
||||
recoverManifests := map[string]string{}
|
||||
var isTLSUpgrade bool
|
||||
var isExternalEtcd bool
|
||||
|
||||
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// etcd upgrade is done prior to other control plane components
|
||||
if etcdUpgrade {
|
||||
previousEtcdHasTLS, err := oldEtcdCluster.HasTLS()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine if previous etcd was using TLS: %v", err)
|
||||
if oldEtcdClient == nil {
|
||||
if len(cfg.Etcd.Endpoints) > 0 {
|
||||
// External etcd
|
||||
isExternalEtcd = true
|
||||
client, err := etcdutil.NewClient(
|
||||
cfg.Etcd.Endpoints,
|
||||
cfg.Etcd.CAFile,
|
||||
cfg.Etcd.CertFile,
|
||||
cfg.Etcd.KeyFile,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create etcd client for external etcd: %v", err)
|
||||
}
|
||||
oldEtcdClient = client
|
||||
// Since etcd is managed externally, the new etcd client will be the same as the old client
|
||||
if newEtcdClient == nil {
|
||||
newEtcdClient = client
|
||||
}
|
||||
} else {
|
||||
// etcd Static Pod
|
||||
client, err := etcdutil.NewStaticPodClient(
|
||||
[]string{"localhost:2379"},
|
||||
constants.GetStaticPodDirectory(),
|
||||
cfg.CertificatesDir,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create etcd client: %v", err)
|
||||
}
|
||||
oldEtcdClient = client
|
||||
}
|
||||
}
|
||||
|
||||
// etcd upgrade is done prior to other control plane components
|
||||
if !isExternalEtcd && etcdUpgrade {
|
||||
previousEtcdHasTLS := oldEtcdClient.HasTLS()
|
||||
|
||||
// set the TLS upgrade flag for all components
|
||||
isTLSUpgrade = !previousEtcdHasTLS
|
||||
@ -352,7 +406,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
|
||||
}
|
||||
|
||||
// Perform etcd upgrade using common to all control plane components function
|
||||
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdCluster, newEtcdCluster)
|
||||
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient)
|
||||
if err != nil {
|
||||
if fatal {
|
||||
return err
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
|
||||
@ -208,33 +209,50 @@ func (spm *fakeStaticPodPathManager) BackupEtcdDir() string {
|
||||
return spm.backupEtcdDir
|
||||
}
|
||||
|
||||
type fakeTLSEtcdCluster struct{ TLS bool }
|
||||
type fakeTLSEtcdClient struct{ TLS bool }
|
||||
|
||||
func (cluster fakeTLSEtcdCluster) HasTLS() (bool, error) {
|
||||
return cluster.TLS, nil
|
||||
func (c fakeTLSEtcdClient) HasTLS() bool {
|
||||
return c.TLS
|
||||
}
|
||||
|
||||
func (cluster fakeTLSEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
client := &clientv3.StatusResponse{}
|
||||
client.Version = "3.1.12"
|
||||
return client, nil
|
||||
}
|
||||
|
||||
type fakePodManifestEtcdCluster struct{ ManifestDir, CertificatesDir string }
|
||||
|
||||
func (cluster fakePodManifestEtcdCluster) HasTLS() (bool, error) {
|
||||
return etcdutil.PodManifestsHaveTLS(cluster.ManifestDir)
|
||||
func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
|
||||
return c.GetStatus()
|
||||
}
|
||||
|
||||
func (cluster fakePodManifestEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
|
||||
|
||||
func (c fakePodManifestEtcdClient) HasTLS() bool {
|
||||
hasTLS, _ := etcdutil.PodManifestsHaveTLS(c.ManifestDir)
|
||||
return hasTLS
|
||||
}
|
||||
|
||||
func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
// Make sure the certificates generated from the upgrade are readable from disk
|
||||
etcdutil.NewTLSConfig(cluster.CertificatesDir)
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName),
|
||||
KeyFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientCertName),
|
||||
TrustedCAFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientKeyName),
|
||||
}
|
||||
_, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &clientv3.StatusResponse{}
|
||||
client.Version = "3.1.12"
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
|
||||
return c.GetStatus()
|
||||
}
|
||||
|
||||
func TestStaticPodControlPlane(t *testing.T) {
|
||||
tests := []struct {
|
||||
description string
|
||||
@ -420,10 +438,10 @@ func TestStaticPodControlPlane(t *testing.T) {
|
||||
pathMgr,
|
||||
newcfg,
|
||||
true,
|
||||
fakeTLSEtcdCluster{
|
||||
fakeTLSEtcdClient{
|
||||
TLS: false,
|
||||
},
|
||||
fakePodManifestEtcdCluster{
|
||||
fakePodManifestEtcdClient{
|
||||
ManifestDir: pathMgr.RealManifestDir(),
|
||||
CertificatesDir: newcfg.CertificatesDir,
|
||||
},
|
||||
|
@ -17,10 +17,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["etcd_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//cmd/kubeadm/app/constants:go_default_library",
|
||||
"//cmd/kubeadm/test:go_default_library",
|
||||
],
|
||||
deps = ["//cmd/kubeadm/test:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
|
@ -30,25 +30,22 @@ import (
|
||||
"k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
|
||||
)
|
||||
|
||||
// Cluster is an interface to get etcd cluster related information
|
||||
type Cluster interface {
|
||||
HasTLS() (bool, error)
|
||||
// Client is an interface to get etcd cluster related information
|
||||
type Client interface {
|
||||
GetStatus() (*clientv3.StatusResponse, error)
|
||||
WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error)
|
||||
HasTLS() bool
|
||||
}
|
||||
|
||||
// StaticPodCluster represents an instance of a static pod etcd cluster.
|
||||
// CertificatesDir should contain the etcd CA and healthcheck client TLS identity.
|
||||
// ManifestDir should contain the etcd static pod manifest.
|
||||
type StaticPodCluster struct {
|
||||
Endpoints []string
|
||||
CertificatesDir string
|
||||
ManifestDir string
|
||||
// GenericClient is a common etcd client for supported etcd servers
|
||||
type GenericClient struct {
|
||||
Endpoints []string
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
// HasTLS returns a boolean representing whether the static pod etcd cluster implements TLS.
|
||||
// It may return an error for file I/O issues.
|
||||
func (cluster StaticPodCluster) HasTLS() (bool, error) {
|
||||
return PodManifestsHaveTLS(cluster.ManifestDir)
|
||||
// HasTLS returns true if etcd is configured for TLS
|
||||
func (c GenericClient) HasTLS() bool {
|
||||
return c.TLSConfig != nil
|
||||
}
|
||||
|
||||
// PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags
|
||||
@ -86,55 +83,87 @@ FlagLoop:
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetStatus invokes the proper protocol check based off of whether the cluster HasTLS() to get the cluster's status
|
||||
func (cluster StaticPodCluster) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
hasTLS, err := cluster.HasTLS()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to determine if current etcd static pod is using TLS: %v", err)
|
||||
}
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
if hasTLS {
|
||||
tlsConfig, err = NewTLSConfig(cluster.CertificatesDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create a TLS Config using the cluster.CertificatesDir: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return GetClusterStatus(cluster.Endpoints, tlsConfig)
|
||||
}
|
||||
|
||||
// NewTLSConfig generates a tlsConfig using credentials from the default sub-paths of the certificates directory
|
||||
func NewTLSConfig(certificatesDir string) (*tls.Config, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
|
||||
KeyFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
|
||||
TrustedCAFile: filepath.Join(certificatesDir, constants.EtcdCACertName),
|
||||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tlsConfig, nil
|
||||
}
|
||||
|
||||
// GetClusterStatus returns nil for status Up or error for status Down
|
||||
func GetClusterStatus(endpoints []string, tlsConfig *tls.Config) (*clientv3.StatusResponse, error) {
|
||||
// GetStatus gets server status
|
||||
func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) {
|
||||
const dialTimeout = 5 * time.Second
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: 5 * time.Second,
|
||||
TLS: tlsConfig,
|
||||
Endpoints: c.Endpoints,
|
||||
DialTimeout: dialTimeout,
|
||||
TLS: c.TLSConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
resp, err := cli.Status(context.Background(), endpoints[0])
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
resp, err := cli.Status(ctx, c.Endpoints[0])
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// WaitForStatus returns a StatusResponse after an initial delay and retry attempts
|
||||
func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
|
||||
fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay)
|
||||
time.Sleep(delay)
|
||||
for i := 0; i < retries; i++ {
|
||||
if i > 0 {
|
||||
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
|
||||
time.Sleep(retryInterval)
|
||||
}
|
||||
fmt.Printf("[util/etcd] Attempting to get etcd status %d/%d\n", i+1, retries)
|
||||
resp, err := c.GetStatus()
|
||||
if err != nil {
|
||||
switch err {
|
||||
case context.DeadlineExceeded:
|
||||
fmt.Println("[util/etcd] Attempt timed out")
|
||||
default:
|
||||
fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
return nil, fmt.Errorf("timeout waiting for etcd cluster status")
|
||||
}
|
||||
|
||||
// NewClient creates a new EtcdCluster client
|
||||
func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) {
|
||||
client := GenericClient{Endpoints: endpoints}
|
||||
|
||||
if caFile != "" || certFile != "" || keyFile != "" {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.TLSConfig = tlsConfig
|
||||
}
|
||||
|
||||
return &client, nil
|
||||
}
|
||||
|
||||
// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir
|
||||
func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) {
|
||||
hasTLS, err := PodManifestsHaveTLS(manifestDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err)
|
||||
}
|
||||
if hasTLS {
|
||||
return NewClient(
|
||||
endpoints,
|
||||
filepath.Join(certificatesDir, constants.EtcdCACertName),
|
||||
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
|
||||
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
|
||||
)
|
||||
}
|
||||
return NewClient(endpoints, "", "", "")
|
||||
}
|
||||
|
@ -179,9 +179,7 @@ func TestPodManifestHasTLS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
tmpEtcdCluster := StaticPodCluster{ManifestDir: tmpdir}
|
||||
|
||||
hasTLS, actualErr := tmpEtcdCluster.HasTLS()
|
||||
hasTLS, actualErr := PodManifestsHaveTLS(tmpdir)
|
||||
if (actualErr != nil) != rt.expectErr {
|
||||
t.Errorf(
|
||||
"PodManifestHasTLS failed\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
|
||||
|
Loading…
Reference in New Issue
Block a user