kubeadm: refactored token discovery.

This commit is contained in:
Paulo Pires 2017-01-11 01:09:34 +00:00
parent d2e0913e68
commit 8a195b9a1b
No known key found for this signature in database
GPG Key ID: F3F6ED5C522EAA71
10 changed files with 50 additions and 190 deletions

View File

@ -33,7 +33,6 @@ go_library(
"//cmd/kubeadm/app/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/unversioned/clientcmd/api:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",

View File

@ -33,9 +33,7 @@ import (
kubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/api"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/runtime"
)
@ -91,7 +89,6 @@ type Join struct {
}
func NewJoin(cfgPath string, args []string, cfg *kubeadmapi.NodeConfiguration, skipPreFlight bool) (*Join, error) {
fmt.Println("[kubeadm] WARNING: kubeadm is in alpha, please do not use it for production clusters.")
if cfgPath != "" {
@ -132,35 +129,13 @@ func (j *Join) Validate() error {
// Run executes worker node provisioning and tries to join an existing cluster.
func (j *Join) Run(out io.Writer) error {
var cfg *clientcmdapi.Config
// TODO: delete this first block when we move Token to the discovery interface
if j.cfg.Discovery.Token != nil {
clusterInfo, err := kubenode.RetrieveTrustedClusterInfo(j.cfg.Discovery.Token)
if err != nil {
return err
}
connectionDetails, err := kubenode.EstablishMasterConnection(j.cfg.Discovery.Token, clusterInfo)
if err != nil {
return err
}
err = kubenode.CheckForNodeNameDuplicates(connectionDetails)
if err != nil {
return err
}
cfg, err = kubenode.PerformTLSBootstrapDeprecated(connectionDetails)
if err != nil {
return err
}
} else {
cfg, err := discovery.For(j.cfg.Discovery)
if err != nil {
return err
}
if err := kubenode.PerformTLSBootstrap(cfg); err != nil {
return err
}
cfg, err := discovery.For(j.cfg.Discovery)
if err != nil {
return err
}
if err := kubenode.PerformTLSBootstrap(cfg); err != nil {
return err
}
if err := kubeconfigphase.WriteKubeconfigToDisk(path.Join(kubeadmapi.GlobalEnvParams.KubernetesDir, kubeconfigphase.KubeletKubeConfigFileName), cfg); err != nil {
return err
}

View File

@ -20,6 +20,7 @@ go_library(
"//cmd/kubeadm/app/discovery/file:go_default_library",
"//cmd/kubeadm/app/discovery/https:go_default_library",
"//cmd/kubeadm/app/discovery/token:go_default_library",
"//cmd/kubeadm/app/node:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/client/unversioned/clientcmd/api:go_default_library",
"//vendor:github.com/spf13/pflag",

View File

@ -22,7 +22,7 @@ import (
"net/http"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubenode "k8s.io/kubernetes/cmd/kubeadm/app/node"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
)
@ -35,10 +35,9 @@ func For(d kubeadmapi.Discovery) (*clientcmdapi.Config, error) {
case d.HTTPS != nil:
return runHTTPSDiscovery(d.HTTPS)
case d.Token != nil:
// TODO move token discovery here
return runTokenDiscovery(d.Token)
default:
return nil, fmt.Errorf("Couldn't find a valid discovery configuration. Please provide one.")
return nil, fmt.Errorf("couldn't find a valid discovery configuration.")
}
}
@ -63,8 +62,15 @@ func runHTTPSDiscovery(hd *kubeadmapi.HTTPSDiscovery) (*clientcmdapi.Config, err
return clientcmd.Load(kubeconfig)
}
// TODO implement
// runTokenDiscovery executes token-based discovery.
func runTokenDiscovery(td *kubeadmapi.TokenDiscovery) (*clientcmdapi.Config, error) {
return nil, fmt.Errorf("Couldn't find a valid discovery configuration. Please provide one.")
clusterInfo, err := kubenode.RetrieveTrustedClusterInfo(td)
if err != nil {
return nil, err
}
cfg, err := kubenode.EstablishMasterConnection(td, clusterInfo)
if err != nil {
return nil, err
}
return cfg, nil
}

View File

@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/util/wait"

View File

@ -23,7 +23,6 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/apis/certificates:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/certificates/v1alpha1:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/client/unversioned/clientcmd/api:go_default_library",
"//pkg/kubelet/util/csr:go_default_library",
@ -38,7 +37,6 @@ go_test(
name = "go_default_test",
srcs = [
"bootstrap_test.go",
"csr_test.go",
"discovery_test.go",
],
library = ":go_default_library",
@ -47,7 +45,6 @@ go_test(
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/certificates/v1alpha1:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/typed/discovery:go_default_library",
"//pkg/version:go_default_library",

View File

@ -27,31 +27,27 @@ import (
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/certificates"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
certclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/certificates/v1alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
)
// ConnectionDetails represents a master API endpoint connection
type ConnectionDetails struct {
ClientSet *clientset.Clientset
CertClient *certclient.CertificatesV1alpha1Client
Endpoint string
CACert []byte
NodeName types.NodeName
}
// retryTimeout between the subsequent attempts to connect
// to an API endpoint
const retryTimeout = 5
type apiClient struct {
clientSet *clientset.Clientset
clientConfig *clientcmdapi.Config
}
// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints.
// The function builds a client for every endpoint and concurrently keeps trying to connect to any one
// of the provided endpoints. Blocks until at least one connection is established, then it stops the
// connection attempts for other endpoints.
func EstablishMasterConnection(c *kubeadmapi.TokenDiscovery, clusterInfo *kubeadmapi.ClusterInfo) (*ConnectionDetails, error) {
// connection attempts for other endpoints and returns the valid client configuration, if any.
func EstablishMasterConnection(c *kubeadmapi.TokenDiscovery, clusterInfo *kubeadmapi.ClusterInfo) (*clientcmdapi.Config, error) {
hostName, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("failed to get node hostname [%v]", err)
@ -63,10 +59,11 @@ func EstablishMasterConnection(c *kubeadmapi.TokenDiscovery, clusterInfo *kubead
caCert := []byte(clusterInfo.CertificateAuthorities[0])
stopChan := make(chan struct{})
result := make(chan *ConnectionDetails)
var clientConfig *clientcmdapi.Config
var once sync.Once
var wg sync.WaitGroup
for _, endpoint := range endpoints {
clientSet, err := createClients(caCert, endpoint, kubeadmutil.BearerToken(c), nodeName)
ac, err := createClients(caCert, endpoint, kubeadmutil.BearerToken(c), nodeName)
if err != nil {
fmt.Printf("[bootstrap] Warning: %s. Skipping endpoint %s\n", err, endpoint)
continue
@ -76,7 +73,7 @@ func EstablishMasterConnection(c *kubeadmapi.TokenDiscovery, clusterInfo *kubead
defer wg.Done()
wait.Until(func() {
fmt.Printf("[bootstrap] Trying to connect to endpoint %s\n", apiEndpoint)
err := checkAPIEndpoint(clientSet, apiEndpoint)
err := checkAPIEndpoint(ac.clientSet, apiEndpoint)
if err != nil {
fmt.Printf("[bootstrap] Endpoint check failed [%v]\n", err)
return
@ -84,32 +81,23 @@ func EstablishMasterConnection(c *kubeadmapi.TokenDiscovery, clusterInfo *kubead
fmt.Printf("[bootstrap] Successfully established connection with endpoint %q\n", apiEndpoint)
// connection established, stop all wait threads
close(stopChan)
result <- &ConnectionDetails{
ClientSet: clientSet,
CertClient: clientSet.CertificatesV1alpha1Client,
Endpoint: apiEndpoint,
CACert: caCert,
NodeName: nodeName,
}
once.Do(func() {
clientConfig = ac.clientConfig
})
}, retryTimeout*time.Second, stopChan)
}(endpoint)
}
wg.Wait()
go func() {
wg.Wait()
// all wait.Until() calls have finished now
close(result)
}()
establishedConnection, ok := <-result
if !ok {
if clientConfig == nil {
return nil, fmt.Errorf("failed to create bootstrap clients for any of the provided API endpoints")
}
return establishedConnection, nil
return clientConfig, nil
}
// creates a set of clients for this endpoint
func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*clientset.Clientset, error) {
func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*apiClient, error) {
clientConfig := kubeconfigphase.MakeClientConfigWithToken(
endpoint,
"kubernetes",
@ -126,16 +114,21 @@ func createClients(caCert []byte, endpoint, token string, nodeName types.NodeNam
if err != nil {
return nil, fmt.Errorf("failed to create clients for the API endpoint %q: [%v]", endpoint, err)
}
return clientSet, nil
ac := &apiClient{
clientSet: clientSet,
clientConfig: clientConfig,
}
return ac, nil
}
// CheckForNodeNameDuplicates checks whether there are other nodes in the cluster with identical node names.
func CheckForNodeNameDuplicates(connection *ConnectionDetails) error {
// checkForNodeNameDuplicates checks whether there are other nodes in the cluster with identical node names.
func checkForNodeNameDuplicates(clientSet *clientset.Clientset) error {
hostName, err := os.Hostname()
if err != nil {
return fmt.Errorf("Failed to get node hostname [%v]", err)
}
nodeList, err := connection.ClientSet.Nodes().List(v1.ListOptions{})
nodeList, err := clientSet.Nodes().List(v1.ListOptions{})
if err != nil {
return fmt.Errorf("Failed to list the nodes in the cluster: [%v]\n", err)
}

View File

@ -24,7 +24,7 @@ import (
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/version"

View File

@ -20,7 +20,6 @@ import (
"fmt"
"os"
kubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
@ -29,37 +28,6 @@ import (
certutil "k8s.io/kubernetes/pkg/util/cert"
)
// TODO @mikedanese move this to PerformTLSBootstrap
func PerformTLSBootstrapDeprecated(connection *ConnectionDetails) (*clientcmdapi.Config, error) {
fmt.Println("[csr] Created API client to obtain unique certificate for this node, generating keys and certificate signing request")
key, err := certutil.MakeEllipticPrivateKeyPEM()
if err != nil {
return nil, fmt.Errorf("failed to generate private key [%v]", err)
}
cert, err := csr.RequestNodeCertificate(connection.CertClient.CertificateSigningRequests(), key, connection.NodeName)
if err != nil {
return nil, fmt.Errorf("failed to request signed certificate from the API server [%v]", err)
}
fmtCert, err := certutil.FormatBytesCert(cert)
if err != nil {
return nil, fmt.Errorf("failed to format certificate [%v]", err)
}
fmt.Printf("[csr] Received signed certificate from the API server:\n%s\n", fmtCert)
fmt.Println("[csr] Generating kubelet configuration")
newConfig := kubeconfigphase.MakeClientConfigWithCerts(
connection.Endpoint,
"kubernetes",
fmt.Sprintf("kubelet-%s", connection.NodeName),
connection.CACert,
key,
cert,
)
return newConfig, nil
}
// PerformTLSBootstrap executes a node certificate signing request.
func PerformTLSBootstrap(cfg *clientcmdapi.Config) error {
hostName, err := os.Hostname()

View File

@ -1,79 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
certclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/certificates/v1alpha1"
restclient "k8s.io/kubernetes/pkg/client/restclient"
)
func TestPerformTLSBootstrap(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
default:
output, err := json.Marshal(nil)
if err != nil {
t.Errorf("unexpected encoding error: %v", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(output)
}
}))
defer srv.Close()
tests := []struct {
h string
expect bool
}{
{
h: "",
expect: false,
},
{
h: "localhost",
expect: false,
},
{
h: srv.URL,
expect: false,
},
}
for _, rt := range tests {
cd := &ConnectionDetails{}
r := &restclient.Config{Host: rt.h}
tmpConfig, err := certclient.NewForConfig(r)
if err != nil {
t.Fatalf("encountered an error while trying to get New Cert Client: %v", err)
}
cd.CertClient = tmpConfig
_, actual := PerformTLSBootstrapDeprecated(cd)
if (actual == nil) != rt.expect {
t.Errorf(
"failed createClients:\n\texpected: %t\n\t actual: %t",
rt.expect,
(actual == nil),
)
}
}
}