kubeadm: graduate reset phases

This commit is contained in:
Yago Nobre 2019-05-18 19:25:31 -03:00
parent f780ac028b
commit 960083130b
No known key found for this signature in database
GPG Key ID: 75E01A69D4DEE54C
11 changed files with 681 additions and 364 deletions

View File

@ -29,6 +29,7 @@ go_library(
"//cmd/kubeadm/app/cmd/phases:go_default_library",
"//cmd/kubeadm/app/cmd/phases/init:go_default_library",
"//cmd/kubeadm/app/cmd/phases/join:go_default_library",
"//cmd/kubeadm/app/cmd/phases/reset:go_default_library",
"//cmd/kubeadm/app/cmd/phases/workflow:go_default_library",
"//cmd/kubeadm/app/cmd/upgrade:go_default_library",
"//cmd/kubeadm/app/cmd/util:go_default_library",
@ -39,17 +40,13 @@ go_library(
"//cmd/kubeadm/app/images:go_default_library",
"//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library",
"//cmd/kubeadm/app/phases/certs:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/phases/kubeconfig:go_default_library",
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//cmd/kubeadm/app/util/runtime:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//pkg/util/initsystem:go_default_library",
"//pkg/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
@ -78,23 +75,19 @@ go_test(
"config_test.go",
"init_test.go",
"join_test.go",
"reset_test.go",
"token_test.go",
"version_test.go",
],
embed = [":go_default_library"],
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1beta2:go_default_library",
"//cmd/kubeadm/app/cmd/options:go_default_library",
"//cmd/kubeadm/app/componentconfigs:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/features:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/runtime:go_default_library",
"//cmd/kubeadm/test:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -34,6 +34,7 @@ filegroup(
":package-srcs",
"//cmd/kubeadm/app/cmd/phases/init:all-srcs",
"//cmd/kubeadm/app/cmd/phases/join:all-srcs",
"//cmd/kubeadm/app/cmd/phases/reset:all-srcs",
"//cmd/kubeadm/app/cmd/phases/workflow:all-srcs",
],
tags = ["automanaged"],

View File

@ -0,0 +1,63 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cleanupnode.go",
"data.go",
"preflight.go",
"removeetcdmember.go",
"updateclusterstatus.go",
],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/reset",
visibility = ["//visibility:public"],
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1beta2:go_default_library",
"//cmd/kubeadm/app/cmd/options:go_default_library",
"//cmd/kubeadm/app/cmd/phases/workflow:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/app/util/runtime:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//pkg/util/initsystem:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = [
"cleanupnode_test.go",
"removeetcdmember_test.go",
],
embed = [":go_default_library"],
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//cmd/kubeadm/test:go_default_library",
"//vendor/github.com/lithammer/dedent:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],
)

View File

@ -0,0 +1,177 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"k8s.io/klog"
kubeadmapiv1beta2 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
utilruntime "k8s.io/kubernetes/cmd/kubeadm/app/util/runtime"
"k8s.io/kubernetes/pkg/util/initsystem"
utilsexec "k8s.io/utils/exec"
)
// NewCleanupNodePhase creates a kubeadm workflow phase that cleanup the node
func NewCleanupNodePhase() workflow.Phase {
return workflow.Phase{
Name: "cleanup-node",
Aliases: []string{"cleanupnode"},
Short: "Run cleanup node.",
Run: runCleanupNode,
InheritFlags: []string{
options.CertificatesDir,
options.NodeCRISocket,
},
}
}
func runCleanupNode(c workflow.RunData) error {
r, ok := c.(resetData)
if !ok {
return errors.New("cleanup-node phase invoked with an invalid data struct")
}
certsDir := r.CertificatesDir()
// Try to stop the kubelet service
klog.V(1).Infoln("[reset] Getting init system")
initSystem, err := initsystem.GetInitSystem()
if err != nil {
klog.Warningln("[reset] The kubelet service could not be stopped by kubeadm. Unable to detect a supported init system!")
klog.Warningln("[reset] Please ensure kubelet is stopped manually")
} else {
fmt.Println("[reset] Stopping the kubelet service")
if err := initSystem.ServiceStop("kubelet"); err != nil {
klog.Warningf("[reset] The kubelet service could not be stopped by kubeadm: [%v]\n", err)
klog.Warningln("[reset] Please ensure kubelet is stopped manually")
}
}
// Try to unmount mounted directories under kubeadmconstants.KubeletRunDirectory in order to be able to remove the kubeadmconstants.KubeletRunDirectory directory later
fmt.Printf("[reset] Unmounting mounted directories in %q\n", kubeadmconstants.KubeletRunDirectory)
// In case KubeletRunDirectory holds a symbolic link, evaluate it
kubeletRunDir, err := absoluteKubeletRunDirectory()
if err == nil {
// Only clean absoluteKubeletRunDirectory if umountDirsCmd passed without error
r.AddDirsToClean(kubeletRunDir)
}
klog.V(1).Info("[reset] Removing Kubernetes-managed containers")
if err := removeContainers(utilsexec.New(), r.CRISocketPath()); err != nil {
klog.Errorf("[reset] Failed to remove containers: %v", err)
}
r.AddDirsToClean("/etc/cni/net.d", "/var/lib/dockershim", "/var/run/kubernetes")
// Remove contents from the config and pki directories
klog.V(1).Infoln("[reset] Removing contents from the config and pki directories")
if certsDir != kubeadmapiv1beta2.DefaultCertificatesDir {
klog.Warningf("[reset] WARNING: Cleaning a non-default certificates directory: %q\n", certsDir)
}
resetConfigDir(kubeadmconstants.KubernetesDir, certsDir)
return nil
}
func absoluteKubeletRunDirectory() (string, error) {
absoluteKubeletRunDirectory, err := filepath.EvalSymlinks(kubeadmconstants.KubeletRunDirectory)
if err != nil {
klog.Errorf("[reset] Failed to evaluate the %q directory. Skipping its unmount and cleanup: %v", kubeadmconstants.KubeletRunDirectory, err)
return "", err
}
// Only unmount mount points which start with "/var/lib/kubelet" or absolute path of symbolic link, and avoid using empty absoluteKubeletRunDirectory
umountDirsCmd := fmt.Sprintf("awk '$2 ~ path {print $2}' path=%s/ /proc/mounts | xargs -r umount", absoluteKubeletRunDirectory)
klog.V(1).Infof("[reset] Executing command %q", umountDirsCmd)
umountOutputBytes, err := exec.Command("sh", "-c", umountDirsCmd).Output()
if err != nil {
klog.Errorf("[reset] Failed to unmount mounted directories in %s: %s\n", kubeadmconstants.KubeletRunDirectory, string(umountOutputBytes))
}
return absoluteKubeletRunDirectory, nil
}
func removeContainers(execer utilsexec.Interface, criSocketPath string) error {
containerRuntime, err := utilruntime.NewContainerRuntime(execer, criSocketPath)
if err != nil {
return err
}
containers, err := containerRuntime.ListKubeContainers()
if err != nil {
return err
}
return containerRuntime.RemoveContainers(containers)
}
// resetConfigDir is used to cleanup the files kubeadm writes in /etc/kubernetes/.
func resetConfigDir(configPathDir, pkiPathDir string) {
dirsToClean := []string{
filepath.Join(configPathDir, kubeadmconstants.ManifestsSubDirName),
pkiPathDir,
}
fmt.Printf("[reset] Deleting contents of config directories: %v\n", dirsToClean)
for _, dir := range dirsToClean {
if err := CleanDir(dir); err != nil {
klog.Errorf("[reset] Failed to remove directory: %q [%v]\n", dir, err)
}
}
filesToClean := []string{
filepath.Join(configPathDir, kubeadmconstants.AdminKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.KubeletKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.KubeletBootstrapKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.ControllerManagerKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.SchedulerKubeConfigFileName),
}
fmt.Printf("[reset] Deleting files: %v\n", filesToClean)
for _, path := range filesToClean {
if err := os.RemoveAll(path); err != nil {
klog.Errorf("[reset] Failed to remove file: %q [%v]\n", path, err)
}
}
}
// CleanDir removes everything in a directory, but not the directory itself
func CleanDir(filePath string) error {
// If the directory doesn't even exist there's nothing to do, and we do
// not consider this an error
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil
}
d, err := os.Open(filePath)
if err != nil {
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
return err
}
for _, name := range names {
if err = os.RemoveAll(filepath.Join(filePath, name)); err != nil {
return err
}
}
return nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
package phases
import (
"io/ioutil"
@ -22,44 +22,12 @@ import (
"path/filepath"
"testing"
"github.com/lithammer/dedent"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
)
const (
etcdPod = `apiVersion: v1
kind: Pod
metadata:
spec:
volumes:
- hostPath:
path: /path/to/etcd
type: DirectoryOrCreate
name: etcd-data
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs`
etcdPodWithoutDataVolume = `apiVersion: v1
kind: Pod
metadata:
spec:
volumes:
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs`
etcdPodInvalid = `invalid pod`
)
func assertExists(t *testing.T, path string) {
if _, err := os.Stat(path); os.IsNotExist(err) {
t.Errorf("file/directory does not exist; error: %s", err)
@ -250,83 +218,3 @@ func TestRemoveContainers(t *testing.T) {
removeContainers(&fexec, "unix:///var/run/crio/crio.sock")
}
func TestGetEtcdDataDir(t *testing.T) {
tests := map[string]struct {
dataDir string
podYaml string
expectErr bool
writeManifest bool
validConfig bool
}{
"non-existent file returns error": {
expectErr: true,
writeManifest: false,
validConfig: true,
},
"return etcd data dir": {
dataDir: "/path/to/etcd",
podYaml: etcdPod,
expectErr: false,
writeManifest: true,
validConfig: true,
},
"invalid etcd pod": {
podYaml: etcdPodInvalid,
expectErr: true,
writeManifest: true,
validConfig: true,
},
"etcd pod spec without data volume": {
podYaml: etcdPodWithoutDataVolume,
expectErr: true,
writeManifest: true,
validConfig: true,
},
"kubeconfig file doesn't exist": {
dataDir: "/path/to/etcd",
podYaml: etcdPod,
expectErr: false,
writeManifest: true,
validConfig: false,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
tmpdir := testutil.SetupTempDir(t)
defer os.RemoveAll(tmpdir)
manifestPath := filepath.Join(tmpdir, "etcd.yaml")
if test.writeManifest {
err := ioutil.WriteFile(manifestPath, []byte(test.podYaml), 0644)
if err != nil {
t.Fatalf(dedent.Dedent("failed to write pod manifest\n%s\n\tfatal error: %v"), name, err)
}
}
var dataDir string
var err error
if test.validConfig {
cfg := &kubeadmapi.InitConfiguration{}
dataDir, err = getEtcdDataDir(manifestPath, cfg)
} else {
dataDir, err = getEtcdDataDir(manifestPath, nil)
}
if (err != nil) != test.expectErr {
t.Fatalf(dedent.Dedent(
"getEtcdDataDir failed\n%s\nexpected error: %t\n\tgot: %t\nerror: %v"),
name,
test.expectErr,
(err != nil),
err,
)
}
if dataDir != test.dataDir {
t.Fatalf(dedent.Dedent("getEtcdDataDir failed\n%s\n\texpected: %s\ngot: %s"), name, test.dataDir, dataDir)
}
})
}
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"io"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
)
// resetData is the interface to use for reset phases.
// The "resetData" type from "cmd/reset.go" must satisfy this interface.
type resetData interface {
ForceReset() bool
InputReader() io.Reader
IgnorePreflightErrors() sets.String
Cfg() *kubeadmapi.InitConfiguration
Client() clientset.Interface
AddDirsToClean(dirs ...string)
CertificatesDir() string
CRISocketPath() string
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"bufio"
"errors"
"fmt"
"strings"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
)
// NewPreflightPhase creates a kubeadm workflow phase implements preflight checks for reset
func NewPreflightPhase() workflow.Phase {
return workflow.Phase{
Name: "preflight",
Aliases: []string{"pre-flight"},
Short: "Run reset pre-flight checks",
Long: "Run pre-flight checks for kubeadm reset.",
Run: runPreflight,
InheritFlags: []string{
options.IgnorePreflightErrors,
options.ForceReset,
},
}
}
// runPreflight executes preflight checks logic.
func runPreflight(c workflow.RunData) error {
r, ok := c.(resetData)
if !ok {
return errors.New("preflight phase invoked with an invalid data struct")
}
if !r.ForceReset() {
fmt.Println("[reset] WARNING: Changes made to this host by 'kubeadm init' or 'kubeadm join' will be reverted.")
fmt.Print("[reset] Are you sure you want to proceed? [y/N]: ")
s := bufio.NewScanner(r.InputReader())
s.Scan()
if err := s.Err(); err != nil {
return err
}
if strings.ToLower(s.Text()) != "y" {
return errors.New("Aborted reset operation")
}
}
fmt.Println("[preflight] Running pre-flight checks")
return preflight.RunRootCheckOnly(r.IgnorePreflightErrors())
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"errors"
"fmt"
"path/filepath"
"k8s.io/klog"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
utilstaticpod "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
// NewRemoveETCDMemberPhase creates a kubeadm workflow phase for remove-etcd-member
func NewRemoveETCDMemberPhase() workflow.Phase {
return workflow.Phase{
Name: "remove-etcd-member",
Short: "Remove a local etcd member.",
Long: "Remove a local etcd member for a control plane node.",
Run: runPreflight,
InheritFlags: []string{
options.KubeconfigPath,
},
}
}
func runRemoveETCDMemberPhase(c workflow.RunData) error {
r, ok := c.(resetData)
if !ok {
return errors.New("remove-etcd-member-phase phase invoked with an invalid data struct")
}
cfg := r.Cfg()
// Only clear etcd data when using local etcd.
klog.V(1).Infoln("[reset] Checking for etcd config")
etcdManifestPath := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.ManifestsSubDirName, "etcd.yaml")
etcdDataDir, err := getEtcdDataDir(etcdManifestPath, cfg)
if err == nil {
r.AddDirsToClean(etcdDataDir)
if cfg != nil {
if err := etcdphase.RemoveStackedEtcdMemberFromCluster(r.Client(), cfg); err != nil {
klog.Warningf("[reset] failed to remove etcd member: %v\n.Please manually remove this etcd member using etcdctl", err)
}
}
} else {
fmt.Println("[reset] No etcd config found. Assuming external etcd")
fmt.Println("[reset] Please, manually reset etcd to prevent further issues")
}
return nil
}
func getEtcdDataDir(manifestPath string, cfg *kubeadmapi.InitConfiguration) (string, error) {
const etcdVolumeName = "etcd-data"
var dataDir string
if cfg != nil && cfg.Etcd.Local != nil {
return cfg.Etcd.Local.DataDir, nil
}
klog.Warningln("[reset] No kubeadm config, using etcd pod spec to get data directory")
etcdPod, err := utilstaticpod.ReadStaticPodFromDisk(manifestPath)
if err != nil {
return "", err
}
for _, volumeMount := range etcdPod.Spec.Volumes {
if volumeMount.Name == etcdVolumeName {
dataDir = volumeMount.HostPath.Path
break
}
}
if dataDir == "" {
return dataDir, errors.New("invalid etcd pod manifest")
}
return dataDir, nil
}

View File

@ -0,0 +1,136 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/lithammer/dedent"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
)
const (
etcdPod = `apiVersion: v1
kind: Pod
metadata:
spec:
volumes:
- hostPath:
path: /path/to/etcd
type: DirectoryOrCreate
name: etcd-data
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs`
etcdPodWithoutDataVolume = `apiVersion: v1
kind: Pod
metadata:
spec:
volumes:
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs`
etcdPodInvalid = `invalid pod`
)
func TestGetEtcdDataDir(t *testing.T) {
tests := map[string]struct {
dataDir string
podYaml string
expectErr bool
writeManifest bool
validConfig bool
}{
"non-existent file returns error": {
expectErr: true,
writeManifest: false,
validConfig: true,
},
"return etcd data dir": {
dataDir: "/path/to/etcd",
podYaml: etcdPod,
expectErr: false,
writeManifest: true,
validConfig: true,
},
"invalid etcd pod": {
podYaml: etcdPodInvalid,
expectErr: true,
writeManifest: true,
validConfig: true,
},
"etcd pod spec without data volume": {
podYaml: etcdPodWithoutDataVolume,
expectErr: true,
writeManifest: true,
validConfig: true,
},
"kubeconfig file doesn't exist": {
dataDir: "/path/to/etcd",
podYaml: etcdPod,
expectErr: false,
writeManifest: true,
validConfig: false,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
tmpdir := testutil.SetupTempDir(t)
defer os.RemoveAll(tmpdir)
manifestPath := filepath.Join(tmpdir, "etcd.yaml")
if test.writeManifest {
err := ioutil.WriteFile(manifestPath, []byte(test.podYaml), 0644)
if err != nil {
t.Fatalf(dedent.Dedent("failed to write pod manifest\n%s\n\tfatal error: %v"), name, err)
}
}
var dataDir string
var err error
if test.validConfig {
cfg := &kubeadmapi.InitConfiguration{}
dataDir, err = getEtcdDataDir(manifestPath, cfg)
} else {
dataDir, err = getEtcdDataDir(manifestPath, nil)
}
if (err != nil) != test.expectErr {
t.Fatalf(dedent.Dedent(
"getEtcdDataDir failed\n%s\nexpected error: %t\n\tgot: %t\nerror: %v"),
name,
test.expectErr,
(err != nil),
err,
)
}
if dataDir != test.dataDir {
t.Fatalf(dedent.Dedent("getEtcdDataDir failed\n%s\n\texpected: %s\ngot: %s"), name, test.dataDir, dataDir)
}
})
}
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"errors"
"os"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
)
// NewUpdateClusterStatus creates a kubeadm workflow phase for update-cluster-status
func NewUpdateClusterStatus() workflow.Phase {
return workflow.Phase{
Name: "update-cluster-status",
Short: "Remove this node from the ClusterStatus object.",
Long: "Remove this node from the ClusterStatus object if the node is a control plane node.",
Run: runUpdateClusterStatus,
}
}
func runUpdateClusterStatus(c workflow.RunData) error {
r, ok := c.(resetData)
if !ok {
return errors.New("update-cluster-status phase invoked with an invalid data struct")
}
// Reset the ClusterStatus for a given control-plane node.
cfg := r.Cfg()
if isControlPlane() && cfg != nil {
uploadconfig.ResetClusterStatusForNode(cfg.NodeRegistration.Name, r.Client())
}
return nil
}
// isControlPlane checks if a node is a control-plane node by looking up
// the kube-apiserver manifest file
func isControlPlane() bool {
filepath := kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.KubeAPIServer, kubeadmconstants.GetStaticPodDirectory())
_, err := os.Stat(filepath)
return !os.IsNotExist(err)
}

View File

@ -17,16 +17,10 @@ limitations under the License.
package cmd
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/lithammer/dedent"
"github.com/pkg/errors"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
@ -36,18 +30,28 @@ import (
kubeadmapiv1beta2 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
phases "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/reset"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
uploadconfig "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
utilruntime "k8s.io/kubernetes/cmd/kubeadm/app/util/runtime"
utilstaticpod "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
"k8s.io/kubernetes/pkg/util/initsystem"
utilsexec "k8s.io/utils/exec"
)
var (
iptablesCleanupInstructions = dedent.Dedent(`
The reset process does not reset or clean up iptables rules or IPVS tables.
If you wish to reset iptables, you must do so manually.
For example:
iptables -F && iptables -t nat -F && iptables -t mangle -F && iptables -X
If your cluster was setup to utilize IPVS, run ipvsadm --clear (or similar)
to reset your system's IPVS tables.
The reset process does not clean your kubeconfig files and you must remove them manually.
Please, check the contents of the $HOME/.kube/config file.
`)
)
// resetOptions defines all the options exposed via flags by kubeadm reset.
@ -70,6 +74,7 @@ type resetData struct {
inputReader io.Reader
outputWriter io.Writer
cfg *kubeadmapi.InitConfiguration
dirsToClean []string
}
// newResetOptions returns a struct ready for being used for creating cmd join flags.
@ -166,17 +171,23 @@ func NewCmdReset(in io.Reader, out io.Writer, resetOptions *resetOptions) *cobra
err = resetRunner.Run(args)
kubeadmutil.CheckErr(err)
// TODO: remove this once we have all phases in place.
// the method joinData.Run() itself should be removed too.
// Then clean contents from the stateful kubelet, etcd and cni directories
data := c.(*resetData)
kubeadmutil.CheckErr(data.Run())
cleanDirs(data)
// Output help text instructing user how to remove iptables rules
fmt.Print(iptablesCleanupInstructions)
},
}
AddResetFlags(cmd.Flags(), resetOptions)
// initialize the workflow runner with the list of phases
// TODO: append phases here
resetRunner.AppendPhase(phases.NewPreflightPhase())
resetRunner.AppendPhase(phases.NewUpdateClusterStatus())
resetRunner.AppendPhase(phases.NewRemoveETCDMemberPhase())
resetRunner.AppendPhase(phases.NewCleanupNodePhase())
// sets the data builder function, that will be used by the runner
// both when running the entire workflow or single phases
@ -191,6 +202,14 @@ func NewCmdReset(in io.Reader, out io.Writer, resetOptions *resetOptions) *cobra
return cmd
}
func cleanDirs(data *resetData) {
fmt.Printf("[reset] Deleting contents of stateful directories: %v\n", data.dirsToClean)
for _, dir := range data.dirsToClean {
klog.V(1).Infof("[reset] Deleting content of %s", dir)
phases.CleanDir(dir)
}
}
// Cfg returns the InitConfiguration.
func (r *resetData) Cfg() *kubeadmapi.InitConfiguration {
return r.cfg
@ -221,224 +240,14 @@ func (r *resetData) IgnorePreflightErrors() sets.String {
return r.ignorePreflightErrors
}
func (r *resetData) preflight() error {
if !r.ForceReset() {
fmt.Println("[reset] WARNING: Changes made to this host by 'kubeadm init' or 'kubeadm join' will be reverted.")
fmt.Print("[reset] Are you sure you want to proceed? [y/N]: ")
s := bufio.NewScanner(r.InputReader())
s.Scan()
if err := s.Err(); err != nil {
return err
}
if strings.ToLower(s.Text()) != "y" {
return errors.New("Aborted reset operation")
}
}
fmt.Println("[preflight] Running pre-flight checks")
if err := preflight.RunRootCheckOnly(r.IgnorePreflightErrors()); err != nil {
return err
}
return nil
// AddDirsToClean add a list of dirs to the list of dirs that will be removed.
func (r *resetData) AddDirsToClean(dirs ...string) {
r.dirsToClean = append(r.dirsToClean, dirs...)
}
// Run reverts any changes made to this host by "kubeadm init" or "kubeadm join".
func (r *resetData) Run() error {
var dirsToClean []string
cfg := r.Cfg()
certsDir := r.CertificatesDir()
client := r.Client()
err := r.preflight()
if err != nil {
return err
}
// Reset the ClusterStatus for a given control-plane node.
if isControlPlane() && cfg != nil {
uploadconfig.ResetClusterStatusForNode(cfg.NodeRegistration.Name, client)
}
// Only clear etcd data when using local etcd.
klog.V(1).Infoln("[reset] Checking for etcd config")
etcdManifestPath := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.ManifestsSubDirName, "etcd.yaml")
etcdDataDir, err := getEtcdDataDir(etcdManifestPath, cfg)
if err == nil {
dirsToClean = append(dirsToClean, etcdDataDir)
if cfg != nil {
if err := etcdphase.RemoveStackedEtcdMemberFromCluster(client, cfg); err != nil {
klog.Warningf("[reset] failed to remove etcd member: %v\n.Please manually remove this etcd member using etcdctl", err)
}
}
} else {
fmt.Println("[reset] No etcd config found. Assuming external etcd")
fmt.Println("[reset] Please manually reset etcd to prevent further issues")
}
// Try to stop the kubelet service
klog.V(1).Infoln("[reset] Getting init system")
initSystem, err := initsystem.GetInitSystem()
if err != nil {
klog.Warningln("[reset] The kubelet service could not be stopped by kubeadm. Unable to detect a supported init system!")
klog.Warningln("[reset] Please ensure kubelet is stopped manually")
} else {
fmt.Println("[reset] Stopping the kubelet service")
if err := initSystem.ServiceStop("kubelet"); err != nil {
klog.Warningf("[reset] The kubelet service could not be stopped by kubeadm: [%v]\n", err)
klog.Warningln("[reset] Please ensure kubelet is stopped manually")
}
}
// Try to unmount mounted directories under kubeadmconstants.KubeletRunDirectory in order to be able to remove the kubeadmconstants.KubeletRunDirectory directory later
fmt.Printf("[reset] Unmounting mounted directories in %q\n", kubeadmconstants.KubeletRunDirectory)
// In case KubeletRunDirectory holds a symbolic link, evaluate it
var absoluteKubeletRunDirectory string
absoluteKubeletRunDirectory, err = filepath.EvalSymlinks(kubeadmconstants.KubeletRunDirectory)
if err != nil {
klog.Errorf("[reset] Failed to evaluate the %q directory. Skipping its unmount and cleanup: %v", kubeadmconstants.KubeletRunDirectory, err)
} else {
// Only unmount mount points which start with "/var/lib/kubelet" or absolute path of symbolic link, and avoid using empty absoluteKubeletRunDirectory
umountDirsCmd := fmt.Sprintf("awk '$2 ~ path {print $2}' path=%s/ /proc/mounts | xargs -r umount", absoluteKubeletRunDirectory)
klog.V(1).Infof("[reset] Executing command %q", umountDirsCmd)
umountOutputBytes, err := exec.Command("sh", "-c", umountDirsCmd).Output()
if err != nil {
klog.Errorf("[reset] Failed to unmount mounted directories in %s: %s\n", kubeadmconstants.KubeletRunDirectory, string(umountOutputBytes))
} else {
// Only clean absoluteKubeletRunDirectory if umountDirsCmd passed without error
dirsToClean = append(dirsToClean, absoluteKubeletRunDirectory)
}
}
klog.V(1).Info("[reset] Removing Kubernetes-managed containers")
if err := removeContainers(utilsexec.New(), r.criSocketPath); err != nil {
klog.Errorf("[reset] Failed to remove containers: %v", err)
}
dirsToClean = append(dirsToClean, []string{"/etc/cni/net.d", "/var/lib/dockershim", "/var/run/kubernetes"}...)
// Then clean contents from the stateful kubelet, etcd and cni directories
fmt.Printf("[reset] Deleting contents of stateful directories: %v\n", dirsToClean)
for _, dir := range dirsToClean {
klog.V(1).Infof("[reset] Deleting content of %s", dir)
cleanDir(dir)
}
// Remove contents from the config and pki directories
klog.V(1).Infoln("[reset] Removing contents from the config and pki directories")
if certsDir != kubeadmapiv1beta2.DefaultCertificatesDir {
klog.Warningf("[reset] WARNING: Cleaning a non-default certificates directory: %q\n", certsDir)
}
resetConfigDir(kubeadmconstants.KubernetesDir, certsDir)
// Output help text instructing user how to remove iptables rules
msg := dedent.Dedent(`
The reset process does not reset or clean up iptables rules or IPVS tables.
If you wish to reset iptables, you must do so manually.
For example:
iptables -F && iptables -t nat -F && iptables -t mangle -F && iptables -X
If your cluster was setup to utilize IPVS, run ipvsadm --clear (or similar)
to reset your system's IPVS tables.
The reset process does not clean your kubeconfig files and you must remove them manually.
Please, check the contents of the $HOME/.kube/config file.
`)
fmt.Print(msg)
return nil
}
func getEtcdDataDir(manifestPath string, cfg *kubeadmapi.InitConfiguration) (string, error) {
const etcdVolumeName = "etcd-data"
var dataDir string
if cfg != nil && cfg.Etcd.Local != nil {
return cfg.Etcd.Local.DataDir, nil
}
klog.Warningln("[reset] No kubeadm config, using etcd pod spec to get data directory")
etcdPod, err := utilstaticpod.ReadStaticPodFromDisk(manifestPath)
if err != nil {
return "", err
}
for _, volumeMount := range etcdPod.Spec.Volumes {
if volumeMount.Name == etcdVolumeName {
dataDir = volumeMount.HostPath.Path
break
}
}
if dataDir == "" {
return dataDir, errors.New("invalid etcd pod manifest")
}
return dataDir, nil
}
func removeContainers(execer utilsexec.Interface, criSocketPath string) error {
containerRuntime, err := utilruntime.NewContainerRuntime(execer, criSocketPath)
if err != nil {
return err
}
containers, err := containerRuntime.ListKubeContainers()
if err != nil {
return err
}
return containerRuntime.RemoveContainers(containers)
}
// cleanDir removes everything in a directory, but not the directory itself
func cleanDir(filePath string) error {
// If the directory doesn't even exist there's nothing to do, and we do
// not consider this an error
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil
}
d, err := os.Open(filePath)
if err != nil {
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
return err
}
for _, name := range names {
if err = os.RemoveAll(filepath.Join(filePath, name)); err != nil {
return err
}
}
return nil
}
// resetConfigDir is used to cleanup the files kubeadm writes in /etc/kubernetes/.
func resetConfigDir(configPathDir, pkiPathDir string) {
dirsToClean := []string{
filepath.Join(configPathDir, kubeadmconstants.ManifestsSubDirName),
pkiPathDir,
}
fmt.Printf("[reset] Deleting contents of config directories: %v\n", dirsToClean)
for _, dir := range dirsToClean {
if err := cleanDir(dir); err != nil {
klog.Errorf("[reset] Failed to remove directory: %q [%v]\n", dir, err)
}
}
filesToClean := []string{
filepath.Join(configPathDir, kubeadmconstants.AdminKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.KubeletKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.KubeletBootstrapKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.ControllerManagerKubeConfigFileName),
filepath.Join(configPathDir, kubeadmconstants.SchedulerKubeConfigFileName),
}
fmt.Printf("[reset] Deleting files: %v\n", filesToClean)
for _, path := range filesToClean {
if err := os.RemoveAll(path); err != nil {
klog.Errorf("[reset] Failed to remove file: %q [%v]\n", path, err)
}
}
// CRISocketPath returns the criSocketPath.
func (r *resetData) CRISocketPath() string {
return r.criSocketPath
}
func resetDetectCRISocket(cfg *kubeadmapi.InitConfiguration) (string, error) {
@ -450,13 +259,3 @@ func resetDetectCRISocket(cfg *kubeadmapi.InitConfiguration) (string, error) {
// if this fails, try to detect it
return utilruntime.DetectCRISocket()
}
// isControlPlane checks if a node is a control-plane node by looking up
// the kube-apiserver manifest file
func isControlPlane() bool {
filepath := kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.KubeAPIServer, kubeadmconstants.GetStaticPodDirectory())
if _, err := os.Stat(filepath); os.IsNotExist(err) {
return false
}
return true
}