Merge pull request #48899 from luxas/kubeadm_easy_upgrades_plan

Automatic merge from submit-queue

Implement the `kubeadm upgrade` command

**What this PR does / why we need it**:

Implements the kubeadm upgrades proposal: https://docs.google.com/document/d/1PRrC2tvB-p7sotIA5rnHy5WAOGdJJOIXPPv23hUFGrY/edit#

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

fixes: https://github.com/kubernetes/kubeadm/issues/14

**Special notes for your reviewer**:

I'm gonna split out changes not directly related to the upgrade procedure into separate PRs as dependencies as we go.
**Now ready for review. Please look at `cmd/kubeadm/app/phases/upgrade` first and give feedback on that. The rest kind of follows.**

**Release note**:

```release-note
Implemented `kubeadm upgrade plan` for checking whether you can upgrade your cluster to a newer version
Implemented `kubeadm upgrade apply` for upgrading your cluster from one version to an other
```

cc @fabriziopandini @kubernetes/sig-cluster-lifecycle-pr-reviews @craigtracey @mattmoyer
This commit is contained in:
Kubernetes Submit Queue 2017-09-03 05:48:23 -07:00 committed by GitHub
commit b3efdebeb6
28 changed files with 2800 additions and 85 deletions

View File

@ -13,14 +13,19 @@ go_library(
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1alpha1:go_default_library",
"//cmd/kubeadm/app/cmd/util:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/upgrade:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//pkg/api:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/client-go/discovery/fake:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package upgrade
import (
"fmt"
"os"
"strings"
"time"
@ -26,12 +27,20 @@ import (
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/version"
)
const (
upgradeManifestTimeout = 1 * time.Minute
)
// applyFlags holds the information about the flags that can be passed to apply
type applyFlags struct {
nonInteractiveMode bool
@ -102,7 +111,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
func RunApply(flags *applyFlags) error {
// Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap)
upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig)
upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig, flags.dryRun)
if err != nil {
return err
}
@ -126,16 +135,24 @@ func RunApply(flags *applyFlags) error {
}
}
// TODO: Implement a prepulling mechanism here
// Use a prepuller implementation based on creating DaemonSets
// and block until all DaemonSets are ready; then we know for sure that all control plane images are cached locally
prepuller := upgrade.NewDaemonSetPrepuller(upgradeVars.client, upgradeVars.waiter, internalcfg)
upgrade.PrepullImagesInParallel(prepuller, flags.imagePullTimeout)
// Now; perform the upgrade procedure
if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, internalcfg); err != nil {
if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, upgradeVars.waiter, internalcfg); err != nil {
return fmt.Errorf("[upgrade/apply] FATAL: %v", err)
}
// Upgrade RBAC rules and addons. Optionally, if needed, perform some extra task for a specific version
if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil {
return fmt.Errorf("[upgrade/postupgrade] FATAL: %v", err)
return fmt.Errorf("[upgrade/postupgrade] FATAL post-upgrade error: %v", err)
}
if flags.dryRun {
fmt.Println("[dryrun] Finished dryrunning successfully!")
return nil
}
fmt.Println("")
@ -182,7 +199,7 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett
if len(versionSkewErrs.Skippable) > 0 {
// Return the error if the user hasn't specified the --force flag
if !flags.force {
return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Mandatory)
return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Skippable)
}
// Soft errors found, but --force was specified
fmt.Printf("[upgrade/version] Found %d potential version compatibility errors but skipping since the --force flag is set: %v\n", len(versionSkewErrs.Skippable), versionSkewErrs.Skippable)
@ -192,21 +209,60 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett
}
// PerformControlPlaneUpgrade actually performs the upgrade procedure for the cluster of your type (self-hosted or static-pod-hosted)
func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, internalcfg *kubeadmapi.MasterConfiguration) error {
func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
// Check if the cluster is self-hosted and act accordingly
if upgrade.IsControlPlaneSelfHosted(client) {
fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr)
// Upgrade a self-hosted cluster
// TODO(luxas): Implement this later when we have the new upgrade strategy
return fmt.Errorf("not implemented")
// Upgrade the self-hosted cluster
return upgrade.SelfHostedControlPlane(client, waiter, internalcfg, flags.newK8sVersion)
}
// OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster
fmt.Printf("[upgrade/apply] Upgrading your Static Pod-hosted control plane to version %q...\n", flags.newK8sVersionStr)
if err := upgrade.PerformStaticPodControlPlaneUpgrade(client, internalcfg, flags.newK8sVersion); err != nil {
if flags.dryRun {
return DryRunStaticPodUpgrade(internalcfg)
}
return PerformStaticPodUpgrade(client, waiter, internalcfg)
}
// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory())
if err != nil {
return err
}
if err := upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg); err != nil {
return err
}
return nil
}
// DryRunStaticPodUpgrade fakes an upgrade of the control plane
func DryRunStaticPodUpgrade(internalcfg *kubeadmapi.MasterConfiguration) error {
dryRunManifestDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgrade-dryrun")
if err != nil {
return err
}
defer os.RemoveAll(dryRunManifestDir)
if err := controlplane.CreateInitStaticPodManifestFiles(dryRunManifestDir, internalcfg); err != nil {
return err
}
// Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests
files := []dryrunutil.FileToPrint{}
for _, component := range constants.MasterComponents {
realPath := constants.GetStaticPodFilepath(component, dryRunManifestDir)
outputPath := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory())
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
if err := dryrunutil.PrintDryRunFiles(files, os.Stdout); err != nil {
return err
}
return nil

View File

@ -26,10 +26,13 @@ import (
"github.com/ghodss/yaml"
fakediscovery "k8s.io/client-go/discovery/fake"
clientset "k8s.io/client-go/kubernetes"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
)
@ -39,11 +42,12 @@ type upgradeVariables struct {
client clientset.Interface
cfg *kubeadmapiext.MasterConfiguration
versionGetter upgrade.VersionGetter
waiter apiclient.Waiter
}
// enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure
func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upgradeVariables, error) {
client, err := kubeconfigutil.ClientSetFromFile(kubeConfigPath)
func enforceRequirements(kubeConfigPath, cfgPath string, printConfig, dryRun bool) (*upgradeVariables, error) {
client, err := getClient(kubeConfigPath, dryRun)
if err != nil {
return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err)
}
@ -69,6 +73,8 @@ func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upg
cfg: cfg,
// Use a real version getter interface that queries the API server, the kubeadm client and the Kubernetes CI system for latest versions
versionGetter: upgrade.NewKubeVersionGetter(client, os.Stdout),
// Use the waiter conditionally based on the dryrunning variable
waiter: getWaiter(dryRun, client),
}, nil
}
@ -101,6 +107,46 @@ func runPreflightChecks(skipPreFlight bool) error {
return preflight.RunRootCheckOnly()
}
// getClient gets a real or fake client depending on whether the user is dry-running or not
func getClient(file string, dryRun bool) (clientset.Interface, error) {
if dryRun {
dryRunGetter, err := apiclient.NewClientBackedDryRunGetterFromKubeconfig(file)
if err != nil {
return nil, err
}
// In order for fakeclient.Discovery().ServerVersion() to return the backing API Server's
// real version; we have to do some clever API machinery tricks. First, we get the real
// API Server's version
realServerVersion, err := dryRunGetter.Client().Discovery().ServerVersion()
if err != nil {
return nil, fmt.Errorf("failed to get server version: %v", err)
}
// Get the fake clientset
fakeclient := apiclient.NewDryRunClient(dryRunGetter, os.Stdout)
// As we know the return of Discovery() of the fake clientset is of type *fakediscovery.FakeDiscovery
// we can convert it to that struct.
fakeclientDiscovery, ok := fakeclient.Discovery().(*fakediscovery.FakeDiscovery)
if !ok {
return nil, fmt.Errorf("couldn't set fake discovery's server version")
}
// Lastly, set the right server version to be used
fakeclientDiscovery.FakedServerVersion = realServerVersion
// return the fake clientset used for dry-running
return fakeclient, nil
}
return kubeconfigutil.ClientSetFromFile(file)
}
// getWaiter gets the right waiter implementation
func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter {
if dryRun {
return dryrunutil.NewWaiter()
}
return apiclient.NewKubeWaiter(client, upgradeManifestTimeout, os.Stdout)
}
// InteractivelyConfirmUpgrade asks the user whether they _really_ want to upgrade.
func InteractivelyConfirmUpgrade(question string) error {

View File

@ -50,8 +50,8 @@ func NewCmdPlan(parentFlags *cmdUpgradeFlags) *cobra.Command {
// RunPlan takes care of outputting available versions to upgrade to for the user
func RunPlan(parentFlags *cmdUpgradeFlags) error {
// Start with the basics, verify that the cluster is healthy, build a client and a versionGetter.
upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig)
// Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never set dry-run for plan.
upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig, false)
if err != nil {
return err
}
@ -59,7 +59,7 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error {
// Compute which upgrade possibilities there are
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades)
if err != nil {
return err
return fmt.Errorf("[upgrade/versions] FATAL: %v", err)
}
// Tell the user which upgrades are available

View File

@ -18,6 +18,8 @@ package constants
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
@ -31,6 +33,7 @@ var KubernetesDir = "/etc/kubernetes"
const (
ManifestsSubDirName = "manifests"
TempDirForKubeadm = "/etc/kubernetes/tmp"
CACertAndKeyBaseName = "ca"
CACertName = "ca.crt"
@ -181,3 +184,17 @@ func GetAdminKubeConfigPath() string {
func AddSelfHostedPrefix(componentName string) string {
return fmt.Sprintf("%s%s", SelfHostingPrefix, componentName)
}
// CreateTempDirForKubeadm is a function that creates a temporary directory under /etc/kubernetes/tmp (not using /tmp as that would potentially be dangerous)
func CreateTempDirForKubeadm(dirName string) (string, error) {
// creates target folder if not already exists
if err := os.MkdirAll(TempDirForKubeadm, 0700); err != nil {
return "", fmt.Errorf("failed to create directory %q: %v", TempDirForKubeadm, err)
}
tempDir, err := ioutil.TempDir(TempDirForKubeadm, dirName)
if err != nil {
return "", fmt.Errorf("couldn't create a temporary directory: %v", err)
}
return tempDir, nil
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/api/core/v1"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
)
@ -34,8 +35,8 @@ const (
// PodSpecMutatorFunc is a function capable of mutating a PodSpec
type PodSpecMutatorFunc func(*v1.PodSpec)
// getDefaultMutators gets the mutator functions that alwasy should be used
func getDefaultMutators() map[string][]PodSpecMutatorFunc {
// GetDefaultMutators gets the mutator functions that alwasy should be used
func GetDefaultMutators() map[string][]PodSpecMutatorFunc {
return map[string][]PodSpecMutatorFunc{
kubeadmconstants.KubeAPIServer: {
addNodeSelectorToPodSpec,
@ -55,6 +56,22 @@ func getDefaultMutators() map[string][]PodSpecMutatorFunc {
}
}
// GetMutatorsFromFeatureGates returns all mutators needed based on the feature gates passed
func GetMutatorsFromFeatureGates(featureGates map[string]bool) map[string][]PodSpecMutatorFunc {
// Here the map of different mutators to use for the control plane's podspec is stored
mutators := GetDefaultMutators()
// Some extra work to be done if we should store the control plane certificates in Secrets
if features.Enabled(featureGates, features.StoreCertsInSecrets) {
// Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them
mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer)
mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager)
mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler)
}
return mutators
}
// mutatePodSpec makes a Static Pod-hosted PodSpec suitable for self-hosting
func mutatePodSpec(mutators map[string][]PodSpecMutatorFunc, name string, podSpec *v1.PodSpec) {
// Get the mutator functions for the component in question, then loop through and execute them

View File

@ -73,7 +73,7 @@ func TestMutatePodSpec(t *testing.T) {
}
for _, rt := range tests {
mutatePodSpec(getDefaultMutators(), rt.component, rt.podSpec)
mutatePodSpec(GetDefaultMutators(), rt.component, rt.podSpec)
if !reflect.DeepEqual(*rt.podSpec, rt.expected) {
t.Errorf("failed mutatePodSpec:\nexpected:\n%v\nsaw:\n%v", rt.expected, *rt.podSpec)

View File

@ -60,7 +60,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
waiter.SetTimeout(selfHostingWaitTimeout)
// Here the map of different mutators to use for the control plane's podspec is stored
mutators := getDefaultMutators()
mutators := GetMutatorsFromFeatureGates(cfg.FeatureGates)
// Some extra work to be done if we should store the control plane certificates in Secrets
if features.Enabled(cfg.FeatureGates, features.StoreCertsInSecrets) {
@ -72,10 +72,6 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
if err := uploadKubeConfigSecrets(client, kubeConfigDir); err != nil {
return err
}
// Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them
mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer)
mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager)
mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler)
}
for _, componentName := range kubeadmconstants.MasterComponents {
@ -95,7 +91,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
}
// Build a DaemonSet object from the loaded PodSpec
ds := buildDaemonSet(componentName, podSpec, mutators)
ds := BuildDaemonSet(componentName, podSpec, mutators)
// Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out
if err := apiclient.TryRunCommand(func() error {
@ -105,7 +101,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
}
// Wait for the self-hosted component to come up
if err := waiter.WaitForPodsWithLabel(buildSelfHostedWorkloadLabelQuery(componentName)); err != nil {
if err := waiter.WaitForPodsWithLabel(BuildSelfHostedComponentLabelQuery(componentName)); err != nil {
return err
}
@ -132,8 +128,8 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
return nil
}
// buildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet {
// BuildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
func BuildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet {
// Mutate the PodSpec so it's suitable for self-hosting
mutatePodSpec(mutators, name, podSpec)
@ -143,19 +139,19 @@ func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodS
ObjectMeta: metav1.ObjectMeta{
Name: kubeadmconstants.AddSelfHostedPrefix(name),
Namespace: metav1.NamespaceSystem,
Labels: map[string]string{
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(name),
},
Labels: BuildSelfhostedComponentLabels(name),
},
Spec: extensions.DaemonSetSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(name),
},
Labels: BuildSelfhostedComponentLabels(name),
},
Spec: *podSpec,
},
UpdateStrategy: extensions.DaemonSetUpdateStrategy{
// Make the DaemonSet utilize the RollingUpdate rollout strategy
Type: extensions.RollingUpdateDaemonSetStrategyType,
},
},
}
}
@ -176,7 +172,14 @@ func loadPodSpecFromFile(manifestPath string) (*v1.PodSpec, error) {
return &staticPod.Spec, nil
}
// buildSelfHostedWorkloadLabelQuery creates the right query for matching a self-hosted Pod
func buildSelfHostedWorkloadLabelQuery(componentName string) string {
// BuildSelfhostedComponentLabels returns the labels for a self-hosted component
func BuildSelfhostedComponentLabels(component string) map[string]string {
return map[string]string{
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(component),
}
}
// BuildSelfHostedComponentLabelQuery creates the right query for matching a self-hosted Pod
func BuildSelfHostedComponentLabelQuery(componentName string) string {
return fmt.Sprintf("k8s-app=%s", kubeadmconstants.AddSelfHostedPrefix(componentName))
}

View File

@ -182,7 +182,8 @@ spec:
- hostPath:
path: /etc/pki
name: ca-certs-etc-pki
updateStrategy: {}
updateStrategy:
type: RollingUpdate
status:
currentNumberScheduled: 0
desiredNumberScheduled: 0
@ -330,7 +331,8 @@ spec:
- hostPath:
path: /etc/pki
name: ca-certs-etc-pki
updateStrategy: {}
updateStrategy:
type: RollingUpdate
status:
currentNumberScheduled: 0
desiredNumberScheduled: 0
@ -430,7 +432,8 @@ spec:
path: /etc/kubernetes/scheduler.conf
type: FileOrCreate
name: kubeconfig
updateStrategy: {}
updateStrategy:
type: RollingUpdate
status:
currentNumberScheduled: 0
desiredNumberScheduled: 0
@ -471,7 +474,7 @@ func TestBuildDaemonSet(t *testing.T) {
t.Fatalf("couldn't load the specified Pod")
}
ds := buildDaemonSet(rt.component, podSpec, getDefaultMutators())
ds := BuildDaemonSet(rt.component, podSpec, GetDefaultMutators())
dsBytes, err := yaml.Marshal(ds)
if err != nil {
t.Fatalf("failed to marshal daemonset to YAML: %v", err)

View File

@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -8,6 +8,8 @@ go_library(
"health.go",
"policy.go",
"postupgrade.go",
"prepull.go",
"selfhosted.go",
"staticpods.go",
"versiongetter.go",
],
@ -15,8 +17,29 @@ go_library(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1alpha1:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/validation:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/images:go_default_library",
"//cmd/kubeadm/app/phases/addons/dns:go_default_library",
"//cmd/kubeadm/app/phases/addons/proxy:go_default_library",
"//cmd/kubeadm/app/phases/apiconfig:go_default_library",
"//cmd/kubeadm/app/phases/bootstraptoken/clusterinfo:go_default_library",
"//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library",
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/selfhosting:go_default_library",
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//pkg/api:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/version:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
@ -34,3 +57,24 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = [
"compute_test.go",
"policy_test.go",
"prepull_test.go",
"staticpods_test.go",
],
library = ":go_default_library",
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1alpha1:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//pkg/api:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

View File

@ -18,6 +18,10 @@ package upgrade
import (
"fmt"
"strings"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
"k8s.io/kubernetes/pkg/util/version"
)
// Upgrade defines an upgrade possibility to upgrade from a current version to a new one
@ -57,7 +61,196 @@ type ClusterState struct {
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
// kinds of upgrades can be performed
func GetAvailableUpgrades(_ VersionGetter, _, _ bool) ([]Upgrade, error) {
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool) ([]Upgrade, error) {
fmt.Println("[upgrade] Fetching available versions to upgrade to:")
return []Upgrade{}, nil
// Collect the upgrades kubeadm can do in this list
upgrades := []Upgrade{}
// Get the cluster version
clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion()
if err != nil {
return nil, err
}
// Get current kubeadm CLI version
kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion()
if err != nil {
return nil, err
}
// Get and output the current latest stable version
stableVersionStr, stableVersion, err := versionGetterImpl.VersionFromCILabel("stable", "stable version")
if err != nil {
return nil, err
}
// Get the kubelet versions in the cluster
kubeletVersions, err := versionGetterImpl.KubeletVersions()
if err != nil {
return nil, err
}
// Construct a descriptor for the current state of the world
beforeState := ClusterState{
KubeVersion: clusterVersionStr,
DNSVersion: dns.GetKubeDNSVersion(clusterVersion),
KubeadmVersion: kubeadmVersionStr,
KubeletVersions: kubeletVersions,
}
// Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version
// This guess will be corrected once we know if there is a patch version available
canDoMinorUpgrade := clusterVersion.LessThan(stableVersion)
// A patch version doesn't exist if the cluster version is higher than or equal to the current stable version
// in the case that a user is trying to upgrade from, let's say, v1.8.0-beta.2 to v1.8.0-rc.1 (given we support such upgrades experimentally)
// a stable-1.8 branch doesn't exist yet. Hence this check.
if patchVersionBranchExists(clusterVersion, stableVersion) {
currentBranch := getBranchFromVersion(clusterVersionStr)
versionLabel := fmt.Sprintf("stable-%s", currentBranch)
description := fmt.Sprintf("version in the v%s series", currentBranch)
// Get and output the latest patch version for the cluster branch
patchVersionStr, patchVersion, err := versionGetterImpl.VersionFromCILabel(versionLabel, description)
if err != nil {
return nil, err
}
// Check if a minor version upgrade is possible when a patch release exists
// It's only possible if the latest patch version is higher than the current patch version
// If that's the case, they must be on different branches => a newer minor version can be upgraded to
canDoMinorUpgrade = minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion)
// If the cluster version is lower than the newest patch version, we should inform about the possible upgrade
if patchUpgradePossible(clusterVersion, patchVersion) {
// The kubeadm version has to be upgraded to the latest patch version
newKubeadmVer := patchVersionStr
if kubeadmVersion.AtLeast(patchVersion) {
// In this case, the kubeadm CLI version is new enough. Don't display an update suggestion for kubeadm by making .NewKubeadmVersion equal .CurrentKubeadmVersion
newKubeadmVer = kubeadmVersionStr
}
upgrades = append(upgrades, Upgrade{
Description: description,
Before: beforeState,
After: ClusterState{
KubeVersion: patchVersionStr,
DNSVersion: dns.GetKubeDNSVersion(patchVersion),
KubeadmVersion: newKubeadmVer,
// KubeletVersions is unset here as it is not used anywhere in .After
},
})
}
}
if canDoMinorUpgrade {
upgrades = append(upgrades, Upgrade{
Description: "stable version",
Before: beforeState,
After: ClusterState{
KubeVersion: stableVersionStr,
DNSVersion: dns.GetKubeDNSVersion(stableVersion),
KubeadmVersion: stableVersionStr,
// KubeletVersions is unset here as it is not used anywhere in .After
},
})
}
if experimentalUpgradesAllowed || rcUpgradesAllowed {
// dl.k8s.io/release/latest.txt is ALWAYS an alpha.X version
// dl.k8s.io/release/latest-1.X.txt is first v1.X.0-alpha.0 -> v1.X.0-alpha.Y, then v1.X.0-beta.0 to v1.X.0-beta.Z, then v1.X.0-rc.1 to v1.X.0-rc.W.
// After the v1.X.0 release, latest-1.X.txt is always a beta.0 version. Let's say the latest stable version on the v1.7 branch is v1.7.3, then the
// latest-1.7 version is v1.7.4-beta.0
// Worth noticing is that when the release-1.X branch is cut; there are two versions tagged: v1.X.0-beta.0 AND v1.(X+1).alpha.0
// The v1.(X+1).alpha.0 is pretty much useless and should just be ignored, as more betas may be released that have more features than the initial v1.(X+1).alpha.0
// So what we do below is getting the latest overall version, always an v1.X.0-alpha.Y version. Then we get latest-1.(X-1) version. This version may be anything
// between v1.(X-1).0-beta.0 and v1.(X-1).Z-beta.0. At some point in time, latest-1.(X-1) will point to v1.(X-1).0-rc.1. Then we should show it.
// The flow looks like this (with time on the X axis):
// v1.8.0-alpha.1 -> v1.8.0-alpha.2 -> v1.8.0-alpha.3 | release-1.8 branch | v1.8.0-beta.0 -> v1.8.0-beta.1 -> v1.8.0-beta.2 -> v1.8.0-rc.1 -> v1.8.0 -> v1.8.1
// v1.9.0-alpha.0 -> v1.9.0-alpha.1 -> v1.9.0-alpha.2
// Get and output the current latest unstable version
latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version")
if err != nil {
return nil, err
}
minorUnstable := latestVersion.Components()[1]
// Get and output the current latest unstable version
previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1)
previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "")
if err != nil {
return nil, err
}
// If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade
if rcUpgradesAllowed && rcUpgradePossible(clusterVersion, previousBranchLatestVersion) {
upgrades = append(upgrades, Upgrade{
Description: "release candidate version",
Before: beforeState,
After: ClusterState{
KubeVersion: previousBranchLatestVersionStr,
DNSVersion: dns.GetKubeDNSVersion(previousBranchLatestVersion),
KubeadmVersion: previousBranchLatestVersionStr,
// KubeletVersions is unset here as it is not used anywhere in .After
},
})
}
// Show the possibility if experimental upgrades are allowed
if experimentalUpgradesAllowed && clusterVersion.LessThan(latestVersion) {
// Default to assume that the experimental version to show is the unstable one
unstableKubeVersion := latestVersionStr
unstableKubeDNSVersion := dns.GetKubeDNSVersion(latestVersion)
// Ẃe should not display alpha.0. The previous branch's beta/rc versions are more relevant due how the kube branching process works.
if latestVersion.PreRelease() == "alpha.0" {
unstableKubeVersion = previousBranchLatestVersionStr
unstableKubeDNSVersion = dns.GetKubeDNSVersion(previousBranchLatestVersion)
}
upgrades = append(upgrades, Upgrade{
Description: "experimental version",
Before: beforeState,
After: ClusterState{
KubeVersion: unstableKubeVersion,
DNSVersion: unstableKubeDNSVersion,
KubeadmVersion: unstableKubeVersion,
// KubeletVersions is unset here as it is not used anywhere in .After
},
})
}
}
// Add a newline in the end of this output to leave some space to the next output section
fmt.Println("")
return upgrades, nil
}
func getBranchFromVersion(version string) string {
return strings.TrimPrefix(version, "v")[:3]
}
func patchVersionBranchExists(clusterVersion, stableVersion *version.Version) bool {
return stableVersion.AtLeast(clusterVersion)
}
func patchUpgradePossible(clusterVersion, patchVersion *version.Version) bool {
return clusterVersion.LessThan(patchVersion)
}
func rcUpgradePossible(clusterVersion, previousBranchLatestVersion *version.Version) bool {
return strings.HasPrefix(previousBranchLatestVersion.PreRelease(), "rc") && clusterVersion.LessThan(previousBranchLatestVersion)
}
func minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion *version.Version) bool {
return patchVersion.LessThan(stableVersion)
}

View File

@ -0,0 +1,481 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"reflect"
"testing"
versionutil "k8s.io/kubernetes/pkg/util/version"
)
type fakeVersionGetter struct {
clusterVersion, kubeadmVersion, stableVersion, latestVersion, latestDevBranchVersion, stablePatchVersion, kubeletVersion string
}
var _ VersionGetter = &fakeVersionGetter{}
// ClusterVersion gets a fake API server version
func (f *fakeVersionGetter) ClusterVersion() (string, *versionutil.Version, error) {
return f.clusterVersion, versionutil.MustParseSemantic(f.clusterVersion), nil
}
// KubeadmVersion gets a fake kubeadm version
func (f *fakeVersionGetter) KubeadmVersion() (string, *versionutil.Version, error) {
return f.kubeadmVersion, versionutil.MustParseSemantic(f.kubeadmVersion), nil
}
// VersionFromCILabel gets fake latest versions from CI
func (f *fakeVersionGetter) VersionFromCILabel(ciVersionLabel, _ string) (string, *versionutil.Version, error) {
if ciVersionLabel == "stable" {
return f.stableVersion, versionutil.MustParseSemantic(f.stableVersion), nil
}
if ciVersionLabel == "latest" {
return f.latestVersion, versionutil.MustParseSemantic(f.latestVersion), nil
}
if ciVersionLabel == "latest-1.8" {
return f.latestDevBranchVersion, versionutil.MustParseSemantic(f.latestDevBranchVersion), nil
}
return f.stablePatchVersion, versionutil.MustParseSemantic(f.stablePatchVersion), nil
}
// KubeletVersions gets the versions of the kubelets in the cluster
func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
return map[string]uint16{
f.kubeletVersion: 1,
}, nil
}
func TestGetAvailableUpgrades(t *testing.T) {
tests := []struct {
vg *fakeVersionGetter
expectedUpgrades []Upgrade
allowExperimental, allowRCs bool
errExpected bool
}{
{ // no action needed, already up-to-date
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
stablePatchVersion: "v1.7.3",
stableVersion: "v1.7.3",
},
expectedUpgrades: []Upgrade{},
allowExperimental: false,
errExpected: false,
},
{ // simple patch version upgrade
vg: &fakeVersionGetter{
clusterVersion: "v1.7.1",
kubeletVersion: "v1.7.1", // the kubelet are on the same version as the control plane
kubeadmVersion: "v1.7.2",
stablePatchVersion: "v1.7.3",
stableVersion: "v1.7.3",
},
expectedUpgrades: []Upgrade{
{
Description: "version in the v1.7 series",
Before: ClusterState{
KubeVersion: "v1.7.1",
KubeletVersions: map[string]uint16{
"v1.7.1": 1,
},
KubeadmVersion: "v1.7.2",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.7.3",
KubeadmVersion: "v1.7.3",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: false,
errExpected: false,
},
{ // minor version upgrade only
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3", // the kubelet are on the same version as the control plane
kubeadmVersion: "v1.8.0",
stablePatchVersion: "v1.7.3",
stableVersion: "v1.8.0",
},
expectedUpgrades: []Upgrade{
{
Description: "stable version",
Before: ClusterState{
KubeVersion: "v1.7.3",
KubeletVersions: map[string]uint16{
"v1.7.3": 1,
},
KubeadmVersion: "v1.8.0",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0",
KubeadmVersion: "v1.8.0",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: false,
errExpected: false,
},
{ // both minor version upgrade and patch version upgrade available
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3", // the kubelet are on the same version as the control plane
kubeadmVersion: "v1.8.1",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.8.2",
},
expectedUpgrades: []Upgrade{
{
Description: "version in the v1.7 series",
Before: ClusterState{
KubeVersion: "v1.7.3",
KubeletVersions: map[string]uint16{
"v1.7.3": 1,
},
KubeadmVersion: "v1.8.1",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.7.5",
KubeadmVersion: "v1.8.1", // Note: The kubeadm version mustn't be "downgraded" here
DNSVersion: "1.14.4",
},
},
{
Description: "stable version",
Before: ClusterState{
KubeVersion: "v1.7.3",
KubeletVersions: map[string]uint16{
"v1.7.3": 1,
},
KubeadmVersion: "v1.8.1",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.2",
KubeadmVersion: "v1.8.2",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: false,
errExpected: false,
},
{ // allow experimental upgrades, but no upgrade available
vg: &fakeVersionGetter{
clusterVersion: "v1.8.0-alpha.2",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestVersion: "v1.8.0-alpha.2",
},
expectedUpgrades: []Upgrade{},
allowExperimental: true,
errExpected: false,
},
{ // upgrade to an unstable version should be supported
vg: &fakeVersionGetter{
clusterVersion: "v1.7.5",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestVersion: "v1.8.0-alpha.2",
},
expectedUpgrades: []Upgrade{
{
Description: "experimental version",
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-alpha.2",
KubeadmVersion: "v1.8.0-alpha.2",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: true,
errExpected: false,
},
{ // upgrade from an unstable version to an unstable version should be supported
vg: &fakeVersionGetter{
clusterVersion: "v1.8.0-alpha.1",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestVersion: "v1.8.0-alpha.2",
},
expectedUpgrades: []Upgrade{
{
Description: "experimental version",
Before: ClusterState{
KubeVersion: "v1.8.0-alpha.1",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-alpha.2",
KubeadmVersion: "v1.8.0-alpha.2",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: true,
errExpected: false,
},
{ // v1.X.0-alpha.0 should be ignored
vg: &fakeVersionGetter{
clusterVersion: "v1.7.5",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestDevBranchVersion: "v1.8.0-beta.1",
latestVersion: "v1.9.0-alpha.0",
},
expectedUpgrades: []Upgrade{
{
Description: "experimental version",
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-beta.1",
KubeadmVersion: "v1.8.0-beta.1",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: true,
errExpected: false,
},
{ // upgrade to an RC version should be supported
vg: &fakeVersionGetter{
clusterVersion: "v1.7.5",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestDevBranchVersion: "v1.8.0-rc.1",
latestVersion: "v1.9.0-alpha.1",
},
expectedUpgrades: []Upgrade{
{
Description: "release candidate version",
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-rc.1",
KubeadmVersion: "v1.8.0-rc.1",
DNSVersion: "1.14.4",
},
},
},
allowRCs: true,
errExpected: false,
},
{ // it is possible (but very uncommon) that the latest version from the previous branch is an rc and the current latest version is alpha.0. In that case, show the RC
vg: &fakeVersionGetter{
clusterVersion: "v1.7.5",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestDevBranchVersion: "v1.8.0-rc.1",
latestVersion: "v1.9.0-alpha.0",
},
expectedUpgrades: []Upgrade{
{
Description: "experimental version", // Note that this is considered an experimental version in this uncommon scenario
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-rc.1",
KubeadmVersion: "v1.8.0-rc.1",
DNSVersion: "1.14.4",
},
},
},
allowExperimental: true,
errExpected: false,
},
{ // upgrade to an RC version should be supported. There may also be an even newer unstable version.
vg: &fakeVersionGetter{
clusterVersion: "v1.7.5",
kubeletVersion: "v1.7.5",
kubeadmVersion: "v1.7.5",
stablePatchVersion: "v1.7.5",
stableVersion: "v1.7.5",
latestDevBranchVersion: "v1.8.0-rc.1",
latestVersion: "v1.9.0-alpha.1",
},
expectedUpgrades: []Upgrade{
{
Description: "release candidate version",
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.8.0-rc.1",
KubeadmVersion: "v1.8.0-rc.1",
DNSVersion: "1.14.4",
},
},
{
Description: "experimental version",
Before: ClusterState{
KubeVersion: "v1.7.5",
KubeletVersions: map[string]uint16{
"v1.7.5": 1,
},
KubeadmVersion: "v1.7.5",
DNSVersion: "1.14.4",
},
After: ClusterState{
KubeVersion: "v1.9.0-alpha.1",
KubeadmVersion: "v1.9.0-alpha.1",
DNSVersion: "1.14.4",
},
},
},
allowRCs: true,
allowExperimental: true,
errExpected: false,
},
}
for _, rt := range tests {
actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs)
if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) {
t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades)
}
if (actualErr != nil) != rt.errExpected {
t.Errorf("failed TestGetAvailableUpgrades\n\texpected error: %t\n\tgot error: %t", rt.errExpected, (actualErr != nil))
}
}
}
func TestKubeletUpgrade(t *testing.T) {
tests := []struct {
before map[string]uint16
after string
expected bool
}{
{ // upgrade available
before: map[string]uint16{
"v1.7.1": 1,
},
after: "v1.7.3",
expected: true,
},
{ // upgrade available
before: map[string]uint16{
"v1.7.1": 1,
"v1.7.3": 100,
},
after: "v1.7.3",
expected: true,
},
{ // upgrade not available
before: map[string]uint16{
"v1.7.3": 1,
},
after: "v1.7.3",
expected: false,
},
{ // upgrade not available
before: map[string]uint16{
"v1.7.3": 100,
},
after: "v1.7.3",
expected: false,
},
{ // upgrade not available if we don't know anything about the earlier state
before: map[string]uint16{},
after: "v1.7.3",
expected: false,
},
}
for _, rt := range tests {
upgrade := Upgrade{
Before: ClusterState{
KubeletVersions: rt.before,
},
After: ClusterState{
KubeVersion: rt.after,
},
}
actual := upgrade.CanUpgradeKubelets()
if actual != rt.expected {
t.Errorf("failed TestKubeletUpgrade\n\texpected: %t\n\tgot: %t\n\ttest object: %v", rt.expected, actual, upgrade)
}
}
}

View File

@ -19,18 +19,89 @@ package upgrade
import (
"fmt"
"io"
"io/ioutil"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
"k8s.io/kubernetes/pkg/api"
)
// FetchConfiguration fetches configuration required for upgrading your cluster from a file (which has precedence) or a ConfigMap in the cluster
func FetchConfiguration(_ clientset.Interface, _ io.Writer, _ string) (*kubeadmapiext.MasterConfiguration, error) {
func FetchConfiguration(client clientset.Interface, w io.Writer, cfgPath string) (*kubeadmapiext.MasterConfiguration, error) {
fmt.Println("[upgrade/config] Making sure the configuration is correct:")
cfg := &kubeadmapiext.MasterConfiguration{}
api.Scheme.Default(cfg)
// Load the configuration from a file or the cluster
configBytes, err := loadConfigurationBytes(client, w, cfgPath)
if err != nil {
return nil, err
}
return cfg, nil
// Take the versioned configuration populated from the configmap, default it and validate
// Return the internal version of the API object
versionedcfg, err := bytesToValidatedMasterConfig(configBytes)
if err != nil {
return nil, fmt.Errorf("could not decode configuration: %v", err)
}
return versionedcfg, nil
}
// loadConfigurationBytes loads the configuration byte slice from either a file or the cluster ConfigMap
func loadConfigurationBytes(client clientset.Interface, w io.Writer, cfgPath string) ([]byte, error) {
if cfgPath != "" {
fmt.Printf("[upgrade/config] Reading configuration options from a file: %s\n", cfgPath)
return ioutil.ReadFile(cfgPath)
}
fmt.Println("[upgrade/config] Reading configuration from the cluster...")
configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(constants.MasterConfigurationConfigMap, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
fmt.Printf("[upgrade/config] In order to upgrade, a ConfigMap called %q in the %s namespace must exist.\n", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem)
fmt.Println("[upgrade/config] Without this information, 'kubeadm upgrade' don't how to configure your upgraded cluster.")
fmt.Println("")
fmt.Println("[upgrade/config] Next steps:")
fmt.Printf("\t- OPTION 1: Run 'kubeadm config upload from-flags' and specify the same CLI arguments you passed to 'kubeadm init' when you created your master.\n")
fmt.Printf("\t- OPTION 2: Run 'kubeadm config upload from-file' and specify the same config file you passed to 'kubeadm init' when you created your master.\n")
fmt.Printf("\t- OPTION 3: Pass a config file to 'kubeadm upgrade' using the --config flag.\n")
fmt.Println("")
return []byte{}, fmt.Errorf("the ConfigMap %q in the %s namespace used for getting configuration information was not found", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem)
} else if err != nil {
return []byte{}, fmt.Errorf("an unexpected error happened when trying to get the ConfigMap %q in the %s namespace: %v", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem, err)
}
fmt.Printf("[upgrade/config] FYI: You can look at this config file with 'kubectl -n %s get cm %s -oyaml'\n", metav1.NamespaceSystem, constants.MasterConfigurationConfigMap)
return []byte(configMap.Data[constants.MasterConfigurationConfigMapKey]), nil
}
// bytesToValidatedMasterConfig converts a byte array to an external, defaulted and validated configuration object
func bytesToValidatedMasterConfig(b []byte) (*kubeadmapiext.MasterConfiguration, error) {
cfg := &kubeadmapiext.MasterConfiguration{}
finalCfg := &kubeadmapiext.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), b, cfg); err != nil {
return nil, fmt.Errorf("unable to decode config from bytes: %v", err)
}
// Default and convert to the internal version
api.Scheme.Default(cfg)
api.Scheme.Convert(cfg, internalcfg, nil)
// Applies dynamic defaults to settings not provided with flags
if err := configutil.SetInitDynamicDefaults(internalcfg); err != nil {
return nil, err
}
// Validates cfg (flags/configs + defaults + dynamic defaults)
if err := validation.ValidateMasterConfiguration(internalcfg).ToAggregate(); err != nil {
return nil, err
}
// Finally converts back to the external version
api.Scheme.Convert(internalcfg, finalCfg, nil)
return finalCfg, nil
}

View File

@ -17,20 +17,190 @@ limitations under the License.
package upgrade
import (
"fmt"
"net/http"
"os"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
)
// healthCheck is a helper struct for easily performing healthchecks against the cluster and printing the output
type healthCheck struct {
description, okMessage, failMessage string
// f is invoked with a k8s client passed to it. Should return an optional warning and/or an error
f func(clientset.Interface) error
}
// CheckClusterHealth makes sure:
// - the API /healthz endpoint is healthy
// - all Nodes are Ready
// - (if self-hosted) that there are DaemonSets with at least one Pod for all control plane components
// - (if static pod-hosted) that all required Static Pod manifests exist on disk
func CheckClusterHealth(_ clientset.Interface) error {
func CheckClusterHealth(client clientset.Interface) error {
fmt.Println("[upgrade] Making sure the cluster is healthy:")
healthChecks := []healthCheck{
{
description: "API Server health",
okMessage: "Healthy",
failMessage: "Unhealthy",
f: apiServerHealthy,
},
{
description: "Node health",
okMessage: "All Nodes are healthy",
failMessage: "More than one Node unhealthy",
f: nodesHealthy,
},
// TODO: Add a check for ComponentStatuses here?
}
// Run slightly different health checks depending on control plane hosting type
if IsControlPlaneSelfHosted(client) {
healthChecks = append(healthChecks, healthCheck{
description: "Control plane DaemonSet health",
okMessage: "All control plane DaemonSets are healthy",
failMessage: "More than one control plane DaemonSet unhealthy",
f: controlPlaneHealth,
})
} else {
healthChecks = append(healthChecks, healthCheck{
description: "Static Pod manifests exists on disk",
okMessage: "All manifests exist on disk",
failMessage: "Some manifests don't exist on disk",
f: staticPodManifestHealth,
})
}
return runHealthChecks(client, healthChecks)
}
// runHealthChecks runs a set of health checks against the cluster
func runHealthChecks(client clientset.Interface, healthChecks []healthCheck) error {
for _, check := range healthChecks {
err := check.f(client)
if err != nil {
fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.failMessage)
return fmt.Errorf("The cluster is not in an upgradeable state due to: %v", err)
}
fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.okMessage)
}
return nil
}
// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not
func IsControlPlaneSelfHosted(_ clientset.Interface) bool {
// No-op for now
return false
// apiServerHealthy checks whether the API server's /healthz endpoint is healthy
func apiServerHealthy(client clientset.Interface) error {
healthStatus := 0
// If client.Discovery().RESTClient() is nil, the fake client is used, and that means we are dry-running. Just proceed
if client.Discovery().RESTClient() == nil {
return nil
}
client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return fmt.Errorf("the API Server is unhealthy; /healthz didn't return %q", "ok")
}
return nil
}
// nodesHealthy checks whether all Nodes in the cluster are in the Running state
func nodesHealthy(client clientset.Interface) error {
nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("couldn't list all nodes in cluster: %v", err)
}
notReadyNodes := getNotReadyNodes(nodes.Items)
if len(notReadyNodes) != 0 {
return fmt.Errorf("there are NotReady Nodes in the cluster: %v", notReadyNodes)
}
return nil
}
// controlPlaneHealth ensures all control plane DaemonSets are healthy
func controlPlaneHealth(client clientset.Interface) error {
notReadyDaemonSets, err := getNotReadyDaemonSets(client)
if err != nil {
return err
}
if len(notReadyDaemonSets) != 0 {
return fmt.Errorf("there are control plane DaemonSets in the cluster that are not ready: %v", notReadyDaemonSets)
}
return nil
}
// staticPodManifestHealth makes sure the required static pods are presents
func staticPodManifestHealth(_ clientset.Interface) error {
nonExistentManifests := []string{}
for _, component := range constants.MasterComponents {
manifestFile := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory())
if _, err := os.Stat(manifestFile); os.IsNotExist(err) {
nonExistentManifests = append(nonExistentManifests, manifestFile)
}
}
if len(nonExistentManifests) == 0 {
return nil
}
return fmt.Errorf("The control plane seems to be Static Pod-hosted, but some of the manifests don't seem to exist on disk. This probably means you're running 'kubeadm upgrade' on a remote machine, which is not supported for a Static Pod-hosted cluster. Manifest files not found: %v", nonExistentManifests)
}
// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not
func IsControlPlaneSelfHosted(client clientset.Interface) bool {
notReadyDaemonSets, err := getNotReadyDaemonSets(client)
if err != nil {
return false
}
// If there are no NotReady DaemonSets, we are using self-hosting
return len(notReadyDaemonSets) == 0
}
// getNotReadyDaemonSets gets the amount of Ready control plane DaemonSets
func getNotReadyDaemonSets(client clientset.Interface) ([]error, error) {
notReadyDaemonSets := []error{}
for _, component := range constants.MasterComponents {
dsName := constants.AddSelfHostedPrefix(component)
ds, err := client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get daemonset %q in the %s namespace", dsName, metav1.NamespaceSystem)
}
if err := daemonSetHealth(&ds.Status); err != nil {
notReadyDaemonSets = append(notReadyDaemonSets, fmt.Errorf("DaemonSet %q not healthy: %v", dsName, err))
}
}
return notReadyDaemonSets, nil
}
// daemonSetHealth is a helper function for getting the health of a DaemonSet's status
func daemonSetHealth(dsStatus *extensions.DaemonSetStatus) error {
if dsStatus.CurrentNumberScheduled != dsStatus.DesiredNumberScheduled {
return fmt.Errorf("current number of scheduled Pods ('%d') doesn't match the amount of desired Pods ('%d')", dsStatus.CurrentNumberScheduled, dsStatus.DesiredNumberScheduled)
}
if dsStatus.NumberAvailable == 0 {
return fmt.Errorf("no available Pods for DaemonSet")
}
if dsStatus.NumberReady == 0 {
return fmt.Errorf("no ready Pods for DaemonSet")
}
return nil
}
// getNotReadyNodes returns a string slice of nodes in the cluster that are NotReady
func getNotReadyNodes(nodes []v1.Node) []string {
notReadyNodes := []string{}
for _, node := range nodes {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
notReadyNodes = append(notReadyNodes, node.ObjectMeta.Name)
}
}
}
return notReadyNodes
}

View File

@ -17,9 +17,21 @@ limitations under the License.
package upgrade
import (
"fmt"
"strings"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/pkg/util/version"
)
const (
// MaximumAllowedMinorVersionUpgradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go
MaximumAllowedMinorVersionUpgradeSkew = 1
// MaximumAllowedMinorVersionKubeletSkew describes how many minor versions the control plane version and the kubelet can skew in a kubeadm cluster
MaximumAllowedMinorVersionKubeletSkew = 1
)
// VersionSkewPolicyErrors describes version skew errors that might be seen during the validation process in EnforceVersionPolicies
type VersionSkewPolicyErrors struct {
Mandatory []error
@ -27,7 +39,120 @@ type VersionSkewPolicyErrors struct {
}
// EnforceVersionPolicies enforces that the proposed new version is compatible with all the different version skew policies
func EnforceVersionPolicies(_ VersionGetter, _ string, _ *version.Version, _, _ bool) *VersionSkewPolicyErrors {
// No-op now and return no skew errors
return nil
func EnforceVersionPolicies(versionGetter VersionGetter, newK8sVersionStr string, newK8sVersion *version.Version, allowExperimentalUpgrades, allowRCUpgrades bool) *VersionSkewPolicyErrors {
skewErrors := &VersionSkewPolicyErrors{
Mandatory: []error{},
Skippable: []error{},
}
clusterVersionStr, clusterVersion, err := versionGetter.ClusterVersion()
if err != nil {
// This case can't be forced: kubeadm has to be able to lookup cluster version for upgrades to work
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch cluster version: %v", err))
return skewErrors
}
kubeadmVersionStr, kubeadmVersion, err := versionGetter.KubeadmVersion()
if err != nil {
// This case can't be forced: kubeadm has to be able to lookup its version for upgrades to work
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch kubeadm version: %v", err))
return skewErrors
}
kubeletVersions, err := versionGetter.KubeletVersions()
if err != nil {
// This is a non-critical error; continue although kubeadm couldn't look this up
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Unable to fetch kubeadm version: %v", err))
}
// Make sure the new version is a supported version (higher than the minimum one supported)
if constants.MinimumControlPlaneVersion.AtLeast(newK8sVersion) {
// This must not happen, kubeadm always supports a minimum version; and we can't go below that
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the minimum supported version %q. Please specify a higher version to upgrade to", newK8sVersionStr, clusterVersionStr))
}
// Make sure new version is higher than the current Kubernetes version
if clusterVersion.AtLeast(newK8sVersion) {
// Even though we don't officially support downgrades, it "should work", and if user(s) need it and are willing to try; they can do so with --force
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the cluster version %q. Downgrades are not supported yet", newK8sVersionStr, clusterVersionStr))
} else {
// If this code path runs, it's an upgrade (this code will run most of the time)
// kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported. Enforce that here
if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew {
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew))
}
}
// If the kubeadm version is lower than what we want to upgrade to; error
if kubeadmVersion.LessThan(newK8sVersion) {
if newK8sVersion.Minor() > kubeadmVersion.Minor() {
// This is totally unsupported; kubeadm has no idea how it should handle a newer minor release than itself
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor()))
} else {
// Upgrading to a higher patch version than kubeadm is ok if the user specifies --force. Not recommended, but possible.
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is higher than the kubeadm version %q. Upgrade kubeadm first using the tool you used to install kubeadm", newK8sVersionStr, kubeadmVersionStr))
}
}
// Detect if the version is unstable and the user didn't allow that
if err = detectUnstableVersionError(newK8sVersion, newK8sVersionStr, allowExperimentalUpgrades, allowRCUpgrades); err != nil {
skewErrors.Skippable = append(skewErrors.Skippable, err)
}
// Detect if there are too old kubelets in the cluster
// Check for nil here since this is the only case where kubeletVersions can be nil; if KubeletVersions() returned an error
// However, it's okay to skip that check
if kubeletVersions != nil {
if err = detectTooOldKubelets(newK8sVersion, kubeletVersions); err != nil {
skewErrors.Skippable = append(skewErrors.Skippable, err)
}
}
// If we did not see any errors, return nil
if len(skewErrors.Skippable) == 0 && len(skewErrors.Mandatory) == 0 {
return nil
}
// Uh oh, we encountered one or more errors, return them
return skewErrors
}
// detectUnstableVersionError is a helper function for detecting if the unstable version (if specified) is allowed to be used
func detectUnstableVersionError(newK8sVersion *version.Version, newK8sVersionStr string, allowExperimentalUpgrades, allowRCUpgrades bool) error {
// Short-circuit quickly if this is not an unstable version
if len(newK8sVersion.PreRelease()) == 0 {
return nil
}
// If the user has specified that unstable versions are fine, then no error should be returned
if allowExperimentalUpgrades {
return nil
}
// If this is a release candidate and we allow such ones, everything's fine
if strings.HasPrefix(newK8sVersion.PreRelease(), "rc") && allowRCUpgrades {
return nil
}
return fmt.Errorf("Specified version to upgrade to %q is an unstable version and such upgrades weren't allowed via setting the --allow-*-upgrades flags", newK8sVersionStr)
}
// detectTooOldKubelets errors out if the kubelet versions are so old that an unsupported skew would happen if the cluster was upgraded
func detectTooOldKubelets(newK8sVersion *version.Version, kubeletVersions map[string]uint16) error {
tooOldKubeletVersions := []string{}
for versionStr := range kubeletVersions {
kubeletVersion, err := version.ParseSemantic(versionStr)
if err != nil {
return fmt.Errorf("couldn't parse kubelet version %s", versionStr)
}
if newK8sVersion.Minor() > kubeletVersion.Minor()+MaximumAllowedMinorVersionKubeletSkew {
tooOldKubeletVersions = append(tooOldKubeletVersions, versionStr)
}
}
if len(tooOldKubeletVersions) == 0 {
return nil
}
return fmt.Errorf("There are kubelets in this cluster that are too old that have these versions %v", tooOldKubeletVersions)
}

View File

@ -0,0 +1,186 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"testing"
"k8s.io/kubernetes/pkg/util/version"
)
func TestEnforceVersionPolicies(t *testing.T) {
tests := []struct {
vg *fakeVersionGetter
expectedMandatoryErrs int
expectedSkippableErrs int
allowExperimental, allowRCs bool
newK8sVersion string
}{
{ // everything ok
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.5",
},
newK8sVersion: "v1.7.5",
},
{ // everything ok
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.2",
kubeadmVersion: "v1.8.1",
},
newK8sVersion: "v1.8.0",
},
{ // downgrades not supported
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
},
newK8sVersion: "v1.7.2",
expectedSkippableErrs: 1,
},
{ // upgrades without bumping the version number not supported yet. TODO: Change this?
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
},
newK8sVersion: "v1.7.3",
expectedSkippableErrs: 1,
},
{ // new version must be higher than v1.7.0
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
},
newK8sVersion: "v1.6.10",
expectedMandatoryErrs: 1, // version must be higher than v1.7.0
expectedSkippableErrs: 1, // version shouldn't be downgraded
},
{ // upgrading two minor versions in one go is not supported
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.9.0",
},
newK8sVersion: "v1.9.0",
expectedMandatoryErrs: 1, // can't upgrade two minor versions
expectedSkippableErrs: 1, // kubelet <-> apiserver skew too large
},
{ // kubeadm version must be higher than the new kube version. However, patch version skews may be forced
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
},
newK8sVersion: "v1.7.5",
expectedSkippableErrs: 1,
},
{ // kubeadm version must be higher than the new kube version. Trying to upgrade k8s to a higher minor version than kubeadm itself should never be supported
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.7.3",
},
newK8sVersion: "v1.8.0",
expectedMandatoryErrs: 1,
},
{ // the maximum skew between the cluster version and the kubelet versions should be one minor version. This may be forced through though.
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.6.8",
kubeadmVersion: "v1.8.0",
},
newK8sVersion: "v1.8.0",
expectedSkippableErrs: 1,
},
{ // experimental upgrades supported if the flag is set
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.8.0-beta.1",
},
newK8sVersion: "v1.8.0-beta.1",
allowExperimental: true,
},
{ // release candidate upgrades supported if the flag is set
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.8.0-rc.1",
},
newK8sVersion: "v1.8.0-rc.1",
allowRCs: true,
},
{ // release candidate upgrades supported if the flag is set
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.8.0-rc.1",
},
newK8sVersion: "v1.8.0-rc.1",
allowExperimental: true,
},
{ // the user should not be able to upgrade to an experimental version if they haven't opted into that
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.8.0-beta.1",
},
newK8sVersion: "v1.8.0-beta.1",
allowRCs: true,
expectedSkippableErrs: 1,
},
{ // the user should not be able to upgrade to an release candidate version if they haven't opted into that
vg: &fakeVersionGetter{
clusterVersion: "v1.7.3",
kubeletVersion: "v1.7.3",
kubeadmVersion: "v1.8.0-rc.1",
},
newK8sVersion: "v1.8.0-rc.1",
expectedSkippableErrs: 1,
},
}
for _, rt := range tests {
newK8sVer, err := version.ParseSemantic(rt.newK8sVersion)
if err != nil {
t.Fatalf("couldn't parse version %s: %v", rt.newK8sVersion, err)
}
actualSkewErrs := EnforceVersionPolicies(rt.vg, rt.newK8sVersion, newK8sVer, rt.allowExperimental, rt.allowRCs)
if actualSkewErrs == nil {
// No errors were seen. Report unit test failure if we expected to see errors
if rt.expectedMandatoryErrs+rt.expectedSkippableErrs > 0 {
t.Errorf("failed TestEnforceVersionPolicies\n\texpected errors but got none")
}
// Otherwise, just move on with the next test
continue
}
if len(actualSkewErrs.Skippable) != rt.expectedSkippableErrs {
t.Errorf("failed TestEnforceVersionPolicies\n\texpected skippable errors: %d\n\tgot skippable errors: %d %v", rt.expectedSkippableErrs, len(actualSkewErrs.Skippable), *rt.vg)
}
if len(actualSkewErrs.Mandatory) != rt.expectedMandatoryErrs {
t.Errorf("failed TestEnforceVersionPolicies\n\texpected mandatory errors: %d\n\tgot mandatory errors: %d %v", rt.expectedMandatoryErrs, len(actualSkewErrs.Mandatory), *rt.vg)
}
}
}

View File

@ -17,14 +17,60 @@ limitations under the License.
package upgrade
import (
"k8s.io/apimachinery/pkg/util/errors"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/proxy"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/apiconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo"
nodebootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
"k8s.io/kubernetes/pkg/util/version"
)
// PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do
// Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade
func PerformPostUpgradeTasks(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error {
// No-op; don't do anything here yet
return nil
func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error {
errs := []error{}
// Upload currently used configuration to the cluster
// Note: This is done right in the beginning of cluster initialization; as we might want to make other phases
// depend on centralized information from this source in the future
if err := uploadconfig.UploadConfiguration(cfg, client); err != nil {
errs = append(errs, err)
}
// Create/update RBAC rules that makes the bootstrap tokens able to post CSRs
if err := nodebootstraptoken.AllowBootstrapTokensToPostCSRs(client); err != nil {
errs = append(errs, err)
}
// Create/update RBAC rules that makes the bootstrap tokens able to get their CSRs approved automatically
if err := nodebootstraptoken.AutoApproveNodeBootstrapTokens(client, k8sVersion); err != nil {
errs = append(errs, err)
}
// TODO: Is this needed to do here? I think that updating cluster info should probably be separate from a normal upgrade
// Create the cluster-info ConfigMap with the associated RBAC rules
// if err := clusterinfo.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil {
// return err
//}
// Create/update RBAC rules that makes the cluster-info ConfigMap reachable
if err := clusterinfo.CreateClusterInfoRBACRules(client); err != nil {
errs = append(errs, err)
}
// TODO: This call is deprecated
if err := apiconfig.CreateRBACRules(client, k8sVersion); err != nil {
errs = append(errs, err)
}
// Upgrade kube-dns and kube-proxy
if err := dns.EnsureDNSAddon(cfg, client, k8sVersion); err != nil {
errs = append(errs, err)
}
if err := proxy.EnsureProxyAddon(cfg, client); err != nil {
errs = append(errs, err)
}
return errors.NewAggregate(errs)
}

View File

@ -0,0 +1,180 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"fmt"
"time"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/images"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
const (
prepullPrefix = "upgrade-prepull-"
)
// Prepuller defines an interface for performing a prepull operation in a create-wait-delete fashion in parallel
type Prepuller interface {
CreateFunc(string) error
WaitFunc(string)
DeleteFunc(string) error
}
// DaemonSetPrepuller makes sure the control plane images are availble on all masters
type DaemonSetPrepuller struct {
client clientset.Interface
cfg *kubeadmapi.MasterConfiguration
waiter apiclient.Waiter
}
// NewDaemonSetPrepuller creates a new instance of the DaemonSetPrepuller struct
func NewDaemonSetPrepuller(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration) *DaemonSetPrepuller {
return &DaemonSetPrepuller{
client: client,
cfg: cfg,
waiter: waiter,
}
}
// CreateFunc creates a DaemonSet for making the image available on every relevant node
func (d *DaemonSetPrepuller) CreateFunc(component string) error {
image := images.GetCoreImage(component, d.cfg.ImageRepository, d.cfg.KubernetesVersion, d.cfg.UnifiedControlPlaneImage)
ds := buildPrePullDaemonSet(component, image)
// Create the DaemonSet in the API Server
if err := apiclient.CreateOrUpdateDaemonSet(d.client, ds); err != nil {
return fmt.Errorf("unable to create a DaemonSet for prepulling the component %q: %v", component, err)
}
return nil
}
// WaitFunc waits for all Pods in the specified DaemonSet to be in the Running state
func (d *DaemonSetPrepuller) WaitFunc(component string) {
fmt.Printf("[upgrade/prepull] Prepulling image for component %s.\n", component)
d.waiter.WaitForPodsWithLabel("k8s-app=upgrade-prepull-" + component)
}
// DeleteFunc deletes the DaemonSet used for making the image available on every relevant node
func (d *DaemonSetPrepuller) DeleteFunc(component string) error {
dsName := addPrepullPrefix(component)
if err := apiclient.DeleteDaemonSetForeground(d.client, metav1.NamespaceSystem, dsName); err != nil {
return fmt.Errorf("unable to cleanup the DaemonSet used for prepulling %s: %v", component, err)
}
fmt.Printf("[upgrade/prepull] Prepulled image for component %s.\n", component)
return nil
}
// PrepullImagesInParallel creates DaemonSets synchronously but waits in parallel for the images to pull
func PrepullImagesInParallel(kubePrepuller Prepuller, timeout time.Duration) error {
componentsToPrepull := constants.MasterComponents
fmt.Printf("[upgrade/prepull] Will prepull images for components %v\n", componentsToPrepull)
timeoutChan := time.After(timeout)
// Synchronously create the DaemonSets
for _, component := range componentsToPrepull {
if err := kubePrepuller.CreateFunc(component); err != nil {
return err
}
}
// Create a channel for streaming data from goroutines that run in parallell to a blocking for loop that cleans up
prePulledChan := make(chan string, len(componentsToPrepull))
for _, component := range componentsToPrepull {
go func(c string) {
// Wait as long as needed. This WaitFunc call should be blocking until completetion
kubePrepuller.WaitFunc(c)
// When the task is done, go ahead and cleanup by sending the name to the channel
prePulledChan <- c
}(component)
}
// This call blocks until all expected messages are received from the channel or errors out if timeoutChan fires.
// For every successful wait, kubePrepuller.DeleteFunc is executed
if err := waitForItemsFromChan(timeoutChan, prePulledChan, len(componentsToPrepull), kubePrepuller.DeleteFunc); err != nil {
return err
}
fmt.Println("[upgrade/prepull] Successfully prepulled the images for all the control plane components")
return nil
}
// waitForItemsFromChan waits for n elements from stringChan with a timeout. For every item received from stringChan, cleanupFunc is executed
func waitForItemsFromChan(timeoutChan <-chan time.Time, stringChan chan string, n int, cleanupFunc func(string) error) error {
i := 0
for {
select {
case <-timeoutChan:
return fmt.Errorf("The prepull operation timed out")
case result := <-stringChan:
i++
// If the cleanup function errors; error here as well
if err := cleanupFunc(result); err != nil {
return err
}
if i == n {
return nil
}
}
}
}
// addPrepullPrefix adds the prepull prefix for this functionality; can be used in names, labels, etc.
func addPrepullPrefix(component string) string {
return fmt.Sprintf("%s%s", prepullPrefix, component)
}
// buildPrePullDaemonSet builds the DaemonSet that ensures the control plane image is available
func buildPrePullDaemonSet(component, image string) *extensions.DaemonSet {
var gracePeriodSecs int64
return &extensions.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: addPrepullPrefix(component),
Namespace: metav1.NamespaceSystem,
},
Spec: extensions.DaemonSetSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"k8s-app": addPrepullPrefix(component),
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: component,
Image: image,
Command: []string{"/bin/sleep", "3600"},
},
},
NodeSelector: map[string]string{
constants.LabelNodeRoleMaster: "",
},
Tolerations: []v1.Toleration{constants.MasterToleration},
TerminationGracePeriodSeconds: &gracePeriodSecs,
},
},
},
}
}

View File

@ -0,0 +1,145 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"fmt"
"testing"
"time"
//"k8s.io/kubernetes/pkg/util/version"
)
// failedCreatePrepuller is a fake prepuller that errors for kube-controller-manager in the CreateFunc call
type failedCreatePrepuller struct{}
func NewFailedCreatePrepuller() Prepuller {
return &failedCreatePrepuller{}
}
func (p *failedCreatePrepuller) CreateFunc(component string) error {
if component == "kube-controller-manager" {
return fmt.Errorf("boo")
}
return nil
}
func (p *failedCreatePrepuller) WaitFunc(component string) {}
func (p *failedCreatePrepuller) DeleteFunc(component string) error {
return nil
}
// foreverWaitPrepuller is a fake prepuller that basically waits "forever" (10 mins, but longer than the 10sec timeout)
type foreverWaitPrepuller struct{}
func NewForeverWaitPrepuller() Prepuller {
return &foreverWaitPrepuller{}
}
func (p *foreverWaitPrepuller) CreateFunc(component string) error {
return nil
}
func (p *foreverWaitPrepuller) WaitFunc(component string) {
time.Sleep(10 * time.Minute)
}
func (p *foreverWaitPrepuller) DeleteFunc(component string) error {
return nil
}
// failedDeletePrepuller is a fake prepuller that errors for kube-scheduler in the DeleteFunc call
type failedDeletePrepuller struct{}
func NewFailedDeletePrepuller() Prepuller {
return &failedDeletePrepuller{}
}
func (p *failedDeletePrepuller) CreateFunc(component string) error {
return nil
}
func (p *failedDeletePrepuller) WaitFunc(component string) {}
func (p *failedDeletePrepuller) DeleteFunc(component string) error {
if component == "kube-scheduler" {
return fmt.Errorf("boo")
}
return nil
}
// goodPrepuller is a fake prepuller that works as expected
type goodPrepuller struct{}
func NewGoodPrepuller() Prepuller {
return &goodPrepuller{}
}
func (p *goodPrepuller) CreateFunc(component string) error {
time.Sleep(300 * time.Millisecond)
return nil
}
func (p *goodPrepuller) WaitFunc(component string) {
time.Sleep(300 * time.Millisecond)
}
func (p *goodPrepuller) DeleteFunc(component string) error {
time.Sleep(300 * time.Millisecond)
return nil
}
func TestPrepullImagesInParallel(t *testing.T) {
tests := []struct {
p Prepuller
timeout time.Duration
expectedErr bool
}{
{ // should error out; create failed
p: NewFailedCreatePrepuller(),
timeout: 10 * time.Second,
expectedErr: true,
},
{ // should error out; timeout exceeded
p: NewForeverWaitPrepuller(),
timeout: 10 * time.Second,
expectedErr: true,
},
{ // should error out; delete failed
p: NewFailedDeletePrepuller(),
timeout: 10 * time.Second,
expectedErr: true,
},
{ // should work just fine
p: NewGoodPrepuller(),
timeout: 10 * time.Second,
expectedErr: false,
},
}
for _, rt := range tests {
actualErr := PrepullImagesInParallel(rt.p, rt.timeout)
if (actualErr != nil) != rt.expectedErr {
t.Errorf(
"failed TestPrepullImagesInParallel\n\texpected error: %t\n\tgot: %t",
rt.expectedErr,
(actualErr != nil),
)
}
}
}

View File

@ -0,0 +1,272 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"fmt"
"time"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
"k8s.io/kubernetes/pkg/util/version"
)
const (
// upgradeTempDSPrefix is the prefix added to the temporary DaemonSet's name used during the upgrade
upgradeTempDSPrefix = "temp-upgrade-"
// upgradeTempLabel is the label key used for identifying the temporary component's DaemonSet
upgradeTempLabel = "temp-upgrade-component"
// selfHostingWaitTimeout describes the maximum amount of time a self-hosting wait process should wait before timing out
selfHostingWaitTimeout = 2 * time.Minute
// selfHostingFailureThreshold describes how many times kubeadm will retry creating the DaemonSets
selfHostingFailureThreshold uint8 = 10
)
// controlPlaneComponentResources holds the relevant Pod and DaemonSet associated with a control plane component
type controlPlaneComponentResources struct {
pod *v1.Pod
daemonSet *extensions.DaemonSet
}
// SelfHostedControlPlane upgrades a self-hosted control plane
// It works as follows:
// - The client gets the currently running DaemonSets and their associated Pods used for self-hosting the control plane
// - A temporary DaemonSet for the component in question is created; but nearly identical to the DaemonSet for the self-hosted component running right now
// - Why use this temporary DaemonSet? Because, the RollingUpdate strategy for upgrading DaemonSets first kills the old Pod, and then adds the new one
// - This doesn't work for self-hosted upgrades, as if you remove the only API server for instance you have in the cluster, the cluster essentially goes down
// - So instead, a nearly identical copy of the pre-upgrade DaemonSet is created and applied to the cluster. In the beginning, this duplicate DS is just idle
// - kubeadm waits for the temporary DaemonSet's Pod to become Running
// - kubeadm updates the real, self-hosted component. This will result in the pre-upgrade component Pod being removed from the cluster
// - Luckily, the temporary, backup DaemonSet now kicks in and takes over and acts as the control plane. It recognizes that a new Pod should be created,
// - as the "real" DaemonSet is being updated.
// - kubeadm waits for the pre-upgrade Pod to become deleted. It now takes advantage of the backup/temporary component
// - kubeadm waits for the new, upgraded DaemonSet to become Running.
// - Now that the new, upgraded DaemonSet is Running, we can delete the backup/temporary DaemonSet
// - Lastly, make sure the API /healthz endpoint still is reachable
//
// TL;DR; This is what the flow looks like in pseudo-code:
// for [kube-apiserver, kube-controller-manager, kube-scheduler], do:
// 1. Self-Hosted component v1 Running
// -> Duplicate the DaemonSet manifest
// 2. Self-Hosted component v1 Running (active). Backup component v1 Running (passive)
// -> Upgrade the Self-Hosted component v1 to v2.
// -> Self-Hosted component v1 is Deleted from the cluster
// 3. Backup component v1 Running becomes active and completes the upgrade by creating the Self-Hosted component v2 Pod (passive)
// -> Wait for Self-Hosted component v2 to become Running
// 4. Backup component v1 Running (active). Self-Hosted component v2 Running (passive)
// -> Backup component v1 is Deleted
// 5. Wait for Self-Hosted component v2 Running to become active
// 6. Repeat for all control plane components
func SelfHostedControlPlane(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error {
// Adjust the timeout slightly to something self-hosting specific
waiter.SetTimeout(selfHostingWaitTimeout)
// This function returns a map of DaemonSet objects ready to post to the API server
newControlPlaneDaemonSets := BuildUpgradedDaemonSetsFromConfig(cfg, k8sVersion)
controlPlaneResources, err := getCurrentControlPlaneComponentResources(client)
if err != nil {
return err
}
for _, component := range constants.MasterComponents {
// Make a shallow copy of the current DaemonSet in order to create a new, temporary one
tempDS := *controlPlaneResources[component].daemonSet
// Mutate the temp daemonset a little to be suitable for this usage (change label selectors, etc)
mutateTempDaemonSet(&tempDS, component)
// Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out
if err := apiclient.TryRunCommand(func() error {
return apiclient.CreateOrUpdateDaemonSet(client, &tempDS)
}, selfHostingFailureThreshold); err != nil {
return err
}
// Wait for the temporary/backup self-hosted component to come up
if err := waiter.WaitForPodsWithLabel(buildTempUpgradeDSLabelQuery(component)); err != nil {
return err
}
newDS := newControlPlaneDaemonSets[component]
// Upgrade the component's self-hosted resource
// During this upgrade; the temporary/backup component will take over
if err := apiclient.TryRunCommand(func() error {
if _, err := client.ExtensionsV1beta1().DaemonSets(newDS.ObjectMeta.Namespace).Update(newDS); err != nil {
return fmt.Errorf("couldn't update self-hosted component's DaemonSet: %v", err)
}
return nil
}, selfHostingFailureThreshold); err != nil {
return err
}
// Wait for the component's old Pod to disappear
oldPod := controlPlaneResources[component].pod
if err := waiter.WaitForPodToDisappear(oldPod.ObjectMeta.Name); err != nil {
return err
}
// Wait for the main, upgraded self-hosted component to come up
// Here we're talking to the temporary/backup component; the upgraded component is in the process of starting up
if err := waiter.WaitForPodsWithLabel(selfhosting.BuildSelfHostedComponentLabelQuery(component)); err != nil {
return err
}
// Delete the temporary DaemonSet, and retry selfHostingFailureThreshold times if it errors out
// In order to pivot back to the upgraded API server, we kill the temporary/backup component
if err := apiclient.TryRunCommand(func() error {
return apiclient.DeleteDaemonSetForeground(client, tempDS.ObjectMeta.Namespace, tempDS.ObjectMeta.Name)
}, selfHostingFailureThreshold); err != nil {
return err
}
// Just as an extra safety check; make sure the API server is returning ok at the /healthz endpoint
if err := waiter.WaitForAPI(); err != nil {
return err
}
fmt.Printf("[upgrade/apply] Self-hosted component %q upgraded successfully!\n", component)
}
return nil
}
// BuildUpgradedDaemonSetsFromConfig takes a config object and the current version and returns the DaemonSet objects to post to the master
func BuildUpgradedDaemonSetsFromConfig(cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) map[string]*extensions.DaemonSet {
// Here the map of different mutators to use for the control plane's podspec is stored
mutators := selfhosting.GetMutatorsFromFeatureGates(cfg.FeatureGates)
// Get the new PodSpecs to use
controlPlanePods := controlplane.GetStaticPodSpecs(cfg, k8sVersion)
// Store the created DaemonSets in this map
controlPlaneDaemonSets := map[string]*extensions.DaemonSet{}
for _, component := range constants.MasterComponents {
podSpec := controlPlanePods[component].Spec
// Build the full DaemonSet object from the PodSpec generated from the control plane phase and
// using the self-hosting mutators available from the selfhosting phase
ds := selfhosting.BuildDaemonSet(component, &podSpec, mutators)
controlPlaneDaemonSets[component] = ds
}
return controlPlaneDaemonSets
}
// addTempUpgradeDSPrefix adds the upgradeTempDSPrefix to the specified DaemonSet name
func addTempUpgradeDSPrefix(currentName string) string {
return fmt.Sprintf("%s%s", upgradeTempDSPrefix, currentName)
}
// buildTempUpgradeLabels returns the label string-string map for identifying the temporary
func buildTempUpgradeLabels(component string) map[string]string {
return map[string]string{
upgradeTempLabel: component,
}
}
// buildTempUpgradeDSLabelQuery creates the right query for matching
func buildTempUpgradeDSLabelQuery(component string) string {
return fmt.Sprintf("%s=%s", upgradeTempLabel, component)
}
// mutateTempDaemonSet mutates the specified self-hosted DaemonSet for the specified component
// in a way that makes it possible to post a nearly identical, temporary DaemonSet as a backup
func mutateTempDaemonSet(tempDS *extensions.DaemonSet, component string) {
// Prefix the name of the temporary DaemonSet with upgradeTempDSPrefix
tempDS.ObjectMeta.Name = addTempUpgradeDSPrefix(tempDS.ObjectMeta.Name)
// Set .Labels to something else than the "real" self-hosted components have
tempDS.ObjectMeta.Labels = buildTempUpgradeLabels(component)
tempDS.Spec.Selector.MatchLabels = buildTempUpgradeLabels(component)
tempDS.Spec.Template.ObjectMeta.Labels = buildTempUpgradeLabels(component)
// Clean all unnecessary ObjectMeta fields
tempDS.ObjectMeta = extractRelevantObjectMeta(tempDS.ObjectMeta)
// Reset .Status as we're posting a new object
tempDS.Status = extensions.DaemonSetStatus{}
}
// extractRelevantObjectMeta returns only the relevant parts of ObjectMeta required when creating
// a new, identical resource. We should not POST ResourceVersion, UUIDs, etc., only the name, labels,
// namespace and annotations should be preserved.
func extractRelevantObjectMeta(ob metav1.ObjectMeta) metav1.ObjectMeta {
return metav1.ObjectMeta{
Name: ob.Name,
Namespace: ob.Namespace,
Labels: ob.Labels,
Annotations: ob.Annotations,
}
}
// listPodsWithLabelSelector returns the relevant Pods for the given LabelSelector
func listPodsWithLabelSelector(client clientset.Interface, kvLabel string) (*v1.PodList, error) {
return client.CoreV1().Pods(metav1.NamespaceSystem).List(metav1.ListOptions{
LabelSelector: kvLabel,
})
}
// getCurrentControlPlaneComponentResources returns a string-(Pod|DaemonSet) map for later use
func getCurrentControlPlaneComponentResources(client clientset.Interface) (map[string]controlPlaneComponentResources, error) {
controlPlaneResources := map[string]controlPlaneComponentResources{}
for _, component := range constants.MasterComponents {
var podList *v1.PodList
var currentDS *extensions.DaemonSet
// Get the self-hosted pod associated with the component
podLabelSelector := selfhosting.BuildSelfHostedComponentLabelQuery(component)
if err := apiclient.TryRunCommand(func() error {
var tryrunerr error
podList, tryrunerr = listPodsWithLabelSelector(client, podLabelSelector)
return tryrunerr // note that tryrunerr is most likely nil here (in successful cases)
}, selfHostingFailureThreshold); err != nil {
return nil, err
}
// Make sure that there are only one Pod with this label selector; otherwise unexpected things can happen
if len(podList.Items) > 1 {
return nil, fmt.Errorf("too many pods with label selector %q found in the %s namespace", podLabelSelector, metav1.NamespaceSystem)
}
// Get the component's DaemonSet object
dsName := constants.AddSelfHostedPrefix(component)
if err := apiclient.TryRunCommand(func() error {
var tryrunerr error
// Try to get the current self-hosted component
currentDS, tryrunerr = client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{})
return tryrunerr // note that tryrunerr is most likely nil here (in successful cases)
}, selfHostingFailureThreshold); err != nil {
return nil, err
}
// Add the associated resources to the map to return later
controlPlaneResources[component] = controlPlaneComponentResources{
pod: &podList.Items[0],
daemonSet: currentDS,
}
}
return controlPlaneResources, nil
}

View File

@ -17,13 +17,176 @@ limitations under the License.
package upgrade
import (
clientset "k8s.io/client-go/kubernetes"
"fmt"
"os"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
// PerformStaticPodControlPlaneUpgrade upgrades a static pod-hosted control plane
func PerformStaticPodControlPlaneUpgrade(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error {
// No-op for now; doesn't do anything yet
// StaticPodPathManager is responsible for tracking the directories used in the static pod upgrade transition
type StaticPodPathManager interface {
// MoveFile should move a file from oldPath to newPath
MoveFile(oldPath, newPath string) error
// RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet
RealManifestPath(component string) string
// RealManifestDir should point to the static pod manifest directory used by the kubelet
RealManifestDir() string
// TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade
TempManifestPath(component string) string
// TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade
TempManifestDir() string
// BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition
BackupManifestPath(component string) string
// BackupManifestDir should point to the backup directory used for backuping manifests during the transition
BackupManifestDir() string
}
// KubeStaticPodPathManager is a real implementation of StaticPodPathManager that is used when upgrading a static pod cluster
type KubeStaticPodPathManager struct {
realManifestDir string
tempManifestDir string
backupManifestDir string
}
// NewKubeStaticPodPathManager creates a new instance of KubeStaticPodPathManager
func NewKubeStaticPodPathManager(realDir, tempDir, backupDir string) StaticPodPathManager {
return &KubeStaticPodPathManager{
realManifestDir: realDir,
tempManifestDir: tempDir,
backupManifestDir: backupDir,
}
}
// NewKubeStaticPodPathManagerUsingTempDirs creates a new instance of KubeStaticPodPathManager with temporary directories backing it
func NewKubeStaticPodPathManagerUsingTempDirs(realManifestDir string) (StaticPodPathManager, error) {
upgradedManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgraded-manifests")
if err != nil {
return nil, err
}
backupManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-backup-manifests")
if err != nil {
return nil, err
}
return NewKubeStaticPodPathManager(realManifestDir, upgradedManifestsDir, backupManifestsDir), nil
}
// MoveFile should move a file from oldPath to newPath
func (spm *KubeStaticPodPathManager) MoveFile(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
}
// RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet
func (spm *KubeStaticPodPathManager) RealManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.realManifestDir)
}
// RealManifestDir should point to the static pod manifest directory used by the kubelet
func (spm *KubeStaticPodPathManager) RealManifestDir() string {
return spm.realManifestDir
}
// TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade
func (spm *KubeStaticPodPathManager) TempManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.tempManifestDir)
}
// TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade
func (spm *KubeStaticPodPathManager) TempManifestDir() string {
return spm.tempManifestDir
}
// BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition
func (spm *KubeStaticPodPathManager) BackupManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.backupManifestDir)
}
// BackupManifestDir should point to the backup directory used for backuping manifests during the transition
func (spm *KubeStaticPodPathManager) BackupManifestDir() string {
return spm.backupManifestDir
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration) error {
// This string-string map stores the component name and backup filepath (if a rollback is needed).
// If a rollback is needed,
recoverManifests := map[string]string{}
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
if err != nil {
return err
}
// Write the updated static Pod manifests into the temporary directory
fmt.Printf("[upgrade/staticpods] Writing upgraded Static Pod manifests to %q\n", pathMgr.TempManifestDir())
err = controlplane.CreateInitStaticPodManifestFiles(pathMgr.TempManifestDir(), cfg)
for _, component := range constants.MasterComponents {
// The old manifest is here; in the /etc/kubernetes/manifests/
currentManifestPath := pathMgr.RealManifestPath(component)
// The new, upgraded manifest will be written here
newManifestPath := pathMgr.TempManifestPath(component)
// The old manifest will be moved here; into a subfolder of the temporary directory
// If a rollback is needed, these manifests will be put back to where they where initially
backupManifestPath := pathMgr.BackupManifestPath(component)
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
recoverManifests[component] = backupManifestPath
// Move the old manifest into the old-manifests directory
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
// Move the new manifest into the manifests directory
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHashMap[component]); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
// Wait for the static pod component to come up and register itself as a mirror pod
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
return rollbackOldManifests(recoverManifests, err, pathMgr)
}
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
}
// Remove the temporary directories used on a best-effort (don't fail if the calls error out)
// The calls are set here by design; we should _not_ use "defer" above as that would remove the directories
// even in the "fail and rollback" case, where we want the directories preserved for the user.
os.RemoveAll(pathMgr.TempManifestDir())
os.RemoveAll(pathMgr.BackupManifestDir())
return nil
}
// rollbackOldManifests rolls back the backuped manifests if something went wrong
func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr StaticPodPathManager) error {
errs := []error{origErr}
for component, backupPath := range oldManifests {
// Where we should put back the backed up manifest
realManifestPath := pathMgr.RealManifestPath(component)
// Move the backup manifest back into the manifests directory
err := pathMgr.MoveFile(backupPath, realManifestPath)
if err != nil {
errs = append(errs, err)
}
}
// Let the user know there we're problems, but we tried to reçover
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
}

View File

@ -0,0 +1,352 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"crypto/sha256"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
"k8s.io/kubernetes/pkg/api"
)
const (
waitForHashes = "wait-for-hashes"
waitForHashChange = "wait-for-hash-change"
waitForPodsWithLabel = "wait-for-pods-with-label"
testConfiguration = `
api:
advertiseAddress: 1.2.3.4
bindPort: 6443
apiServerCertSANs: null
apiServerExtraArgs: null
authorizationModes:
- Node
- RBAC
certificatesDir: /etc/kubernetes/pki
cloudProvider: ""
controllerManagerExtraArgs: null
etcd:
caFile: ""
certFile: ""
dataDir: /var/lib/etcd
endpoints: null
extraArgs: null
image: ""
keyFile: ""
featureFlags: null
imageRepository: gcr.io/google_containers
kubernetesVersion: %s
networking:
dnsDomain: cluster.local
podSubnet: ""
serviceSubnet: 10.96.0.0/12
nodeName: thegopher
schedulerExtraArgs: null
token: ce3aa5.5ec8455bb76b379f
tokenTTL: 86400000000000
unifiedControlPlaneImage: ""
`
)
// fakeWaiter is a fake apiclient.Waiter that returns errors it was initialized with
type fakeWaiter struct {
errsToReturn map[string]error
}
func NewFakeStaticPodWaiter(errsToReturn map[string]error) apiclient.Waiter {
return &fakeWaiter{
errsToReturn: errsToReturn,
}
}
// WaitForAPI just returns a dummy nil, to indicate that the program should just proceed
func (w *fakeWaiter) WaitForAPI() error {
return nil
}
// WaitForPodsWithLabel just returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForPodsWithLabel(kvLabel string) error {
return w.errsToReturn[waitForPodsWithLabel]
}
// WaitForPodToDisappear just returns a dummy nil, to indicate that the program should just proceed
func (w *fakeWaiter) WaitForPodToDisappear(podName string) error {
return nil
}
// SetTimeout is a no-op; we don't use it in this implementation
func (w *fakeWaiter) SetTimeout(_ time.Duration) {}
// WaitForStaticPodControlPlaneHashes returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string, error) {
return map[string]string{}, w.errsToReturn[waitForHashes]
}
// WaitForStaticPodControlPlaneHashChange returns an error if set from errsToReturn
func (w *fakeWaiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
return w.errsToReturn[waitForHashChange]
}
type fakeStaticPodPathManager struct {
realManifestDir string
tempManifestDir string
backupManifestDir string
MoveFileFunc func(string, string) error
}
func NewFakeStaticPodPathManager(moveFileFunc func(string, string) error) (StaticPodPathManager, error) {
realManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
upgradedManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
backupManifestsDir, err := ioutil.TempDir("", "kubeadm-backup-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
return &fakeStaticPodPathManager{
realManifestDir: realManifestsDir,
tempManifestDir: upgradedManifestsDir,
backupManifestDir: backupManifestsDir,
MoveFileFunc: moveFileFunc,
}, nil
}
func (spm *fakeStaticPodPathManager) MoveFile(oldPath, newPath string) error {
return spm.MoveFileFunc(oldPath, newPath)
}
func (spm *fakeStaticPodPathManager) RealManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.realManifestDir)
}
func (spm *fakeStaticPodPathManager) RealManifestDir() string {
return spm.realManifestDir
}
func (spm *fakeStaticPodPathManager) TempManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.tempManifestDir)
}
func (spm *fakeStaticPodPathManager) TempManifestDir() string {
return spm.tempManifestDir
}
func (spm *fakeStaticPodPathManager) BackupManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.backupManifestDir)
}
func (spm *fakeStaticPodPathManager) BackupManifestDir() string {
return spm.backupManifestDir
}
func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
waitErrsToReturn map[string]error
moveFileFunc func(string, string) error
expectedErr bool
manifestShouldChange bool
}{
{ // error-free case should succeed
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
},
expectedErr: false,
manifestShouldChange: true,
},
{ // any wait error should result in a rollback and an abort
waitErrsToReturn: map[string]error{
waitForHashes: fmt.Errorf("boo! failed"),
waitForHashChange: nil,
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: fmt.Errorf("boo! failed"),
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
waitForPodsWithLabel: fmt.Errorf("boo! failed"),
},
moveFileFunc: func(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
// fail for kube-apiserver move
if strings.Contains(newPath, "kube-apiserver") {
return fmt.Errorf("moving the kube-apiserver file failed")
}
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
// fail for kube-controller-manager move
if strings.Contains(newPath, "kube-controller-manager") {
return fmt.Errorf("moving the kube-apiserver file failed")
}
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort; even though this is the last component (kube-apiserver and kube-controller-manager healthy)
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
waitForPodsWithLabel: nil,
},
moveFileFunc: func(oldPath, newPath string) error {
// fail for kube-scheduler move
if strings.Contains(newPath, "kube-scheduler") {
return fmt.Errorf("moving the kube-apiserver file failed")
}
return os.Rename(oldPath, newPath)
},
expectedErr: true,
manifestShouldChange: false,
},
}
for _, rt := range tests {
waiter := NewFakeStaticPodWaiter(rt.waitErrsToReturn)
pathMgr, err := NewFakeStaticPodPathManager(rt.moveFileFunc)
if err != nil {
t.Fatalf("couldn't run NewFakeStaticPodPathManager: %v", err)
}
defer os.RemoveAll(pathMgr.RealManifestDir())
defer os.RemoveAll(pathMgr.TempManifestDir())
defer os.RemoveAll(pathMgr.BackupManifestDir())
oldcfg, err := getConfig("v1.7.0")
if err != nil {
t.Fatalf("couldn't create config: %v", err)
}
// Initialize the directory with v1.7 manifests; should then be upgraded to v1.8 using the method
err = controlplane.CreateInitStaticPodManifestFiles(pathMgr.RealManifestDir(), oldcfg)
if err != nil {
t.Fatalf("couldn't run CreateInitStaticPodManifestFiles: %v", err)
}
// Get a hash of the v1.7 API server manifest to compare later (was the file re-written)
oldHash, err := getAPIServerHash(pathMgr.RealManifestDir())
if err != nil {
t.Fatalf("couldn't read temp file: %v", err)
}
newcfg, err := getConfig("v1.8.0")
if err != nil {
t.Fatalf("couldn't create config: %v", err)
}
actualErr := StaticPodControlPlane(waiter, pathMgr, newcfg)
if (actualErr != nil) != rt.expectedErr {
t.Errorf(
"failed UpgradeStaticPodControlPlane\n\texpected error: %t\n\tgot: %t",
rt.expectedErr,
(actualErr != nil),
)
}
newHash, err := getAPIServerHash(pathMgr.RealManifestDir())
if err != nil {
t.Fatalf("couldn't read temp file: %v", err)
}
if (oldHash != newHash) != rt.manifestShouldChange {
t.Errorf(
"failed StaticPodControlPlane\n\texpected manifest change: %t\n\tgot: %t",
rt.manifestShouldChange,
(oldHash != newHash),
)
}
}
}
func getAPIServerHash(dir string) (string, error) {
manifestPath := constants.GetStaticPodFilepath(constants.KubeAPIServer, dir)
fileBytes, err := ioutil.ReadFile(manifestPath)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", sha256.Sum256(fileBytes)), nil
}
func getConfig(version string) (*kubeadmapi.MasterConfiguration, error) {
externalcfg := &kubeadmapiext.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, version)), externalcfg); err != nil {
return nil, fmt.Errorf("unable to decode config: %v", err)
}
api.Scheme.Convert(externalcfg, internalcfg, nil)
return internalcfg, nil
}

View File

@ -20,8 +20,12 @@ import (
"fmt"
"io"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
versionutil "k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/pkg/version"
)
// VersionGetter defines an interface for fetching different versions.
@ -43,11 +47,8 @@ type KubeVersionGetter struct {
w io.Writer
}
// Make sure KubeVersionGetter implements the VersionGetter interface
var _ VersionGetter = &KubeVersionGetter{}
// NewKubeVersionGetter returns a new instance of KubeVersionGetter
func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVersionGetter {
func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) VersionGetter {
return &KubeVersionGetter{
client: client,
w: writer,
@ -56,28 +57,68 @@ func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVer
// ClusterVersion gets API server version
func (g *KubeVersionGetter) ClusterVersion() (string, *versionutil.Version, error) {
fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: ")
fmt.Fprintln(g.w, "v1.7.0")
clusterVersionInfo, err := g.client.Discovery().ServerVersion()
if err != nil {
return "", nil, fmt.Errorf("Couldn't fetch cluster version from the API Server: %v", err)
}
fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: %s\n", clusterVersionInfo.String())
return "v1.7.0", versionutil.MustParseSemantic("v1.7.0"), nil
clusterVersion, err := versionutil.ParseSemantic(clusterVersionInfo.String())
if err != nil {
return "", nil, fmt.Errorf("Couldn't parse cluster version: %v", err)
}
return clusterVersionInfo.String(), clusterVersion, nil
}
// KubeadmVersion gets kubeadm version
func (g *KubeVersionGetter) KubeadmVersion() (string, *versionutil.Version, error) {
fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", "v1.8.0")
kubeadmVersionInfo := version.Get()
fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", kubeadmVersionInfo.String())
return "v1.8.0", versionutil.MustParseSemantic("v1.8.0"), nil
kubeadmVersion, err := versionutil.ParseSemantic(kubeadmVersionInfo.String())
if err != nil {
return "", nil, fmt.Errorf("Couldn't parse kubeadm version: %v", err)
}
return kubeadmVersionInfo.String(), kubeadmVersion, nil
}
// VersionFromCILabel resolves different labels like "stable" to action semver versions using the Kubernetes CI uploads to GCS
func (g *KubeVersionGetter) VersionFromCILabel(_, _ string) (string, *versionutil.Version, error) {
return "v1.8.1", versionutil.MustParseSemantic("v1.8.0"), nil
// VersionFromCILabel resolves a version label like "latest" or "stable" to an actual version using the public Kubernetes CI uploads
func (g *KubeVersionGetter) VersionFromCILabel(ciVersionLabel, description string) (string, *versionutil.Version, error) {
versionStr, err := kubeadmutil.KubernetesReleaseVersion(ciVersionLabel)
if err != nil {
return "", nil, fmt.Errorf("Couldn't fetch latest %s version from the internet: %v", description, err)
}
if description != "" {
fmt.Fprintf(g.w, "[upgrade/versions] Latest %s: %s\n", description, versionStr)
}
ver, err := versionutil.ParseSemantic(versionStr)
if err != nil {
return "", nil, fmt.Errorf("Couldn't parse latest %s version: %v", description, err)
}
return versionStr, ver, nil
}
// KubeletVersions gets the versions of the kubelets in the cluster
func (g *KubeVersionGetter) KubeletVersions() (map[string]uint16, error) {
// This tells kubeadm that there are two nodes in the cluster; both on the v1.7.1 version currently
return map[string]uint16{
"v1.7.1": 2,
}, nil
nodes, err := g.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't list all nodes in cluster")
}
return computeKubeletVersions(nodes.Items), nil
}
// computeKubeletVersions returns a string-int map that describes how many nodes are of a specific version
func computeKubeletVersions(nodes []v1.Node) map[string]uint16 {
kubeletVersions := map[string]uint16{}
for _, node := range nodes {
kver := node.Status.NodeInfo.KubeletVersion
if _, found := kubeletVersions[kver]; !found {
kubeletVersions[kver] = 1
continue
}
kubeletVersions[kver]++
}
return kubeletVersions
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
@ -33,7 +34,7 @@ import (
// ClientBackedDryRunGetter implements the DryRunGetter interface for use in NewDryRunClient() and proxies all GET and LIST requests to the backing API server reachable via rest.Config
type ClientBackedDryRunGetter struct {
baseConfig *rest.Config
client clientset.Interface
dynClientPool dynamic.ClientPool
}
@ -41,11 +42,16 @@ type ClientBackedDryRunGetter struct {
var _ DryRunGetter = &ClientBackedDryRunGetter{}
// NewClientBackedDryRunGetter creates a new ClientBackedDryRunGetter instance based on the rest.Config object
func NewClientBackedDryRunGetter(config *rest.Config) *ClientBackedDryRunGetter {
return &ClientBackedDryRunGetter{
baseConfig: config,
dynClientPool: dynamic.NewDynamicClientPool(config),
func NewClientBackedDryRunGetter(config *rest.Config) (*ClientBackedDryRunGetter, error) {
client, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
return &ClientBackedDryRunGetter{
client: client,
dynClientPool: dynamic.NewDynamicClientPool(config),
}, nil
}
// NewClientBackedDryRunGetterFromKubeconfig creates a new ClientBackedDryRunGetter instance from the given KubeConfig file
@ -58,7 +64,7 @@ func NewClientBackedDryRunGetterFromKubeconfig(file string) (*ClientBackedDryRun
if err != nil {
return nil, fmt.Errorf("failed to create API client configuration from kubeconfig: %v", err)
}
return NewClientBackedDryRunGetter(clientConfig), nil
return NewClientBackedDryRunGetter(clientConfig)
}
// HandleGetAction handles GET actions to the dryrun clientset this interface supports
@ -106,6 +112,11 @@ func (clg *ClientBackedDryRunGetter) HandleListAction(action core.ListAction) (b
return true, newObj, err
}
// Client gets the backing clientset.Interface
func (clg *ClientBackedDryRunGetter) Client() clientset.Interface {
return clg.client
}
// actionToResourceClient returns the ResourceInterface for the given action
// First; the function gets the right API group interface from the resource type. The API group struct behind the interface
// returned may be cached in the dynamic client pool. Then, an APIResource object is constructed so that it can be passed to

View File

@ -23,6 +23,7 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
rbac "k8s.io/api/rbac/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
)
@ -97,6 +98,15 @@ func CreateOrUpdateDaemonSet(client clientset.Interface, ds *extensions.DaemonSe
return nil
}
// DeleteDaemonSetForeground deletes the specified DaemonSet in foreground mode; i.e. it blocks until/makes sure all the managed Pods are deleted
func DeleteDaemonSetForeground(client clientset.Interface, namespace, name string) error {
foregroundDelete := metav1.DeletePropagationForeground
deleteOptions := &metav1.DeleteOptions{
PropagationPolicy: &foregroundDelete,
}
return client.ExtensionsV1beta1().DaemonSets(namespace).Delete(name, deleteOptions)
}
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
if _, err := client.RbacV1beta1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil {

View File

@ -17,6 +17,8 @@ limitations under the License.
package apiclient
import (
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
@ -38,6 +40,10 @@ type Waiter interface {
WaitForPodsWithLabel(kvLabel string) error
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
WaitForPodToDisappear(staticPodName string) error
// WaitForStaticPodControlPlaneHashes
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForStaticPodControlPlaneHashChange
WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
@ -110,7 +116,7 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
fmt.Printf("[apiclient] The Static Pod %q is now removed\n", podName)
fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
return true, nil
}
return false, nil
@ -122,6 +128,61 @@ func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
w.timeout = timeout
}
// WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
var mirrorPodHashes map[string]string
err := wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
if err != nil {
return false, nil
}
mirrorPodHashes = hashes
return true, nil
})
return mirrorPodHashes, err
}
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
// This implicitely means this function blocks until the kubelet has restarted the Static Pod in question
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error {
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
if err != nil {
return false, nil
}
// We should continue polling until the UID changes
if hashes[component] == previousHash {
return false, nil
}
return true, nil
})
}
// getStaticPodControlPlaneHashes computes hashes for all the control plane's Static Pod resources
func getStaticPodControlPlaneHashes(client clientset.Interface, nodeName string) (map[string]string, error) {
mirrorPodHashes := map[string]string{}
for _, component := range constants.MasterComponents {
staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
podBytes, err := json.Marshal(staticPod)
if err != nil {
return nil, err
}
mirrorPodHashes[component] = fmt.Sprintf("%x", sha256.Sum256(podBytes))
}
return mirrorPodHashes, nil
}
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
func TryRunCommand(f func() error, failureThreshold uint8) error {
var numFailures uint8

View File

@ -5,6 +5,7 @@ go_library(
srcs = ["dryrun.go"],
visibility = ["//visibility:public"],
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
@ -98,3 +99,18 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error {
// SetTimeout is a no-op; we don't wait in this implementation
func (w *Waiter) SetTimeout(_ time.Duration) {}
// WaitForStaticPodControlPlaneHashes returns an empty hash for all control plane images; WaitForStaticPodControlPlaneHashChange won't block in any case
// but the empty strings there are needed
func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string, error) {
return map[string]string{
constants.KubeAPIServer: "",
constants.KubeControllerManager: "",
constants.KubeScheduler: "",
}, nil
}
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
return nil
}