kubeadm: graduate wait-control-plane phase

This commit is contained in:
Lubomir I. Ivanov 2018-11-01 02:20:13 +02:00
parent dad6741530
commit cbb448113d
9 changed files with 218 additions and 110 deletions

View File

@ -53,7 +53,6 @@ go_library(
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//cmd/kubeadm/app/util/runtime:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",

View File

@ -24,7 +24,6 @@ import (
"path/filepath"
"strings"
"text/template"
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
@ -57,7 +56,6 @@ import (
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
utilsexec "k8s.io/utils/exec"
)
@ -128,6 +126,8 @@ type initData struct {
dryRunDir string
externalCA bool
client clientset.Interface
waiter apiclient.Waiter
outputWriter io.Writer
}
// NewCmdInit returns "kubeadm init" command.
@ -168,12 +168,13 @@ func NewCmdInit(out io.Writer) *cobra.Command {
initRunner.AppendPhase(phases.NewKubeConfigPhase())
initRunner.AppendPhase(phases.NewControlPlanePhase())
initRunner.AppendPhase(phases.NewEtcdPhase())
initRunner.AppendPhase(phases.NewWaitControlPlanePhase())
// TODO: add other phases to the runner.
// sets the data builder function, that will be used by the runner
// both when running the entire workflow or single phases
initRunner.SetDataInitializer(func() (workflow.RunData, error) {
return newInitData(cmd, options)
return newInitData(cmd, options, out)
})
// binds the Runner to kubeadm init command by altering
@ -270,7 +271,7 @@ func newInitOptions() *initOptions {
// newInitData returns a new initData struct to be used for the execution of the kubeadm init workflow.
// This func takes care of validating initOptions passed to the command, and then it converts
// options into the internal InitConfiguration type that is used as input all the phases in the kubeadm init workflow
func newInitData(cmd *cobra.Command, options *initOptions) (initData, error) {
func newInitData(cmd *cobra.Command, options *initOptions, out io.Writer) (initData, error) {
// Re-apply defaults to the public kubeadm API (this will set only values not exposed/not set as a flags)
kubeadmscheme.Scheme.Default(options.externalcfg)
@ -324,6 +325,7 @@ func newInitData(cmd *cobra.Command, options *initOptions) (initData, error) {
dryRunDir: dryRunDir,
ignorePreflightErrors: ignorePreflightErrorsSet,
externalCA: externalCA,
outputWriter: out,
}, nil
}
@ -389,6 +391,11 @@ func (d initData) ExternalCA() bool {
return d.externalCA
}
// OutputWriter returns the io.Writer used to write output to by this command.
func (d initData) OutputWriter() io.Writer {
return d.outputWriter
}
// Client returns a Kubernetes client to be used by kubeadm.
// This function is implemented as a singleton, thus avoiding to recreate the client when it is used by different phases.
// Important. This function must be called after the admin.conf kubeconfig file is created.
@ -434,32 +441,16 @@ func runInit(i *initData, out io.Writer) error {
adminKubeConfigPath := filepath.Join(kubeConfigDir, kubeadmconstants.AdminKubeConfigFileName)
// If we're dry-running, print the generated manifests
if err := printFilesIfDryRunning(i.dryRun, manifestDir); err != nil {
return errors.Wrap(err, "error printing files on dryrun")
}
// Create a Kubernetes client and wait for the API server to be healthy (if not dryrunning)
glog.V(1).Infof("creating Kubernetes client")
client, err := createClient(i.cfg, i.dryRun)
// TODO: client and waiter are temporary until the rest of the phases that use them
// are removed from this function.
client, err := i.Client()
if err != nil {
return errors.Wrap(err, "error creating client")
return errors.Wrap(err, "failed to create client")
}
// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled
glog.V(1).Infof("[init] waiting for the API server to be healthy")
waiter := getWaiter(i, client)
fmt.Printf("[init] waiting for the kubelet to boot up the control plane as Static Pods from directory %q \n", kubeadmconstants.GetStaticPodDirectory())
if err := waitForKubeletAndFunc(waiter, waiter.WaitForAPI); err != nil {
ctx := map[string]string{
"Error": fmt.Sprintf("%v", err),
}
kubeletFailTempl.Execute(out, ctx)
return errors.New("couldn't initialize a Kubernetes cluster")
// TODO: NewControlPlaneWaiter should be converted to private after the self-hosting phase is removed.
waiter, err := phases.NewControlPlaneWaiter(i.dryRun, client, i.outputWriter)
if err != nil {
return errors.Wrap(err, "failed to create waiter")
}
// Upload currently used configuration to the cluster
@ -594,18 +585,6 @@ func printJoinCommand(out io.Writer, adminKubeConfigPath, token string, skipToke
return initDoneTempl.Execute(out, ctx)
}
// createClient creates a clientset.Interface object
func createClient(cfg *kubeadmapi.InitConfiguration, dryRun bool) (clientset.Interface, error) {
if dryRun {
// If we're dry-running; we should create a faked client that answers some GETs in order to be able to do the full init flow and just logs the rest of requests
dryRunGetter := apiclient.NewInitDryRunGetter(cfg.NodeRegistration.Name, cfg.Networking.ServiceSubnet)
return apiclient.NewDryRunClient(dryRunGetter, os.Stdout), nil
}
// If we're acting for real, we should create a connection to the API server and wait for it to come up
return kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetAdminKubeConfigPath())
}
// getDirectoriesToUse returns the (in order) certificates, kubeconfig and Static Pod manifest directories, followed by a possible error
// This behaves differently when dry-running vs the normal flow
func getDirectoriesToUse(dryRun bool, dryRunDir string, defaultPkiDir string) (string, string, string, string, error) {
@ -616,67 +595,3 @@ func getDirectoriesToUse(dryRun bool, dryRunDir string, defaultPkiDir string) (s
return defaultPkiDir, kubeadmconstants.KubernetesDir, kubeadmconstants.GetStaticPodDirectory(), kubeadmconstants.KubeletRunDirectory, nil
}
// printFilesIfDryRunning prints the Static Pod manifests to stdout and informs about the temporary directory to go and lookup
func printFilesIfDryRunning(dryRun bool, manifestDir string) error {
if !dryRun {
return nil
}
fmt.Printf("[dryrun] wrote certificates, kubeconfig files and control plane manifests to the %q directory\n", manifestDir)
fmt.Println("[dryrun] the certificates or kubeconfig files would not be printed due to their sensitive nature")
fmt.Printf("[dryrun] please examine the %q directory for details about what would be written\n", manifestDir)
// Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests
files := []dryrunutil.FileToPrint{}
// Print static pod manifests
for _, component := range kubeadmconstants.MasterComponents {
realPath := kubeadmconstants.GetStaticPodFilepath(component, manifestDir)
outputPath := kubeadmconstants.GetStaticPodFilepath(component, kubeadmconstants.GetStaticPodDirectory())
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
// Print kubelet config manifests
kubeletConfigFiles := []string{kubeadmconstants.KubeletConfigurationFileName, kubeadmconstants.KubeletEnvFileName}
for _, filename := range kubeletConfigFiles {
realPath := filepath.Join(manifestDir, filename)
outputPath := filepath.Join(kubeadmconstants.KubeletRunDirectory, filename)
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
return dryrunutil.PrintDryRunFiles(files, os.Stdout)
}
// getWaiter gets the right waiter implementation for the right occasion
func getWaiter(ctx *initData, client clientset.Interface) apiclient.Waiter {
if ctx.dryRun {
return dryrunutil.NewWaiter()
}
// We know that the images should be cached locally already as we have pulled them using
// crictl in the preflight checks. Hence we can have a pretty short timeout for the kubelet
// to start creating Static Pods.
timeout := 4 * time.Minute
return apiclient.NewKubeWaiter(client, timeout, os.Stdout)
}
// waitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
func waitForKubeletAndFunc(waiter apiclient.Waiter, f func() error) error {
errorChan := make(chan error)
go func(errC chan error, waiter apiclient.Waiter) {
// This goroutine can only make kubeadm init fail. If this check succeeds, it won't do anything special
if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil {
errC <- err
}
}(errorChan, waiter)
go func(errC chan error, waiter apiclient.Waiter) {
// This main goroutine sends whatever the f function returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if the function returns an error
errC <- f()
}(errorChan, waiter)
// This call is blocking until one of the goroutines sends to errorChan
return <-errorChan
}

View File

@ -548,7 +548,7 @@ func (j *Join) BootstrapKubelet(tlsBootstrapCfg *clientcmdapi.Config) error {
// Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process
// times out, display a somewhat user-friendly message.
waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout)
if err := waitForKubeletAndFunc(waiter, waitForTLSBootstrappedClient); err != nil {
if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil {
fmt.Printf(kubeadmJoinFailMsg, err)
return err
}

View File

@ -15,6 +15,7 @@ go_library(
"selfhosting.go",
"uploadconfig.go",
"util.go",
"waitcontrolplane.go",
],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases",
visibility = ["//visibility:public"],
@ -46,6 +47,7 @@ go_library(
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/audit:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//pkg/util/normalizer:go_default_library",
"//pkg/version:go_default_library",
@ -55,6 +57,7 @@ go_library(
"//staging/src/k8s.io/cluster-bootstrap/token/api:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/github.com/renstrom/dedent:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",

View File

@ -129,7 +129,7 @@ func runControlPlaneSubPhase(component string) func(c workflow.RunData) error {
}
}
fmt.Printf("[control-plane] creating static Pod manifest for %q\n", component)
fmt.Printf("[control-plane] Creating static Pod manifest for %q\n", component)
if err := controlplane.CreateStaticPodFiles(data.ManifestDir(), cfg, component); err != nil {
return err
}

View File

@ -0,0 +1,156 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"fmt"
"io"
"path/filepath"
"text/template"
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/renstrom/dedent"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
)
var (
kubeletFailTempl = template.Must(template.New("init").Parse(dedent.Dedent(`
Unfortunately, an error has occurred:
{{ .Error }}
This error is likely caused by:
- The kubelet is not running
- The kubelet is unhealthy due to a misconfiguration of the node in some way (required cgroups disabled)
If you are on a systemd-powered system, you can try to troubleshoot the error with the following commands:
- 'systemctl status kubelet'
- 'journalctl -xeu kubelet'
Additionally, a control plane component may have crashed or exited when started by the container runtime.
To troubleshoot, list all containers using your preferred container runtimes CLI, e.g. docker.
Here is one example how you may list all Kubernetes containers running in docker:
- 'docker ps -a | grep kube | grep -v pause'
Once you have found the failing container, you can inspect its logs with:
- 'docker logs CONTAINERID'
`)))
)
type waitControlPlaneData interface {
Cfg() *kubeadmapi.InitConfiguration
ManifestDir() string
DryRun() bool
Client() (clientset.Interface, error)
OutputWriter() io.Writer
}
// NewWaitControlPlanePhase is a hidden phase that runs after the control-plane and etcd phases
func NewWaitControlPlanePhase() workflow.Phase {
phase := workflow.Phase{
Name: "wait-control-plane",
Run: runWaitControlPlanePhase,
Hidden: true,
}
return phase
}
func runWaitControlPlanePhase(c workflow.RunData) error {
data, ok := c.(waitControlPlaneData)
if !ok {
return errors.New("wait-control-plane phase invoked with an invalid data struct")
}
// If we're dry-running, print the generated manifests
if err := printFilesIfDryRunning(data); err != nil {
return errors.Wrap(err, "error printing files on dryrun")
}
// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled
glog.V(1).Infof("[wait-control-plane] Waiting for the API server to be healthy")
client, err := data.Client()
if err != nil {
return errors.Wrap(err, "cannot obtain client")
}
waiter, err := NewControlPlaneWaiter(data.DryRun(), client, data.OutputWriter())
if err != nil {
return errors.Wrap(err, "error creating waiter")
}
fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q\n", data.ManifestDir())
if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil {
ctx := map[string]string{
"Error": fmt.Sprintf("%v", err),
}
kubeletFailTempl.Execute(data.OutputWriter(), ctx)
return errors.New("couldn't initialize a Kubernetes cluster")
}
return nil
}
// printFilesIfDryRunning prints the Static Pod manifests to stdout and informs about the temporary directory to go and lookup
func printFilesIfDryRunning(data waitControlPlaneData) error {
if !data.DryRun() {
return nil
}
manifestDir := data.ManifestDir()
fmt.Printf("[dryrun] Wrote certificates, kubeconfig files and control plane manifests to the %q directory\n", manifestDir)
fmt.Println("[dryrun] The certificates or kubeconfig files would not be printed due to their sensitive nature")
fmt.Printf("[dryrun] Please examine the %q directory for details about what would be written\n", manifestDir)
// Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests
files := []dryrunutil.FileToPrint{}
// Print static pod manifests
for _, component := range kubeadmconstants.MasterComponents {
realPath := kubeadmconstants.GetStaticPodFilepath(component, manifestDir)
outputPath := kubeadmconstants.GetStaticPodFilepath(component, kubeadmconstants.GetStaticPodDirectory())
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
// Print kubelet config manifests
kubeletConfigFiles := []string{kubeadmconstants.KubeletConfigurationFileName, kubeadmconstants.KubeletEnvFileName}
for _, filename := range kubeletConfigFiles {
realPath := filepath.Join(manifestDir, filename)
outputPath := filepath.Join(kubeadmconstants.KubeletRunDirectory, filename)
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
}
return dryrunutil.PrintDryRunFiles(files, data.OutputWriter())
}
// NewControlPlaneWaiter returns a new waiter that is used to wait on the control plane to boot up.
// TODO: make private (lowercase) after self-hosting phase is removed.
func NewControlPlaneWaiter(dryRun bool, client clientset.Interface, out io.Writer) (apiclient.Waiter, error) {
if dryRun {
return dryrunutil.NewWaiter(), nil
}
// We know that the images should be cached locally already as we have pulled them using
// crictl in the preflight checks. Hence we can have a pretty short timeout for the kubelet
// to start creating Static Pods.
timeout := 4 * time.Minute
return apiclient.NewKubeWaiter(client, timeout, out), nil
}

View File

@ -134,6 +134,11 @@ func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error {
return nil
}
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error {
return nil
}
type fakeStaticPodPathManager struct {
kubernetesDir string
realManifestDir string

View File

@ -1,5 +1,5 @@
/*
Copyright 2017 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -50,6 +51,8 @@ type Waiter interface {
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForHealthyKubelet(initalTimeout time.Duration, healthzEndpoint string) error
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
WaitForKubeletAndFunc(f func() error) error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
@ -132,17 +135,18 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *KubeWaiter) WaitForHealthyKubelet(initalTimeout time.Duration, healthzEndpoint string) error {
time.Sleep(initalTimeout)
fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initalTimeout)
return TryRunCommand(func() error {
client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
resp, err := client.Get(healthzEndpoint)
if err != nil {
fmt.Printf("[kubelet-check] It seems like the kubelet isn't running or healthy.\n")
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Printf("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode)
return errors.New("the kubelet healthz endpoint is unhealthy")
}
@ -150,6 +154,27 @@ func (w *KubeWaiter) WaitForHealthyKubelet(initalTimeout time.Duration, healthzE
}, 5) // a failureThreshold of five means waiting for a total of 155 seconds
}
// WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error {
errorChan := make(chan error)
go func(errC chan error, waiter Waiter) {
if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil {
errC <- err
}
}(errorChan, w)
go func(errC chan error, waiter Waiter) {
// This main goroutine sends whatever the f function returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if the function returns an error
errC <- f()
}(errorChan, w)
// This call is blocking until one of the goroutines sends to errorChan
return <-errorChan
}
// SetTimeout adjusts the timeout to the specified duration
func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
w.timeout = timeout

View File

@ -111,6 +111,11 @@ func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string)
return nil
}
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *Waiter) WaitForKubeletAndFunc(f func() error) error {
return nil
}
// SetTimeout is a no-op; we don't wait in this implementation
func (w *Waiter) SetTimeout(_ time.Duration) {}