diff --git a/cmd/kubeadm/app/cmd/upgrade/apply.go b/cmd/kubeadm/app/cmd/upgrade/apply.go index 4640683e252..663b055cc90 100644 --- a/cmd/kubeadm/app/cmd/upgrade/apply.go +++ b/cmd/kubeadm/app/cmd/upgrade/apply.go @@ -282,23 +282,9 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter return err } - // These are the same because kubeadm currently does not support reconciling a new config against an older one. - // For instance, currently, changing CertificatesDir or EtcdDataDir breaks the upgrade, because oldcfg is not fetchable. - // There would need to be additional upgrade code to handle copying the certs/data over to the new filepaths. - // It's still useful to have these parameterized as separate clusters though, because it allows us to mock these - // interfaces for tests. - oldEtcdCluster := etcdutil.StaticPodCluster{ - Endpoints: []string{"localhost:2379"}, - ManifestDir: constants.GetStaticPodDirectory(), - CertificatesDir: internalcfg.CertificatesDir, - } - newEtcdCluster := etcdutil.StaticPodCluster{ - Endpoints: []string{"localhost:2379"}, - ManifestDir: constants.GetStaticPodDirectory(), - CertificatesDir: internalcfg.CertificatesDir, - } - - return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdCluster, newEtcdCluster) + // These are uninitialized because passing in the clients allow for mocking the client during testing + var oldEtcdClient, newEtdClient etcdutil.Client + return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdClient, newEtdClient) } // DryRunStaticPodUpgrade fakes an upgrade of the control plane diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index 53ec27ff5fe..da7294874c5 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -66,15 +66,18 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error { } // Define Local Etcd cluster to be able to retrieve information - etcdCluster := etcdutil.StaticPodCluster{ - Endpoints: []string{"localhost:2379"}, - ManifestDir: constants.GetStaticPodDirectory(), - CertificatesDir: upgradeVars.cfg.CertificatesDir, + etcdClient, err := etcdutil.NewStaticPodClient( + []string{"localhost:2379"}, + constants.GetStaticPodDirectory(), + upgradeVars.cfg.CertificatesDir, + ) + if err != nil { + return err } // Compute which upgrade possibilities there are glog.V(1).Infof("[upgrade/plan] computing upgrade possibilities") - availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdCluster, upgradeVars.cfg.FeatureGates) + availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdClient, upgradeVars.cfg.FeatureGates) if err != nil { return fmt.Errorf("[upgrade/versions] FATAL: %v", err) } diff --git a/cmd/kubeadm/app/phases/upgrade/BUILD b/cmd/kubeadm/app/phases/upgrade/BUILD index bd0dc18ec26..13ecb339de8 100644 --- a/cmd/kubeadm/app/phases/upgrade/BUILD +++ b/cmd/kubeadm/app/phases/upgrade/BUILD @@ -91,6 +91,7 @@ go_test( "//pkg/api/legacyscheme:go_default_library", "//pkg/util/version:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", + "//vendor/github.com/coreos/etcd/pkg/transport:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) diff --git a/cmd/kubeadm/app/phases/upgrade/compute.go b/cmd/kubeadm/app/phases/upgrade/compute.go index 68fba96bbf8..e36bb0acc17 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute.go +++ b/cmd/kubeadm/app/phases/upgrade/compute.go @@ -74,7 +74,7 @@ type ClusterState struct { // GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which // kinds of upgrades can be performed -func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdCluster etcdutil.Cluster, featureGates map[string]bool) ([]Upgrade, error) { +func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) { fmt.Println("[upgrade] Fetching available versions to upgrade to") // Collect the upgrades kubeadm can do in this list @@ -107,7 +107,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA } // Get current etcd version - etcdStatus, err := etcdCluster.GetStatus() + etcdStatus, err := etcdClient.GetStatus() if err != nil { return nil, err } diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index 97f219d053e..5b83b254f39 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -19,6 +19,7 @@ package upgrade import ( "reflect" "testing" + "time" "github.com/coreos/etcd/clientv3" versionutil "k8s.io/kubernetes/pkg/util/version" @@ -63,7 +64,7 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) { type fakeEtcdCluster struct{ TLS bool } -func (f fakeEtcdCluster) HasTLS() (bool, error) { return f.TLS, nil } +func (f fakeEtcdCluster) HasTLS() bool { return f.TLS } func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { client := &clientv3.StatusResponse{} @@ -71,6 +72,10 @@ func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { return client, nil } +func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return f.GetStatus() +} + func TestGetAvailableUpgrades(t *testing.T) { featureGates := make(map[string]bool) tests := []struct { diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index d3b8b76b448..66bc88298ce 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strings" + "time" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/constants" @@ -224,13 +225,14 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP } // performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error. -func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) (bool, error) { +func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) { // Add etcd static pod spec only if external etcd is not configured if len(cfg.Etcd.Endpoints) != 0 { return false, fmt.Errorf("external etcd detected, won't try to change any etcd state") } + // Checking health state of etcd before proceeding with the upgrade - etcdStatus, err := oldEtcdCluster.GetStatus() + etcdStatus, err := oldEtcdClient.GetStatus() if err != nil { return true, fmt.Errorf("etcd cluster is not healthy: %v", err) } @@ -276,7 +278,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests, isTLSUpgrade); err != nil { // Since etcd upgrade component failed, the old manifest has been restored // now we need to check the health of etcd cluster if it came back up with old manifest - if _, err := oldEtcdCluster.GetStatus(); err != nil { + if _, err := oldEtcdClient.GetStatus(); err != nil { // At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil { // Even copying back datastore failed, no options for recovery left, bailing out @@ -288,7 +290,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) } // Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster - if _, err := oldEtcdCluster.GetStatus(); err != nil { + if _, err := oldEtcdClient.GetStatus(); err != nil { // Nothing else left to try to recover etcd cluster return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) } @@ -299,51 +301,103 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM return true, fmt.Errorf("fatal error when trying to upgrade the etcd cluster: %v, rolled the state back to pre-upgrade state", err) } - if isTLSUpgrade { - fmt.Printf("[upgrade/etcd] Skipping L7 health-check for %s (as well as data rollback on failure)\n", constants.Etcd) - } else { - // Checking health state of etcd after the upgrade - if _, err = newEtcdCluster.GetStatus(); err != nil { - // Despite the fact that upgradeComponent was successful, there is something wrong with etcd cluster - // First step is to restore back up of datastore - if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil { - // Even copying back datastore failed, no options for recovery left, bailing out - return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) - } - // Old datastore has been copied, rolling back old manifests - if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil { - // Rolling back to old manifests failed, no options for recovery left, bailing out - return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) - } - // Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster - if _, err := oldEtcdCluster.GetStatus(); err != nil { - // Nothing else left to try to recover etcd cluster - return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) - } + fmt.Println("[upgrade/etcd] waiting for etcd to become available") - return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err) + // Initialize the new etcd client if it wasn't pre-initialized + if newEtcdClient == nil { + client, err := etcdutil.NewStaticPodClient( + []string{"localhost:2379"}, + constants.GetStaticPodDirectory(), + cfg.CertificatesDir, + ) + if err != nil { + return true, fmt.Errorf("fatal error creating etcd client: %v", err) } + newEtcdClient = client + } + + // Checking health state of etcd after the upgrade + delay := 0 * time.Second + if isTLSUpgrade { + // If we are upgrading TLS we need to wait for old static pod to be removed. + // This is needed because we are not able to currently verify that the static pod + // has been updated through the apiserver across an etcd TLS upgrade. + delay = 30 * time.Second + } + // The intial delay is required to ensure that the old static etcd pod + // has stopped prior to polling for status. + retries := 10 + retryInterval := 15 * time.Second + if _, err = newEtcdClient.WaitForStatus(delay, retries, retryInterval); err != nil { + // Despite the fact that upgradeComponent was successful, there is something wrong with etcd cluster + // First step is to restore back up of datastore + if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil { + // Even copying back datastore failed, no options for recovery left, bailing out + return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) + } + // Old datastore has been copied, rolling back old manifests + if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil { + // Rolling back to old manifests failed, no options for recovery left, bailing out + return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) + } + // Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster + if _, err := oldEtcdClient.GetStatus(); err != nil { + // Nothing else left to try to recover etcd cluster + return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) + } + + return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err) } return false, nil } // StaticPodControlPlane upgrades a static pod-hosted control plane -func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) error { +func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error { recoverManifests := map[string]string{} var isTLSUpgrade bool + var isExternalEtcd bool beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName) if err != nil { return err } - // etcd upgrade is done prior to other control plane components - if etcdUpgrade { - previousEtcdHasTLS, err := oldEtcdCluster.HasTLS() - if err != nil { - return fmt.Errorf("failed to determine if previous etcd was using TLS: %v", err) + if oldEtcdClient == nil { + if len(cfg.Etcd.Endpoints) > 0 { + // External etcd + isExternalEtcd = true + client, err := etcdutil.NewClient( + cfg.Etcd.Endpoints, + cfg.Etcd.CAFile, + cfg.Etcd.CertFile, + cfg.Etcd.KeyFile, + ) + if err != nil { + return fmt.Errorf("failed to create etcd client for external etcd: %v", err) + } + oldEtcdClient = client + // Since etcd is managed externally, the new etcd client will be the same as the old client + if newEtcdClient == nil { + newEtcdClient = client + } + } else { + // etcd Static Pod + client, err := etcdutil.NewStaticPodClient( + []string{"localhost:2379"}, + constants.GetStaticPodDirectory(), + cfg.CertificatesDir, + ) + if err != nil { + return fmt.Errorf("failed to create etcd client: %v", err) + } + oldEtcdClient = client } + } + + // etcd upgrade is done prior to other control plane components + if !isExternalEtcd && etcdUpgrade { + previousEtcdHasTLS := oldEtcdClient.HasTLS() // set the TLS upgrade flag for all components isTLSUpgrade = !previousEtcdHasTLS @@ -352,7 +406,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager } // Perform etcd upgrade using common to all control plane components function - fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdCluster, newEtcdCluster) + fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient) if err != nil { if fatal { return err diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index 28c2973950f..8bc1f4423b5 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/transport" "k8s.io/apimachinery/pkg/runtime" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" @@ -208,33 +209,50 @@ func (spm *fakeStaticPodPathManager) BackupEtcdDir() string { return spm.backupEtcdDir } -type fakeTLSEtcdCluster struct{ TLS bool } +type fakeTLSEtcdClient struct{ TLS bool } -func (cluster fakeTLSEtcdCluster) HasTLS() (bool, error) { - return cluster.TLS, nil +func (c fakeTLSEtcdClient) HasTLS() bool { + return c.TLS } -func (cluster fakeTLSEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { +func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { client := &clientv3.StatusResponse{} client.Version = "3.1.12" return client, nil } -type fakePodManifestEtcdCluster struct{ ManifestDir, CertificatesDir string } - -func (cluster fakePodManifestEtcdCluster) HasTLS() (bool, error) { - return etcdutil.PodManifestsHaveTLS(cluster.ManifestDir) +func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return c.GetStatus() } -func (cluster fakePodManifestEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { +type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } + +func (c fakePodManifestEtcdClient) HasTLS() bool { + hasTLS, _ := etcdutil.PodManifestsHaveTLS(c.ManifestDir) + return hasTLS +} + +func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { // Make sure the certificates generated from the upgrade are readable from disk - etcdutil.NewTLSConfig(cluster.CertificatesDir) + tlsInfo := transport.TLSInfo{ + CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName), + KeyFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientCertName), + TrustedCAFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientKeyName), + } + _, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } client := &clientv3.StatusResponse{} client.Version = "3.1.12" return client, nil } +func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return c.GetStatus() +} + func TestStaticPodControlPlane(t *testing.T) { tests := []struct { description string @@ -420,10 +438,10 @@ func TestStaticPodControlPlane(t *testing.T) { pathMgr, newcfg, true, - fakeTLSEtcdCluster{ + fakeTLSEtcdClient{ TLS: false, }, - fakePodManifestEtcdCluster{ + fakePodManifestEtcdClient{ ManifestDir: pathMgr.RealManifestDir(), CertificatesDir: newcfg.CertificatesDir, }, diff --git a/cmd/kubeadm/app/util/etcd/BUILD b/cmd/kubeadm/app/util/etcd/BUILD index d01c2d5a31e..8a439c27352 100644 --- a/cmd/kubeadm/app/util/etcd/BUILD +++ b/cmd/kubeadm/app/util/etcd/BUILD @@ -17,10 +17,7 @@ go_test( name = "go_default_test", srcs = ["etcd_test.go"], embed = [":go_default_library"], - deps = [ - "//cmd/kubeadm/app/constants:go_default_library", - "//cmd/kubeadm/test:go_default_library", - ], + deps = ["//cmd/kubeadm/test:go_default_library"], ) filegroup( diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 41cc8cd0cf8..9eec25c4595 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -30,25 +30,22 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod" ) -// Cluster is an interface to get etcd cluster related information -type Cluster interface { - HasTLS() (bool, error) +// Client is an interface to get etcd cluster related information +type Client interface { GetStatus() (*clientv3.StatusResponse, error) + WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) + HasTLS() bool } -// StaticPodCluster represents an instance of a static pod etcd cluster. -// CertificatesDir should contain the etcd CA and healthcheck client TLS identity. -// ManifestDir should contain the etcd static pod manifest. -type StaticPodCluster struct { - Endpoints []string - CertificatesDir string - ManifestDir string +// GenericClient is a common etcd client for supported etcd servers +type GenericClient struct { + Endpoints []string + TLSConfig *tls.Config } -// HasTLS returns a boolean representing whether the static pod etcd cluster implements TLS. -// It may return an error for file I/O issues. -func (cluster StaticPodCluster) HasTLS() (bool, error) { - return PodManifestsHaveTLS(cluster.ManifestDir) +// HasTLS returns true if etcd is configured for TLS +func (c GenericClient) HasTLS() bool { + return c.TLSConfig != nil } // PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags @@ -86,55 +83,87 @@ FlagLoop: return true, nil } -// GetStatus invokes the proper protocol check based off of whether the cluster HasTLS() to get the cluster's status -func (cluster StaticPodCluster) GetStatus() (*clientv3.StatusResponse, error) { - hasTLS, err := cluster.HasTLS() - if err != nil { - return nil, fmt.Errorf("failed to determine if current etcd static pod is using TLS: %v", err) - } - - var tlsConfig *tls.Config - if hasTLS { - tlsConfig, err = NewTLSConfig(cluster.CertificatesDir) - if err != nil { - return nil, fmt.Errorf("failed to create a TLS Config using the cluster.CertificatesDir: %v", err) - } - } - - return GetClusterStatus(cluster.Endpoints, tlsConfig) -} - -// NewTLSConfig generates a tlsConfig using credentials from the default sub-paths of the certificates directory -func NewTLSConfig(certificatesDir string) (*tls.Config, error) { - tlsInfo := transport.TLSInfo{ - CertFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName), - KeyFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName), - TrustedCAFile: filepath.Join(certificatesDir, constants.EtcdCACertName), - } - tlsConfig, err := tlsInfo.ClientConfig() - if err != nil { - return nil, err - } - - return tlsConfig, nil -} - -// GetClusterStatus returns nil for status Up or error for status Down -func GetClusterStatus(endpoints []string, tlsConfig *tls.Config) (*clientv3.StatusResponse, error) { +// GetStatus gets server status +func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) { + const dialTimeout = 5 * time.Second cli, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: 5 * time.Second, - TLS: tlsConfig, + Endpoints: c.Endpoints, + DialTimeout: dialTimeout, + TLS: c.TLSConfig, }) if err != nil { return nil, err } defer cli.Close() - resp, err := cli.Status(context.Background(), endpoints[0]) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + resp, err := cli.Status(ctx, c.Endpoints[0]) + cancel() if err != nil { return nil, err } return resp, nil } + +// WaitForStatus returns a StatusResponse after an initial delay and retry attempts +func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay) + time.Sleep(delay) + for i := 0; i < retries; i++ { + if i > 0 { + fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) + time.Sleep(retryInterval) + } + fmt.Printf("[util/etcd] Attempting to get etcd status %d/%d\n", i+1, retries) + resp, err := c.GetStatus() + if err != nil { + switch err { + case context.DeadlineExceeded: + fmt.Println("[util/etcd] Attempt timed out") + default: + fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err) + } + continue + } + return resp, nil + } + return nil, fmt.Errorf("timeout waiting for etcd cluster status") +} + +// NewClient creates a new EtcdCluster client +func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) { + client := GenericClient{Endpoints: endpoints} + + if caFile != "" || certFile != "" || keyFile != "" { + tlsInfo := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + TrustedCAFile: caFile, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + client.TLSConfig = tlsConfig + } + + return &client, nil +} + +// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir +func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) { + hasTLS, err := PodManifestsHaveTLS(manifestDir) + if err != nil { + return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err) + } + if hasTLS { + return NewClient( + endpoints, + filepath.Join(certificatesDir, constants.EtcdCACertName), + filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName), + filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName), + ) + } + return NewClient(endpoints, "", "", "") +} diff --git a/cmd/kubeadm/app/util/etcd/etcd_test.go b/cmd/kubeadm/app/util/etcd/etcd_test.go index 7a51ba34846..1003f6cf262 100644 --- a/cmd/kubeadm/app/util/etcd/etcd_test.go +++ b/cmd/kubeadm/app/util/etcd/etcd_test.go @@ -179,9 +179,7 @@ func TestPodManifestHasTLS(t *testing.T) { } } - tmpEtcdCluster := StaticPodCluster{ManifestDir: tmpdir} - - hasTLS, actualErr := tmpEtcdCluster.HasTLS() + hasTLS, actualErr := PodManifestsHaveTLS(tmpdir) if (actualErr != nil) != rt.expectErr { t.Errorf( "PodManifestHasTLS failed\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",