kubeadm: Introduce controllable timeout on join

Signed-off-by: Rostislav M. Georgiev <rostislavg@vmware.com>
This commit is contained in:
Rostislav M. Georgiev 2018-03-09 16:21:52 +02:00
parent a5133305a9
commit 230a9c67ce
11 changed files with 62 additions and 10 deletions

View File

@ -128,6 +128,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.DiscoveryFile = "foo"
obj.DiscoveryToken = "foo"
obj.DiscoveryTokenAPIServers = []string{"foo"}
obj.DiscoveryTimeout = &metav1.Duration{Duration: 1}
obj.TLSBootstrapToken = "foo"
obj.Token = "foo"
obj.CRISocket = "foo"

View File

@ -224,6 +224,8 @@ type NodeConfiguration struct {
// will be fetched. Currently we only pay attention to one API server but
// hope to support >1 in the future.
DiscoveryTokenAPIServers []string
// DiscoveryTimeout modifies the discovery timeout
DiscoveryTimeout *metav1.Duration
// NodeName is the name of the node to join the cluster. Defaults
// to the name of the host.
NodeName string

View File

@ -19,6 +19,7 @@ package v1alpha1
import (
"net/url"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -68,6 +69,9 @@ const (
DefaultProxyBindAddressv6 = "::"
// KubeproxyKubeConfigFileName defines the file name for the kube-proxy's KubeConfig file
KubeproxyKubeConfigFileName = "/var/lib/kube-proxy/kubeconfig.conf"
// DefaultDiscoveryTimeout specifies the default discovery timeout for kubeadm (used unless one is specified in the NodeConfiguration)
DefaultDiscoveryTimeout = 5 * time.Minute
)
var (
@ -177,6 +181,11 @@ func SetDefaults_NodeConfiguration(obj *NodeConfiguration) {
obj.DiscoveryFile = u.Path
}
}
if obj.DiscoveryTimeout == nil {
obj.DiscoveryTimeout = &metav1.Duration{
Duration: DefaultDiscoveryTimeout,
}
}
}
// SetDefaultsEtcdSelfHosted sets defaults for self-hosted etcd if used

View File

@ -214,6 +214,8 @@ type NodeConfiguration struct {
// will be fetched. Currently we only pay attention to one API server but
// hope to support >1 in the future.
DiscoveryTokenAPIServers []string `json:"discoveryTokenAPIServers,omitempty"`
// DiscoveryTimeout modifies the discovery timeout
DiscoveryTimeout *metav1.Duration `json:"discoveryTimeout,omitempty"`
// NodeName is the name of the node to join the cluster. Defaults
// to the name of the host.
NodeName string `json:"nodeName"`

View File

@ -347,6 +347,7 @@ func autoConvert_v1alpha1_NodeConfiguration_To_kubeadm_NodeConfiguration(in *Nod
out.DiscoveryFile = in.DiscoveryFile
out.DiscoveryToken = in.DiscoveryToken
out.DiscoveryTokenAPIServers = *(*[]string)(unsafe.Pointer(&in.DiscoveryTokenAPIServers))
out.DiscoveryTimeout = (*v1.Duration)(unsafe.Pointer(in.DiscoveryTimeout))
out.NodeName = in.NodeName
out.TLSBootstrapToken = in.TLSBootstrapToken
out.Token = in.Token
@ -367,6 +368,7 @@ func autoConvert_kubeadm_NodeConfiguration_To_v1alpha1_NodeConfiguration(in *kub
out.DiscoveryFile = in.DiscoveryFile
out.DiscoveryToken = in.DiscoveryToken
out.DiscoveryTokenAPIServers = *(*[]string)(unsafe.Pointer(&in.DiscoveryTokenAPIServers))
out.DiscoveryTimeout = (*v1.Duration)(unsafe.Pointer(in.DiscoveryTimeout))
out.NodeName = in.NodeName
out.TLSBootstrapToken = in.TLSBootstrapToken
out.Token = in.Token

View File

@ -309,6 +309,15 @@ func (in *NodeConfiguration) DeepCopyInto(out *NodeConfiguration) {
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.DiscoveryTimeout != nil {
in, out := &in.DiscoveryTimeout, &out.DiscoveryTimeout
if *in == nil {
*out = nil
} else {
*out = new(v1.Duration)
**out = **in
}
}
if in.DiscoveryTokenCACertHashes != nil {
in, out := &in.DiscoveryTokenCACertHashes, &out.DiscoveryTokenCACertHashes
*out = make([]string, len(*in))

View File

@ -309,6 +309,15 @@ func (in *NodeConfiguration) DeepCopyInto(out *NodeConfiguration) {
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.DiscoveryTimeout != nil {
in, out := &in.DiscoveryTimeout, &out.DiscoveryTimeout
if *in == nil {
*out = nil
} else {
*out = new(v1.Duration)
**out = **in
}
}
if in.DiscoveryTokenCACertHashes != nil {
in, out := &in.DiscoveryTokenCACertHashes, &out.DiscoveryTokenCACertHashes
*out = make([]string, len(*in))

View File

@ -59,7 +59,7 @@ func GetValidatedClusterInfoObject(cfg *kubeadmapi.NodeConfiguration) (*clientcm
}
return file.RetrieveValidatedClusterInfo(cfg.DiscoveryFile)
case len(cfg.DiscoveryToken) != 0:
return token.RetrieveValidatedClusterInfo(cfg.DiscoveryToken, cfg.DiscoveryTokenAPIServers, cfg.DiscoveryTokenCACertHashes)
return token.RetrieveValidatedClusterInfo(cfg)
default:
return nil, fmt.Errorf("couldn't find a valid discovery configuration")
}

View File

@ -11,6 +11,7 @@ go_library(
srcs = ["token.go"],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/discovery/token",
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//cmd/kubeadm/app/util/pubkeypin:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"encoding/pem"
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -29,6 +30,7 @@ import (
bootstrapapi "k8s.io/client-go/tools/bootstrap/token/api"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/util/pubkeypin"
@ -40,24 +42,24 @@ import (
const BootstrapUser = "token-bootstrap-client"
// RetrieveValidatedClusterInfo connects to the API Server and tries to fetch the cluster-info ConfigMap
// It then makes sure it can trust the API Server by looking at the JWS-signed tokens and (if rootCAPubKeys is not empty)
// It then makes sure it can trust the API Server by looking at the JWS-signed tokens and (if cfg.DiscoveryTokenCACertHashes is not empty)
// validating the cluster CA against a set of pinned public keys
func RetrieveValidatedClusterInfo(discoveryToken string, tokenAPIServers, rootCAPubKeys []string) (*clientcmdapi.Cluster, error) {
tokenID, tokenSecret, err := tokenutil.ParseToken(discoveryToken)
func RetrieveValidatedClusterInfo(cfg *kubeadmapi.NodeConfiguration) (*clientcmdapi.Cluster, error) {
tokenID, tokenSecret, err := tokenutil.ParseToken(cfg.DiscoveryToken)
if err != nil {
return nil, err
}
// Load the cfg.DiscoveryTokenCACertHashes into a pubkeypin.Set
pubKeyPins := pubkeypin.NewSet()
err = pubKeyPins.Allow(rootCAPubKeys...)
err = pubKeyPins.Allow(cfg.DiscoveryTokenCACertHashes...)
if err != nil {
return nil, err
}
// The function below runs for every endpoint, and all endpoints races with each other.
// The endpoint that wins the race and completes the task first gets its kubeconfig returned below
baseKubeConfig := runForEndpointsAndReturnFirst(tokenAPIServers, func(endpoint string) (*clientcmdapi.Config, error) {
baseKubeConfig, err := runForEndpointsAndReturnFirst(cfg.DiscoveryTokenAPIServers, cfg.DiscoveryTimeout.Duration, func(endpoint string) (*clientcmdapi.Config, error) {
insecureBootstrapConfig := buildInsecureBootstrapKubeConfig(endpoint)
clusterName := insecureBootstrapConfig.Contexts[insecureBootstrapConfig.CurrentContext].Cluster
@ -158,6 +160,9 @@ func RetrieveValidatedClusterInfo(discoveryToken string, tokenAPIServers, rootCA
fmt.Printf("[discovery] Cluster info signature and contents are valid and TLS certificate validates against pinned roots, will use API Server %q\n", endpoint)
return secureKubeconfig, nil
})
if err != nil {
return nil, err
}
return kubeconfigutil.GetClusterFromKubeConfig(baseKubeConfig), nil
}
@ -179,7 +184,7 @@ func buildSecureBootstrapKubeConfig(endpoint string, caCert []byte) *clientcmdap
}
// runForEndpointsAndReturnFirst loops the endpoints slice and let's the endpoints race for connecting to the master
func runForEndpointsAndReturnFirst(endpoints []string, fetchKubeConfigFunc func(string) (*clientcmdapi.Config, error)) *clientcmdapi.Config {
func runForEndpointsAndReturnFirst(endpoints []string, discoveryTimeout time.Duration, fetchKubeConfigFunc func(string) (*clientcmdapi.Config, error)) (*clientcmdapi.Config, error) {
stopChan := make(chan struct{})
var resultingKubeConfig *clientcmdapi.Config
var once sync.Once
@ -205,8 +210,17 @@ func runForEndpointsAndReturnFirst(endpoints []string, fetchKubeConfigFunc func(
}, constants.DiscoveryRetryInterval, stopChan)
}(endpoint)
}
wg.Wait()
return resultingKubeConfig
select {
case <-time.After(discoveryTimeout):
close(stopChan)
err := fmt.Errorf("abort connecting to API servers after timeout of %v", discoveryTimeout)
fmt.Printf("[discovery] %v\n", err)
wg.Wait()
return nil, err
case <-stopChan:
wg.Wait()
return resultingKubeConfig, nil
}
}
// parsePEMCert decodes a PEM-formatted certificate and returns it as an x509.Certificate

View File

@ -68,11 +68,14 @@ func TestRunForEndpointsAndReturnFirst(t *testing.T) {
},
}
for _, rt := range tests {
returnKubeConfig := runForEndpointsAndReturnFirst(rt.endpoints, func(endpoint string) (*clientcmdapi.Config, error) {
returnKubeConfig, err := runForEndpointsAndReturnFirst(rt.endpoints, 5*time.Minute, func(endpoint string) (*clientcmdapi.Config, error) {
timeout, _ := strconv.Atoi(endpoint)
time.Sleep(time.Second * time.Duration(timeout))
return kubeconfigutil.CreateBasic(endpoint, "foo", "foo", []byte{}), nil
})
if err != nil {
t.Errorf("unexpected error: %v for endpoint %s", err, rt.expectedEndpoint)
}
endpoint := returnKubeConfig.Clusters[returnKubeConfig.Contexts[returnKubeConfig.CurrentContext].Cluster].Server
if endpoint != rt.expectedEndpoint {
t.Errorf(