diff --git a/cmd/kubeadm/app/node/bootstrap.go b/cmd/kubeadm/app/node/bootstrap.go index c451a3a40c2..a18eb0c9626 100644 --- a/cmd/kubeadm/app/node/bootstrap.go +++ b/cmd/kubeadm/app/node/bootstrap.go @@ -19,13 +19,17 @@ package node import ( "fmt" "os" + "sync" + "time" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" + "k8s.io/kubernetes/pkg/apis/certificates" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" certclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/certificates/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/wait" ) // ConnectionDetails represents a master API endpoint connection @@ -36,7 +40,14 @@ type ConnectionDetails struct { NodeName types.NodeName } -// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints or errors. +// retryTimeout between the subsequent attempts to connect +// to an API endpoint +const retryTimeout = 5 + +// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints. +// The function builds a client for every endpoint and concurrently keeps trying to connect to any one +// of the provided endpoints. Blocks until at least one connection is established, then it stops the +// connection attempts for other endpoints. func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kubeadmapi.ClusterInfo) (*ConnectionDetails, error) { hostName, err := os.Hostname() if err != nil { @@ -48,42 +59,53 @@ func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kub endpoints := clusterInfo.Endpoints caCert := []byte(clusterInfo.CertificateAuthorities[0]) - var establishedConnection *ConnectionDetails - // TODO: add a wait mechanism for the API endpoints (retrying to connect to at least one) + stopChan := make(chan struct{}) + result := make(chan *ConnectionDetails) + var wg sync.WaitGroup for _, endpoint := range endpoints { clientSet, err := createClients(caCert, endpoint, s.Secrets.BearerToken, nodeName) if err != nil { fmt.Printf(" warning: %s. Skipping endpoint %s\n", err, endpoint) continue } - fmt.Printf(" trying to connect to endpoint %s\n", endpoint) - - // TODO: add a simple GET /version request to fail early if needed before attempting - // to connect with a discovery client. - if err := checkCertsAPI(clientSet.DiscoveryClient); err != nil { - fmt.Printf(" warning: failed to connect to %s: %v\n", endpoint, err) - continue - } - - fmt.Printf(" successfully established connection with endpoint %s\n", endpoint) - // connection established - establishedConnection = &ConnectionDetails{ - CertClient: clientSet.CertificatesClient, - Endpoint: endpoint, - CACert: caCert, - NodeName: nodeName, - } - break + wg.Add(1) + go func(apiEndpoint string) { + defer wg.Done() + wait.Until(func() { + fmt.Printf(" trying to connect to endpoint %s\n", apiEndpoint) + err := checkAPIEndpoint(clientSet, apiEndpoint) + if err != nil { + fmt.Printf(" endpoint check failed [%v]\n", err) + return + } + fmt.Printf(" successfully established connection with endpoint %s\n", apiEndpoint) + // connection established, stop all wait threads + close(stopChan) + result <- &ConnectionDetails{ + CertClient: clientSet.CertificatesClient, + Endpoint: apiEndpoint, + CACert: caCert, + NodeName: nodeName, + } + }, retryTimeout*time.Second, stopChan) + }(endpoint) } - if establishedConnection == nil { + go func() { + wg.Wait() + // all wait.Until() calls have finished now + close(result) + }() + + establishedConnection, ok := <-result + if !ok { return nil, fmt.Errorf(" failed to create bootstrap clients " + "for any of the provided API endpoints") } return establishedConnection, nil } -// Creates a set of clients for this endpoint +// creates a set of clients for this endpoint func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*clientset.Clientset, error) { bareClientConfig := kubeadmutil.CreateBasicClientConfig("kubernetes", endpoint, caCert) bootstrapClientConfig, err := clientcmd.NewDefaultClientConfig( @@ -101,3 +123,26 @@ func createClients(caCert []byte, endpoint, token string, nodeName types.NodeNam } return clientSet, nil } + +// checks the connection requirements for a specific API endpoint +func checkAPIEndpoint(clientSet *clientset.Clientset, endpoint string) error { + // check general connectivity + version, err := clientSet.DiscoveryClient.ServerVersion() + if err != nil { + return fmt.Errorf("failed to connect to %s [%v]", endpoint, err) + } + fmt.Printf(" detected server version %s\n", version.String()) + + // check certificates API + serverGroups, err := clientSet.DiscoveryClient.ServerGroups() + if err != nil { + return fmt.Errorf("certificate API check failed: failed to retrieve a list of supported API objects [%v]", err) + } + for _, group := range serverGroups.Groups { + if group.Name == certificates.GroupName { + return nil + } + } + return fmt.Errorf("certificate API check failed: API version %s does not support certificates API, use v1.4.0 or newer", + version.String()) +} diff --git a/cmd/kubeadm/app/node/csr.go b/cmd/kubeadm/app/node/csr.go index ed73e7c26dc..11748bf29a7 100644 --- a/cmd/kubeadm/app/node/csr.go +++ b/cmd/kubeadm/app/node/csr.go @@ -20,8 +20,6 @@ import ( "fmt" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" - "k8s.io/kubernetes/pkg/apis/certificates" - "k8s.io/kubernetes/pkg/client/typed/discovery" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/kubelet/util/csr" certutil "k8s.io/kubernetes/pkg/util/cert" @@ -57,25 +55,3 @@ func PerformTLSBootstrap(connection *ConnectionDetails) (*clientcmdapi.Config, e return finalConfig, nil } - -// Checks if the certificates API for this endpoint is functional -func checkCertsAPI(discoveryClient *discovery.DiscoveryClient) error { - serverGroups, err := discoveryClient.ServerGroups() - - if err != nil { - return fmt.Errorf("failed to retrieve a list of supported API objects [%v]", err) - } - - for _, group := range serverGroups.Groups { - if group.Name == certificates.GroupName { - return nil - } - } - - version, err := discoveryClient.ServerVersion() - if err != nil { - return fmt.Errorf("unable to obtain API version [%v]", err) - } - - return fmt.Errorf("API version %s does not support certificates API, use v1.4.0 or newer", version.String()) -}