kubeadm: Upload CRISocket information and hence make kubeadm join blocking

This commit is contained in:
Lucas Käldström 2018-06-06 01:05:49 +03:00
parent 2bb6fdc675
commit 041858232c
No known key found for this signature in database
GPG Key ID: 3FA3783D77751514
5 changed files with 110 additions and 39 deletions

View File

@ -52,6 +52,7 @@ import (
kubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig"
kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet"
markmasterphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/markmaster"
patchnodephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/patchnode"
selfhostingphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting"
uploadconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
@ -379,7 +380,10 @@ func (i *Init) Run(out io.Writer) error {
glog.V(1).Infof("[init] waiting for the API server to be healthy")
waiter := getWaiter(i, client)
if err := waitForAPIAndKubelet(waiter); err != nil {
fmt.Printf("[init] waiting for the kubelet to boot up the control plane as Static Pods from directory %q \n", kubeadmconstants.GetStaticPodDirectory())
fmt.Println("[init] this might take a minute or longer if the control plane images have to be pulled")
if err := waitForKubeletAndFunc(waiter, waiter.WaitForAPI); err != nil {
ctx := map[string]string{
"Error": fmt.Sprintf("%v", err),
"APIServerImage": images.GetCoreImage(kubeadmconstants.KubeAPIServer, i.cfg.GetControlPlaneImageRepository(), i.cfg.KubernetesVersion, i.cfg.UnifiedControlPlaneImage),
@ -417,6 +421,11 @@ func (i *Init) Run(out io.Writer) error {
return fmt.Errorf("error marking master: %v", err)
}
glog.V(1).Infof("[init] preserving the crisocket information for the master")
if err := patchnodephase.AnnotateCRISocket(client, i.cfg.NodeRegistration.Name, i.cfg.NodeRegistration.CRISocket); err != nil {
return fmt.Errorf("error uploading crisocket: %v", err)
}
// NOTE: flag "--dynamic-config-dir" should be specified in /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
// This feature is disabled by default, as it is alpha still
if features.Enabled(i.cfg.FeatureGates, features.DynamicKubeletConfig) {
@ -581,15 +590,11 @@ func getWaiter(i *Init, client clientset.Interface) apiclient.Waiter {
return apiclient.NewKubeWaiter(client, timeout, os.Stdout)
}
// waitForAPIAndKubelet waits primarily for the API server to come up. If that takes a long time, and the kubelet
// /healthz and /healthz/syncloop endpoints continuously are unhealthy, kubeadm will error out after a period of
// backoffing exponentially
func waitForAPIAndKubelet(waiter apiclient.Waiter) error {
// waitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
func waitForKubeletAndFunc(waiter apiclient.Waiter, f func() error) error {
errorChan := make(chan error)
fmt.Printf("[init] waiting for the kubelet to boot up the control plane as Static Pods from directory %q \n", kubeadmconstants.GetStaticPodDirectory())
fmt.Println("[init] this might take a minute or longer if the control plane images have to be pulled")
go func(errC chan error, waiter apiclient.Waiter) {
// This goroutine can only make kubeadm init fail. If this check succeeds, it won't do anything special
if err := waiter.WaitForHealthyKubelet(40*time.Second, "http://localhost:10248/healthz"); err != nil {
@ -598,9 +603,9 @@ func waitForAPIAndKubelet(waiter apiclient.Waiter) error {
}(errorChan, waiter)
go func(errC chan error, waiter apiclient.Waiter) {
// This main goroutine sends whatever WaitForAPI returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if
errC <- waiter.WaitForAPI()
// This main goroutine sends whatever the f function returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if the function returns an error
errC <- f()
}(errorChan, waiter)
// This call is blocking until one of the goroutines sends to errorChan

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
@ -28,6 +29,7 @@ import (
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
certutil "k8s.io/client-go/util/cert"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
@ -37,8 +39,10 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/discovery"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet"
patchnodephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/patchnode"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
utilsexec "k8s.io/utils/exec"
@ -96,6 +100,19 @@ var (
Often times the same token is used for both parts. In this case, the
--token flag can be used instead of specifying each token individually.
`)
kubeadmJoinFailMsgf = dedent.Dedent(`
Unfortunately, an error has occurred:
%v
This error is likely caused by:
- The kubelet is not running
- The kubelet is unhealthy due to a misconfiguration of the node in some way (required cgroups disabled)
If you are on a systemd-powered system, you can try to troubleshoot the error with the following commands:
- 'systemctl status kubelet'
- 'journalctl -xeu kubelet'
`)
)
// NewCmdJoin returns "kubeadm join" command.
@ -269,16 +286,30 @@ func (j *Join) Run(out io.Writer) error {
}
// Now the kubelet will perform the TLS Bootstrap, transforming bootstrap-kubeconfig.conf to kubeconfig.conf in /etc/kubernetes
// Wait for the kubelet to create the /etc/kubernetes/kubelet.conf KubeConfig file. If this process
// times out, display a somewhat user-friendly message.
waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout)
if err := waitForKubeletAndFunc(waiter, waitForTLSBootstrappedClient); err != nil {
fmt.Printf(kubeadmJoinFailMsgf, err)
return err
}
// When we know the /etc/kubernetes/kubelet.conf file is available, get the client
kubeletKubeConfig := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.KubeletKubeConfigFileName)
client, err := kubeconfigutil.ClientSetFromFile(kubeletKubeConfig)
if err != nil {
return err
}
glog.V(1).Infof("[join] preserving the crisocket information for the node")
if err := patchnodephase.AnnotateCRISocket(client, j.cfg.NodeRegistration.Name, j.cfg.NodeRegistration.CRISocket); err != nil {
return fmt.Errorf("error uploading crisocket: %v", err)
}
// NOTE: the "--dynamic-config-dir" flag should be specified in /etc/systemd/system/kubelet.service.d/10-kubeadm.conf for this to work
// This feature is disabled by default, as it is alpha still
glog.V(1).Infoln("[join] enabling dynamic kubelet configuration")
if features.Enabled(j.cfg.FeatureGates, features.DynamicKubeletConfig) {
client, err := kubeletphase.GetLocalNodeTLSBootstrappedClient()
if err != nil {
return err
}
if err := kubeletphase.EnableDynamicConfigForNode(client, j.cfg.NodeRegistration.Name, kubeletVersion); err != nil {
return fmt.Errorf("error consuming base kubelet configuration: %v", err)
}
@ -287,3 +318,15 @@ func (j *Join) Run(out io.Writer) error {
fmt.Fprintf(out, joinDoneMsgf)
return nil
}
// waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available
func waitForTLSBootstrappedClient() error {
fmt.Println("[tlsbootstrap] Waiting for the kubelet to perform the TLS Bootstrap...")
kubeletKubeConfig := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.KubeletKubeConfigFileName)
// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned.
return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) {
_, err := os.Stat(kubeletKubeConfig)
return (err == nil), nil
})
}

View File

@ -171,6 +171,8 @@ const (
PatchNodeTimeout = 2 * time.Minute
// UpdateNodeTimeout specifies how long kubeadm should wait for updating node with the initial remote configuration of kubelet before timing out
UpdateNodeTimeout = 2 * time.Minute
// TLSBootstrapTimeout specifies how long kubeadm should wait for the kubelet to perform the TLS Bootstrap
TLSBootstrapTimeout = 2 * time.Minute
// MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow.
// We need at least ten, because the DNS service is always at the tenth cluster clusterIP
@ -184,6 +186,10 @@ const (
// This is a duplicate definition of the constant in pkg/controller/service/service_controller.go
LabelNodeRoleMaster = "node-role.kubernetes.io/master"
// AnnotationKubeadmCRISocket specifies the annotation kubeadm uses to preserve the crisocket information given to kubeadm at
// init/join time for use later. kubeadm annotates the node object with this information
AnnotationKubeadmCRISocket = "kubeadm.alpha.kubernetes.io/cri-socket"
// MasterConfigurationConfigMap specifies in what ConfigMap in the kube-system namespace the `kubeadm init` configuration should be stored
MasterConfigurationConfigMap = "kubeadm-config"

View File

@ -18,17 +18,13 @@ package kubelet
import (
"fmt"
"os"
"path/filepath"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
"k8s.io/kubernetes/pkg/util/version"
)
@ -63,22 +59,3 @@ func patchNodeForDynamicConfig(n *v1.Node, configMapName string, configMapUID ty
},
}
}
// GetLocalNodeTLSBootstrappedClient waits for the kubelet to perform the TLS bootstrap
// and then creates a client from config file /etc/kubernetes/kubelet.conf
func GetLocalNodeTLSBootstrappedClient() (clientset.Interface, error) {
fmt.Println("[tlsbootstrap] Waiting for the kubelet to perform the TLS Bootstrap...")
kubeletKubeConfig := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.KubeletKubeConfigFileName)
// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned.
err := wait.PollImmediateInfinite(kubeadmconstants.APICallRetryInterval, func() (bool, error) {
_, err := os.Stat(kubeletKubeConfig)
return (err == nil), nil
})
if err != nil {
return nil, err
}
return kubeconfigutil.ClientSetFromFile(kubeletKubeConfig)
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package patchnode
import (
"fmt"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
)
// AnnotateCRISocket annotates the node with the given crisocket
func AnnotateCRISocket(client clientset.Interface, nodeName string, criSocket string) error {
fmt.Printf("[patchnode] Uploading the CRI Socket information %q to the Node API object %q as an annotation\n", criSocket, nodeName)
return apiclient.PatchNode(client, nodeName, func(n *v1.Node) {
annotateNodeWithCRISocket(n, criSocket)
})
}
func annotateNodeWithCRISocket(n *v1.Node, criSocket string) {
n.ObjectMeta.Annotations[constants.AnnotationKubeadmCRISocket] = criSocket
}