mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Refactoring staticpod and waiter functions
This commit is contained in:
parent
bfe581d9e5
commit
39830f3642
@ -46,6 +46,7 @@ type applyFlags struct {
|
|||||||
nonInteractiveMode bool
|
nonInteractiveMode bool
|
||||||
force bool
|
force bool
|
||||||
dryRun bool
|
dryRun bool
|
||||||
|
etcdUpgrade bool
|
||||||
newK8sVersionStr string
|
newK8sVersionStr string
|
||||||
newK8sVersion *version.Version
|
newK8sVersion *version.Version
|
||||||
imagePullTimeout time.Duration
|
imagePullTimeout time.Duration
|
||||||
@ -62,6 +63,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
|
|||||||
flags := &applyFlags{
|
flags := &applyFlags{
|
||||||
parent: parentFlags,
|
parent: parentFlags,
|
||||||
imagePullTimeout: 15 * time.Minute,
|
imagePullTimeout: 15 * time.Minute,
|
||||||
|
etcdUpgrade: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
@ -91,6 +93,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
|
|||||||
cmd.Flags().BoolVarP(&flags.nonInteractiveMode, "yes", "y", flags.nonInteractiveMode, "Perform the upgrade and do not prompt for confirmation (non-interactive mode).")
|
cmd.Flags().BoolVarP(&flags.nonInteractiveMode, "yes", "y", flags.nonInteractiveMode, "Perform the upgrade and do not prompt for confirmation (non-interactive mode).")
|
||||||
cmd.Flags().BoolVarP(&flags.force, "force", "f", flags.force, "Force upgrading although some requirements might not be met. This also implies non-interactive mode.")
|
cmd.Flags().BoolVarP(&flags.force, "force", "f", flags.force, "Force upgrading although some requirements might not be met. This also implies non-interactive mode.")
|
||||||
cmd.Flags().BoolVar(&flags.dryRun, "dry-run", flags.dryRun, "Do not change any state, just output what actions would be performed.")
|
cmd.Flags().BoolVar(&flags.dryRun, "dry-run", flags.dryRun, "Do not change any state, just output what actions would be performed.")
|
||||||
|
cmd.Flags().BoolVar(&flags.etcdUpgrade, "etcd-upgrade", flags.etcdUpgrade, "Perform the upgrade of ETCD.")
|
||||||
cmd.Flags().DurationVar(&flags.imagePullTimeout, "image-pull-timeout", flags.imagePullTimeout, "The maximum amount of time to wait for the control plane pods to be downloaded.")
|
cmd.Flags().DurationVar(&flags.imagePullTimeout, "image-pull-timeout", flags.imagePullTimeout, "The maximum amount of time to wait for the control plane pods to be downloaded.")
|
||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
@ -222,7 +225,7 @@ func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, w
|
|||||||
fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
||||||
|
|
||||||
// Upgrade the self-hosted cluster
|
// Upgrade the self-hosted cluster
|
||||||
return upgrade.SelfHostedControlPlane(client, waiter, internalcfg, flags.newK8sVersion)
|
return nil // upgrade.SelfHostedControlPlane(client, waiter, internalcfg, flags.newK8sVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster
|
// OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster
|
||||||
@ -231,17 +234,18 @@ func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, w
|
|||||||
if flags.dryRun {
|
if flags.dryRun {
|
||||||
return DryRunStaticPodUpgrade(internalcfg)
|
return DryRunStaticPodUpgrade(internalcfg)
|
||||||
}
|
}
|
||||||
return PerformStaticPodUpgrade(client, waiter, internalcfg)
|
|
||||||
|
return PerformStaticPodUpgrade(client, waiter, internalcfg, flags.etcdUpgrade)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster
|
// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster
|
||||||
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
|
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
|
||||||
pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory())
|
pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg)
|
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DryRunStaticPodUpgrade fakes an upgrade of the control plane
|
// DryRunStaticPodUpgrade fakes an upgrade of the control plane
|
||||||
|
@ -201,7 +201,7 @@ var (
|
|||||||
DefaultTokenUsages = []string{"signing", "authentication"}
|
DefaultTokenUsages = []string{"signing", "authentication"}
|
||||||
|
|
||||||
// MasterComponents defines the master component names
|
// MasterComponents defines the master component names
|
||||||
MasterComponents = []string{KubeAPIServer, KubeControllerManager, KubeScheduler}
|
MasterComponents = []string{KubeAPIServer, KubeControllerManager, KubeScheduler, Etcd}
|
||||||
|
|
||||||
// MinimumControlPlaneVersion specifies the minimum control plane version kubeadm can deploy
|
// MinimumControlPlaneVersion specifies the minimum control plane version kubeadm can deploy
|
||||||
MinimumControlPlaneVersion = version.MustParseSemantic("v1.8.0")
|
MinimumControlPlaneVersion = version.MustParseSemantic("v1.8.0")
|
||||||
|
@ -109,12 +109,54 @@ func (spm *KubeStaticPodPathManager) BackupManifestDir() string {
|
|||||||
return spm.backupManifestDir
|
return spm.backupManifestDir
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func performStaticPodUpgrade(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string) error {
|
||||||
|
// The old manifest is here; in the /etc/kubernetes/manifests/
|
||||||
|
currentManifestPath := pathMgr.RealManifestPath(component)
|
||||||
|
// The new, upgraded manifest will be written here
|
||||||
|
newManifestPath := pathMgr.TempManifestPath(component)
|
||||||
|
// The old manifest will be moved here; into a subfolder of the temporary directory
|
||||||
|
// If a rollback is needed, these manifests will be put back to where they where initially
|
||||||
|
backupManifestPath := pathMgr.BackupManifestPath(component)
|
||||||
|
|
||||||
|
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
|
||||||
|
recoverManifest := backupManifestPath
|
||||||
|
|
||||||
|
// Move the old manifest into the old-manifests directory
|
||||||
|
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
|
||||||
|
return rollbackOldManifest(component, recoverManifest, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move the new manifest into the manifests directory
|
||||||
|
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
|
||||||
|
return rollbackOldManifest(component, recoverManifest, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
|
||||||
|
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
|
||||||
|
|
||||||
|
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
|
||||||
|
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
|
||||||
|
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
|
||||||
|
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
|
||||||
|
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHash); err != nil {
|
||||||
|
return rollbackOldManifest(component, recoverManifest, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the static pod component to come up and register itself as a mirror pod
|
||||||
|
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
|
||||||
|
return rollbackOldManifest(component, recoverManifest, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// StaticPodControlPlane upgrades a static pod-hosted control plane
|
// StaticPodControlPlane upgrades a static pod-hosted control plane
|
||||||
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration) error {
|
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
|
||||||
|
|
||||||
// This string-string map stores the component name and backup filepath (if a rollback is needed).
|
// This string-string map stores the component name and backup filepath (if a rollback is needed).
|
||||||
// If a rollback is needed,
|
// If a rollback is needed,
|
||||||
recoverManifests := map[string]string{}
|
// recoverManifests := map[string]string{}
|
||||||
|
|
||||||
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
|
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -128,44 +170,9 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
|
|||||||
return fmt.Errorf("error creating init static pod manifest files: %v", err)
|
return fmt.Errorf("error creating init static pod manifest files: %v", err)
|
||||||
}
|
}
|
||||||
for _, component := range constants.MasterComponents {
|
for _, component := range constants.MasterComponents {
|
||||||
// The old manifest is here; in the /etc/kubernetes/manifests/
|
if err = performStaticPodUpgrade(component, waiter, pathMgr, cfg, beforePodHashMap[component]); err != nil {
|
||||||
currentManifestPath := pathMgr.RealManifestPath(component)
|
return err
|
||||||
// The new, upgraded manifest will be written here
|
|
||||||
newManifestPath := pathMgr.TempManifestPath(component)
|
|
||||||
// The old manifest will be moved here; into a subfolder of the temporary directory
|
|
||||||
// If a rollback is needed, these manifests will be put back to where they where initially
|
|
||||||
backupManifestPath := pathMgr.BackupManifestPath(component)
|
|
||||||
|
|
||||||
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
|
|
||||||
recoverManifests[component] = backupManifestPath
|
|
||||||
|
|
||||||
// Move the old manifest into the old-manifests directory
|
|
||||||
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
|
|
||||||
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move the new manifest into the manifests directory
|
|
||||||
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
|
|
||||||
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
|
|
||||||
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
|
|
||||||
|
|
||||||
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
|
|
||||||
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
|
|
||||||
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
|
|
||||||
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
|
|
||||||
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHashMap[component]); err != nil {
|
|
||||||
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the static pod component to come up and register itself as a mirror pod
|
|
||||||
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
|
|
||||||
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
|
|
||||||
}
|
}
|
||||||
// Remove the temporary directories used on a best-effort (don't fail if the calls error out)
|
// Remove the temporary directories used on a best-effort (don't fail if the calls error out)
|
||||||
// The calls are set here by design; we should _not_ use "defer" above as that would remove the directories
|
// The calls are set here by design; we should _not_ use "defer" above as that would remove the directories
|
||||||
@ -192,3 +199,18 @@ func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr
|
|||||||
// Let the user know there we're problems, but we tried to reçover
|
// Let the user know there we're problems, but we tried to reçover
|
||||||
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
|
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rollbackOldManifest rolls back the backuped component manifest if something went wrong
|
||||||
|
func rollbackOldManifest(component string, oldManifest string, origErr error, pathMgr StaticPodPathManager) error {
|
||||||
|
errs := []error{origErr}
|
||||||
|
// Where we should put back the backed up manifest
|
||||||
|
realManifestPath := pathMgr.RealManifestPath(component)
|
||||||
|
|
||||||
|
// Move the backup manifest back into the manifests directory
|
||||||
|
err := pathMgr.MoveFile(oldManifest, realManifestPath)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
// Let the user know there we're problems, but we tried to reçover
|
||||||
|
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
|
||||||
|
}
|
||||||
|
@ -40,6 +40,8 @@ type Waiter interface {
|
|||||||
WaitForPodsWithLabel(kvLabel string) error
|
WaitForPodsWithLabel(kvLabel string) error
|
||||||
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
|
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
|
||||||
WaitForPodToDisappear(staticPodName string) error
|
WaitForPodToDisappear(staticPodName string) error
|
||||||
|
// WaitForStaticPodSingleHash fetches sha256 hash for the control plane static pod
|
||||||
|
WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
|
||||||
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
|
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
|
||||||
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
|
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
|
||||||
// WaitForStaticPodControlPlaneHashChange waits for the given static pod component's static pod hash to get updated.
|
// WaitForStaticPodControlPlaneHashChange waits for the given static pod component's static pod hash to get updated.
|
||||||
@ -167,6 +169,22 @@ func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[st
|
|||||||
return mirrorPodHashes, err
|
return mirrorPodHashes, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodSingleHash blocks until it timeouts or gets a hash for a single component and its Static Pod
|
||||||
|
func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component string) (string, error) {
|
||||||
|
|
||||||
|
mirrorPodHash := ""
|
||||||
|
err := wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
|
||||||
|
|
||||||
|
hash, err := getStaticPodSingleHash(w.client, nodeName, component)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
mirrorPodHash = hash
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return mirrorPodHash, err
|
||||||
|
}
|
||||||
|
|
||||||
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
|
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
|
||||||
// This implicitely means this function blocks until the kubelet has restarted the Static Pod in question
|
// This implicitely means this function blocks until the kubelet has restarted the Static Pod in question
|
||||||
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error {
|
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error {
|
||||||
@ -206,6 +224,25 @@ func getStaticPodControlPlaneHashes(client clientset.Interface, nodeName string)
|
|||||||
return mirrorPodHashes, nil
|
return mirrorPodHashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getStaticSinglePodHash computes hashes for a single Static Pod resource
|
||||||
|
func getStaticPodSingleHash(client clientset.Interface, nodeName string, component string) (string, error) {
|
||||||
|
|
||||||
|
mirrorPodHash := ""
|
||||||
|
staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
|
||||||
|
staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return mirrorPodHash, err
|
||||||
|
}
|
||||||
|
|
||||||
|
podBytes, err := json.Marshal(staticPod)
|
||||||
|
if err != nil {
|
||||||
|
return mirrorPodHash, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mirrorPodHash = fmt.Sprintf("%x", sha256.Sum256(podBytes))
|
||||||
|
return mirrorPodHash, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
|
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
|
||||||
func TryRunCommand(f func() error, failureThreshold int) error {
|
func TryRunCommand(f func() error, failureThreshold int) error {
|
||||||
backoff := wait.Backoff{
|
backoff := wait.Backoff{
|
||||||
|
@ -116,6 +116,12 @@ func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodSingleHash returns an empty hash
|
||||||
|
// but the empty strings there are needed
|
||||||
|
func (w *Waiter) WaitForStaticPodSingleHash(_ string, _ string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
|
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
|
||||||
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
|
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user