kubeadm join: wait for API endpoints

* Introduce a concurrent retry mechanism for bootstrapping
   with a single API endpoint
This commit is contained in:
Atanas Mirchev 2016-09-26 15:08:59 +02:00 committed by Atanas Mirchev
parent ca75b47657
commit 072259f80f
2 changed files with 68 additions and 47 deletions

View File

@ -19,13 +19,17 @@ package node
import ( import (
"fmt" "fmt"
"os" "os"
"sync"
"time"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/apis/certificates"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
certclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/certificates/unversioned" certclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/certificates/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
) )
// ConnectionDetails represents a master API endpoint connection // ConnectionDetails represents a master API endpoint connection
@ -36,7 +40,14 @@ type ConnectionDetails struct {
NodeName types.NodeName NodeName types.NodeName
} }
// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints or errors. // retryTimeout between the subsequent attempts to connect
// to an API endpoint
const retryTimeout = 5
// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints.
// The function builds a client for every endpoint and concurrently keeps trying to connect to any one
// of the provided endpoints. Blocks until at least one connection is established, then it stops the
// connection attempts for other endpoints.
func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kubeadmapi.ClusterInfo) (*ConnectionDetails, error) { func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kubeadmapi.ClusterInfo) (*ConnectionDetails, error) {
hostName, err := os.Hostname() hostName, err := os.Hostname()
if err != nil { if err != nil {
@ -48,42 +59,53 @@ func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kub
endpoints := clusterInfo.Endpoints endpoints := clusterInfo.Endpoints
caCert := []byte(clusterInfo.CertificateAuthorities[0]) caCert := []byte(clusterInfo.CertificateAuthorities[0])
var establishedConnection *ConnectionDetails stopChan := make(chan struct{})
// TODO: add a wait mechanism for the API endpoints (retrying to connect to at least one) result := make(chan *ConnectionDetails)
var wg sync.WaitGroup
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
clientSet, err := createClients(caCert, endpoint, s.Secrets.BearerToken, nodeName) clientSet, err := createClients(caCert, endpoint, s.Secrets.BearerToken, nodeName)
if err != nil { if err != nil {
fmt.Printf("<node/bootstrap> warning: %s. Skipping endpoint %s\n", err, endpoint) fmt.Printf("<node/bootstrap> warning: %s. Skipping endpoint %s\n", err, endpoint)
continue continue
} }
fmt.Printf("<node/bootstrap> trying to connect to endpoint %s\n", endpoint) wg.Add(1)
go func(apiEndpoint string) {
// TODO: add a simple GET /version request to fail early if needed before attempting defer wg.Done()
// to connect with a discovery client. wait.Until(func() {
if err := checkCertsAPI(clientSet.DiscoveryClient); err != nil { fmt.Printf("<node/bootstrap> trying to connect to endpoint %s\n", apiEndpoint)
fmt.Printf("<node/bootstrap> warning: failed to connect to %s: %v\n", endpoint, err) err := checkAPIEndpoint(clientSet, apiEndpoint)
continue if err != nil {
} fmt.Printf("<node/bootstrap> endpoint check failed [%v]\n", err)
return
fmt.Printf("<node/bootstrap> successfully established connection with endpoint %s\n", endpoint) }
// connection established fmt.Printf("<node/bootstrap> successfully established connection with endpoint %s\n", apiEndpoint)
establishedConnection = &ConnectionDetails{ // connection established, stop all wait threads
CertClient: clientSet.CertificatesClient, close(stopChan)
Endpoint: endpoint, result <- &ConnectionDetails{
CACert: caCert, CertClient: clientSet.CertificatesClient,
NodeName: nodeName, Endpoint: apiEndpoint,
} CACert: caCert,
break NodeName: nodeName,
}
}, retryTimeout*time.Second, stopChan)
}(endpoint)
} }
if establishedConnection == nil { go func() {
wg.Wait()
// all wait.Until() calls have finished now
close(result)
}()
establishedConnection, ok := <-result
if !ok {
return nil, fmt.Errorf("<node/bootstrap> failed to create bootstrap clients " + return nil, fmt.Errorf("<node/bootstrap> failed to create bootstrap clients " +
"for any of the provided API endpoints") "for any of the provided API endpoints")
} }
return establishedConnection, nil return establishedConnection, nil
} }
// Creates a set of clients for this endpoint // creates a set of clients for this endpoint
func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*clientset.Clientset, error) { func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*clientset.Clientset, error) {
bareClientConfig := kubeadmutil.CreateBasicClientConfig("kubernetes", endpoint, caCert) bareClientConfig := kubeadmutil.CreateBasicClientConfig("kubernetes", endpoint, caCert)
bootstrapClientConfig, err := clientcmd.NewDefaultClientConfig( bootstrapClientConfig, err := clientcmd.NewDefaultClientConfig(
@ -101,3 +123,26 @@ func createClients(caCert []byte, endpoint, token string, nodeName types.NodeNam
} }
return clientSet, nil return clientSet, nil
} }
// checks the connection requirements for a specific API endpoint
func checkAPIEndpoint(clientSet *clientset.Clientset, endpoint string) error {
// check general connectivity
version, err := clientSet.DiscoveryClient.ServerVersion()
if err != nil {
return fmt.Errorf("failed to connect to %s [%v]", endpoint, err)
}
fmt.Printf("<node/bootstrap> detected server version %s\n", version.String())
// check certificates API
serverGroups, err := clientSet.DiscoveryClient.ServerGroups()
if err != nil {
return fmt.Errorf("certificate API check failed: failed to retrieve a list of supported API objects [%v]", err)
}
for _, group := range serverGroups.Groups {
if group.Name == certificates.GroupName {
return nil
}
}
return fmt.Errorf("certificate API check failed: API version %s does not support certificates API, use v1.4.0 or newer",
version.String())
}

View File

@ -20,8 +20,6 @@ import (
"fmt" "fmt"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/client/typed/discovery"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/kubelet/util/csr" "k8s.io/kubernetes/pkg/kubelet/util/csr"
certutil "k8s.io/kubernetes/pkg/util/cert" certutil "k8s.io/kubernetes/pkg/util/cert"
@ -57,25 +55,3 @@ func PerformTLSBootstrap(connection *ConnectionDetails) (*clientcmdapi.Config, e
return finalConfig, nil return finalConfig, nil
} }
// Checks if the certificates API for this endpoint is functional
func checkCertsAPI(discoveryClient *discovery.DiscoveryClient) error {
serverGroups, err := discoveryClient.ServerGroups()
if err != nil {
return fmt.Errorf("failed to retrieve a list of supported API objects [%v]", err)
}
for _, group := range serverGroups.Groups {
if group.Name == certificates.GroupName {
return nil
}
}
version, err := discoveryClient.ServerVersion()
if err != nil {
return fmt.Errorf("unable to obtain API version [%v]", err)
}
return fmt.Errorf("API version %s does not support certificates API, use v1.4.0 or newer", version.String())
}