Merge pull request #37831 from luxas/improve_reset

Automatic merge from submit-queue (batch tested with PRs 38194, 37594, 38123, 37831, 37084)

Improve kubeadm reset

Depends on: https://github.com/kubernetes/kubernetes/pull/36474
Broken out from: https://github.com/kubernetes/kubernetes/pull/37568
Carries: https://github.com/kubernetes/kubernetes/pull/35709, @camilocot

This makes the `kubeadm reset` command more robust and user-friendly.
I'll rebase after #36474 merges...

cc-ing reviewers: @mikedanese @errordeveloper @dgoodwin @jbeda
This commit is contained in:
Kubernetes Submit Queue 2016-12-06 17:41:35 -08:00 committed by GitHub
commit 4eb4777df1
3 changed files with 139 additions and 76 deletions

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 The Kubernetes Authors. Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -21,23 +21,25 @@ import (
"io" "io"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path"
"strings"
"github.com/spf13/cobra" "github.com/spf13/cobra"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight" "k8s.io/kubernetes/cmd/kubeadm/app/preflight"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/util/initsystem" "k8s.io/kubernetes/pkg/util/initsystem"
) )
// NewCmdReset returns "kubeadm reset" command. // NewCmdReset returns the "kubeadm reset" command
func NewCmdReset(out io.Writer) *cobra.Command { func NewCmdReset(out io.Writer) *cobra.Command {
var skipPreFlight bool var skipPreFlight, removeNode bool
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "reset", Use: "reset",
Short: "Run this to revert any changes made to this host by 'kubeadm init' or 'kubeadm join'.", Short: "Run this to revert any changes made to this host by 'kubeadm init' or 'kubeadm join'.",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
r, err := NewReset(skipPreFlight) r, err := NewReset(skipPreFlight, removeNode)
kubeadmutil.CheckErr(err) kubeadmutil.CheckErr(err)
kubeadmutil.CheckErr(r.Run(out)) kubeadmutil.CheckErr(r.Run(out))
}, },
@ -45,121 +47,181 @@ func NewCmdReset(out io.Writer) *cobra.Command {
cmd.PersistentFlags().BoolVar( cmd.PersistentFlags().BoolVar(
&skipPreFlight, "skip-preflight-checks", false, &skipPreFlight, "skip-preflight-checks", false,
"skip preflight checks normally run before modifying the system", "Skip preflight checks normally run before modifying the system",
)
cmd.PersistentFlags().BoolVar(
&removeNode, "remove-node", true,
"Remove this node from the pool of nodes in this cluster",
) )
return cmd return cmd
} }
type Reset struct{} type Reset struct {
removeNode bool
}
func NewReset(skipPreFlight bool) (*Reset, error) { func NewReset(skipPreFlight, removeNode bool) (*Reset, error) {
if !skipPreFlight { if !skipPreFlight {
fmt.Println("Running pre-flight checks") fmt.Println("[preflight] Running pre-flight checks...")
err := preflight.RunResetCheck()
if err != nil { if err := preflight.RunResetCheck(); err != nil {
return nil, &preflight.PreFlightError{Msg: err.Error()} return nil, &preflight.PreFlightError{Msg: err.Error()}
} }
} else { } else {
fmt.Println("Skipping pre-flight checks") fmt.Println("[preflight] Skipping pre-flight checks...")
} }
return &Reset{}, nil return &Reset{
} removeNode: removeNode,
}, nil
// cleanDir removes everything in a directory, but not the directory itself:
func cleanDir(path string) {
// If the directory doesn't even exist there's nothing to do, and we do
// not consider this an error:
if _, err := os.Stat(path); os.IsNotExist(err) {
return
}
d, err := os.Open(path)
if err != nil {
fmt.Printf("failed to remove directory: [%v]\n", err)
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
fmt.Printf("failed to remove directory: [%v]\n", err)
}
for _, name := range names {
err = os.RemoveAll(filepath.Join(path, name))
if err != nil {
fmt.Printf("failed to remove directory: [%v]\n", err)
}
}
}
// resetConfigDir is used to cleanup the files kubeadm writes in /etc/kubernetes/.
func resetConfigDir(configDirPath string) {
dirsToClean := []string{
filepath.Join(configDirPath, "manifests"),
filepath.Join(configDirPath, "pki"),
}
fmt.Printf("Deleting contents of config directories: %v\n", dirsToClean)
for _, dir := range dirsToClean {
cleanDir(dir)
}
filesToClean := []string{
filepath.Join(configDirPath, "admin.conf"),
filepath.Join(configDirPath, "kubelet.conf"),
}
fmt.Printf("Deleting files: %v\n", filesToClean)
for _, path := range filesToClean {
err := os.RemoveAll(path)
if err != nil {
fmt.Printf("failed to remove file: [%v]\n", err)
}
}
} }
// Run reverts any changes made to this host by "kubeadm init" or "kubeadm join". // Run reverts any changes made to this host by "kubeadm init" or "kubeadm join".
func (r *Reset) Run(out io.Writer) error { func (r *Reset) Run(out io.Writer) error {
// Drain and maybe remove the node from the cluster
err := drainAndRemoveNode(r.removeNode)
if err != nil {
fmt.Printf("[reset] Failed to cleanup node: [%v]\n", err)
}
serviceToStop := "kubelet" serviceToStop := "kubelet"
initSystem, err := initsystem.GetInitSystem() initSystem, err := initsystem.GetInitSystem()
if err != nil { if err != nil {
fmt.Printf("%v", err) fmt.Printf("[reset] Failed to detect init system and stop the kubelet service: %v\n", err)
} else { } else {
fmt.Printf("Stopping the %s service...\n", serviceToStop) fmt.Printf("[reset] Stopping the %s service...\n", serviceToStop)
if err := initSystem.ServiceStop(serviceToStop); err != nil { if err := initSystem.ServiceStop(serviceToStop); err != nil {
fmt.Printf("failed to stop the %s service", serviceToStop) fmt.Printf("[reset] Failed to stop the %s service\n", serviceToStop)
} }
} }
fmt.Printf("Unmounting directories in /var/lib/kubelet...\n") fmt.Println("[reset] Unmounting directories in /var/lib/kubelet...")
umountDirsCmd := "cat /proc/mounts | awk '{print $2}' | grep '/var/lib/kubelet' | xargs -r umount" umountDirsCmd := "cat /proc/mounts | awk '{print $2}' | grep '/var/lib/kubelet' | xargs -r umount"
umountOutputBytes, err := exec.Command("sh", "-c", umountDirsCmd).Output() umountOutputBytes, err := exec.Command("sh", "-c", umountDirsCmd).Output()
if err != nil { if err != nil {
fmt.Printf("failed to unmount directories in /var/lib/kubelet, %s", string(umountOutputBytes)) fmt.Printf("[reset] Failed to unmount directories in /var/lib/kubelet: %s\n", string(umountOutputBytes))
} }
dirsToClean := []string{"/var/lib/kubelet"} // Remove contents from the config and pki directories
resetConfigDir(kubeadmapi.GlobalEnvParams.KubernetesDir, kubeadmapi.GlobalEnvParams.HostPKIPath)
dirsToClean := []string{"/var/lib/kubelet", "/etc/cni/net.d"}
// Only clear etcd data when the etcd manifest is found. In case it is not found, we must assume that the user // Only clear etcd data when the etcd manifest is found. In case it is not found, we must assume that the user
// provided external etcd endpoints. In that case, it is his own responsibility to reset etcd // provided external etcd endpoints. In that case, it is his own responsibility to reset etcd
if _, err := os.Stat("/etc/kubernetes/manifests/etcd.json"); os.IsNotExist(err) { if _, err := os.Stat("/etc/kubernetes/manifests/etcd.json"); os.IsNotExist(err) {
dirsToClean = append(dirsToClean, "/var/lib/etcd") dirsToClean = append(dirsToClean, "/var/lib/etcd")
} else {
fmt.Printf("[reset] No etcd manifest found in %q, assuming external etcd.\n", "/etc/kubernetes/manifests/etcd.json")
} }
resetConfigDir("/etc/kubernetes/") fmt.Printf("[reset] Deleting contents of stateful directories: %v\n", dirsToClean)
fmt.Printf("Deleting contents of stateful directories: %v\n", dirsToClean)
for _, dir := range dirsToClean { for _, dir := range dirsToClean {
cleanDir(dir) cleanDir(dir)
} }
dockerCheck := preflight.ServiceCheck{Service: "docker"} dockerCheck := preflight.ServiceCheck{Service: "docker"}
if warnings, errors := dockerCheck.Check(); len(warnings) == 0 && len(errors) == 0 { if warnings, errors := dockerCheck.Check(); len(warnings) == 0 && len(errors) == 0 {
fmt.Println("Stopping all running docker containers...") fmt.Println("[reset] Stopping all running docker containers...")
if err := exec.Command("sh", "-c", "docker ps | grep 'k8s_' | awk '{print $1}' | xargs docker rm --force --volumes").Run(); err != nil { if err := exec.Command("sh", "-c", "docker ps | grep 'k8s_' | awk '{print $1}' | xargs -r docker rm --force --volumes").Run(); err != nil {
fmt.Println("failed to stop the running containers") fmt.Println("[reset] Failed to stop the running containers")
} }
} else { } else {
fmt.Println("docker doesn't seem to be running, skipping the removal of kubernetes containers") fmt.Println("[reset] docker doesn't seem to be running, skipping the removal of running kubernetes containers")
} }
return nil return nil
} }
func drainAndRemoveNode(removeNode bool) error {
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to detect node hostname")
}
hostname = strings.ToLower(hostname)
// TODO: Use the "native" k8s client for this once we're confident the versioned is working
kubeConfigPath := path.Join(kubeadmapi.GlobalEnvParams.KubernetesDir, "kubelet.conf")
getNodesCmd := fmt.Sprintf("kubectl --kubeconfig %s get nodes | grep %s", kubeConfigPath, hostname)
output, err := exec.Command("sh", "-c", getNodesCmd).Output()
if err != nil || len(output) == 0 {
// kubeadm shouldn't drain and/or remove the node when it doesn't exist anymore
return nil
}
fmt.Printf("[reset] Draining node: %q\n", hostname)
output, err = exec.Command("kubectl", "--kubeconfig", kubeConfigPath, "drain", hostname, "--delete-local-data", "--force", "--ignore-daemonsets").Output()
if err != nil {
return fmt.Errorf("failed to drain node %q [%s]", hostname, output)
}
if removeNode {
fmt.Printf("[reset] Removing node: %q\n", hostname)
output, err = exec.Command("kubectl", "--kubeconfig", kubeConfigPath, "delete", "node", hostname).Output()
if err != nil {
return fmt.Errorf("failed to remove node %q [%s]", hostname, output)
}
}
return nil
}
// cleanDir removes everything in a directory, but not the directory itself
func cleanDir(filepath string) error {
// If the directory doesn't even exist there's nothing to do, and we do
// not consider this an error
if _, err := os.Stat(filepath); os.IsNotExist(err) {
return nil
}
d, err := os.Open(filepath)
if err != nil {
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
return err
}
for _, name := range names {
err = os.RemoveAll(path.Join(filepath, name))
if err != nil {
return err
}
}
return nil
}
// resetConfigDir is used to cleanup the files kubeadm writes in /etc/kubernetes/.
func resetConfigDir(configPathDir, pkiPathDir string) {
dirsToClean := []string{
path.Join(configPathDir, "manifests"),
pkiPathDir,
}
fmt.Printf("[reset] Deleting contents of config directories: %v\n", dirsToClean)
for _, dir := range dirsToClean {
err := cleanDir(dir)
if err != nil {
fmt.Printf("[reset] Failed to remove directory: %q [%v]\n", dir, err)
}
}
filesToClean := []string{
path.Join(configPathDir, "admin.conf"),
path.Join(configPathDir, "kubelet.conf"),
}
fmt.Printf("[reset] Deleting files: %v\n", filesToClean)
for _, path := range filesToClean {
err := os.RemoveAll(path)
if err != nil {
fmt.Printf("[reset] Failed to remove file: %q [%v]\n", path, err)
}
}
}

View File

@ -162,7 +162,7 @@ func TestConfigDirCleaner(t *testing.T) {
} }
} }
resetConfigDir(tmpDir) resetConfigDir(tmpDir, filepath.Join(tmpDir, "pki"))
// Verify the files we cleanup implicitly in every test: // Verify the files we cleanup implicitly in every test:
assertExists(t, tmpDir) assertExists(t, tmpDir)

View File

@ -489,6 +489,7 @@ registry-burst
registry-qps registry-qps
reject-methods reject-methods
reject-paths reject-paths
remove-node
repair-malformed-updates repair-malformed-updates
replicaset-lookup-cache-size replicaset-lookup-cache-size
replication-controller-lookup-cache-size replication-controller-lookup-cache-size