Merge pull request #62655 from stealthybox/TLSUpgrade_+_detiber-kubeadm_hash

Automatic merge from submit-queue (batch tested with PRs 62655, 61711, 59122, 62853, 62390). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Modify the kubeadm upgrade DAG for the TLS Upgrade

**What this PR does / why we need it**:
This adds the necessary utilities to detect Etcd TLS on static pods from the file system and query Etcd.
It modifies the upgrade logic to make it support the APIServer downtime.
Tests are included and should be passing.

```bash 
bazel test //cmd/kubeadm/... \
  && bazel build //cmd/kubeadm --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 \
  && issue=TLSUpgrade ~/Repos/vagrant-kubeadm-testing/copy_kubeadm_bin.sh
```
These cases are working consistently for me
```bash
kubeadm-1.9.6 reset \
  && kubeadm-1.9.6 init --kubernetes-version 1.9.1 \
  && kubectl apply -f https://git.io/weave-kube-1.6
/vagrant/bin/TLSUpgrade_kubeadm upgrade apply 1.9.6  # non-TLS to TLS
/vagrant/bin/TLSUpgrade_kubeadm upgrade apply 1.10.0 # TLS to TLS
/vagrant/bin/TLSUpgrade_kubeadm upgrade apply 1.10.1 # TLS to TLS
/vagrant/bin/TLSUpgrade_kubeadm upgrade apply 1.9.1  # TLS to TLS /w major version downgrade
```

This branch is based on top of #61942, as resolving the hash race condition is necessary for consistent behavior.
It looks to fit in pretty well with @craigtracey's PR: #62141
The interfaces are pretty similar

/assign @detiber @timothysc

**Which issue(s) this PR fixes**
Helps with https://github.com/kubernetes/kubeadm/issues/740

**Special notes for your reviewer**:

278b322a1c
   [kubeadm] Implement ReadStaticPodFromDisk

c74b56372d
   Implement etcdutils with Cluster.HasTLS()

   - Test HasTLS()
   - Instrument throughout upgrade plan and apply
   - Update plan_test and apply_test to use new fake Cluster interfaces
   - Add descriptions to upgrade range test
   - Support KubernetesDir and EtcdDataDir in upgrade tests
   - Cover etcdUpgrade in upgrade tests
   - Cover upcoming TLSUpgrade in upgrade tests

8d8e5fe33b
   Update test-case, fix nil-pointer bug, and improve error message

97117fa873
   Modify the kubeadm upgrade DAG for the TLS Upgrade

   - Calculate `beforePodHashMap` before the etcd upgrade in anticipation of
   KubeAPIServer downtime
   - Detect if pre-upgrade etcd static pod cluster `HasTLS()==false` to switch
   on the Etcd TLS Upgrade if TLS Upgrade:
      - Skip L7 Etcd check (could implement a waiter for this)
      - Skip data rollback on etcd upgrade failure due to lack of L7 check
    (APIServer is already down unable to serve new requests)
      - On APIServer upgrade failure, also rollback the etcd manifest to
    maintain protocol compatibility

   - Add logging

**Release note**:
```release-note
kubeadm upgrade no longer races leading to unexpected upgrade behavior on pod restarts
kubeadm upgrade now successfully upgrades etcd and the controlplane to use TLS
kubeadm upgrade now supports external etcd setups
kubeadm upgrade can now rollback and restore etcd after an upgrade failure
```
This commit is contained in:
Kubernetes Submit Queue 2018-04-24 13:28:13 -07:00 committed by GitHub
commit 67870dac16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 854 additions and 184 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/util/version"
)
@ -281,7 +282,9 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter
return err
}
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade)
// These are uninitialized because passing in the clients allow for mocking the client during testing
var oldEtcdClient, newEtdClient etcdutil.Client
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdClient, newEtdClient)
}
// DryRunStaticPodUpgrade fakes an upgrade of the control plane

View File

@ -27,9 +27,11 @@ import (
"github.com/spf13/cobra"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
)
// NewCmdPlan returns the cobra command for `kubeadm upgrade plan`
@ -64,11 +66,18 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error {
}
// Define Local Etcd cluster to be able to retrieve information
etcdCluster := kubeadmutil.LocalEtcdCluster{}
etcdClient, err := etcdutil.NewStaticPodClient(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
upgradeVars.cfg.CertificatesDir,
)
if err != nil {
return err
}
// Compute which upgrade possibilities there are
glog.V(1).Infof("[upgrade/plan] computing upgrade possibilities")
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdCluster, upgradeVars.cfg.FeatureGates)
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades, etcdClient, upgradeVars.cfg.FeatureGates)
if err != nil {
return fmt.Errorf("[upgrade/versions] FATAL: %v", err)
}

View File

@ -313,7 +313,7 @@ func EtcdSupportedVersion(versionString string) (*version.Version, error) {
}
return etcdVersion, nil
}
return nil, fmt.Errorf("Unsupported or unknown kubernetes version")
return nil, fmt.Errorf("Unsupported or unknown kubernetes version(%v)", kubernetesVersion)
}
// GetStaticPodDirectory returns the location on the disk where the Static Pod should be present

View File

@ -124,7 +124,7 @@ func TestEtcdSupportedVersion(t *testing.T) {
{
kubernetesVersion: "1.99.0",
expectedVersion: nil,
expectedError: fmt.Errorf("Unsupported or unknown kubernetes version"),
expectedError: fmt.Errorf("Unsupported or unknown kubernetes version(1.99.0)"),
},
{
kubernetesVersion: "1.9.0",

View File

@ -36,6 +36,7 @@ go_library(
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/version:go_default_library",
@ -84,10 +85,12 @@ go_test(
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/test:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

View File

@ -23,7 +23,7 @@ import (
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/util/version"
)
@ -74,7 +74,7 @@ type ClusterState struct {
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
// kinds of upgrades can be performed
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, cluster util.EtcdCluster, featureGates map[string]bool) ([]Upgrade, error) {
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) {
fmt.Println("[upgrade] Fetching available versions to upgrade to")
// Collect the upgrades kubeadm can do in this list
@ -107,7 +107,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
}
// Get current etcd version
etcdStatus, err := cluster.GetEtcdClusterStatus()
etcdStatus, err := etcdClient.GetStatus()
if err != nil {
return nil, err
}

View File

@ -19,6 +19,7 @@ package upgrade
import (
"reflect"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
versionutil "k8s.io/kubernetes/pkg/util/version"
@ -61,14 +62,20 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
}, nil
}
type fakeEtcdCluster struct{}
type fakeEtcdCluster struct{ TLS bool }
func (f fakeEtcdCluster) GetEtcdClusterStatus() (*clientv3.StatusResponse, error) {
func (f fakeEtcdCluster) HasTLS() bool { return f.TLS }
func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
}
func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
func TestGetAvailableUpgrades(t *testing.T) {
featureGates := make(map[string]bool)
tests := []struct {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strings"
"time"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
@ -28,6 +29,7 @@ import (
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/util/version"
)
@ -127,13 +129,29 @@ func (spm *KubeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string, recoverManifests map[string]string) error {
func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string, recoverManifests map[string]string, isTLSUpgrade bool) error {
// Special treatment is required for etcd case, when rollbackOldManifests should roll back etcd
// manifests only for the case when component is Etcd
recoverEtcd := false
waitForComponentRestart := true
if component == constants.Etcd {
recoverEtcd = true
}
if isTLSUpgrade {
// We currently depend on getting the Etcd mirror Pod hash from the KubeAPIServer;
// Upgrading the Etcd protocol takes down the apiserver, so we can't verify component restarts if we restart Etcd independently.
// Skip waiting for Etcd to restart and immediately move on to updating the apiserver.
if component == constants.Etcd {
waitForComponentRestart = false
}
// Normally, if an Etcd upgrade is successful, but the apiserver upgrade fails, Etcd is not rolled back.
// In the case of a TLS upgrade, the old KubeAPIServer config is incompatible with the new Etcd confg, so we rollback Etcd
// if the APIServer upgrade fails.
if component == constants.KubeAPIServer {
recoverEtcd = true
fmt.Printf("[upgrade/staticpods] The %s manifest will be restored if component %q fails to upgrade\n", constants.Etcd, component)
}
}
// ensure etcd certs are generated for etcd and kube-apiserver
if component == constants.Etcd || component == constants.KubeAPIServer {
@ -180,34 +198,40 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP
}
fmt.Printf("[upgrade/staticpods] Moved new manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHash); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr, recoverEtcd)
if waitForComponentRestart {
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
if err := waiter.WaitForStaticPodHashChange(cfg.NodeName, component, beforePodHash); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr, recoverEtcd)
}
// Wait for the static pod component to come up and register itself as a mirror pod
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr, recoverEtcd)
}
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
} else {
fmt.Printf("[upgrade/staticpods] Not waiting for pod-hash change for component %q\n", component)
}
// Wait for the static pod component to come up and register itself as a mirror pod
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr, recoverEtcd)
}
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
return nil
}
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string) (bool, error) {
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) {
// Add etcd static pod spec only if external etcd is not configured
if len(cfg.Etcd.Endpoints) != 0 {
return false, fmt.Errorf("external etcd detected, won't try to change any etcd state")
}
// Checking health state of etcd before proceeding with the upgrade
etcdCluster := util.LocalEtcdCluster{}
etcdStatus, err := etcdCluster.GetEtcdClusterStatus()
etcdStatus, err := oldEtcdClient.GetStatus()
if err != nil {
return true, fmt.Errorf("etcd cluster is not healthy: %v", err)
}
@ -222,11 +246,11 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Need to check currently used version and version from constants, if differs then upgrade
desiredEtcdVersion, err := constants.EtcdSupportedVersion(cfg.KubernetesVersion)
if err != nil {
return true, fmt.Errorf("failed to parse the desired etcd version(%s): %v", desiredEtcdVersion.String(), err)
return true, fmt.Errorf("failed to retrieve an etcd version for the target kubernetes version: %v", err)
}
currentEtcdVersion, err := version.ParseSemantic(etcdStatus.Version)
if err != nil {
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersion.String(), err)
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", etcdStatus.Version, err)
}
// Comparing current etcd version with desired to catch the same version or downgrade condition and fail on them.
@ -249,52 +273,93 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
return true, fmt.Errorf("error creating local etcd static pod manifest file: %v", err)
}
// Waiter configurations for checking etcd status
noDelay := 0 * time.Second
podRestartDelay := noDelay
if isTLSUpgrade {
// If we are upgrading TLS we need to wait for old static pod to be removed.
// This is needed because we are not able to currently verify that the static pod
// has been updated through the apiserver across an etcd TLS upgrade.
// This value is arbitrary but seems to be long enough in manual testing.
podRestartDelay = 30 * time.Second
}
retries := 10
retryInterval := 15 * time.Second
// Perform etcd upgrade using common to all control plane components function
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests); err != nil {
// Since etcd upgrade component failed, the old manifest has been restored
// now we need to check the health of etcd cluster if it came back up with old manifest
if _, err := etcdCluster.GetEtcdClusterStatus(); err != nil {
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests, isTLSUpgrade); err != nil {
fmt.Printf("[upgrade/etcd] Failed to upgrade etcd: %v\n", err)
// Since upgrade component failed, the old etcd manifest has either been restored or was never touched
// Now we need to check the health of etcd cluster if it is up with old manifest
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
fmt.Println("[upgrade/etcd] Rolling back etcd data")
if err := rollbackEtcdData(cfg, pathMgr); err != nil {
// Even copying back datastore failed, no options for recovery left, bailing out
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
return true, fmt.Errorf("fatal error rolling back local etcd cluster datadir: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
// Old datastore has been copied, rolling back old manifests
if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil {
// Rolling back to old manifests failed, no options for recovery left, bailing out
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
if _, err := etcdCluster.GetEtcdClusterStatus(); err != nil {
fmt.Println("[upgrade/etcd] Etcd data rollback successful")
// Now that we've rolled back the data, let's check if the cluster comes up
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err)
// We've recovered to the previous etcd from this case
}
fmt.Println("[upgrade/etcd] Etcd was rolled back and is now available")
// Since etcd cluster came back up with the old manifest
return true, fmt.Errorf("fatal error when trying to upgrade the etcd cluster: %v, rolled the state back to pre-upgrade state", err)
}
// Checking health state of etcd after the upgrade
if _, err = etcdCluster.GetEtcdClusterStatus(); err != nil {
// Despite the fact that upgradeComponent was successful, there is something wrong with etcd cluster
// First step is to restore back up of datastore
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
// Even copying back datastore failed, no options for recovery left, bailing out
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
// Old datastore has been copied, rolling back old manifests
if err := rollbackOldManifests(recoverManifests, err, pathMgr, true); err != nil {
// Rolling back to old manifests failed, no options for recovery left, bailing out
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
if _, err := etcdCluster.GetEtcdClusterStatus(); err != nil {
// Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
// Initialize the new etcd client if it wasn't pre-initialized
if newEtcdClient == nil {
client, err := etcdutil.NewStaticPodClient(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,
)
if err != nil {
return true, fmt.Errorf("fatal error creating etcd client: %v", err)
}
newEtcdClient = client
}
// Checking health state of etcd after the upgrade
fmt.Println("[upgrade/etcd] Waiting for etcd to become available")
if _, err = newEtcdClient.WaitForStatus(podRestartDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck etcd: %v\n", err)
// Despite the fact that upgradeComponent was successful, there is something wrong with the etcd cluster
// First step is to restore back up of datastore
fmt.Println("[upgrade/etcd] Rolling back etcd data")
if err := rollbackEtcdData(cfg, pathMgr); err != nil {
// Even copying back datastore failed, no options for recovery left, bailing out
return true, fmt.Errorf("fatal error rolling back local etcd cluster datadir: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
fmt.Println("[upgrade/etcd] Etcd data rollback successful")
// Old datastore has been copied, rolling back old manifests
fmt.Println("[upgrade/etcd] Rolling back etcd manifest")
rollbackOldManifests(recoverManifests, err, pathMgr, true)
// rollbackOldManifests() always returns an error -- ignore it and continue
// Assuming rollback of the old etcd manifest was successful, check the status of etcd cluster again
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
fmt.Println("[upgrade/etcd] Etcd was rolled back and is now available")
// We've successfully rolled back etcd, and now return an error describing that the upgrade failed
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, rolled the state back to pre-upgrade state", err)
}
@ -302,13 +367,60 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error {
recoverManifests := map[string]string{}
var isTLSUpgrade bool
var isExternalEtcd bool
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
if err != nil {
return err
}
if oldEtcdClient == nil {
if len(cfg.Etcd.Endpoints) > 0 {
// External etcd
isExternalEtcd = true
client, err := etcdutil.NewClient(
cfg.Etcd.Endpoints,
cfg.Etcd.CAFile,
cfg.Etcd.CertFile,
cfg.Etcd.KeyFile,
)
if err != nil {
return fmt.Errorf("failed to create etcd client for external etcd: %v", err)
}
oldEtcdClient = client
// Since etcd is managed externally, the new etcd client will be the same as the old client
if newEtcdClient == nil {
newEtcdClient = client
}
} else {
// etcd Static Pod
client, err := etcdutil.NewStaticPodClient(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,
)
if err != nil {
return fmt.Errorf("failed to create etcd client: %v", err)
}
oldEtcdClient = client
}
}
// etcd upgrade is done prior to other control plane components
if etcdUpgrade {
if !isExternalEtcd && etcdUpgrade {
previousEtcdHasTLS := oldEtcdClient.HasTLS()
// set the TLS upgrade flag for all components
isTLSUpgrade = !previousEtcdHasTLS
if isTLSUpgrade {
fmt.Printf("[upgrade/etcd] Upgrading to TLS for %s\n", constants.Etcd)
}
// Perform etcd upgrade using common to all control plane components function
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests)
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient)
if err != nil {
if fatal {
return err
@ -317,11 +429,6 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
}
}
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
if err != nil {
return err
}
// Write the updated static Pod manifests into the temporary directory
fmt.Printf("[upgrade/staticpods] Writing new Static Pod manifests to %q\n", pathMgr.TempManifestDir())
err = controlplanephase.CreateInitStaticPodManifestFiles(pathMgr.TempManifestDir(), cfg)
@ -330,7 +437,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
}
for _, component := range constants.MasterComponents {
if err = upgradeComponent(component, waiter, pathMgr, cfg, beforePodHashMap[component], recoverManifests); err != nil {
if err = upgradeComponent(component, waiter, pathMgr, cfg, beforePodHashMap[component], recoverManifests, isTLSUpgrade); err != nil {
return err
}
}
@ -345,7 +452,8 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
return nil
}
// rollbackOldManifests rolls back the backuped manifests if something went wrong
// rollbackOldManifests rolls back the backed-up manifests if something went wrong.
// It always returns an error to the caller.
func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr StaticPodPathManager, restoreEtcd bool) error {
errs := []error{origErr}
for component, backupPath := range oldManifests {
@ -366,17 +474,16 @@ func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
}
// rollbackEtcdData rolls back the the content of etcd folder if something went wrong
func rollbackEtcdData(cfg *kubeadmapi.MasterConfiguration, origErr error, pathMgr StaticPodPathManager) error {
errs := []error{origErr}
// rollbackEtcdData rolls back the the content of etcd folder if something went wrong.
// When the folder contents are successfully rolled back, nil is returned, otherwise an error is returned.
func rollbackEtcdData(cfg *kubeadmapi.MasterConfiguration, pathMgr StaticPodPathManager) error {
backupEtcdDir := pathMgr.BackupEtcdDir()
runningEtcdDir := cfg.Etcd.DataDir
err := util.CopyDir(backupEtcdDir, runningEtcdDir)
if err != nil {
errs = append(errs, err)
if err := util.CopyDir(backupEtcdDir, runningEtcdDir); err != nil {
// Let the user know there we're problems, but we tried to reçover
return fmt.Errorf("couldn't recover etcd database with error: %v, the location of etcd backup: %s ", err, backupEtcdDir)
}
// Let the user know there we're problems, but we tried to reçover
return fmt.Errorf("couldn't recover etcd database with error: %v, the location of etcd backup: %s ", errs, backupEtcdDir)
return nil
}

View File

@ -21,10 +21,13 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/apimachinery/pkg/runtime"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
@ -33,6 +36,7 @@ import (
controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
@ -56,7 +60,7 @@ controllerManagerExtraArgs: null
etcd:
caFile: ""
certFile: ""
dataDir: /var/lib/etcd
dataDir: %s
endpoints: null
extraArgs: null
image: ""
@ -117,8 +121,8 @@ func (w *fakeWaiter) WaitForStaticPodSingleHash(_ string, _ string) (string, err
return "", w.errsToReturn[waitForHashes]
}
// WaitForStaticPodControlPlaneHashChange returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
// WaitForStaticPodHashChange returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error {
return w.errsToReturn[waitForHashChange]
}
@ -128,6 +132,7 @@ func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error {
}
type fakeStaticPodPathManager struct {
kubernetesDir string
realManifestDir string
tempManifestDir string
backupManifestDir string
@ -136,29 +141,36 @@ type fakeStaticPodPathManager struct {
}
func NewFakeStaticPodPathManager(moveFileFunc func(string, string) error) (StaticPodPathManager, error) {
realManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
kubernetesDir, err := ioutil.TempDir("", "kubeadm-pathmanager-")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
upgradedManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
realManifestDir := filepath.Join(kubernetesDir, constants.ManifestsSubDirName)
if err := os.Mkdir(realManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a realManifestDir for the upgrade: %v", err)
}
backupManifestsDir, err := ioutil.TempDir("", "kubeadm-backup-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
upgradedManifestDir := filepath.Join(kubernetesDir, "upgraded-manifests")
if err := os.Mkdir(upgradedManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a upgradedManifestDir for the upgrade: %v", err)
}
backupEtcdDir, err := ioutil.TempDir("", "kubeadm-backup-etcd")
if err != nil {
backupManifestDir := filepath.Join(kubernetesDir, "backup-manifests")
if err := os.Mkdir(backupManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a backupManifestDir for the upgrade: %v", err)
}
backupEtcdDir := filepath.Join(kubernetesDir, "kubeadm-backup-etcd")
if err := os.Mkdir(backupEtcdDir, 0700); err != nil {
return nil, err
}
return &fakeStaticPodPathManager{
realManifestDir: realManifestsDir,
tempManifestDir: upgradedManifestsDir,
backupManifestDir: backupManifestsDir,
kubernetesDir: kubernetesDir,
realManifestDir: realManifestDir,
tempManifestDir: upgradedManifestDir,
backupManifestDir: backupManifestDir,
backupEtcdDir: backupEtcdDir,
MoveFileFunc: moveFileFunc,
}, nil
@ -168,6 +180,10 @@ func (spm *fakeStaticPodPathManager) MoveFile(oldPath, newPath string) error {
return spm.MoveFileFunc(oldPath, newPath)
}
func (spm *fakeStaticPodPathManager) KubernetesDir() string {
return spm.kubernetesDir
}
func (spm *fakeStaticPodPathManager) RealManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.realManifestDir)
}
@ -193,14 +209,60 @@ func (spm *fakeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
type fakeTLSEtcdClient struct{ TLS bool }
func (c fakeTLSEtcdClient) HasTLS() bool {
return c.TLS
}
func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
}
func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return c.GetStatus()
}
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
func (c fakePodManifestEtcdClient) HasTLS() bool {
hasTLS, _ := etcdutil.PodManifestsHaveTLS(c.ManifestDir)
return hasTLS
}
func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
// Make sure the certificates generated from the upgrade are readable from disk
tlsInfo := transport.TLSInfo{
CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName),
KeyFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientCertName),
TrustedCAFile: filepath.Join(c.CertificatesDir, constants.EtcdHealthcheckClientKeyName),
}
_, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
}
func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return c.GetStatus()
}
func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
description string
waitErrsToReturn map[string]error
moveFileFunc func(string, string) error
expectedErr bool
manifestShouldChange bool
}{
{ // error-free case should succeed
{
description: "error-free case should succeed",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -212,7 +274,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: false,
manifestShouldChange: true,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: fmt.Errorf("boo! failed"),
waitForHashChange: nil,
@ -224,7 +287,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: fmt.Errorf("boo! failed"),
@ -236,7 +300,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -248,7 +313,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
{
description: "any path-moving error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -264,7 +330,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
{
description: "any path-moving error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -280,7 +347,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort; even though this is the last component (kube-apiserver and kube-controller-manager healthy)
{
description: "any path-moving error should result in a rollback and an abort; even though this is the last component (kube-apiserver and kube-controller-manager healthy)",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -304,17 +372,21 @@ func TestStaticPodControlPlane(t *testing.T) {
if err != nil {
t.Fatalf("couldn't run NewFakeStaticPodPathManager: %v", err)
}
defer os.RemoveAll(pathMgr.RealManifestDir())
defer os.RemoveAll(pathMgr.TempManifestDir())
defer os.RemoveAll(pathMgr.BackupManifestDir())
defer os.RemoveAll(pathMgr.(*fakeStaticPodPathManager).KubernetesDir())
constants.KubernetesDir = pathMgr.(*fakeStaticPodPathManager).KubernetesDir()
tempCertsDir, err := ioutil.TempDir("", "kubeadm-certs")
if err != nil {
t.Fatalf("couldn't create temporary certificates directory: %v", err)
}
defer os.RemoveAll(tempCertsDir)
tmpEtcdDataDir, err := ioutil.TempDir("", "kubeadm-etcd-data")
if err != nil {
t.Fatalf("couldn't create temporary etcd data directory: %v", err)
}
defer os.RemoveAll(tmpEtcdDataDir)
oldcfg, err := getConfig("v1.7.0", tempCertsDir)
oldcfg, err := getConfig("v1.9.0", tempCertsDir, tmpEtcdDataDir)
if err != nil {
t.Fatalf("couldn't create config: %v", err)
}
@ -356,15 +428,28 @@ func TestStaticPodControlPlane(t *testing.T) {
t.Fatalf("couldn't read temp file: %v", err)
}
newcfg, err := getConfig("v1.8.0", tempCertsDir)
newcfg, err := getConfig("v1.10.0", tempCertsDir, tmpEtcdDataDir)
if err != nil {
t.Fatalf("couldn't create config: %v", err)
}
actualErr := StaticPodControlPlane(waiter, pathMgr, newcfg, false)
actualErr := StaticPodControlPlane(
waiter,
pathMgr,
newcfg,
true,
fakeTLSEtcdClient{
TLS: false,
},
fakePodManifestEtcdClient{
ManifestDir: pathMgr.RealManifestDir(),
CertificatesDir: newcfg.CertificatesDir,
},
)
if (actualErr != nil) != rt.expectedErr {
t.Errorf(
"failed UpgradeStaticPodControlPlane\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
"failed UpgradeStaticPodControlPlane\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
rt.description,
rt.expectedErr,
(actualErr != nil),
actualErr,
@ -378,12 +463,13 @@ func TestStaticPodControlPlane(t *testing.T) {
if (oldHash != newHash) != rt.manifestShouldChange {
t.Errorf(
"failed StaticPodControlPlane\n\texpected manifest change: %t\n\tgot: %t",
"failed StaticPodControlPlane\n%s\n\texpected manifest change: %t\n\tgot: %t",
rt.description,
rt.manifestShouldChange,
(oldHash != newHash),
)
}
return
}
}
@ -398,10 +484,10 @@ func getAPIServerHash(dir string) (string, error) {
return fmt.Sprintf("%x", sha256.Sum256(fileBytes)), nil
}
func getConfig(version string, certsDir string) (*kubeadmapi.MasterConfiguration, error) {
func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) {
externalcfg := &kubeadmapiext.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, version)), externalcfg); err != nil {
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil {
return nil, fmt.Errorf("unable to decode config: %v", err)
}
legacyscheme.Scheme.Convert(externalcfg, internalcfg, nil)

View File

@ -13,7 +13,6 @@ go_library(
"copy.go",
"endpoint.go",
"error.go",
"etcd.go",
"marshal.go",
"template.go",
"version.go",
@ -22,7 +21,6 @@ go_library(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
@ -64,6 +62,7 @@ filegroup(
"//cmd/kubeadm/app/util/audit:all-srcs",
"//cmd/kubeadm/app/util/config:all-srcs",
"//cmd/kubeadm/app/util/dryrun:all-srcs",
"//cmd/kubeadm/app/util/etcd:all-srcs",
"//cmd/kubeadm/app/util/kubeconfig:all-srcs",
"//cmd/kubeadm/app/util/pubkeypin:all-srcs",
"//cmd/kubeadm/app/util/staticpod:all-srcs",

View File

@ -19,6 +19,7 @@ go_library(
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//vendor/k8s.io/api/apps/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View File

@ -17,8 +17,6 @@ limitations under the License.
package apiclient
import (
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
@ -31,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// Waiter is an interface for waiting for criteria in Kubernetes to happen
@ -43,11 +42,11 @@ type Waiter interface {
WaitForPodToDisappear(staticPodName string) error
// WaitForStaticPodSingleHash fetches sha256 hash for the control plane static pod
WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
// WaitForStaticPodHashChange waits for the given static pod component's static pod hash to get updated.
// By doing that we can be sure that the kubelet has restarted the given Static Pod
WaitForStaticPodHashChange(nodeName, component, previousHash string) error
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForStaticPodControlPlaneHashChange waits for the given static pod component's static pod hash to get updated.
// By doing that we can be sure that the kubelet has restarted the given Static Pod
WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForHealthyKubelet(initalTimeout time.Duration, healthzEndpoint string) error
// SetTimeout adjusts the timeout to the specified duration
@ -194,17 +193,17 @@ func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component strin
return componentPodHash, err
}
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
// WaitForStaticPodHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
// This implicitly means this function blocks until the kubelet has restarted the Static Pod in question
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error {
func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error {
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
hash, err := getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
return false, nil
}
// We should continue polling until the UID changes
if hashes[component] == previousHash {
if hash == previousHash {
return false, nil
}
@ -235,12 +234,9 @@ func getStaticPodSingleHash(client clientset.Interface, nodeName string, compone
return "", err
}
podBytes, err := json.Marshal(staticPod)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(podBytes)), nil
staticPodHash := staticPod.Annotations[kubetypes.ConfigHashAnnotationKey]
fmt.Printf("Static pod: %s hash: %s\n", staticPodName, staticPodHash)
return staticPodHash, nil
}
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned

View File

@ -106,8 +106,7 @@ func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string)
// SetTimeout is a no-op; we don't wait in this implementation
func (w *Waiter) SetTimeout(_ time.Duration) {}
// WaitForStaticPodControlPlaneHashes returns an empty hash for all control plane images; WaitForStaticPodControlPlaneHashChange won't block in any case
// but the empty strings there are needed
// WaitForStaticPodControlPlaneHashes returns an empty hash for all control plane images;
func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string, error) {
return map[string]string{
constants.KubeAPIServer: "",
@ -122,7 +121,7 @@ func (w *Waiter) WaitForStaticPodSingleHash(_ string, _ string) (string, error)
return "", nil
}
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
// WaitForStaticPodHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
func (w *Waiter) WaitForStaticPodHashChange(_, _, _ string) error {
return nil
}

View File

@ -1,51 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"github.com/coreos/etcd/clientv3"
"time"
)
// EtcdCluster is an interface to get etcd cluster related information
type EtcdCluster interface {
GetEtcdClusterStatus() (*clientv3.StatusResponse, error)
}
// LocalEtcdCluster represents an instance of a local etcd cluster
type LocalEtcdCluster struct{}
// GetEtcdClusterStatus returns nil for status Up or error for status Down
func (cluster LocalEtcdCluster) GetEtcdClusterStatus() (*clientv3.StatusResponse, error) {
ep := []string{"localhost:2379"}
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), ep[0])
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["etcd.go"],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd",
visibility = ["//visibility:public"],
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["etcd_test.go"],
embed = [":go_default_library"],
deps = ["//cmd/kubeadm/test:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,169 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"crypto/tls"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
// Client is an interface to get etcd cluster related information
type Client interface {
GetStatus() (*clientv3.StatusResponse, error)
WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error)
HasTLS() bool
}
// GenericClient is a common etcd client for supported etcd servers
type GenericClient struct {
Endpoints []string
TLSConfig *tls.Config
}
// HasTLS returns true if etcd is configured for TLS
func (c GenericClient) HasTLS() bool {
return c.TLSConfig != nil
}
// PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags
// are missing from the command list. If all the flags are present it returns true.
func PodManifestsHaveTLS(ManifestDir string) (bool, error) {
etcdPodPath := constants.GetStaticPodFilepath(constants.Etcd, ManifestDir)
etcdPod, err := staticpod.ReadStaticPodFromDisk(etcdPodPath)
if err != nil {
return false, fmt.Errorf("failed to check if etcd pod implements TLS: %v", err)
}
tlsFlags := []string{
"--cert-file=",
"--key-file=",
"--trusted-ca-file=",
"--client-cert-auth=",
"--peer-cert-file=",
"--peer-key-file=",
"--peer-trusted-ca-file=",
"--peer-client-cert-auth=",
}
FlagLoop:
for _, flag := range tlsFlags {
for _, container := range etcdPod.Spec.Containers {
for _, arg := range container.Command {
if strings.Contains(arg, flag) {
continue FlagLoop
}
}
}
// flag not found in any container
return false, nil
}
// all flags were found in container args; pod fully implements TLS
return true, nil
}
// GetStatus gets server status
func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) {
const dialTimeout = 5 * time.Second
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
TLS: c.TLSConfig,
})
if err != nil {
return nil, err
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Status(ctx, c.Endpoints[0])
cancel()
if err != nil {
return nil, err
}
return resp, nil
}
// WaitForStatus returns a StatusResponse after an initial delay and retry attempts
func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay)
time.Sleep(delay)
for i := 0; i < retries; i++ {
if i > 0 {
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
time.Sleep(retryInterval)
}
fmt.Printf("[util/etcd] Attempting to get etcd status %d/%d\n", i+1, retries)
resp, err := c.GetStatus()
if err != nil {
switch err {
case context.DeadlineExceeded:
fmt.Println("[util/etcd] Attempt timed out")
default:
fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err)
}
continue
}
return resp, nil
}
return nil, fmt.Errorf("timeout waiting for etcd cluster status")
}
// NewClient creates a new EtcdCluster client
func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) {
client := GenericClient{Endpoints: endpoints}
if caFile != "" || certFile != "" || keyFile != "" {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
TrustedCAFile: caFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
client.TLSConfig = tlsConfig
}
return &client, nil
}
// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir
func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) {
hasTLS, err := PodManifestsHaveTLS(manifestDir)
if err != nil {
return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err)
}
if hasTLS {
return NewClient(
endpoints,
filepath.Join(certificatesDir, constants.EtcdCACertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
)
}
return NewClient(endpoints, "", "", "")
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
)
const (
secureEtcdPod = `# generated by kubeadm v1.10.0
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: etcd
tier: control-plane
name: etcd
namespace: kube-system
spec:
containers:
- command:
- etcd
- --advertise-client-urls=https://127.0.0.1:2379
- --data-dir=/var/lib/etcd
- --peer-key-file=/etc/kubernetes/pki/etcd/peer.key
- --peer-trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt
- --listen-client-urls=https://127.0.0.1:2379
- --peer-client-cert-auth=true
- --cert-file=/etc/kubernetes/pki/etcd/server.crt
- --key-file=/etc/kubernetes/pki/etcd/server.key
- --trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt
- --peer-cert-file=/etc/kubernetes/pki/etcd/peer.crt
- --client-cert-auth=true
image: k8s.gcr.io/etcd-amd64:3.1.12
livenessProbe:
exec:
command:
- /bin/sh
- -ec
- ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 --cacert=/etc/kubernetes/pki/etcd/ca.crt
--cert=/etc/kubernetes/pki/etcd/healthcheck-client.crt --key=/etc/kubernetes/pki/etcd/healthcheck-client.key
get foo
failureThreshold: 8
initialDelaySeconds: 15
timeoutSeconds: 15
name: etcd
resources: {}
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd-data
- mountPath: /etc/kubernetes/pki/etcd
name: etcd-certs
hostNetwork: true
volumes:
- hostPath:
path: /var/lib/etcd
type: DirectoryOrCreate
name: etcd-data
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs
status: {}
`
insecureEtcdPod = `# generated by kubeadm v1.9.6
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: etcd
tier: control-plane
name: etcd
namespace: kube-system
spec:
containers:
- command:
- etcd
- --listen-client-urls=http://127.0.0.1:2379
- --advertise-client-urls=http://127.0.0.1:2379
- --data-dir=/var/lib/etcd
image: gcr.io/google_containers/etcd-amd64:3.1.11
livenessProbe:
failureThreshold: 8
httpGet:
host: 127.0.0.1
path: /health
port: 2379
scheme: HTTP
initialDelaySeconds: 15
timeoutSeconds: 15
name: etcd
resources: {}
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd
hostNetwork: true
volumes:
- hostPath:
path: /var/lib/etcd
type: DirectoryOrCreate
name: etcd
status: {}
`
invalidPod = `---{ broken yaml @@@`
)
func TestPodManifestHasTLS(t *testing.T) {
tests := []struct {
description string
podYaml string
hasTLS bool
expectErr bool
writeManifest bool
}{
{
description: "secure etcd returns true",
podYaml: secureEtcdPod,
hasTLS: true,
writeManifest: true,
expectErr: false,
},
{
description: "insecure etcd returns false",
podYaml: insecureEtcdPod,
hasTLS: false,
writeManifest: true,
expectErr: false,
},
{
description: "invalid pod fails to unmarshal",
podYaml: invalidPod,
hasTLS: false,
writeManifest: true,
expectErr: true,
},
{
description: "non-existent file returns error",
podYaml: ``,
hasTLS: false,
writeManifest: false,
expectErr: true,
},
}
for _, rt := range tests {
tmpdir := testutil.SetupTempDir(t)
defer os.RemoveAll(tmpdir)
manifestPath := filepath.Join(tmpdir, "etcd.yaml")
if rt.writeManifest {
err := ioutil.WriteFile(manifestPath, []byte(rt.podYaml), 0644)
if err != nil {
t.Fatalf("Failed to write pod manifest\n%s\n\tfatal error: %v", rt.description, err)
}
}
hasTLS, actualErr := PodManifestsHaveTLS(tmpdir)
if (actualErr != nil) != rt.expectErr {
t.Errorf(
"PodManifestHasTLS failed\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
rt.description,
rt.expectErr,
(actualErr != nil),
actualErr,
)
}
if hasTLS != rt.hasTLS {
t.Errorf("PodManifestHasTLS failed\n%s\n\texpected hasTLS: %t\n\tgot: %t", rt.description, rt.hasTLS, hasTLS)
}
}
}

View File

@ -41,3 +41,20 @@ func MarshalToYamlForCodecs(obj runtime.Object, gv schema.GroupVersion, codecs s
encoder := codecs.EncoderForVersion(info.Serializer, gv)
return runtime.Encode(encoder, obj)
}
// UnmarshalFromYaml unmarshals yaml into an object.
func UnmarshalFromYaml(buffer []byte, gv schema.GroupVersion) (runtime.Object, error) {
return UnmarshalFromYamlForCodecs(buffer, gv, clientsetscheme.Codecs)
}
// UnmarshalFromYamlForCodecs unmarshals yaml into an object using the specified codec
func UnmarshalFromYamlForCodecs(buffer []byte, gv schema.GroupVersion, codecs serializer.CodecFactory) (runtime.Object, error) {
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
decoder := codecs.DecoderToVersion(info.Serializer, gv)
return runtime.Decode(decoder, buffer)
}

View File

@ -14,6 +14,7 @@ go_test(
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/features:go_default_library",
"//cmd/kubeadm/test:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",

View File

@ -198,6 +198,23 @@ func WriteStaticPodToDisk(componentName, manifestDir string, pod v1.Pod) error {
return nil
}
// ReadStaticPodFromDisk reads a static pod file from disk
func ReadStaticPodFromDisk(manifestPath string) (*v1.Pod, error) {
buf, err := ioutil.ReadFile(manifestPath)
if err != nil {
return &v1.Pod{}, fmt.Errorf("failed to read manifest for %q: %v", manifestPath, err)
}
obj, err := util.UnmarshalFromYaml(buf, v1.SchemeGroupVersion)
if err != nil {
return &v1.Pod{}, fmt.Errorf("failed to unmarshal manifest for %q from YAML: %v", manifestPath, err)
}
pod := obj.(*v1.Pod)
return pod, nil
}
// GetProbeAddress returns an IP address or 127.0.0.1 to use for liveness probes
// in static pod manifests.
func GetProbeAddress(cfg *kubeadmapi.MasterConfiguration, componentName string) string {

View File

@ -17,6 +17,9 @@ limitations under the License.
package staticpod
import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"testing"
@ -28,6 +31,7 @@ import (
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
)
func TestComponentResources(t *testing.T) {
@ -454,3 +458,73 @@ func TestGetExtraParameters(t *testing.T) {
}
}
}
const (
validPod = `
apiVersion: v1
kind: Pod
metadata:
labels:
component: etcd
tier: control-plane
name: etcd
namespace: kube-system
spec:
containers:
- image: gcr.io/google_containers/etcd-amd64:3.1.11
status: {}
`
invalidPod = `---{ broken yaml @@@`
)
func TestReadStaticPodFromDisk(t *testing.T) {
tests := []struct {
description string
podYaml string
expectErr bool
writeManifest bool
}{
{
description: "valid pod is marshaled",
podYaml: validPod,
writeManifest: true,
expectErr: false,
},
{
description: "invalid pod fails to unmarshal",
podYaml: invalidPod,
writeManifest: true,
expectErr: true,
},
{
description: "non-existent file returns error",
podYaml: ``,
writeManifest: false,
expectErr: true,
},
}
for _, rt := range tests {
tmpdir := testutil.SetupTempDir(t)
defer os.RemoveAll(tmpdir)
manifestPath := filepath.Join(tmpdir, "pod.yaml")
if rt.writeManifest {
err := ioutil.WriteFile(manifestPath, []byte(rt.podYaml), 0644)
if err != nil {
t.Fatalf("Failed to write pod manifest\n%s\n\tfatal error: %v", rt.description, err)
}
}
_, actualErr := ReadStaticPodFromDisk(manifestPath)
if (actualErr != nil) != rt.expectErr {
t.Errorf(
"ReadStaticPodFromDisk failed\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
rt.description,
rt.expectErr,
(actualErr != nil),
actualErr,
)
}
}
}