Merge pull request #69812 from rosti/single-api-endpoint

kubeadm: Remove multiple API server endpoints support upon join
This commit is contained in:
k8s-ci-robot 2018-10-30 12:22:38 -07:00 committed by GitHub
commit 0a405f4a86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 108 additions and 132 deletions

View File

@ -323,10 +323,8 @@ type BootstrapTokenDiscovery struct {
// fetched from the master.
Token string
// APIServerEndpoints is a set of IPs or domain names to API servers from which info
// will be fetched. Currently we only pay attention to one API server but
// hope to support >1 in the future.
APIServerEndpoints []string
// APIServerEndpoint is an IP or domain name to the API server from which info will be fetched.
APIServerEndpoint string
// CACertHashes specifies a set of public key pins to verify
// when token-based discovery is used. The root CA found during discovery

View File

@ -40,10 +40,12 @@ func Convert_v1alpha3_JoinConfiguration_To_kubeadm_JoinConfiguration(in *JoinCon
}
} else {
out.Discovery.BootstrapToken = &kubeadm.BootstrapTokenDiscovery{
APIServerEndpoints: in.DiscoveryTokenAPIServers,
CACertHashes: in.DiscoveryTokenCACertHashes,
UnsafeSkipCAVerification: in.DiscoveryTokenUnsafeSkipCAVerification,
}
if len(in.DiscoveryTokenAPIServers) != 0 {
out.Discovery.BootstrapToken.APIServerEndpoint = in.DiscoveryTokenAPIServers[0]
}
if len(in.DiscoveryToken) != 0 {
out.Discovery.BootstrapToken.Token = in.DiscoveryToken
} else {
@ -64,7 +66,7 @@ func Convert_kubeadm_JoinConfiguration_To_v1alpha3_JoinConfiguration(in *kubeadm
if in.Discovery.BootstrapToken != nil {
out.DiscoveryToken = in.Discovery.BootstrapToken.Token
out.DiscoveryTokenAPIServers = in.Discovery.BootstrapToken.APIServerEndpoints
out.DiscoveryTokenAPIServers = []string{in.Discovery.BootstrapToken.APIServerEndpoint}
out.DiscoveryTokenCACertHashes = in.Discovery.BootstrapToken.CACertHashes
out.DiscoveryTokenUnsafeSkipCAVerification = in.Discovery.BootstrapToken.UnsafeSkipCAVerification

View File

@ -301,10 +301,8 @@ type BootstrapTokenDiscovery struct {
// fetched from the master.
Token string `json:"token"`
// APIServerEndpoints is a set of IPs or domain names to API servers from which info
// will be fetched. Currently we only pay attention to one API server but
// hope to support >1 in the future.
APIServerEndpoints []string `json:"apiServerEndpoints,omitempty"`
// APIServerEndpoint is an IP or domain name to the API server from which info will be fetched.
APIServerEndpoint string `json:"apiServerEndpoint,omitempty"`
// CACertHashes specifies a set of public key pins to verify
// when token-based discovery is used. The root CA found during discovery

View File

@ -288,7 +288,7 @@ func Convert_kubeadm_BootstrapToken_To_v1beta1_BootstrapToken(in *kubeadm.Bootst
func autoConvert_v1beta1_BootstrapTokenDiscovery_To_kubeadm_BootstrapTokenDiscovery(in *BootstrapTokenDiscovery, out *kubeadm.BootstrapTokenDiscovery, s conversion.Scope) error {
out.Token = in.Token
out.APIServerEndpoints = *(*[]string)(unsafe.Pointer(&in.APIServerEndpoints))
out.APIServerEndpoint = in.APIServerEndpoint
out.CACertHashes = *(*[]string)(unsafe.Pointer(&in.CACertHashes))
out.UnsafeSkipCAVerification = in.UnsafeSkipCAVerification
return nil
@ -301,7 +301,7 @@ func Convert_v1beta1_BootstrapTokenDiscovery_To_kubeadm_BootstrapTokenDiscovery(
func autoConvert_kubeadm_BootstrapTokenDiscovery_To_v1beta1_BootstrapTokenDiscovery(in *kubeadm.BootstrapTokenDiscovery, out *BootstrapTokenDiscovery, s conversion.Scope) error {
out.Token = in.Token
out.APIServerEndpoints = *(*[]string)(unsafe.Pointer(&in.APIServerEndpoints))
out.APIServerEndpoint = in.APIServerEndpoint
out.CACertHashes = *(*[]string)(unsafe.Pointer(&in.CACertHashes))
out.UnsafeSkipCAVerification = in.UnsafeSkipCAVerification
return nil

View File

@ -106,11 +106,6 @@ func (in *BootstrapToken) DeepCopy() *BootstrapToken {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BootstrapTokenDiscovery) DeepCopyInto(out *BootstrapTokenDiscovery) {
*out = *in
if in.APIServerEndpoints != nil {
in, out := &in.APIServerEndpoints, &out.APIServerEndpoints
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.CACertHashes != nil {
in, out := &in.CACertHashes, &out.CACertHashes
*out = make([]string, len(*in))

View File

@ -123,13 +123,8 @@ func ValidateDiscovery(d *kubeadm.Discovery, fldPath *field.Path) field.ErrorLis
func ValidateDiscoveryBootstrapToken(b *kubeadm.BootstrapTokenDiscovery, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if len(b.APIServerEndpoints) < 1 {
allErrs = append(allErrs, field.Required(fldPath, "APIServerEndpoints not set"))
}
// TODO remove once we support multiple api servers
if len(b.APIServerEndpoints) > 1 {
fmt.Println("[validation] WARNING: kubeadm doesn't fully support multiple API Servers yet")
if len(b.APIServerEndpoint) == 0 {
allErrs = append(allErrs, field.Required(fldPath, "APIServerEndpoint is not set"))
}
if len(b.CACertHashes) == 0 && !b.UnsafeSkipCAVerification {
@ -137,7 +132,7 @@ func ValidateDiscoveryBootstrapToken(b *kubeadm.BootstrapTokenDiscovery, fldPath
}
allErrs = append(allErrs, ValidateToken(b.Token, fldPath.Child("token"))...)
allErrs = append(allErrs, ValidateDiscoveryTokenAPIServer(b.APIServerEndpoints, fldPath.Child("apiServerEndpoints"))...)
allErrs = append(allErrs, ValidateDiscoveryTokenAPIServer(b.APIServerEndpoint, fldPath.Child("apiServerEndpoints"))...)
return allErrs
}
@ -152,13 +147,11 @@ func ValidateDiscoveryFile(f *kubeadm.FileDiscovery, fldPath *field.Path) field.
}
// ValidateDiscoveryTokenAPIServer validates discovery token for API server
func ValidateDiscoveryTokenAPIServer(apiServers []string, fldPath *field.Path) field.ErrorList {
func ValidateDiscoveryTokenAPIServer(apiServer string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
for _, m := range apiServers {
_, _, err := net.SplitHostPort(m)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, m, err.Error()))
}
_, _, err := net.SplitHostPort(apiServer)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, apiServer, err.Error()))
}
return allErrs
}

View File

@ -692,7 +692,7 @@ func TestValidateDiscoveryBootstrapToken(t *testing.T) {
expected bool
}{
{
"invalid: .APIServerEndpoints not set",
"invalid: .APIServerEndpoint not set",
&kubeadm.BootstrapTokenDiscovery{
Token: "abcdef.1234567890123456",
},
@ -702,25 +702,16 @@ func TestValidateDiscoveryBootstrapToken(t *testing.T) {
"invalid: using token-based discovery without .BootstrapToken.CACertHashes and .BootstrapToken.UnsafeSkipCAVerification",
&kubeadm.BootstrapTokenDiscovery{
Token: "abcdef.1234567890123456",
APIServerEndpoints: []string{"192.168.122.100:6443"},
APIServerEndpoint: "192.168.122.100:6443",
UnsafeSkipCAVerification: false,
},
false,
},
{
"WARNING: kubeadm doesn't fully support multiple API Servers yet",
&kubeadm.BootstrapTokenDiscovery{
Token: "abcdef.1234567890123456",
APIServerEndpoints: []string{"192.168.122.100:6443", "192.168.122.88:6443"},
UnsafeSkipCAVerification: true,
},
true,
},
{
"valid: using token-based discovery with .BootstrapToken.CACertHashes",
&kubeadm.BootstrapTokenDiscovery{
Token: "abcdef.1234567890123456",
APIServerEndpoints: []string{"192.168.122.100:6443"},
APIServerEndpoint: "192.168.122.100:6443",
CACertHashes: []string{"sha256:7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc"},
UnsafeSkipCAVerification: false,
},
@ -730,7 +721,7 @@ func TestValidateDiscoveryBootstrapToken(t *testing.T) {
"valid: using token-based discovery with .BootstrapToken.CACertHashe but skip ca verification",
&kubeadm.BootstrapTokenDiscovery{
Token: "abcdef.1234567890123456",
APIServerEndpoints: []string{"192.168.122.100:6443"},
APIServerEndpoint: "192.168.122.100:6443",
CACertHashes: []string{"sha256:7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc"},
UnsafeSkipCAVerification: true,
},
@ -753,20 +744,20 @@ func TestValidateDiscoveryBootstrapToken(t *testing.T) {
func TestValidateDiscoveryTokenAPIServer(t *testing.T) {
var tests = []struct {
apiServerEndpoints []string
expected bool
apiServerEndpoint string
expected bool
}{
{
[]string{"192.168.122.100"},
"192.168.122.100",
false,
},
{
[]string{"192.168.122.100:6443"},
"192.168.122.100:6443",
true,
},
}
for _, rt := range tests {
actual := ValidateDiscoveryTokenAPIServer(rt.apiServerEndpoints, nil)
actual := ValidateDiscoveryTokenAPIServer(rt.apiServerEndpoint, nil)
if (len(actual) == 0) != rt.expected {
t.Errorf(
"failed ValidateDiscoveryTokenAPIServer:\n\texpected: %t\n\t actual: %t",

View File

@ -108,11 +108,6 @@ func (in *BootstrapToken) DeepCopy() *BootstrapToken {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BootstrapTokenDiscovery) DeepCopyInto(out *BootstrapTokenDiscovery) {
*out = *in
if in.APIServerEndpoints != nil {
in, out := &in.APIServerEndpoints, &out.APIServerEndpoints
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.CACertHashes != nil {
in, out := &in.CACertHashes, &out.CACertHashes
*out = make([]string, len(*in))

View File

@ -277,7 +277,7 @@ func getDefaultNodeConfigBytes() ([]byte, error) {
Discovery: kubeadmapiv1beta1.Discovery{
BootstrapToken: &kubeadmapiv1beta1.BootstrapTokenDiscovery{
Token: placeholderToken.Token.String(),
APIServerEndpoints: []string{"kube-apiserver:6443"},
APIServerEndpoint: "kube-apiserver:6443",
UnsafeSkipCAVerification: true, // TODO: UnsafeSkipCAVerification: true needs to be set for validation to pass, but shouldn't be recommended as the default
},
},

View File

@ -177,10 +177,15 @@ func NewCmdJoin(out io.Writer) *cobra.Command {
cfg.Discovery.File = fd
} else {
cfg.Discovery.BootstrapToken = btd
cfg.Discovery.BootstrapToken.APIServerEndpoints = args
if len(cfg.Discovery.BootstrapToken.Token) == 0 {
cfg.Discovery.BootstrapToken.Token = token
}
if len(args) > 0 {
if len(cfgPath) == 0 && len(args) > 1 {
glog.Warningf("[join] WARNING: More than one API server endpoint supplied on command line %v. Using the first one.", args)
}
cfg.Discovery.BootstrapToken.APIServerEndpoint = args[0]
}
}
if len(cfg.Discovery.TLSBootstrapToken) == 0 {

View File

@ -43,8 +43,5 @@ go_test(
name = "go_default_test",
srcs = ["token_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",
],
deps = ["//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library"],
)

View File

@ -60,7 +60,7 @@ func RetrieveValidatedConfigInfo(cfg *kubeadmapi.JoinConfiguration) (*clientcmda
// The function below runs for every endpoint, and all endpoints races with each other.
// The endpoint that wins the race and completes the task first gets its kubeconfig returned below
baseKubeConfig, err := runForEndpointsAndReturnFirst(cfg.Discovery.BootstrapToken.APIServerEndpoints, cfg.Discovery.Timeout.Duration, func(endpoint string) (*clientcmdapi.Config, error) {
baseKubeConfig, err := fetchKubeConfigWithTimeout(cfg.Discovery.BootstrapToken.APIServerEndpoint, cfg.Discovery.Timeout.Duration, func(endpoint string) (*clientcmdapi.Config, error) {
insecureBootstrapConfig := buildInsecureBootstrapKubeConfig(endpoint, cfg.ClusterName)
clusterName := insecureBootstrapConfig.Contexts[insecureBootstrapConfig.CurrentContext].Cluster
@ -184,36 +184,36 @@ func buildSecureBootstrapKubeConfig(endpoint string, caCert []byte, clustername
return bootstrapConfig
}
// runForEndpointsAndReturnFirst loops the endpoints slice and let's the endpoints race for connecting to the master
func runForEndpointsAndReturnFirst(endpoints []string, discoveryTimeout time.Duration, fetchKubeConfigFunc func(string) (*clientcmdapi.Config, error)) (*clientcmdapi.Config, error) {
// fetchKubeConfigWithTimeout tries to run fetchKubeConfigFunc on every DiscoveryRetryInterval, but until discoveryTimeout is reached
func fetchKubeConfigWithTimeout(apiEndpoint string, discoveryTimeout time.Duration, fetchKubeConfigFunc func(string) (*clientcmdapi.Config, error)) (*clientcmdapi.Config, error) {
stopChan := make(chan struct{})
var resultingKubeConfig *clientcmdapi.Config
var once sync.Once
var wg sync.WaitGroup
for _, endpoint := range endpoints {
wg.Add(1)
go func(apiEndpoint string) {
defer wg.Done()
wait.Until(func() {
fmt.Printf("[discovery] Trying to connect to API Server %q\n", apiEndpoint)
cfg, err := fetchKubeConfigFunc(apiEndpoint)
if err != nil {
fmt.Printf("[discovery] Failed to connect to API Server %q: %v\n", apiEndpoint, err)
return
}
fmt.Printf("[discovery] Successfully established connection with API Server %q\n", apiEndpoint)
// connection established, stop all wait threads
once.Do(func() {
close(stopChan)
resultingKubeConfig = cfg
})
}, constants.DiscoveryRetryInterval, stopChan)
}(endpoint)
}
wg.Add(1)
go func() {
defer wg.Done()
wait.Until(func() {
fmt.Printf("[discovery] Trying to connect to API Server %q\n", apiEndpoint)
cfg, err := fetchKubeConfigFunc(apiEndpoint)
if err != nil {
fmt.Printf("[discovery] Failed to connect to API Server %q: %v\n", apiEndpoint, err)
return
}
fmt.Printf("[discovery] Successfully established connection with API Server %q\n", apiEndpoint)
once.Do(func() {
resultingKubeConfig = cfg
close(stopChan)
})
}, constants.DiscoveryRetryInterval, stopChan)
}()
select {
case <-time.After(discoveryTimeout):
close(stopChan)
once.Do(func() {
close(stopChan)
})
err := errors.Errorf("abort connecting to API servers after timeout of %v", discoveryTimeout)
fmt.Printf("[discovery] %v\n", err)
wg.Wait()

View File

@ -17,12 +17,11 @@ limitations under the License.
package token
import (
"strconv"
"fmt"
"testing"
"time"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
)
// testCertPEM is a simple self-signed test certificate issued with the openssl CLI:
@ -49,41 +48,49 @@ c1vuFqTnJBPcb7W//R/GI2Paicm1cmns9NLnPR35exHxFTy+D1yxmGokpoPMdife
aH+sfuxT8xeTPb3kjzF9eJTlnEquUDLM
-----END CERTIFICATE-----`
func TestRunForEndpointsAndReturnFirst(t *testing.T) {
func TestFetchKubeConfigWithTimeout(t *testing.T) {
const testAPIEndpoint = "sample-endpoint:1234"
tests := []struct {
endpoints []string
expectedEndpoint string
name string
discoveryTimeout time.Duration
shouldFail bool
}{
{
endpoints: []string{"1", "2", "3"},
expectedEndpoint: "1",
name: "Timeout if value is not returned on time",
discoveryTimeout: 1 * time.Second,
shouldFail: true,
},
{
endpoints: []string{"6", "5"},
expectedEndpoint: "5",
},
{
endpoints: []string{"10", "4"},
expectedEndpoint: "4",
name: "Don't timeout if value is returned on time",
discoveryTimeout: 5 * time.Second,
shouldFail: false,
},
}
for _, rt := range tests {
returnKubeConfig, err := runForEndpointsAndReturnFirst(rt.endpoints, 5*time.Minute, func(endpoint string) (*clientcmdapi.Config, error) {
timeout, _ := strconv.Atoi(endpoint)
time.Sleep(time.Second * time.Duration(timeout))
return kubeconfigutil.CreateBasic(endpoint, "foo", "foo", []byte{}), nil
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cfg, err := fetchKubeConfigWithTimeout(testAPIEndpoint, test.discoveryTimeout, func(apiEndpoint string) (*clientcmdapi.Config, error) {
if apiEndpoint != testAPIEndpoint {
return nil, fmt.Errorf("unexpected API server endpoint:\n\texpected: %q\n\tgot: %q", testAPIEndpoint, apiEndpoint)
}
time.Sleep(3 * time.Second)
return &clientcmdapi.Config{}, nil
})
if test.shouldFail {
if err == nil {
t.Fatal("unexpected success")
}
} else {
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}
if cfg == nil {
t.Fatal("cfg is nil")
}
}
})
if err != nil {
t.Errorf("unexpected error: %v for endpoint %s", err, rt.expectedEndpoint)
}
endpoint := returnKubeConfig.Clusters[returnKubeConfig.Contexts[returnKubeConfig.CurrentContext].Cluster].Server
if endpoint != rt.expectedEndpoint {
t.Errorf(
"failed TestRunForEndpointsAndReturnFirst:\n\texpected: %s\n\t actual: %s",
endpoint,
rt.expectedEndpoint,
)
}
}
}

View File

@ -950,17 +950,15 @@ func RunJoinNodeChecks(execer utilsexec.Interface, cfg *kubeadmapi.JoinConfigura
addIPv6Checks := false
if cfg.Discovery.BootstrapToken != nil {
for _, server := range cfg.Discovery.BootstrapToken.APIServerEndpoints {
ipstr, _, err := net.SplitHostPort(server)
if err == nil {
checks = append(checks,
HTTPProxyCheck{Proto: "https", Host: ipstr},
)
if !addIPv6Checks {
if ip := net.ParseIP(ipstr); ip != nil {
if ip.To4() == nil && ip.To16() != nil {
addIPv6Checks = true
}
ipstr, _, err := net.SplitHostPort(cfg.Discovery.BootstrapToken.APIServerEndpoint)
if err == nil {
checks = append(checks,
HTTPProxyCheck{Proto: "https", Host: ipstr},
)
if !addIPv6Checks {
if ip := net.ParseIP(ipstr); ip != nil {
if ip.To4() == nil && ip.To16() != nil {
addIPv6Checks = true
}
}
}

View File

@ -256,7 +256,7 @@ func TestRunJoinNodeChecks(t *testing.T) {
cfg: &kubeadmapi.JoinConfiguration{
Discovery: kubeadmapi.Discovery{
BootstrapToken: &kubeadmapi.BootstrapTokenDiscovery{
APIServerEndpoints: []string{"192.168.1.15"},
APIServerEndpoint: "192.168.1.15",
},
},
},
@ -266,7 +266,7 @@ func TestRunJoinNodeChecks(t *testing.T) {
cfg: &kubeadmapi.JoinConfiguration{
Discovery: kubeadmapi.Discovery{
BootstrapToken: &kubeadmapi.BootstrapTokenDiscovery{
APIServerEndpoints: []string{"2001:1234::1:15"},
APIServerEndpoint: "2001:1234::1:15",
},
},
},

View File

@ -6,8 +6,7 @@ ClusterName: kubernetes
ControlPlane: false
Discovery:
BootstrapToken:
APIServerEndpoints:
- kube-apiserver:6443
APIServerEndpoint: kube-apiserver:6443
CACertHashes: null
Token: abcdef.0123456789abcdef
UnsafeSkipCAVerification: true

View File

@ -6,8 +6,7 @@ caCertPath: /etc/kubernetes/pki/ca.crt
clusterName: kubernetes
discovery:
bootstrapToken:
apiServerEndpoints:
- kube-apiserver:6443
apiServerEndpoint: kube-apiserver:6443
token: abcdef.0123456789abcdef
unsafeSkipCAVerification: true
timeout: 5m0s

View File

@ -6,8 +6,7 @@ caCertPath: /etc/kubernetes/pki/ca.crt
clusterName: kubernetes
discovery:
bootstrapToken:
apiServerEndpoints:
- kube-apiserver:6443
apiServerEndpoint: kube-apiserver:6443
token: abcdef.0123456789abcdef
unsafeSkipCAVerification: true
timeout: 5m0s