Merge pull request #55010 from sbezverk/kubeadm_etcd_upgrade_apply

Automatic merge from submit-queue (batch tested with PRs 51192, 55010). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Adding etcd upgrade option to kubeadm upgrade apply 

This PR adds etcd upgrade functionality to kubeadm upgrade apply.
First commit adds certain functions to be able to deal with a single component of control plane and not just with all three components (apiserver, controller-manager and scheduler). It adds granularity as a result code can be reused. 

Closes: https://github.com/kubernetes/kubeadm/issues/490

```release-note
Adds to **kubeadm upgrade apply**, a new **--etcd-upgrade** keyword. When this keyword is specified, etcd's static pod gets upgraded to the etcd version officially recommended for a target kubernetes release.
```
This commit is contained in:
Kubernetes Submit Queue 2017-11-19 05:22:26 -08:00 committed by GitHub
commit f0ce7ca051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 408 additions and 65 deletions

View File

@ -46,6 +46,7 @@ type applyFlags struct {
nonInteractiveMode bool
force bool
dryRun bool
etcdUpgrade bool
newK8sVersionStr string
newK8sVersion *version.Version
imagePullTimeout time.Duration
@ -62,6 +63,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
flags := &applyFlags{
parent: parentFlags,
imagePullTimeout: 15 * time.Minute,
etcdUpgrade: false,
}
cmd := &cobra.Command{
@ -91,6 +93,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
cmd.Flags().BoolVarP(&flags.nonInteractiveMode, "yes", "y", flags.nonInteractiveMode, "Perform the upgrade and do not prompt for confirmation (non-interactive mode).")
cmd.Flags().BoolVarP(&flags.force, "force", "f", flags.force, "Force upgrading although some requirements might not be met. This also implies non-interactive mode.")
cmd.Flags().BoolVar(&flags.dryRun, "dry-run", flags.dryRun, "Do not change any state, just output what actions would be performed.")
cmd.Flags().BoolVar(&flags.etcdUpgrade, "etcd-upgrade", flags.etcdUpgrade, "Perform the upgrade of etcd.")
cmd.Flags().DurationVar(&flags.imagePullTimeout, "image-pull-timeout", flags.imagePullTimeout, "The maximum amount of time to wait for the control plane pods to be downloaded.")
return cmd
@ -231,17 +234,18 @@ func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, w
if flags.dryRun {
return DryRunStaticPodUpgrade(internalcfg)
}
return PerformStaticPodUpgrade(client, waiter, internalcfg)
return PerformStaticPodUpgrade(client, waiter, internalcfg, flags.etcdUpgrade)
}
// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory())
if err != nil {
return err
}
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg)
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade)
}
// DryRunStaticPodUpgrade fakes an upgrade of the control plane

View File

@ -35,4 +35,5 @@ go_test(
srcs = ["constants_test.go"],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/constants",
library = ":go_default_library",
deps = ["//pkg/util/version:go_default_library"],
)

View File

@ -223,8 +223,32 @@ var (
// MinimumKubeletVersion specifies the minimum version of kubelet which kubeadm supports
MinimumKubeletVersion = version.MustParseSemantic("v1.8.0")
// SupportedEtcdVersion lists officially supported etcd versions with corresponding kubernetes releases
SupportedEtcdVersion = map[uint8]string{
8: "3.0.17",
9: "3.1.10",
}
)
// EtcdSupportedVersion returns officially supported version of etcd for a specific kubernetes release
// if passed version is not listed, the function returns nil and an error
func EtcdSupportedVersion(versionString string) (*version.Version, error) {
kubernetesVersion, err := version.ParseSemantic(versionString)
if err != nil {
return nil, err
}
if etcdStringVersion, ok := SupportedEtcdVersion[uint8(kubernetesVersion.Minor())]; ok {
etcdVersion, err := version.ParseSemantic(etcdStringVersion)
if err != nil {
return nil, err
}
return etcdVersion, nil
}
return nil, fmt.Errorf("Unsupported or unknown kubernetes version")
}
// GetStaticPodDirectory returns the location on the disk where the Static Pod should be present
func GetStaticPodDirectory() string {
return filepath.Join(KubernetesDir, ManifestsSubDirName)

View File

@ -17,6 +17,9 @@ limitations under the License.
package constants
import (
"fmt"
"k8s.io/kubernetes/pkg/util/version"
"strings"
"testing"
)
@ -110,3 +113,58 @@ func TestAddSelfHostedPrefix(t *testing.T) {
}
}
}
func TestEtcdSupportedVersion(t *testing.T) {
var tests = []struct {
kubernetesVersion string
expectedVersion *version.Version
expectedError error
}{
{
kubernetesVersion: "1.8.0",
expectedVersion: version.MustParseSemantic("3.0.17"),
expectedError: nil,
},
{
kubernetesVersion: "1.80.0",
expectedVersion: nil,
expectedError: fmt.Errorf("Unsupported or unknown kubernetes version"),
},
{
kubernetesVersion: "1.9.0",
expectedVersion: version.MustParseSemantic("3.1.10"),
expectedError: nil,
},
{
kubernetesVersion: "1.10.0",
expectedVersion: nil,
expectedError: fmt.Errorf("Unsupported or unknown kubernetes version"),
},
{
kubernetesVersion: "1.8.6",
expectedVersion: version.MustParseSemantic("3.0.17"),
expectedError: nil,
},
}
for _, rt := range tests {
actualVersion, actualError := EtcdSupportedVersion(rt.kubernetesVersion)
if actualError != nil {
if actualError.Error() != rt.expectedError.Error() {
t.Errorf(
"failed EtcdSupportedVersion:\n\texpected error: %v\n\t actual error: %v",
rt.expectedError,
actualError,
)
}
} else {
if strings.Compare(actualVersion.String(), rt.expectedVersion.String()) != 0 {
t.Errorf(
"failed EtcdSupportedVersion:\n\texpected version: %s\n\t actual version: %s",
rt.expectedVersion.String(),
actualVersion.String(),
)
}
}
}
}

View File

@ -30,8 +30,13 @@ func GetCoreImage(image, repoPrefix, k8sVersion, overrideImage string) string {
return overrideImage
}
kubernetesImageTag := kubeadmutil.KubernetesVersionToImageTag(k8sVersion)
etcdImageTag := constants.DefaultEtcdVersion
etcdImageVersion, err := constants.EtcdSupportedVersion(k8sVersion)
if err == nil {
etcdImageTag = etcdImageVersion.String()
}
return map[string]string{
constants.Etcd: fmt.Sprintf("%s/%s-%s:%s", repoPrefix, "etcd", runtime.GOARCH, constants.DefaultEtcdVersion),
constants.Etcd: fmt.Sprintf("%s/%s-%s:%s", repoPrefix, "etcd", runtime.GOARCH, etcdImageTag),
constants.KubeAPIServer: fmt.Sprintf("%s/%s-%s:%s", repoPrefix, "kube-apiserver", runtime.GOARCH, kubernetesImageTag),
constants.KubeControllerManager: fmt.Sprintf("%s/%s-%s:%s", repoPrefix, "kube-controller-manager", runtime.GOARCH, kubernetesImageTag),
constants.KubeScheduler: fmt.Sprintf("%s/%s-%s:%s", repoPrefix, "kube-scheduler", runtime.GOARCH, kubernetesImageTag),

View File

@ -36,7 +36,6 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.Ma
// gets etcd StaticPodSpec, actualized for the current MasterConfiguration
spec := GetEtcdPodSpec(cfg)
// writes etcd StaticPod to disk
if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
return err
@ -56,7 +55,7 @@ func GetEtcdPodSpec(cfg *kubeadmapi.MasterConfiguration) v1.Pod {
return staticpodutil.ComponentPod(v1.Container{
Name: kubeadmconstants.Etcd,
Command: getEtcdCommand(cfg),
Image: images.GetCoreImage(kubeadmconstants.Etcd, cfg.ImageRepository, "", cfg.Etcd.Image),
Image: images.GetCoreImage(kubeadmconstants.Etcd, cfg.ImageRepository, cfg.KubernetesVersion, cfg.Etcd.Image),
// Mount the etcd datadir path read-write so etcd can store data in a more persistent manner
VolumeMounts: []v1.VolumeMount{staticpodutil.NewVolumeMount(etcdVolumeName, cfg.Etcd.DataDir, false)},
LivenessProbe: staticpodutil.ComponentProbe(cfg, kubeadmconstants.Etcd, 2379, "/health", v1.URISchemeHTTP),

View File

@ -26,6 +26,7 @@ go_library(
"//cmd/kubeadm/app/phases/bootstraptoken/clusterinfo:go_default_library",
"//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library",
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/phases/selfhosting:go_default_library",
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
@ -73,6 +74,7 @@ go_test(
"//cmd/kubeadm/app/apis/kubeadm/v1alpha1:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -19,11 +19,15 @@ package upgrade
import (
"fmt"
"os"
"strings"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
"k8s.io/kubernetes/pkg/util/version"
)
// StaticPodPathManager is responsible for tracking the directories used in the static pod upgrade transition
@ -42,6 +46,8 @@ type StaticPodPathManager interface {
BackupManifestPath(component string) string
// BackupManifestDir should point to the backup directory used for backuping manifests during the transition
BackupManifestDir() string
// BackupEtcdDir should point to the backup directory used for backuping manifests during the transition
BackupEtcdDir() string
}
// KubeStaticPodPathManager is a real implementation of StaticPodPathManager that is used when upgrading a static pod cluster
@ -49,14 +55,16 @@ type KubeStaticPodPathManager struct {
realManifestDir string
tempManifestDir string
backupManifestDir string
backupEtcdDir string
}
// NewKubeStaticPodPathManager creates a new instance of KubeStaticPodPathManager
func NewKubeStaticPodPathManager(realDir, tempDir, backupDir string) StaticPodPathManager {
func NewKubeStaticPodPathManager(realDir, tempDir, backupDir, backupEtcdDir string) StaticPodPathManager {
return &KubeStaticPodPathManager{
realManifestDir: realDir,
tempManifestDir: tempDir,
backupManifestDir: backupDir,
backupEtcdDir: backupEtcdDir,
}
}
@ -70,8 +78,12 @@ func NewKubeStaticPodPathManagerUsingTempDirs(realManifestDir string) (StaticPod
if err != nil {
return nil, err
}
backupEtcdDir, err := constants.CreateTempDirForKubeadm("kubeadm-backup-etcd")
if err != nil {
return nil, err
}
return NewKubeStaticPodPathManager(realManifestDir, upgradedManifestsDir, backupManifestsDir), nil
return NewKubeStaticPodPathManager(realManifestDir, upgradedManifestsDir, backupManifestsDir, backupEtcdDir), nil
}
// MoveFile should move a file from oldPath to newPath
@ -109,13 +121,131 @@ func (spm *KubeStaticPodPathManager) BackupManifestDir() string {
return spm.backupManifestDir
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration) error {
// BackupEtcdDir should point to the backup directory used for backuping manifests during the transition
func (spm *KubeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
// This string-string map stores the component name and backup filepath (if a rollback is needed).
// If a rollback is needed,
func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string, recoverManifests map[string]string) error {
// The old manifest is here; in the /etc/kubernetes/manifests/
currentManifestPath := pathMgr.RealManifestPath(component)
// The new, upgraded manifest will be written here
newManifestPath := pathMgr.TempManifestPath(component)
// The old manifest will be moved here; into a subfolder of the temporary directory
// If a rollback is needed, these manifests will be put back to where they where initially
backupManifestPath := pathMgr.BackupManifestPath(component)
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
recoverManifests[component] = backupManifestPath
// Move the old manifest into the old-manifests directory
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
// Move the new manifest into the manifests directory
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHash); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
// Wait for the static pod component to come up and register itself as a mirror pod
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
return nil
}
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string) (bool, error) {
// Add etcd static pod spec only if external etcd is not configured
if len(cfg.Etcd.Endpoints) != 0 {
return false, fmt.Errorf("external etcd cannot be upgraded with kubeadm")
}
// Checking health state of etcd before proceeding with the upgrtade
etcdStatus, err := util.GetEtcdClusterStatus()
if err != nil {
return true, fmt.Errorf("etcd cluster is not healthy: %v", err)
}
// Backing up etcd data store
backupEtcdDir := pathMgr.BackupEtcdDir()
runningEtcdDir := cfg.Etcd.DataDir
if err := util.CopyDir(runningEtcdDir, backupEtcdDir); err != nil {
return true, fmt.Errorf("fail to back up etcd data with %v", err)
}
// Need to check currently used version and version from constants, if differs then upgrade
desiredEtcdVersion, err := constants.EtcdSupportedVersion(cfg.KubernetesVersion)
if err != nil {
return true, fmt.Errorf("failed to parse the desired etcd version(%s): %v", desiredEtcdVersion.String(), err)
}
currentEtcdVersion, err := version.ParseSemantic(etcdStatus.Version)
if err != nil {
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersion.String(), err)
}
// Comparing current etcd version with desired to catch the same version or downgrade condition and fail on them.
if desiredEtcdVersion.LessThan(currentEtcdVersion) {
return true, fmt.Errorf("the requested etcd version (%s) for Kubernetes v(%s) is lower than the currently running version (%s)", desiredEtcdVersion.String(), cfg.KubernetesVersion, currentEtcdVersion.String())
}
// For the case when desired etcd version is the same as current etcd version
if strings.Compare(desiredEtcdVersion.String(), currentEtcdVersion.String()) == 0 {
return false, nil
}
beforeEtcdPodHash, err := waiter.WaitForStaticPodSingleHash(cfg.NodeName, constants.Etcd)
if err != nil {
return true, fmt.Errorf("fail to get etcd pod's hash: %v", err)
}
// Write the updated etcd static Pod manifest into the temporary directory
if err := etcdphase.CreateLocalEtcdStaticPodManifestFile(pathMgr.TempManifestDir(), cfg); err != nil {
return true, rollbackEtcdData(cfg, fmt.Errorf("error creating local etcd static pod manifest file: %v", err), pathMgr)
}
// Perform etcd upgrade using common to all control plane components function
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests); err != nil {
return true, rollbackEtcdData(cfg, err, pathMgr)
}
// Checking health state of etcd after the upgrade
etcdStatus, err = util.GetEtcdClusterStatus()
if err != nil {
return true, rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr)
}
return false, nil
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
recoverManifests := map[string]string{}
// etcd upgrade is done prior to other control plane components
if etcdUpgrade {
// Perform etcd upgrade using common to all control plane components function
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests)
if err != nil {
if fatal {
return err
}
fmt.Printf("[etcd] non fatal issue encountered during upgrade: %v\n", err)
}
}
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
if err != nil {
return err
@ -127,51 +257,19 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
if err != nil {
return fmt.Errorf("error creating init static pod manifest files: %v", err)
}
for _, component := range constants.MasterComponents {
// The old manifest is here; in the /etc/kubernetes/manifests/
currentManifestPath := pathMgr.RealManifestPath(component)
// The new, upgraded manifest will be written here
newManifestPath := pathMgr.TempManifestPath(component)
// The old manifest will be moved here; into a subfolder of the temporary directory
// If a rollback is needed, these manifests will be put back to where they where initially
backupManifestPath := pathMgr.BackupManifestPath(component)
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
recoverManifests[component] = backupManifestPath
// Move the old manifest into the old-manifests directory
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
if err = upgradeComponent(component, waiter, pathMgr, cfg, beforePodHashMap[component], recoverManifests); err != nil {
return err
}
// Move the new manifest into the manifests directory
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHashMap[component]); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
// Wait for the static pod component to come up and register itself as a mirror pod
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
}
// Remove the temporary directories used on a best-effort (don't fail if the calls error out)
// The calls are set here by design; we should _not_ use "defer" above as that would remove the directories
// even in the "fail and rollback" case, where we want the directories preserved for the user.
os.RemoveAll(pathMgr.TempManifestDir())
os.RemoveAll(pathMgr.BackupManifestDir())
os.RemoveAll(pathMgr.BackupEtcdDir())
return nil
}
@ -192,3 +290,18 @@ func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr
// Let the user know there we're problems, but we tried to reçover
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
}
// rollbackEtcdData rolls back the the content of etcd folder if something went wrong
func rollbackEtcdData(cfg *kubeadmapi.MasterConfiguration, origErr error, pathMgr StaticPodPathManager) error {
errs := []error{origErr}
backupEtcdDir := pathMgr.BackupEtcdDir()
runningEtcdDir := cfg.Etcd.DataDir
err := util.CopyDir(backupEtcdDir, runningEtcdDir)
if err != nil {
errs = append(errs, err)
}
// Let the user know there we're problems, but we tried to reçover
return fmt.Errorf("couldn't recover etcd database with error: %v, the location of etcd backup: %s ", errs, backupEtcdDir)
}

View File

@ -30,6 +30,7 @@ import (
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
@ -108,6 +109,11 @@ func (w *fakeWaiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]st
return map[string]string{}, w.errsToReturn[waitForHashes]
}
// WaitForStaticPodSingleHash returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodSingleHash(_ string, _ string) (string, error) {
return "", w.errsToReturn[waitForHashes]
}
// WaitForStaticPodControlPlaneHashChange returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
return w.errsToReturn[waitForHashChange]
@ -122,6 +128,7 @@ type fakeStaticPodPathManager struct {
realManifestDir string
tempManifestDir string
backupManifestDir string
backupEtcdDir string
MoveFileFunc func(string, string) error
}
@ -140,11 +147,16 @@ func NewFakeStaticPodPathManager(moveFileFunc func(string, string) error) (Stati
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
backupEtcdDir, err := ioutil.TempDir("", "kubeadm-backup-etcd")
if err != nil {
return nil, err
}
return &fakeStaticPodPathManager{
realManifestDir: realManifestsDir,
tempManifestDir: upgradedManifestsDir,
backupManifestDir: backupManifestsDir,
backupEtcdDir: backupEtcdDir,
MoveFileFunc: moveFileFunc,
}, nil
}
@ -174,6 +186,10 @@ func (spm *fakeStaticPodPathManager) BackupManifestDir() string {
return spm.backupManifestDir
}
func (spm *fakeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
waitErrsToReturn map[string]error
@ -280,7 +296,6 @@ func TestStaticPodControlPlane(t *testing.T) {
}
for _, rt := range tests {
waiter := NewFakeStaticPodWaiter(rt.waitErrsToReturn)
pathMgr, err := NewFakeStaticPodPathManager(rt.moveFileFunc)
if err != nil {
@ -299,6 +314,10 @@ func TestStaticPodControlPlane(t *testing.T) {
if err != nil {
t.Fatalf("couldn't run CreateInitStaticPodManifestFiles: %v", err)
}
err = etcdphase.CreateLocalEtcdStaticPodManifestFile(pathMgr.RealManifestDir(), oldcfg)
if err != nil {
t.Fatalf("couldn't run CreateLocalEtcdStaticPodManifestFile: %v", err)
}
// Get a hash of the v1.7 API server manifest to compare later (was the file re-written)
oldHash, err := getAPIServerHash(pathMgr.RealManifestDir())
if err != nil {
@ -310,7 +329,7 @@ func TestStaticPodControlPlane(t *testing.T) {
t.Fatalf("couldn't create config: %v", err)
}
actualErr := StaticPodControlPlane(waiter, pathMgr, newcfg)
actualErr := StaticPodControlPlane(waiter, pathMgr, newcfg, false)
if (actualErr != nil) != rt.expectedErr {
t.Errorf(
"failed UpgradeStaticPodControlPlane\n\texpected error: %t\n\tgot: %t",

View File

@ -10,8 +10,10 @@ go_library(
name = "go_default_library",
srcs = [
"arguments.go",
"copy.go",
"endpoint.go",
"error.go",
"etcd.go",
"marshal.go",
"template.go",
"version.go",
@ -20,6 +22,7 @@ go_library(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",

View File

@ -40,6 +40,8 @@ type Waiter interface {
WaitForPodsWithLabel(kvLabel string) error
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
WaitForPodToDisappear(staticPodName string) error
// WaitForStaticPodSingleHash fetches sha256 hash for the control plane static pod
WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForStaticPodControlPlaneHashChange waits for the given static pod component's static pod hash to get updated.
@ -154,17 +156,40 @@ func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
// WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
var mirrorPodHashes map[string]string
err := wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
componentHash := ""
var err error
mirrorPodHashes := map[string]string{}
for _, component := range constants.MasterComponents {
err = wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
return false, nil
}
return true, nil
})
if err != nil {
return nil, err
}
mirrorPodHashes[component] = componentHash
}
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
return mirrorPodHashes, nil
}
// WaitForStaticPodSingleHash blocks until it timeouts or gets a hash for a single component and its Static Pod
func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component string) (string, error) {
componentPodHash := ""
var err error
err = wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
return false, nil
}
mirrorPodHashes = hashes
return true, nil
})
return mirrorPodHashes, err
return componentPodHash, err
}
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
@ -190,22 +215,32 @@ func getStaticPodControlPlaneHashes(client clientset.Interface, nodeName string)
mirrorPodHashes := map[string]string{}
for _, component := range constants.MasterComponents {
staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
hash, err := getStaticPodSingleHash(client, nodeName, component)
if err != nil {
return nil, err
}
podBytes, err := json.Marshal(staticPod)
if err != nil {
return nil, err
}
mirrorPodHashes[component] = fmt.Sprintf("%x", sha256.Sum256(podBytes))
mirrorPodHashes[component] = hash
}
return mirrorPodHashes, nil
}
// getStaticSinglePodHash computes hashes for a single Static Pod resource
func getStaticPodSingleHash(client clientset.Interface, nodeName string, component string) (string, error) {
staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
if err != nil {
return "", err
}
podBytes, err := json.Marshal(staticPod)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(podBytes)), nil
}
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
func TryRunCommand(f func() error, failureThreshold int) error {
backoff := wait.Backoff{

View File

@ -0,0 +1,31 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"os/exec"
)
// CopyDir copies the content of a folder
func CopyDir(src string, dst string) error {
cmd := exec.Command("cp", "-r", src, dst)
err := cmd.Run()
if err != nil {
return err
}
return nil
}

View File

@ -116,6 +116,12 @@ func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string
}, nil
}
// WaitForStaticPodSingleHash returns an empty hash
// but the empty strings there are needed
func (w *Waiter) WaitForStaticPodSingleHash(_ string, _ string) (string, error) {
return "", nil
}
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
return nil

View File

@ -0,0 +1,43 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"github.com/coreos/etcd/clientv3"
"time"
)
// GetEtcdClusterStatus returns nil for status Up or error for status Down
func GetEtcdClusterStatus() (*clientv3.StatusResponse, error) {
ep := []string{"localhost:2379"}
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), ep[0])
if err != nil {
return nil, err
}
return resp, nil
}