diff --git a/cmd/kubeadm/app/cmd/join.go b/cmd/kubeadm/app/cmd/join.go index d9f46766d6d..3f179bfaf18 100644 --- a/cmd/kubeadm/app/cmd/join.go +++ b/cmd/kubeadm/app/cmd/join.go @@ -30,7 +30,6 @@ import ( "github.com/renstrom/dedent" "github.com/spf13/cobra" flag "github.com/spf13/pflag" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" @@ -44,6 +43,7 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/features" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" + etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd" kubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig" kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet" markmasterphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/markmaster" @@ -74,7 +74,6 @@ var ( Please ensure that: * The cluster has a stable controlPlaneEndpoint address. - * The cluster uses an external etcd. * The certificates that must be shared among control plane instances are provided. `))) @@ -86,6 +85,7 @@ var ( * The Kubelet was informed of the new secure connection details. * Master label and taint were applied to the new node. * The kubernetes control plane instances scaled up. + {{.etcdMessage}} To start administering your cluster from this node, you need to run the following as a regular user: @@ -383,8 +383,15 @@ func (j *Join) Run(out io.Writer) error { } // outputs the join control plane done template and exits + etcdMessage := "" + // in case of local etcd + if initConfiguration.Etcd.External == nil { + etcdMessage = "* A new etcd member was added to the local/stacked etcd cluster." + } + ctx := map[string]string{ "KubeConfigPath": kubeadmconstants.GetAdminKubeConfigPath(), + "etcdMessage": etcdMessage, } joinControPlaneDoneTemp.Execute(out, ctx) return nil @@ -421,11 +428,6 @@ func (j *Join) CheckIfReadyForAdditionalControlPlane(initConfiguration *kubeadma return errors.New("unable to add a new control plane instance a cluster that doesn't have a stable controlPlaneEndpoint address") } - // blocks if the cluster was created without an external etcd cluster - if initConfiguration.Etcd.External == nil { - return errors.New("unable to add a new control plane instance on a cluster that doesn't use an external etcd") - } - // blocks if control plane is self-hosted if features.Enabled(initConfiguration.FeatureGates, features.SelfHosting) { return errors.New("self-hosted clusters are deprecated and won't be supported by `kubeadm join --experimental-control-plane`") @@ -465,6 +467,23 @@ func (j *Join) PrepareForHostingControlPlane(initConfiguration *kubeadmapi.InitC return errors.Wrap(err, "error creating static pod manifest files for the control plane components") } + // in case of local etcd + if initConfiguration.Etcd.External == nil { + // Checks that the etcd cluster is healthy + // NB. this check cannot be implemented before because it requires the admin.conf and all the certificates + // for connecting to etcd already in place + kubeConfigFile := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.AdminKubeConfigFileName) + + client, err := kubeconfigutil.ClientSetFromFile(kubeConfigFile) + if err != nil { + return errors.Wrap(err, "couldn't create kubernetes client") + } + + if err := etcdphase.CheckLocalEtcdClusterStatus(client, initConfiguration); err != nil { + return err + } + } + return nil } @@ -559,6 +578,23 @@ func (j *Join) PostInstallControlPlane(initConfiguration *kubeadmapi.InitConfigu return errors.Wrap(err, "couldn't create kubernetes client") } + // in case of local etcd + if initConfiguration.Etcd.External == nil { + // Adds a new etcd instance; in order to do this the new etcd instance should be "announced" to + // the existing etcd members before being created. + // This operation must be executed after kubelet is already started in order to minimize the time + // between the new etcd member is announced and the start of the static pod running the new etcd member, because during + // this time frame etcd gets temporary not available (only when moving from 1 to 2 members in the etcd cluster). + // From https://coreos.com/etcd/docs/latest/v2/runtime-configuration.html + // "If you add a new member to a 1-node cluster, the cluster cannot make progress before the new member starts + // because it needs two members as majority to agree on the consensus. You will only see this behavior between the time + // etcdctl member add informs the cluster about the new member and the new member successfully establishing a connection to the existing one." + glog.V(1).Info("[join] adding etcd") + if err := etcdphase.CreateStackedEtcdStaticPodManifestFile(client, kubeadmconstants.GetStaticPodDirectory(), initConfiguration); err != nil { + return errors.Wrap(err, "error creating local etcd static pod manifest file") + } + } + glog.V(1).Info("[join] uploading currently used configuration to the cluster") if err := uploadconfigphase.UploadConfiguration(initConfiguration, client); err != nil { return errors.Wrap(err, "error uploading configuration: %v") diff --git a/cmd/kubeadm/app/cmd/upgrade/apply.go b/cmd/kubeadm/app/cmd/upgrade/apply.go index acffb90f441..7f38160ea13 100644 --- a/cmd/kubeadm/app/cmd/upgrade/apply.go +++ b/cmd/kubeadm/app/cmd/upgrade/apply.go @@ -23,7 +23,6 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/util/version" clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" @@ -303,7 +302,7 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter } // The arguments oldEtcdClient and newEtdClient, are uninitialized because passing in the clients allow for mocking the client during testing - return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, nil, nil) + return upgrade.StaticPodControlPlane(client, waiter, pathManager, internalcfg, etcdUpgrade, nil, nil) } // DryRunStaticPodUpgrade fakes an upgrade of the control plane diff --git a/cmd/kubeadm/app/cmd/upgrade/node.go b/cmd/kubeadm/app/cmd/upgrade/node.go index 346e83b70f0..dcd0a3575e0 100644 --- a/cmd/kubeadm/app/cmd/upgrade/node.go +++ b/cmd/kubeadm/app/cmd/upgrade/node.go @@ -24,7 +24,6 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/util/version" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" @@ -66,6 +65,7 @@ type controlplaneUpgradeFlags struct { kubeConfigPath string advertiseAddress string nodeName string + etcdUpgrade bool dryRun bool } @@ -113,6 +113,7 @@ func NewCmdUpgradeControlPlane() *cobra.Command { flags := &controlplaneUpgradeFlags{ kubeConfigPath: constants.GetKubeletKubeConfigPath(), advertiseAddress: "", + etcdUpgrade: true, dryRun: false, } @@ -149,6 +150,7 @@ func NewCmdUpgradeControlPlane() *cobra.Command { options.AddKubeConfigFlag(cmd.Flags(), &flags.kubeConfigPath) cmd.Flags().BoolVar(&flags.dryRun, "dry-run", flags.dryRun, "Do not change any state, just output the actions that would be performed.") + cmd.Flags().BoolVar(&flags.etcdUpgrade, "etcd-upgrade", flags.etcdUpgrade, "Perform the upgrade of etcd.") return cmd } @@ -236,13 +238,13 @@ func RunUpgradeControlPlane(flags *controlplaneUpgradeFlags) error { return fmt.Errorf("Unable to rotate API server certificate: %v", err) } - // Upgrade the control plane + // Upgrade the control plane and etcd if installed on this node fmt.Printf("[upgrade] Upgrading your Static Pod-hosted control plane instance to version %q...\n", cfg.KubernetesVersion) if flags.dryRun { return DryRunStaticPodUpgrade(cfg) } - if err := PerformStaticPodUpgrade(client, waiter, cfg, false); err != nil { + if err := PerformStaticPodUpgrade(client, waiter, cfg, flags.etcdUpgrade); err != nil { return fmt.Errorf("Couldn't complete the static pod upgrade: %v", err) } diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index a1478b4bfbf..ad03b9871fc 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -26,7 +26,6 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/util/version" kubeadmapiv1beta1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta1" "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation" @@ -111,11 +110,8 @@ func RunPlan(flags *planFlags) error { } etcdClient = client } else { - client, err := etcdutil.NewFromStaticPod( - []string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)}, - constants.GetStaticPodDirectory(), - upgradeVars.cfg.CertificatesDir, - ) + // Connects to local/stacked etcd existing in the cluster + client, err := etcdutil.NewFromCluster(upgradeVars.client, upgradeVars.cfg.CertificatesDir) if err != nil { return err } diff --git a/cmd/kubeadm/app/phases/certs/certs.go b/cmd/kubeadm/app/phases/certs/certs.go index 4f71ca271c4..5447a5f74b5 100644 --- a/cmd/kubeadm/app/phases/certs/certs.go +++ b/cmd/kubeadm/app/phases/certs/certs.go @@ -24,7 +24,6 @@ import ( "path/filepath" "github.com/golang/glog" - certutil "k8s.io/client-go/util/cert" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" @@ -285,7 +284,7 @@ type certKeyLocation struct { } // SharedCertificateExists verifies if the shared certificates - the certificates that must be -// equal across masters: ca.key, ca.crt, sa.key, sa.pub +// equal across masters: ca.key, ca.crt, sa.key, sa.pub + etcd/ca.key, etcd/ca.crt if local/stacked etcd func SharedCertificateExists(cfg *kubeadmapi.InitConfiguration) (bool, error) { if err := validateCACertAndKey(certKeyLocation{cfg.CertificatesDir, kubeadmconstants.CACertAndKeyBaseName, "", "CA"}); err != nil { @@ -300,6 +299,13 @@ func SharedCertificateExists(cfg *kubeadmapi.InitConfiguration) (bool, error) { return false, err } + // in case of local/stacked etcd + if cfg.Etcd.External == nil { + if err := validateCACertAndKey(certKeyLocation{cfg.CertificatesDir, kubeadmconstants.EtcdCACertAndKeyBaseName, "", "etcd CA"}); err != nil { + return false, err + } + } + return true, nil } diff --git a/cmd/kubeadm/app/phases/certs/certs_test.go b/cmd/kubeadm/app/phases/certs/certs_test.go index cd936b01a11..7998e93911f 100644 --- a/cmd/kubeadm/app/phases/certs/certs_test.go +++ b/cmd/kubeadm/app/phases/certs/certs_test.go @@ -308,6 +308,8 @@ func TestSharedCertificateExists(t *testing.T) { "front-proxy-ca.key": caKey, "sa.pub": publicKey, "sa.key": key, + "etcd/ca.crt": caCert, + "etcd/ca.key": caKey, }, }, { @@ -318,6 +320,8 @@ func TestSharedCertificateExists(t *testing.T) { "front-proxy-ca.key": caKey, "sa.pub": publicKey, "sa.key": key, + "etcd/ca.crt": caCert, + "etcd/ca.key": caKey, }, expectedError: true, }, @@ -329,17 +333,34 @@ func TestSharedCertificateExists(t *testing.T) { "front-proxy-ca.crt": caCert, "front-proxy-ca.key": caKey, "sa.pub": publicKey, + "etcd/ca.crt": caCert, + "etcd/ca.key": caKey, }, expectedError: true, }, { - name: "expected front-proxy.crt missing", + name: "missing front-proxy.crt", files: pkiFiles{ "ca.crt": caCert, "ca.key": caKey, "front-proxy-ca.key": caKey, "sa.pub": publicKey, "sa.key": key, + "etcd/ca.crt": caCert, + "etcd/ca.key": caKey, + }, + expectedError: true, + }, + { + name: "missing etcd/ca.crt", + files: pkiFiles{ + "ca.crt": caCert, + "ca.key": caKey, + "front-proxy-ca.key": caKey, + "sa.pub": publicKey, + "sa.key": key, + "etcd/ca.crt": caCert, + "etcd/ca.key": caKey, }, expectedError: true, }, @@ -348,6 +369,7 @@ func TestSharedCertificateExists(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { tmpdir := testutil.SetupTempDir(t) + os.MkdirAll(tmpdir+"/etcd", os.ModePerm) defer os.RemoveAll(tmpdir) cfg := &kubeadmapi.InitConfiguration{ diff --git a/cmd/kubeadm/app/phases/certs/pkiutil/BUILD b/cmd/kubeadm/app/phases/certs/pkiutil/BUILD index e8c69f55dfd..5da8da56262 100644 --- a/cmd/kubeadm/app/phases/certs/pkiutil/BUILD +++ b/cmd/kubeadm/app/phases/certs/pkiutil/BUILD @@ -27,6 +27,7 @@ go_library( "//pkg/registry/core/service/ipallocator:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers.go b/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers.go index 25aac116598..fbc6de5e04d 100644 --- a/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers.go +++ b/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers.go @@ -25,6 +25,7 @@ import ( "path/filepath" "time" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/validation" certutil "k8s.io/client-go/util/cert" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" @@ -304,14 +305,19 @@ func GetAPIServerAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames } // GetEtcdAltNames builds an AltNames object for generating the etcd server certificate. -// `localhost` is included in the SAN since this is the interface the etcd static pod listens on. -// Hostname and `API.AdvertiseAddress` are excluded since etcd does not listen on this interface by default. +// `advertise address` and localhost are included in the SAN since this is the interfaces the etcd static pod listens on. // The user can override the listen address with `Etcd.ExtraArgs` and add SANs with `Etcd.ServerCertSANs`. func GetEtcdAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, error) { + // advertise address + advertiseAddress := net.ParseIP(cfg.APIEndpoint.AdvertiseAddress) + if advertiseAddress == nil { + return nil, errors.Errorf("error parsing APIEndpoint AdvertiseAddress %q: is not a valid textual representation of an IP address", cfg.APIEndpoint.AdvertiseAddress) + } + // create AltNames with defaults DNSNames/IPs altNames := &certutil.AltNames{ DNSNames: []string{cfg.NodeRegistration.Name, "localhost"}, - IPs: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback}, + IPs: []net.IP{advertiseAddress, net.IPv4(127, 0, 0, 1), net.IPv6loopback}, } if cfg.Etcd.Local != nil { @@ -322,8 +328,7 @@ func GetEtcdAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, err } // GetEtcdPeerAltNames builds an AltNames object for generating the etcd peer certificate. -// `localhost` is excluded from the SAN since etcd will not refer to itself as a peer. -// Hostname and `API.AdvertiseAddress` are included if the user chooses to promote the single node etcd cluster into a multi-node one. +// Hostname and `API.AdvertiseAddress` are included if the user chooses to promote the single node etcd cluster into a multi-node one (stacked etcd). // The user can override the listen address with `Etcd.ExtraArgs` and add SANs with `Etcd.PeerCertSANs`. func GetEtcdPeerAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, error) { // advertise address diff --git a/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers_test.go b/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers_test.go index 3fc3eee091b..7af6ff1caca 100644 --- a/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers_test.go +++ b/cmd/kubeadm/app/phases/certs/pkiutil/pki_helpers_test.go @@ -513,6 +513,12 @@ func TestGetEtcdAltNames(t *testing.T) { proxy := "user-etcd-proxy" proxyIP := "10.10.10.100" cfg := &kubeadmapi.InitConfiguration{ + APIEndpoint: kubeadmapi.APIEndpoint{ + AdvertiseAddress: "1.2.3.4", + }, + NodeRegistration: kubeadmapi.NodeRegistrationOptions{ + Name: "myNode", + }, ClusterConfiguration: kubeadmapi.ClusterConfiguration{ Etcd: kubeadmapi.Etcd{ Local: &kubeadmapi.LocalEtcd{ @@ -532,7 +538,7 @@ func TestGetEtcdAltNames(t *testing.T) { t.Fatalf("failed calling GetEtcdAltNames: %v", err) } - expectedDNSNames := []string{"localhost", proxy} + expectedDNSNames := []string{"myNode", "localhost", proxy} for _, DNSName := range expectedDNSNames { found := false for _, val := range altNames.DNSNames { @@ -547,7 +553,7 @@ func TestGetEtcdAltNames(t *testing.T) { } } - expectedIPAddresses := []string{"127.0.0.1", net.IPv6loopback.String(), proxyIP} + expectedIPAddresses := []string{"1.2.3.4", "127.0.0.1", net.IPv6loopback.String(), proxyIP} for _, IPAddress := range expectedIPAddresses { found := false for _, val := range altNames.IPs { diff --git a/cmd/kubeadm/app/phases/etcd/BUILD b/cmd/kubeadm/app/phases/etcd/BUILD index 339022fcfc1..d3268aa6e1a 100644 --- a/cmd/kubeadm/app/phases/etcd/BUILD +++ b/cmd/kubeadm/app/phases/etcd/BUILD @@ -13,6 +13,7 @@ go_test( deps = [ "//cmd/kubeadm/app/apis/kubeadm:go_default_library", "//cmd/kubeadm/app/constants:go_default_library", + "//cmd/kubeadm/app/util/etcd:go_default_library", "//cmd/kubeadm/test:go_default_library", ], ) @@ -26,9 +27,12 @@ go_library( "//cmd/kubeadm/app/constants:go_default_library", "//cmd/kubeadm/app/images:go_default_library", "//cmd/kubeadm/app/util:go_default_library", + "//cmd/kubeadm/app/util/etcd:go_default_library", "//cmd/kubeadm/app/util/staticpod:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index 66333bd8423..ff82bf59248 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -19,14 +19,17 @@ package etcd import ( "fmt" "path/filepath" + "strings" "github.com/golang/glog" - + "github.com/pkg/errors" "k8s.io/api/core/v1" + clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/images" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" + etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd" staticpodutil "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod" ) @@ -36,13 +39,70 @@ const ( ) // CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file. +// This function is used by init - when the etcd cluster is empty - or by kubeadm +// upgrade - when the etcd cluster is already up and running (and the --initial-cluster flag have no impact) func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.InitConfiguration) error { if cfg.ClusterConfiguration.Etcd.External != nil { return fmt.Errorf("etcd static pod manifest cannot be generated for cluster using external etcd") } glog.V(1).Infoln("creating local etcd static pod manifest file") - // gets etcd StaticPodSpec, actualized for the current InitConfiguration - spec := GetEtcdPodSpec(cfg) + // gets etcd StaticPodSpec + emptyInitialCluster := []etcdutil.Member{} + spec := GetEtcdPodSpec(cfg, emptyInitialCluster) + // writes etcd StaticPod to disk + if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil { + return err + } + + fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + return nil +} + +// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member +func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error { + fmt.Println("[etcd] Checking Etcd cluster health") + + // creates an etcd client that connects to all the local/stacked etcd members + glog.V(1).Info("creating etcd client that connects to etcd pods") + etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir) + if err != nil { + return err + } + + // Checking health state + _, err = etcdClient.GetClusterStatus() + if err != nil { + return errors.Wrap(err, "etcd cluster is not healthy") + } + + return nil +} + +// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file +// for an additional etcd member that is joining an existing local/stacked etcd cluster. +// Other members of the etcd cluster will be notified of the joining node in beforehand as well. +func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir string, cfg *kubeadmapi.InitConfiguration) error { + // creates an etcd client that connects to all the local/stacked etcd members + glog.V(1).Info("creating etcd client that connects to etcd pods") + etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir) + if err != nil { + return err + } + + // notifies the other members of the etcd cluster about the joining member + etcdPeerAddress := fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort) + + glog.V(1).Infof("Adding etcd member: %s", etcdPeerAddress) + initialCluster, err := etcdClient.AddMember(cfg.NodeRegistration.Name, etcdPeerAddress) + if err != nil { + return err + } + fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster") + glog.V(1).Infof("Updated etcd member list: %v", initialCluster) + + glog.V(1).Info("Creating local etcd static pod manifest file") + // gets etcd StaticPodSpec, actualized for the current InitConfiguration and the new list of etcd members + spec := GetEtcdPodSpec(cfg, initialCluster) // writes etcd StaticPod to disk if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil { return err @@ -54,7 +114,7 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In // GetEtcdPodSpec returns the etcd static Pod actualized to the context of the current InitConfiguration // NB. GetEtcdPodSpec methods holds the information about how kubeadm creates etcd static pod manifests. -func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod { +func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil.Member) v1.Pod { pathType := v1.HostPathDirectoryOrCreate etcdMounts := map[string]v1.Volume{ etcdVolumeName: staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType), @@ -62,7 +122,7 @@ func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod { } return staticpodutil.ComponentPod(v1.Container{ Name: kubeadmconstants.Etcd, - Command: getEtcdCommand(cfg), + Command: getEtcdCommand(cfg, initialCluster), Image: images.GetEtcdImage(&cfg.ClusterConfiguration), ImagePullPolicy: v1.PullIfNotPresent, // Mount the etcd datadir path read-write so etcd can store data in a more persistent manner @@ -78,13 +138,13 @@ func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod { } // getEtcdCommand builds the right etcd command from the given config object -func getEtcdCommand(cfg *kubeadmapi.InitConfiguration) []string { +func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil.Member) []string { defaultArguments := map[string]string{ "name": cfg.GetNodeName(), - "listen-client-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - "advertise-client-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - "listen-peer-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - "initial-advertise-peer-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), + "listen-client-urls": fmt.Sprintf("https://127.0.0.1:%d,https://%s:%d", kubeadmconstants.EtcdListenClientPort, cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenClientPort), + "advertise-client-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenClientPort), + "listen-peer-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort), + "initial-advertise-peer-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort), "data-dir": cfg.Etcd.Local.DataDir, "cert-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerCertName), "key-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerKeyName), @@ -95,7 +155,19 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration) []string { "peer-trusted-ca-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName), "peer-client-cert-auth": "true", "snapshot-count": "10000", - "initial-cluster": fmt.Sprintf("%s=https://127.0.0.1:%d", cfg.GetNodeName(), kubeadmconstants.EtcdListenPeerPort), + } + + if len(initialCluster) == 0 { + defaultArguments["initial-cluster"] = fmt.Sprintf("%s=https://%s:%d", cfg.GetNodeName(), cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort) + } else { + // NB. the joining etcd instance should be part of the initialCluster list + endpoints := []string{} + for _, member := range initialCluster { + endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL)) + } + + defaultArguments["initial-cluster"] = strings.Join(endpoints, ",") + defaultArguments["initial-cluster-state"] = "existing" } command := []string{"etcd"} diff --git a/cmd/kubeadm/app/phases/etcd/local_test.go b/cmd/kubeadm/app/phases/etcd/local_test.go index 5ee55d25616..53c791a2706 100644 --- a/cmd/kubeadm/app/phases/etcd/local_test.go +++ b/cmd/kubeadm/app/phases/etcd/local_test.go @@ -26,12 +26,11 @@ import ( kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" - + etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd" testutil "k8s.io/kubernetes/cmd/kubeadm/test" ) func TestGetEtcdPodSpec(t *testing.T) { - // Creates a Master Configuration cfg := &kubeadmapi.InitConfiguration{ ClusterConfiguration: kubeadmapi.ClusterConfiguration{ @@ -46,7 +45,7 @@ func TestGetEtcdPodSpec(t *testing.T) { } // Executes GetEtcdPodSpec - spec := GetEtcdPodSpec(cfg) + spec := GetEtcdPodSpec(cfg, []etcdutil.Member{}) // Assert each specs refers to the right pod if spec.Spec.Containers[0].Name != kubeadmconstants.Etcd { @@ -117,11 +116,17 @@ func TestCreateLocalEtcdStaticPodManifestFile(t *testing.T) { func TestGetEtcdCommand(t *testing.T) { var tests = []struct { - cfg *kubeadmapi.InitConfiguration - expected []string + name string + cfg *kubeadmapi.InitConfiguration + initialCluster []etcdutil.Member + expected []string }{ { + name: "Default args - with empty etcd initial cluster", cfg: &kubeadmapi.InitConfiguration{ + APIEndpoint: kubeadmapi.APIEndpoint{ + AdvertiseAddress: "1.2.3.4", + }, NodeRegistration: kubeadmapi.NodeRegistrationOptions{ Name: "foo", }, @@ -136,10 +141,10 @@ func TestGetEtcdCommand(t *testing.T) { expected: []string{ "etcd", "--name=foo", - fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - fmt.Sprintf("--advertise-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d,https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort, kubeadmconstants.EtcdListenClientPort), + fmt.Sprintf("--advertise-client-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort), + fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), "--data-dir=/var/lib/etcd", "--cert-file=" + kubeadmconstants.EtcdServerCertName, "--key-file=" + kubeadmconstants.EtcdServerKeyName, @@ -150,11 +155,57 @@ func TestGetEtcdCommand(t *testing.T) { "--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName, "--snapshot-count=10000", "--peer-client-cert-auth=true", - fmt.Sprintf("--initial-cluster=foo=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--initial-cluster=foo=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), }, }, { + name: "Default args - With an existing etcd cluster", cfg: &kubeadmapi.InitConfiguration{ + APIEndpoint: kubeadmapi.APIEndpoint{ + AdvertiseAddress: "1.2.3.4", + }, + NodeRegistration: kubeadmapi.NodeRegistrationOptions{ + Name: "foo", + }, + ClusterConfiguration: kubeadmapi.ClusterConfiguration{ + Etcd: kubeadmapi.Etcd{ + Local: &kubeadmapi.LocalEtcd{ + DataDir: "/var/lib/etcd", + }, + }, + }, + }, + initialCluster: []etcdutil.Member{ + {Name: "foo", PeerURL: fmt.Sprintf("https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort)}, // NB. the joining etcd instance should be part of the initialCluster list + {Name: "bar", PeerURL: fmt.Sprintf("https://5.6.7.8:%d", kubeadmconstants.EtcdListenPeerPort)}, + }, + expected: []string{ + "etcd", + "--name=foo", + fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d,https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort, kubeadmconstants.EtcdListenClientPort), + fmt.Sprintf("--advertise-client-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort), + fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), + "--data-dir=/var/lib/etcd", + "--cert-file=" + kubeadmconstants.EtcdServerCertName, + "--key-file=" + kubeadmconstants.EtcdServerKeyName, + "--trusted-ca-file=" + kubeadmconstants.EtcdCACertName, + "--client-cert-auth=true", + "--peer-cert-file=" + kubeadmconstants.EtcdPeerCertName, + "--peer-key-file=" + kubeadmconstants.EtcdPeerKeyName, + "--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName, + "--snapshot-count=10000", + "--peer-client-cert-auth=true", + "--initial-cluster-state=existing", + fmt.Sprintf("--initial-cluster=foo=https://1.2.3.4:%d,bar=https://5.6.7.8:%d", kubeadmconstants.EtcdListenPeerPort, kubeadmconstants.EtcdListenPeerPort), + }, + }, + { + name: "Extra args", + cfg: &kubeadmapi.InitConfiguration{ + APIEndpoint: kubeadmapi.APIEndpoint{ + AdvertiseAddress: "1.2.3.4", + }, NodeRegistration: kubeadmapi.NodeRegistrationOptions{ Name: "bar", }, @@ -175,8 +226,8 @@ func TestGetEtcdCommand(t *testing.T) { "--name=bar", "--listen-client-urls=https://10.0.1.10:2379", "--advertise-client-urls=https://10.0.1.10:2379", - fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), "--data-dir=/var/lib/etcd", "--cert-file=" + kubeadmconstants.EtcdServerCertName, "--key-file=" + kubeadmconstants.EtcdServerKeyName, @@ -187,50 +238,19 @@ func TestGetEtcdCommand(t *testing.T) { "--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName, "--snapshot-count=10000", "--peer-client-cert-auth=true", - fmt.Sprintf("--initial-cluster=bar=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - }, - }, - { - cfg: &kubeadmapi.InitConfiguration{ - NodeRegistration: kubeadmapi.NodeRegistrationOptions{ - Name: "wombat", - }, - ClusterConfiguration: kubeadmapi.ClusterConfiguration{ - Etcd: kubeadmapi.Etcd{ - Local: &kubeadmapi.LocalEtcd{ - DataDir: "/etc/foo", - }, - }, - }, - }, - expected: []string{ - "etcd", - "--name=wombat", - fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - fmt.Sprintf("--advertise-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort), - fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), - "--data-dir=/etc/foo", - "--cert-file=" + kubeadmconstants.EtcdServerCertName, - "--key-file=" + kubeadmconstants.EtcdServerKeyName, - "--trusted-ca-file=" + kubeadmconstants.EtcdCACertName, - "--client-cert-auth=true", - "--peer-cert-file=" + kubeadmconstants.EtcdPeerCertName, - "--peer-key-file=" + kubeadmconstants.EtcdPeerKeyName, - "--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName, - "--snapshot-count=10000", - "--peer-client-cert-auth=true", - fmt.Sprintf("--initial-cluster=wombat=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort), + fmt.Sprintf("--initial-cluster=bar=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort), }, }, } for _, rt := range tests { - actual := getEtcdCommand(rt.cfg) - sort.Strings(actual) - sort.Strings(rt.expected) - if !reflect.DeepEqual(actual, rt.expected) { - t.Errorf("failed getEtcdCommand:\nexpected:\n%v\nsaw:\n%v", rt.expected, actual) - } + t.Run(rt.name, func(t *testing.T) { + actual := getEtcdCommand(rt.cfg, rt.initialCluster) + sort.Strings(actual) + sort.Strings(rt.expected) + if !reflect.DeepEqual(actual, rt.expected) { + t.Errorf("failed getEtcdCommand:\nexpected:\n%v\nsaw:\n%v", rt.expected, actual) + } + }) } } diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index 6c9a143df04..0a093591cc3 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -107,6 +107,12 @@ func (f fakeEtcdClient) GetClusterVersions() (map[string]string, error) { }, nil } +func (f fakeEtcdClient) Sync() error { return nil } + +func (f fakeEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + func TestGetAvailableUpgrades(t *testing.T) { etcdClient := fakeEtcdClient{} tests := []struct { diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index 28ef88bab7c..44319f6caa2 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -23,8 +23,8 @@ import ( "time" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/util/version" + clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/constants" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" @@ -252,7 +252,7 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP } // performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error. -func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) { +func performEtcdStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) { // Add etcd static pod spec only if external etcd is not configured if cfg.Etcd.External != nil { return false, errors.New("external etcd detected, won't try to change any etcd state") @@ -276,10 +276,18 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM if err != nil { return true, errors.Wrap(err, "failed to retrieve an etcd version for the target kubernetes version") } - currentEtcdVersionStr, err := oldEtcdClient.GetVersion() + + // gets the etcd version of the local/stacked etcd member running on the current machine + currentEtcdVersions, err := oldEtcdClient.GetClusterVersions() if err != nil { return true, errors.Wrap(err, "failed to retrieve the current etcd version") } + currentEtcdVersionStr, ok := currentEtcdVersions[fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, constants.EtcdListenClientPort)] + if !ok { + fmt.Println(currentEtcdVersions) + return true, errors.Wrap(err, "failed to retrieve the current etcd version") + } + currentEtcdVersion, err := version.ParseSemantic(currentEtcdVersionStr) if err != nil { return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersionStr, err) @@ -353,15 +361,11 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Initialize the new etcd client if it wasn't pre-initialized if newEtcdClient == nil { - client, err := etcdutil.NewFromStaticPod( - []string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)}, - constants.GetStaticPodDirectory(), - cfg.CertificatesDir, - ) + etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir) if err != nil { return true, errors.Wrap(err, "fatal error creating etcd client") } - newEtcdClient = client + newEtcdClient = etcdClient } // Checking health state of etcd after the upgrade @@ -399,7 +403,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM } // StaticPodControlPlane upgrades a static pod-hosted control plane -func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error { +func StaticPodControlPlane(client clientset.Interface, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error { recoverManifests := map[string]string{} var isTLSUpgrade bool var isExternalEtcd bool @@ -413,7 +417,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager if cfg.Etcd.External != nil { // External etcd isExternalEtcd = true - client, err := etcdutil.New( + etcdClient, err := etcdutil.New( cfg.Etcd.External.Endpoints, cfg.Etcd.External.CAFile, cfg.Etcd.External.CertFile, @@ -422,22 +426,18 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager if err != nil { return errors.Wrap(err, "failed to create etcd client for external etcd") } - oldEtcdClient = client + oldEtcdClient = etcdClient // Since etcd is managed externally, the new etcd client will be the same as the old client if newEtcdClient == nil { - newEtcdClient = client + newEtcdClient = etcdClient } } else { // etcd Static Pod - client, err := etcdutil.NewFromStaticPod( - []string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)}, - constants.GetStaticPodDirectory(), - cfg.CertificatesDir, - ) + etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir) if err != nil { return errors.Wrap(err, "failed to create etcd client") } - oldEtcdClient = client + oldEtcdClient = etcdClient } } @@ -452,7 +452,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager } // Perform etcd upgrade using common to all control plane components function - fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient) + fatal, err := performEtcdStaticPodUpgrade(client, waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient) if err != nil { if fatal { return err diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index 7a01ba6c8d1..f3a0a2e35e9 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -54,12 +54,13 @@ kind: InitConfiguration nodeRegistration: name: foo criSocket: "" +apiEndpoint: + advertiseAddress: 1.2.3.4 + bindPort: 6443 --- apiVersion: kubeadm.k8s.io/v1beta1 kind: ClusterConfiguration -api: - advertiseAddress: 1.2.3.4 - bindPort: 6443 + apiServerCertSANs: null apiServerExtraArgs: null certificatesDir: %s @@ -75,9 +76,6 @@ networking: dnsDomain: cluster.local podSubnet: "" serviceSubnet: 10.96.0.0/12 -nodeRegistration: - name: foo - criSocket: "" schedulerExtraArgs: null token: ce3aa5.5ec8455bb76b379f tokenTTL: 24h @@ -236,14 +234,14 @@ func (c fakeTLSEtcdClient) WaitForClusterAvailable(delay time.Duration, retries func (c fakeTLSEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) { return map[string]*clientv3.StatusResponse{ - "foo": { + "https://1.2.3.4:2379": { Version: "3.1.12", }}, nil } func (c fakeTLSEtcdClient) GetClusterVersions() (map[string]string, error) { return map[string]string{ - "foo": "3.1.12", + "https://1.2.3.4:2379": "3.1.12", }, nil } @@ -251,6 +249,12 @@ func (c fakeTLSEtcdClient) GetVersion() (string, error) { return "3.1.12", nil } +func (c fakeTLSEtcdClient) Sync() error { return nil } + +func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } func (c fakePodManifestEtcdClient) HasTLS() bool { @@ -277,13 +281,13 @@ func (c fakePodManifestEtcdClient) GetClusterStatus() (map[string]*clientv3.Stat } return map[string]*clientv3.StatusResponse{ - "foo": {Version: "3.1.12"}, + "https://1.2.3.4:2379": {Version: "3.1.12"}, }, nil } func (c fakePodManifestEtcdClient) GetClusterVersions() (map[string]string, error) { return map[string]string{ - "foo": "3.1.12", + "https://1.2.3.4:2379": "3.1.12", }, nil } @@ -291,6 +295,12 @@ func (c fakePodManifestEtcdClient) GetVersion() (string, error) { return "3.1.12", nil } +func (c fakePodManifestEtcdClient) Sync() error { return nil } + +func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + func TestStaticPodControlPlane(t *testing.T) { tests := []struct { description string @@ -477,6 +487,7 @@ func TestStaticPodControlPlane(t *testing.T) { } actualErr := StaticPodControlPlane( + nil, waiter, pathMgr, newcfg, diff --git a/cmd/kubeadm/app/phases/uploadconfig/BUILD b/cmd/kubeadm/app/phases/uploadconfig/BUILD index b1991dd4b00..3de81d5f1ff 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/BUILD +++ b/cmd/kubeadm/app/phases/uploadconfig/BUILD @@ -12,16 +12,13 @@ go_library( importpath = "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig", deps = [ "//cmd/kubeadm/app/apis/kubeadm:go_default_library", - "//cmd/kubeadm/app/apis/kubeadm/scheme:go_default_library", "//cmd/kubeadm/app/constants:go_default_library", "//cmd/kubeadm/app/util/apiclient:go_default_library", "//cmd/kubeadm/app/util/config:go_default_library", "//pkg/apis/rbac/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/rbac/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", ], ) @@ -48,13 +45,10 @@ go_test( "//cmd/kubeadm/app/apis/kubeadm/scheme:go_default_library", "//cmd/kubeadm/app/apis/kubeadm/v1beta1:go_default_library", "//cmd/kubeadm/app/constants:go_default_library", - "//cmd/kubeadm/app/util/apiclient:go_default_library", "//cmd/kubeadm/app/util/config:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", ], diff --git a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go index b7ac65ddaa2..963119b4481 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go +++ b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go @@ -21,12 +21,9 @@ import ( "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" - kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config" @@ -59,7 +56,7 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int // Prepare the ClusterStatus for upload // Gets the current cluster status // TODO: use configmap locks on this object on the get before the update. - clusterStatus, err := getClusterStatus(client) + clusterStatus, err := configutil.GetClusterStatus(client) if err != nil { return err } @@ -129,22 +126,3 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int }, }) } - -func getClusterStatus(client clientset.Interface) (*kubeadmapi.ClusterStatus, error) { - obj := &kubeadmapi.ClusterStatus{} - - // Read the ConfigMap from the cluster - configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(kubeadmconstants.KubeadmConfigConfigMap, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - return obj, nil - } - if err != nil { - return nil, err - } - - // Decode the file content using the componentconfig Codecs that knows about all APIs - if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(configMap.Data[kubeadmconstants.ClusterStatusConfigMapKey]), obj); err != nil { - return nil, err - } - return obj, nil -} diff --git a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go index d575e506541..7ef7191760a 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go +++ b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go @@ -20,18 +20,15 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - clientset "k8s.io/client-go/kubernetes" clientsetfake "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme" kubeadmapiv1beta1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta1" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" - "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config" ) @@ -158,62 +155,3 @@ func TestUploadConfiguration(t *testing.T) { }) } } - -func TestGetClusterStatus(t *testing.T) { - var tests = []struct { - name string - clusterStatus *kubeadmapi.ClusterStatus - expectedClusterEndpoints int - }{ - { - name: "return empty ClusterStatus if cluster kubeadm-config doesn't exist (e.g init)", - expectedClusterEndpoints: 0, - }, - { - name: "return ClusterStatus if cluster kubeadm-config exist (e.g upgrade)", - clusterStatus: &kubeadmapi.ClusterStatus{ - APIEndpoints: map[string]kubeadmapi.APIEndpoint{ - "dummy": {AdvertiseAddress: "1.2.3.4", BindPort: 1234}, - }, - }, - expectedClusterEndpoints: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - client := clientsetfake.NewSimpleClientset() - - if tt.clusterStatus != nil { - createConfigMapWithStatus(tt.clusterStatus, client) - } - - actual, err := getClusterStatus(client) - if err != nil { - t.Error("GetClusterStatus returned unexpected error") - return - } - if tt.expectedClusterEndpoints != len(actual.APIEndpoints) { - t.Error("actual ClusterStatus doesn't return expected endpoints") - } - }) - } -} - -// createConfigMapWithStatus create a ConfigMap with ClusterStatus for TestGetClusterStatus -func createConfigMapWithStatus(statusToCreate *kubeadmapi.ClusterStatus, client clientset.Interface) error { - statusYaml, err := configutil.MarshalKubeadmConfigObject(statusToCreate) - if err != nil { - return err - } - - return apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: kubeadmconstants.KubeadmConfigConfigMap, - Namespace: metav1.NamespaceSystem, - }, - Data: map[string]string{ - kubeadmconstants.ClusterStatusConfigMapKey: string(statusYaml), - }, - }) -} diff --git a/cmd/kubeadm/app/preflight/checks.go b/cmd/kubeadm/app/preflight/checks.go index 09ee6fdce68..3e506d9c333 100644 --- a/cmd/kubeadm/app/preflight/checks.go +++ b/cmd/kubeadm/app/preflight/checks.go @@ -900,6 +900,7 @@ func RunInitMasterChecks(execer utilsexec.Interface, cfg *kubeadmapi.InitConfigu // Only do etcd related checks when no external endpoints were specified checks = append(checks, PortOpenCheck{port: kubeadmconstants.EtcdListenClientPort}, + PortOpenCheck{port: kubeadmconstants.EtcdListenPeerPort}, DirAvailableCheck{Path: cfg.Etcd.Local.DataDir}, ) } diff --git a/cmd/kubeadm/app/util/config/BUILD b/cmd/kubeadm/app/util/config/BUILD index b16d3636c0c..531318c4835 100644 --- a/cmd/kubeadm/app/util/config/BUILD +++ b/cmd/kubeadm/app/util/config/BUILD @@ -25,6 +25,7 @@ go_library( "//cmd/kubeadm/app/util:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -35,6 +36,7 @@ go_library( "//staging/src/k8s.io/client-go/util/cert:go_default_library", "//staging/src/k8s.io/cluster-bootstrap/token/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/cmd/kubeadm/app/util/config/cluster.go b/cmd/kubeadm/app/util/config/cluster.go index b42207a72a6..ce23bac05a5 100644 --- a/cmd/kubeadm/app/util/config/cluster.go +++ b/cmd/kubeadm/app/util/config/cluster.go @@ -18,13 +18,14 @@ package config import ( "crypto/x509" - "errors" "fmt" "io" "io/ioutil" "path/filepath" "strings" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/version" @@ -196,12 +197,8 @@ func getNodeNameFromKubeletConfig(kubeconfigDir string) (string, error) { // getAPIEndpoint returns the APIEndpoint for the current node func getAPIEndpoint(data map[string]string, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint) error { // gets the ClusterStatus from kubeadm-config - clusterStatusData, ok := data[constants.ClusterStatusConfigMapKey] - if !ok { - return fmt.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterStatusConfigMapKey) - } - clusterStatus := &kubeadmapi.ClusterStatus{} - if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterStatusData), clusterStatus); err != nil { + clusterStatus, err := unmarshalClusterStatus(data) + if err != nil { return err } @@ -232,3 +229,34 @@ func getComponentConfigs(client clientset.Interface, clusterConfiguration *kubea } return nil } + +// GetClusterStatus returns the kubeadm cluster status read from the kubeadm-config ConfigMap +func GetClusterStatus(client clientset.Interface) (*kubeadmapi.ClusterStatus, error) { + configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(constants.KubeadmConfigConfigMap, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return &kubeadmapi.ClusterStatus{}, nil + } + if err != nil { + return nil, err + } + + clusterStatus, err := unmarshalClusterStatus(configMap.Data) + if err != nil { + return nil, err + } + + return clusterStatus, nil +} + +func unmarshalClusterStatus(data map[string]string) (*kubeadmapi.ClusterStatus, error) { + clusterStatusData, ok := data[constants.ClusterStatusConfigMapKey] + if !ok { + return nil, errors.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterStatusConfigMapKey) + } + clusterStatus := &kubeadmapi.ClusterStatus{} + if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterStatusData), clusterStatus); err != nil { + return nil, err + } + + return clusterStatus, nil +} diff --git a/cmd/kubeadm/app/util/config/cluster_test.go b/cmd/kubeadm/app/util/config/cluster_test.go index ba8c7841c90..ecc6bae7661 100644 --- a/cmd/kubeadm/app/util/config/cluster_test.go +++ b/cmd/kubeadm/app/util/config/cluster_test.go @@ -770,6 +770,94 @@ func TestGetInitConfigurationFromCluster(t *testing.T) { } } +func TestGetGetClusterStatus(t *testing.T) { + var tests = []struct { + name string + configMaps []fakeConfigMap + expectedEndpoints int + expectedError bool + }{ + { + name: "invalid missing config map", + expectedEndpoints: 0, + }, + { + name: "valid v1beta1", + configMaps: []fakeConfigMap{ + { + name: kubeadmconstants.KubeadmConfigConfigMap, + data: map[string]string{ + kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1beta1"]), + }, + }, + }, + expectedEndpoints: 1, + }, + { + name: "valid v1alpha3", + configMaps: []fakeConfigMap{ + { + name: kubeadmconstants.KubeadmConfigConfigMap, + data: map[string]string{ + kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1alpha3"]), + }, + }, + }, + expectedEndpoints: 1, + }, + { + name: "invalid missing ClusterStatusConfigMapKey in the config map", + configMaps: []fakeConfigMap{ + { + name: kubeadmconstants.KubeadmConfigConfigMap, + data: map[string]string{}, + }, + }, + expectedError: true, + }, + { + name: "invalid wrong value in the config map", + configMaps: []fakeConfigMap{ + { + name: kubeadmconstants.KubeadmConfigConfigMap, + data: map[string]string{ + kubeadmconstants.ClusterStatusConfigMapKey: "not a kubeadm type", + }, + }, + }, + expectedError: true, + }, + } + + for _, rt := range tests { + t.Run(rt.name, func(t *testing.T) { + client := clientsetfake.NewSimpleClientset() + + for _, c := range rt.configMaps { + err := c.create(client) + if err != nil { + t.Errorf("couldn't create ConfigMap %s", c.name) + return + } + } + + clusterStatus, err := GetClusterStatus(client) + if rt.expectedError != (err != nil) { + t.Errorf("unexpected return err from GetClusterStatus: %v", err) + return + } + if rt.expectedError { + return + } + + // Test expected values in clusterStatus + if len(clusterStatus.APIEndpoints) != rt.expectedEndpoints { + t.Errorf("unexpected ClusterStatus return value") + } + }) + } +} + type fakeConfigMap struct { name string data map[string]string diff --git a/cmd/kubeadm/app/util/etcd/BUILD b/cmd/kubeadm/app/util/etcd/BUILD index c31a1f05aee..b3792bb4aef 100644 --- a/cmd/kubeadm/app/util/etcd/BUILD +++ b/cmd/kubeadm/app/util/etcd/BUILD @@ -8,9 +8,12 @@ go_library( deps = [ "//cmd/kubeadm/app/apis/kubeadm:go_default_library", "//cmd/kubeadm/app/constants:go_default_library", + "//cmd/kubeadm/app/util/config:go_default_library", "//cmd/kubeadm/app/util/staticpod:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/pkg/transport:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 3c92717c69d..c93f13e05df 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -26,9 +26,12 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/pkg/transport" + "github.com/golang/glog" "github.com/pkg/errors" + clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/util/config" "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod" ) @@ -40,6 +43,8 @@ type ClusterInterrogator interface { GetVersion() (string, error) HasTLS() bool WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) + Sync() error + AddMember(name string, peerAddrs string) ([]Member, error) } // Client provides connection parameters for an etcd cluster @@ -125,6 +130,108 @@ func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir st return New(endpoints, "", "", "") } +// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in +// the kubeadm-config ConfigMap in kube-system namespace. +// Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check). +func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) { + // Gets the cluster status + clusterStatus, err := config.GetClusterStatus(client) + if err != nil { + return nil, err + } + + // Get the list of etcd endpoints from cluster status + endpoints := []string{} + for _, e := range clusterStatus.APIEndpoints { + endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", e.AdvertiseAddress, constants.EtcdListenClientPort)) + } + glog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ",")) + + // Creates an etcd client + etcdClient, err := New( + endpoints, + filepath.Join(certificatesDir, constants.EtcdCACertName), + filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName), + filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName), + ) + if err != nil { + return nil, err + } + + // synchronizes client's endpoints with the known endpoints from the etcd membership. + err = etcdClient.Sync() + if err != nil { + return nil, err + } + + return etcdClient, nil +} + +// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. +func (c Client) Sync() error { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: 20 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return err + } + defer cli.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err = cli.Sync(ctx) + cancel() + if err != nil { + return err + } + glog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ",")) + + c.Endpoints = cli.Endpoints() + return nil +} + +// Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency +// across kubeadm codebase +type Member struct { + Name string + PeerURL string +} + +// AddMember notifies an existing etcd cluster that a new member is joining +func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: 20 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return nil, err + } + defer cli.Close() + + // Adds a new member to the cluster + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + resp, err := cli.MemberAdd(ctx, []string{peerAddrs}) + cancel() + if err != nil { + return nil, err + } + + // Returns the updated list of etcd members + ret := []Member{} + for _, m := range resp.Members { + // fixes the entry for the joining member (that doesn't have a name set in the initialCluster returned by etcd) + if m.Name == "" { + ret = append(ret, Member{Name: name, PeerURL: m.PeerURLs[0]}) + } else { + ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]}) + } + } + + return ret, nil +} + // GetVersion returns the etcd version of the cluster. // An error is returned if the version of all endpoints do not match func (c Client) GetVersion() (string, error) {