Merge pull request #69486 from fabriziopandini/kubeadm-stacked-etcd

kubeadm stacked etcd
This commit is contained in:
k8s-ci-robot 2018-10-27 10:13:12 -07:00 committed by GitHub
commit 481fa1977c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 544 additions and 219 deletions

View File

@ -30,7 +30,6 @@ import (
"github.com/renstrom/dedent"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
@ -44,6 +43,7 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/features"
certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs"
controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
kubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig"
kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet"
markmasterphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/markmaster"
@ -74,7 +74,6 @@ var (
Please ensure that:
* The cluster has a stable controlPlaneEndpoint address.
* The cluster uses an external etcd.
* The certificates that must be shared among control plane instances are provided.
`)))
@ -86,6 +85,7 @@ var (
* The Kubelet was informed of the new secure connection details.
* Master label and taint were applied to the new node.
* The kubernetes control plane instances scaled up.
{{.etcdMessage}}
To start administering your cluster from this node, you need to run the following as a regular user:
@ -383,8 +383,15 @@ func (j *Join) Run(out io.Writer) error {
}
// outputs the join control plane done template and exits
etcdMessage := ""
// in case of local etcd
if initConfiguration.Etcd.External == nil {
etcdMessage = "* A new etcd member was added to the local/stacked etcd cluster."
}
ctx := map[string]string{
"KubeConfigPath": kubeadmconstants.GetAdminKubeConfigPath(),
"etcdMessage": etcdMessage,
}
joinControPlaneDoneTemp.Execute(out, ctx)
return nil
@ -421,11 +428,6 @@ func (j *Join) CheckIfReadyForAdditionalControlPlane(initConfiguration *kubeadma
return errors.New("unable to add a new control plane instance a cluster that doesn't have a stable controlPlaneEndpoint address")
}
// blocks if the cluster was created without an external etcd cluster
if initConfiguration.Etcd.External == nil {
return errors.New("unable to add a new control plane instance on a cluster that doesn't use an external etcd")
}
// blocks if control plane is self-hosted
if features.Enabled(initConfiguration.FeatureGates, features.SelfHosting) {
return errors.New("self-hosted clusters are deprecated and won't be supported by `kubeadm join --experimental-control-plane`")
@ -465,6 +467,23 @@ func (j *Join) PrepareForHostingControlPlane(initConfiguration *kubeadmapi.InitC
return errors.Wrap(err, "error creating static pod manifest files for the control plane components")
}
// in case of local etcd
if initConfiguration.Etcd.External == nil {
// Checks that the etcd cluster is healthy
// NB. this check cannot be implemented before because it requires the admin.conf and all the certificates
// for connecting to etcd already in place
kubeConfigFile := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.AdminKubeConfigFileName)
client, err := kubeconfigutil.ClientSetFromFile(kubeConfigFile)
if err != nil {
return errors.Wrap(err, "couldn't create kubernetes client")
}
if err := etcdphase.CheckLocalEtcdClusterStatus(client, initConfiguration); err != nil {
return err
}
}
return nil
}
@ -559,6 +578,23 @@ func (j *Join) PostInstallControlPlane(initConfiguration *kubeadmapi.InitConfigu
return errors.Wrap(err, "couldn't create kubernetes client")
}
// in case of local etcd
if initConfiguration.Etcd.External == nil {
// Adds a new etcd instance; in order to do this the new etcd instance should be "announced" to
// the existing etcd members before being created.
// This operation must be executed after kubelet is already started in order to minimize the time
// between the new etcd member is announced and the start of the static pod running the new etcd member, because during
// this time frame etcd gets temporary not available (only when moving from 1 to 2 members in the etcd cluster).
// From https://coreos.com/etcd/docs/latest/v2/runtime-configuration.html
// "If you add a new member to a 1-node cluster, the cluster cannot make progress before the new member starts
// because it needs two members as majority to agree on the consensus. You will only see this behavior between the time
// etcdctl member add informs the cluster about the new member and the new member successfully establishing a connection to the existing one."
glog.V(1).Info("[join] adding etcd")
if err := etcdphase.CreateStackedEtcdStaticPodManifestFile(client, kubeadmconstants.GetStaticPodDirectory(), initConfiguration); err != nil {
return errors.Wrap(err, "error creating local etcd static pod manifest file")
}
}
glog.V(1).Info("[join] uploading currently used configuration to the cluster")
if err := uploadconfigphase.UploadConfiguration(initConfiguration, client); err != nil {
return errors.Wrap(err, "error uploading configuration: %v")

View File

@ -23,7 +23,6 @@ import (
"github.com/golang/glog"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
@ -303,7 +302,7 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter
}
// The arguments oldEtcdClient and newEtdClient, are uninitialized because passing in the clients allow for mocking the client during testing
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, nil, nil)
return upgrade.StaticPodControlPlane(client, waiter, pathManager, internalcfg, etcdUpgrade, nil, nil)
}
// DryRunStaticPodUpgrade fakes an upgrade of the control plane

View File

@ -24,7 +24,6 @@ import (
"github.com/golang/glog"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
@ -66,6 +65,7 @@ type controlplaneUpgradeFlags struct {
kubeConfigPath string
advertiseAddress string
nodeName string
etcdUpgrade bool
dryRun bool
}
@ -113,6 +113,7 @@ func NewCmdUpgradeControlPlane() *cobra.Command {
flags := &controlplaneUpgradeFlags{
kubeConfigPath: constants.GetKubeletKubeConfigPath(),
advertiseAddress: "",
etcdUpgrade: true,
dryRun: false,
}
@ -149,6 +150,7 @@ func NewCmdUpgradeControlPlane() *cobra.Command {
options.AddKubeConfigFlag(cmd.Flags(), &flags.kubeConfigPath)
cmd.Flags().BoolVar(&flags.dryRun, "dry-run", flags.dryRun, "Do not change any state, just output the actions that would be performed.")
cmd.Flags().BoolVar(&flags.etcdUpgrade, "etcd-upgrade", flags.etcdUpgrade, "Perform the upgrade of etcd.")
return cmd
}
@ -236,13 +238,13 @@ func RunUpgradeControlPlane(flags *controlplaneUpgradeFlags) error {
return fmt.Errorf("Unable to rotate API server certificate: %v", err)
}
// Upgrade the control plane
// Upgrade the control plane and etcd if installed on this node
fmt.Printf("[upgrade] Upgrading your Static Pod-hosted control plane instance to version %q...\n", cfg.KubernetesVersion)
if flags.dryRun {
return DryRunStaticPodUpgrade(cfg)
}
if err := PerformStaticPodUpgrade(client, waiter, cfg, false); err != nil {
if err := PerformStaticPodUpgrade(client, waiter, cfg, flags.etcdUpgrade); err != nil {
return fmt.Errorf("Couldn't complete the static pod upgrade: %v", err)
}

View File

@ -26,7 +26,6 @@ import (
"github.com/golang/glog"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/version"
kubeadmapiv1beta1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta1"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
@ -111,11 +110,8 @@ func RunPlan(flags *planFlags) error {
}
etcdClient = client
} else {
client, err := etcdutil.NewFromStaticPod(
[]string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)},
constants.GetStaticPodDirectory(),
upgradeVars.cfg.CertificatesDir,
)
// Connects to local/stacked etcd existing in the cluster
client, err := etcdutil.NewFromCluster(upgradeVars.client, upgradeVars.cfg.CertificatesDir)
if err != nil {
return err
}

View File

@ -24,7 +24,6 @@ import (
"path/filepath"
"github.com/golang/glog"
certutil "k8s.io/client-go/util/cert"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
@ -285,7 +284,7 @@ type certKeyLocation struct {
}
// SharedCertificateExists verifies if the shared certificates - the certificates that must be
// equal across masters: ca.key, ca.crt, sa.key, sa.pub
// equal across masters: ca.key, ca.crt, sa.key, sa.pub + etcd/ca.key, etcd/ca.crt if local/stacked etcd
func SharedCertificateExists(cfg *kubeadmapi.InitConfiguration) (bool, error) {
if err := validateCACertAndKey(certKeyLocation{cfg.CertificatesDir, kubeadmconstants.CACertAndKeyBaseName, "", "CA"}); err != nil {
@ -300,6 +299,13 @@ func SharedCertificateExists(cfg *kubeadmapi.InitConfiguration) (bool, error) {
return false, err
}
// in case of local/stacked etcd
if cfg.Etcd.External == nil {
if err := validateCACertAndKey(certKeyLocation{cfg.CertificatesDir, kubeadmconstants.EtcdCACertAndKeyBaseName, "", "etcd CA"}); err != nil {
return false, err
}
}
return true, nil
}

View File

@ -308,6 +308,8 @@ func TestSharedCertificateExists(t *testing.T) {
"front-proxy-ca.key": caKey,
"sa.pub": publicKey,
"sa.key": key,
"etcd/ca.crt": caCert,
"etcd/ca.key": caKey,
},
},
{
@ -318,6 +320,8 @@ func TestSharedCertificateExists(t *testing.T) {
"front-proxy-ca.key": caKey,
"sa.pub": publicKey,
"sa.key": key,
"etcd/ca.crt": caCert,
"etcd/ca.key": caKey,
},
expectedError: true,
},
@ -329,17 +333,34 @@ func TestSharedCertificateExists(t *testing.T) {
"front-proxy-ca.crt": caCert,
"front-proxy-ca.key": caKey,
"sa.pub": publicKey,
"etcd/ca.crt": caCert,
"etcd/ca.key": caKey,
},
expectedError: true,
},
{
name: "expected front-proxy.crt missing",
name: "missing front-proxy.crt",
files: pkiFiles{
"ca.crt": caCert,
"ca.key": caKey,
"front-proxy-ca.key": caKey,
"sa.pub": publicKey,
"sa.key": key,
"etcd/ca.crt": caCert,
"etcd/ca.key": caKey,
},
expectedError: true,
},
{
name: "missing etcd/ca.crt",
files: pkiFiles{
"ca.crt": caCert,
"ca.key": caKey,
"front-proxy-ca.key": caKey,
"sa.pub": publicKey,
"sa.key": key,
"etcd/ca.crt": caCert,
"etcd/ca.key": caKey,
},
expectedError: true,
},
@ -348,6 +369,7 @@ func TestSharedCertificateExists(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
tmpdir := testutil.SetupTempDir(t)
os.MkdirAll(tmpdir+"/etcd", os.ModePerm)
defer os.RemoveAll(tmpdir)
cfg := &kubeadmapi.InitConfiguration{

View File

@ -27,6 +27,7 @@ go_library(
"//pkg/registry/core/service/ipallocator:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)

View File

@ -25,6 +25,7 @@ import (
"path/filepath"
"time"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/validation"
certutil "k8s.io/client-go/util/cert"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
@ -304,14 +305,19 @@ func GetAPIServerAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames
}
// GetEtcdAltNames builds an AltNames object for generating the etcd server certificate.
// `localhost` is included in the SAN since this is the interface the etcd static pod listens on.
// Hostname and `API.AdvertiseAddress` are excluded since etcd does not listen on this interface by default.
// `advertise address` and localhost are included in the SAN since this is the interfaces the etcd static pod listens on.
// The user can override the listen address with `Etcd.ExtraArgs` and add SANs with `Etcd.ServerCertSANs`.
func GetEtcdAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, error) {
// advertise address
advertiseAddress := net.ParseIP(cfg.APIEndpoint.AdvertiseAddress)
if advertiseAddress == nil {
return nil, errors.Errorf("error parsing APIEndpoint AdvertiseAddress %q: is not a valid textual representation of an IP address", cfg.APIEndpoint.AdvertiseAddress)
}
// create AltNames with defaults DNSNames/IPs
altNames := &certutil.AltNames{
DNSNames: []string{cfg.NodeRegistration.Name, "localhost"},
IPs: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback},
IPs: []net.IP{advertiseAddress, net.IPv4(127, 0, 0, 1), net.IPv6loopback},
}
if cfg.Etcd.Local != nil {
@ -322,8 +328,7 @@ func GetEtcdAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, err
}
// GetEtcdPeerAltNames builds an AltNames object for generating the etcd peer certificate.
// `localhost` is excluded from the SAN since etcd will not refer to itself as a peer.
// Hostname and `API.AdvertiseAddress` are included if the user chooses to promote the single node etcd cluster into a multi-node one.
// Hostname and `API.AdvertiseAddress` are included if the user chooses to promote the single node etcd cluster into a multi-node one (stacked etcd).
// The user can override the listen address with `Etcd.ExtraArgs` and add SANs with `Etcd.PeerCertSANs`.
func GetEtcdPeerAltNames(cfg *kubeadmapi.InitConfiguration) (*certutil.AltNames, error) {
// advertise address

View File

@ -513,6 +513,12 @@ func TestGetEtcdAltNames(t *testing.T) {
proxy := "user-etcd-proxy"
proxyIP := "10.10.10.100"
cfg := &kubeadmapi.InitConfiguration{
APIEndpoint: kubeadmapi.APIEndpoint{
AdvertiseAddress: "1.2.3.4",
},
NodeRegistration: kubeadmapi.NodeRegistrationOptions{
Name: "myNode",
},
ClusterConfiguration: kubeadmapi.ClusterConfiguration{
Etcd: kubeadmapi.Etcd{
Local: &kubeadmapi.LocalEtcd{
@ -532,7 +538,7 @@ func TestGetEtcdAltNames(t *testing.T) {
t.Fatalf("failed calling GetEtcdAltNames: %v", err)
}
expectedDNSNames := []string{"localhost", proxy}
expectedDNSNames := []string{"myNode", "localhost", proxy}
for _, DNSName := range expectedDNSNames {
found := false
for _, val := range altNames.DNSNames {
@ -547,7 +553,7 @@ func TestGetEtcdAltNames(t *testing.T) {
}
}
expectedIPAddresses := []string{"127.0.0.1", net.IPv6loopback.String(), proxyIP}
expectedIPAddresses := []string{"1.2.3.4", "127.0.0.1", net.IPv6loopback.String(), proxyIP}
for _, IPAddress := range expectedIPAddresses {
found := false
for _, val := range altNames.IPs {

View File

@ -13,6 +13,7 @@ go_test(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/test:go_default_library",
],
)
@ -26,9 +27,12 @@ go_library(
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/images:go_default_library",
"//cmd/kubeadm/app/util:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)

View File

@ -19,14 +19,17 @@ package etcd
import (
"fmt"
"path/filepath"
"strings"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/images"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
staticpodutil "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
@ -36,13 +39,70 @@ const (
)
// CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
// This function is used by init - when the etcd cluster is empty - or by kubeadm
// upgrade - when the etcd cluster is already up and running (and the --initial-cluster flag have no impact)
func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.InitConfiguration) error {
if cfg.ClusterConfiguration.Etcd.External != nil {
return fmt.Errorf("etcd static pod manifest cannot be generated for cluster using external etcd")
}
glog.V(1).Infoln("creating local etcd static pod manifest file")
// gets etcd StaticPodSpec, actualized for the current InitConfiguration
spec := GetEtcdPodSpec(cfg)
// gets etcd StaticPodSpec
emptyInitialCluster := []etcdutil.Member{}
spec := GetEtcdPodSpec(cfg, emptyInitialCluster)
// writes etcd StaticPod to disk
if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
return err
}
fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
return nil
}
// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
fmt.Println("[etcd] Checking Etcd cluster health")
// creates an etcd client that connects to all the local/stacked etcd members
glog.V(1).Info("creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
// Checking health state
_, err = etcdClient.GetClusterStatus()
if err != nil {
return errors.Wrap(err, "etcd cluster is not healthy")
}
return nil
}
// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
// for an additional etcd member that is joining an existing local/stacked etcd cluster.
// Other members of the etcd cluster will be notified of the joining node in beforehand as well.
func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir string, cfg *kubeadmapi.InitConfiguration) error {
// creates an etcd client that connects to all the local/stacked etcd members
glog.V(1).Info("creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
// notifies the other members of the etcd cluster about the joining member
etcdPeerAddress := fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort)
glog.V(1).Infof("Adding etcd member: %s", etcdPeerAddress)
initialCluster, err := etcdClient.AddMember(cfg.NodeRegistration.Name, etcdPeerAddress)
if err != nil {
return err
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
glog.V(1).Infof("Updated etcd member list: %v", initialCluster)
glog.V(1).Info("Creating local etcd static pod manifest file")
// gets etcd StaticPodSpec, actualized for the current InitConfiguration and the new list of etcd members
spec := GetEtcdPodSpec(cfg, initialCluster)
// writes etcd StaticPod to disk
if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
return err
@ -54,7 +114,7 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In
// GetEtcdPodSpec returns the etcd static Pod actualized to the context of the current InitConfiguration
// NB. GetEtcdPodSpec methods holds the information about how kubeadm creates etcd static pod manifests.
func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod {
func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil.Member) v1.Pod {
pathType := v1.HostPathDirectoryOrCreate
etcdMounts := map[string]v1.Volume{
etcdVolumeName: staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType),
@ -62,7 +122,7 @@ func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod {
}
return staticpodutil.ComponentPod(v1.Container{
Name: kubeadmconstants.Etcd,
Command: getEtcdCommand(cfg),
Command: getEtcdCommand(cfg, initialCluster),
Image: images.GetEtcdImage(&cfg.ClusterConfiguration),
ImagePullPolicy: v1.PullIfNotPresent,
// Mount the etcd datadir path read-write so etcd can store data in a more persistent manner
@ -78,13 +138,13 @@ func GetEtcdPodSpec(cfg *kubeadmapi.InitConfiguration) v1.Pod {
}
// getEtcdCommand builds the right etcd command from the given config object
func getEtcdCommand(cfg *kubeadmapi.InitConfiguration) []string {
func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil.Member) []string {
defaultArguments := map[string]string{
"name": cfg.GetNodeName(),
"listen-client-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
"advertise-client-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
"listen-peer-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
"initial-advertise-peer-urls": fmt.Sprintf("https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
"listen-client-urls": fmt.Sprintf("https://127.0.0.1:%d,https://%s:%d", kubeadmconstants.EtcdListenClientPort, cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenClientPort),
"advertise-client-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenClientPort),
"listen-peer-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort),
"initial-advertise-peer-urls": fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort),
"data-dir": cfg.Etcd.Local.DataDir,
"cert-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerCertName),
"key-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerKeyName),
@ -95,7 +155,19 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration) []string {
"peer-trusted-ca-file": filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName),
"peer-client-cert-auth": "true",
"snapshot-count": "10000",
"initial-cluster": fmt.Sprintf("%s=https://127.0.0.1:%d", cfg.GetNodeName(), kubeadmconstants.EtcdListenPeerPort),
}
if len(initialCluster) == 0 {
defaultArguments["initial-cluster"] = fmt.Sprintf("%s=https://%s:%d", cfg.GetNodeName(), cfg.APIEndpoint.AdvertiseAddress, kubeadmconstants.EtcdListenPeerPort)
} else {
// NB. the joining etcd instance should be part of the initialCluster list
endpoints := []string{}
for _, member := range initialCluster {
endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL))
}
defaultArguments["initial-cluster"] = strings.Join(endpoints, ",")
defaultArguments["initial-cluster-state"] = "existing"
}
command := []string{"etcd"}

View File

@ -26,12 +26,11 @@ import (
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
)
func TestGetEtcdPodSpec(t *testing.T) {
// Creates a Master Configuration
cfg := &kubeadmapi.InitConfiguration{
ClusterConfiguration: kubeadmapi.ClusterConfiguration{
@ -46,7 +45,7 @@ func TestGetEtcdPodSpec(t *testing.T) {
}
// Executes GetEtcdPodSpec
spec := GetEtcdPodSpec(cfg)
spec := GetEtcdPodSpec(cfg, []etcdutil.Member{})
// Assert each specs refers to the right pod
if spec.Spec.Containers[0].Name != kubeadmconstants.Etcd {
@ -117,11 +116,17 @@ func TestCreateLocalEtcdStaticPodManifestFile(t *testing.T) {
func TestGetEtcdCommand(t *testing.T) {
var tests = []struct {
cfg *kubeadmapi.InitConfiguration
expected []string
name string
cfg *kubeadmapi.InitConfiguration
initialCluster []etcdutil.Member
expected []string
}{
{
name: "Default args - with empty etcd initial cluster",
cfg: &kubeadmapi.InitConfiguration{
APIEndpoint: kubeadmapi.APIEndpoint{
AdvertiseAddress: "1.2.3.4",
},
NodeRegistration: kubeadmapi.NodeRegistrationOptions{
Name: "foo",
},
@ -136,10 +141,10 @@ func TestGetEtcdCommand(t *testing.T) {
expected: []string{
"etcd",
"--name=foo",
fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--advertise-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d,https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort, kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--advertise-client-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
"--data-dir=/var/lib/etcd",
"--cert-file=" + kubeadmconstants.EtcdServerCertName,
"--key-file=" + kubeadmconstants.EtcdServerKeyName,
@ -150,11 +155,57 @@ func TestGetEtcdCommand(t *testing.T) {
"--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--snapshot-count=10000",
"--peer-client-cert-auth=true",
fmt.Sprintf("--initial-cluster=foo=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-cluster=foo=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
},
},
{
name: "Default args - With an existing etcd cluster",
cfg: &kubeadmapi.InitConfiguration{
APIEndpoint: kubeadmapi.APIEndpoint{
AdvertiseAddress: "1.2.3.4",
},
NodeRegistration: kubeadmapi.NodeRegistrationOptions{
Name: "foo",
},
ClusterConfiguration: kubeadmapi.ClusterConfiguration{
Etcd: kubeadmapi.Etcd{
Local: &kubeadmapi.LocalEtcd{
DataDir: "/var/lib/etcd",
},
},
},
},
initialCluster: []etcdutil.Member{
{Name: "foo", PeerURL: fmt.Sprintf("https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort)}, // NB. the joining etcd instance should be part of the initialCluster list
{Name: "bar", PeerURL: fmt.Sprintf("https://5.6.7.8:%d", kubeadmconstants.EtcdListenPeerPort)},
},
expected: []string{
"etcd",
"--name=foo",
fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d,https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort, kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--advertise-client-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
"--data-dir=/var/lib/etcd",
"--cert-file=" + kubeadmconstants.EtcdServerCertName,
"--key-file=" + kubeadmconstants.EtcdServerKeyName,
"--trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--client-cert-auth=true",
"--peer-cert-file=" + kubeadmconstants.EtcdPeerCertName,
"--peer-key-file=" + kubeadmconstants.EtcdPeerKeyName,
"--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--snapshot-count=10000",
"--peer-client-cert-auth=true",
"--initial-cluster-state=existing",
fmt.Sprintf("--initial-cluster=foo=https://1.2.3.4:%d,bar=https://5.6.7.8:%d", kubeadmconstants.EtcdListenPeerPort, kubeadmconstants.EtcdListenPeerPort),
},
},
{
name: "Extra args",
cfg: &kubeadmapi.InitConfiguration{
APIEndpoint: kubeadmapi.APIEndpoint{
AdvertiseAddress: "1.2.3.4",
},
NodeRegistration: kubeadmapi.NodeRegistrationOptions{
Name: "bar",
},
@ -175,8 +226,8 @@ func TestGetEtcdCommand(t *testing.T) {
"--name=bar",
"--listen-client-urls=https://10.0.1.10:2379",
"--advertise-client-urls=https://10.0.1.10:2379",
fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--listen-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
"--data-dir=/var/lib/etcd",
"--cert-file=" + kubeadmconstants.EtcdServerCertName,
"--key-file=" + kubeadmconstants.EtcdServerKeyName,
@ -187,50 +238,19 @@ func TestGetEtcdCommand(t *testing.T) {
"--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--snapshot-count=10000",
"--peer-client-cert-auth=true",
fmt.Sprintf("--initial-cluster=bar=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
},
},
{
cfg: &kubeadmapi.InitConfiguration{
NodeRegistration: kubeadmapi.NodeRegistrationOptions{
Name: "wombat",
},
ClusterConfiguration: kubeadmapi.ClusterConfiguration{
Etcd: kubeadmapi.Etcd{
Local: &kubeadmapi.LocalEtcd{
DataDir: "/etc/foo",
},
},
},
},
expected: []string{
"etcd",
"--name=wombat",
fmt.Sprintf("--listen-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--advertise-client-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenClientPort),
fmt.Sprintf("--listen-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-advertise-peer-urls=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
"--data-dir=/etc/foo",
"--cert-file=" + kubeadmconstants.EtcdServerCertName,
"--key-file=" + kubeadmconstants.EtcdServerKeyName,
"--trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--client-cert-auth=true",
"--peer-cert-file=" + kubeadmconstants.EtcdPeerCertName,
"--peer-key-file=" + kubeadmconstants.EtcdPeerKeyName,
"--peer-trusted-ca-file=" + kubeadmconstants.EtcdCACertName,
"--snapshot-count=10000",
"--peer-client-cert-auth=true",
fmt.Sprintf("--initial-cluster=wombat=https://127.0.0.1:%d", kubeadmconstants.EtcdListenPeerPort),
fmt.Sprintf("--initial-cluster=bar=https://1.2.3.4:%d", kubeadmconstants.EtcdListenPeerPort),
},
},
}
for _, rt := range tests {
actual := getEtcdCommand(rt.cfg)
sort.Strings(actual)
sort.Strings(rt.expected)
if !reflect.DeepEqual(actual, rt.expected) {
t.Errorf("failed getEtcdCommand:\nexpected:\n%v\nsaw:\n%v", rt.expected, actual)
}
t.Run(rt.name, func(t *testing.T) {
actual := getEtcdCommand(rt.cfg, rt.initialCluster)
sort.Strings(actual)
sort.Strings(rt.expected)
if !reflect.DeepEqual(actual, rt.expected) {
t.Errorf("failed getEtcdCommand:\nexpected:\n%v\nsaw:\n%v", rt.expected, actual)
}
})
}
}

View File

@ -107,6 +107,12 @@ func (f fakeEtcdClient) GetClusterVersions() (map[string]string, error) {
}, nil
}
func (f fakeEtcdClient) Sync() error { return nil }
func (f fakeEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
func TestGetAvailableUpgrades(t *testing.T) {
etcdClient := fakeEtcdClient{}
tests := []struct {

View File

@ -23,8 +23,8 @@ import (
"time"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs"
@ -252,7 +252,7 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP
}
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) {
func performEtcdStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) {
// Add etcd static pod spec only if external etcd is not configured
if cfg.Etcd.External != nil {
return false, errors.New("external etcd detected, won't try to change any etcd state")
@ -276,10 +276,18 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
if err != nil {
return true, errors.Wrap(err, "failed to retrieve an etcd version for the target kubernetes version")
}
currentEtcdVersionStr, err := oldEtcdClient.GetVersion()
// gets the etcd version of the local/stacked etcd member running on the current machine
currentEtcdVersions, err := oldEtcdClient.GetClusterVersions()
if err != nil {
return true, errors.Wrap(err, "failed to retrieve the current etcd version")
}
currentEtcdVersionStr, ok := currentEtcdVersions[fmt.Sprintf("https://%s:%d", cfg.APIEndpoint.AdvertiseAddress, constants.EtcdListenClientPort)]
if !ok {
fmt.Println(currentEtcdVersions)
return true, errors.Wrap(err, "failed to retrieve the current etcd version")
}
currentEtcdVersion, err := version.ParseSemantic(currentEtcdVersionStr)
if err != nil {
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersionStr, err)
@ -353,15 +361,11 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Initialize the new etcd client if it wasn't pre-initialized
if newEtcdClient == nil {
client, err := etcdutil.NewFromStaticPod(
[]string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,
)
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return true, errors.Wrap(err, "fatal error creating etcd client")
}
newEtcdClient = client
newEtcdClient = etcdClient
}
// Checking health state of etcd after the upgrade
@ -399,7 +403,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error {
func StaticPodControlPlane(client clientset.Interface, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.InitConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error {
recoverManifests := map[string]string{}
var isTLSUpgrade bool
var isExternalEtcd bool
@ -413,7 +417,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
if cfg.Etcd.External != nil {
// External etcd
isExternalEtcd = true
client, err := etcdutil.New(
etcdClient, err := etcdutil.New(
cfg.Etcd.External.Endpoints,
cfg.Etcd.External.CAFile,
cfg.Etcd.External.CertFile,
@ -422,22 +426,18 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
if err != nil {
return errors.Wrap(err, "failed to create etcd client for external etcd")
}
oldEtcdClient = client
oldEtcdClient = etcdClient
// Since etcd is managed externally, the new etcd client will be the same as the old client
if newEtcdClient == nil {
newEtcdClient = client
newEtcdClient = etcdClient
}
} else {
// etcd Static Pod
client, err := etcdutil.NewFromStaticPod(
[]string{fmt.Sprintf("localhost:%d", constants.EtcdListenClientPort)},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,
)
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
oldEtcdClient = client
oldEtcdClient = etcdClient
}
}
@ -452,7 +452,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
}
// Perform etcd upgrade using common to all control plane components function
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient)
fatal, err := performEtcdStaticPodUpgrade(client, waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdClient, newEtcdClient)
if err != nil {
if fatal {
return err

View File

@ -54,12 +54,13 @@ kind: InitConfiguration
nodeRegistration:
name: foo
criSocket: ""
apiEndpoint:
advertiseAddress: 1.2.3.4
bindPort: 6443
---
apiVersion: kubeadm.k8s.io/v1beta1
kind: ClusterConfiguration
api:
advertiseAddress: 1.2.3.4
bindPort: 6443
apiServerCertSANs: null
apiServerExtraArgs: null
certificatesDir: %s
@ -75,9 +76,6 @@ networking:
dnsDomain: cluster.local
podSubnet: ""
serviceSubnet: 10.96.0.0/12
nodeRegistration:
name: foo
criSocket: ""
schedulerExtraArgs: null
token: ce3aa5.5ec8455bb76b379f
tokenTTL: 24h
@ -236,14 +234,14 @@ func (c fakeTLSEtcdClient) WaitForClusterAvailable(delay time.Duration, retries
func (c fakeTLSEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
return map[string]*clientv3.StatusResponse{
"foo": {
"https://1.2.3.4:2379": {
Version: "3.1.12",
}}, nil
}
func (c fakeTLSEtcdClient) GetClusterVersions() (map[string]string, error) {
return map[string]string{
"foo": "3.1.12",
"https://1.2.3.4:2379": "3.1.12",
}, nil
}
@ -251,6 +249,12 @@ func (c fakeTLSEtcdClient) GetVersion() (string, error) {
return "3.1.12", nil
}
func (c fakeTLSEtcdClient) Sync() error { return nil }
func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
func (c fakePodManifestEtcdClient) HasTLS() bool {
@ -277,13 +281,13 @@ func (c fakePodManifestEtcdClient) GetClusterStatus() (map[string]*clientv3.Stat
}
return map[string]*clientv3.StatusResponse{
"foo": {Version: "3.1.12"},
"https://1.2.3.4:2379": {Version: "3.1.12"},
}, nil
}
func (c fakePodManifestEtcdClient) GetClusterVersions() (map[string]string, error) {
return map[string]string{
"foo": "3.1.12",
"https://1.2.3.4:2379": "3.1.12",
}, nil
}
@ -291,6 +295,12 @@ func (c fakePodManifestEtcdClient) GetVersion() (string, error) {
return "3.1.12", nil
}
func (c fakePodManifestEtcdClient) Sync() error { return nil }
func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
description string
@ -477,6 +487,7 @@ func TestStaticPodControlPlane(t *testing.T) {
}
actualErr := StaticPodControlPlane(
nil,
waiter,
pathMgr,
newcfg,

View File

@ -12,16 +12,13 @@ go_library(
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig",
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/scheme:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//pkg/apis/rbac/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/rbac/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
],
)
@ -48,13 +45,10 @@ go_test(
"//cmd/kubeadm/app/apis/kubeadm/scheme:go_default_library",
"//cmd/kubeadm/app/apis/kubeadm/v1beta1:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
],

View File

@ -21,12 +21,9 @@ import (
"k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
@ -59,7 +56,7 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
// Prepare the ClusterStatus for upload
// Gets the current cluster status
// TODO: use configmap locks on this object on the get before the update.
clusterStatus, err := getClusterStatus(client)
clusterStatus, err := configutil.GetClusterStatus(client)
if err != nil {
return err
}
@ -129,22 +126,3 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
},
})
}
func getClusterStatus(client clientset.Interface) (*kubeadmapi.ClusterStatus, error) {
obj := &kubeadmapi.ClusterStatus{}
// Read the ConfigMap from the cluster
configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(kubeadmconstants.KubeadmConfigConfigMap, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return obj, nil
}
if err != nil {
return nil, err
}
// Decode the file content using the componentconfig Codecs that knows about all APIs
if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(configMap.Data[kubeadmconstants.ClusterStatusConfigMapKey]), obj); err != nil {
return nil, err
}
return obj, nil
}

View File

@ -20,18 +20,15 @@ import (
"reflect"
"testing"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
kubeadmapiv1beta1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta1"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
)
@ -158,62 +155,3 @@ func TestUploadConfiguration(t *testing.T) {
})
}
}
func TestGetClusterStatus(t *testing.T) {
var tests = []struct {
name string
clusterStatus *kubeadmapi.ClusterStatus
expectedClusterEndpoints int
}{
{
name: "return empty ClusterStatus if cluster kubeadm-config doesn't exist (e.g init)",
expectedClusterEndpoints: 0,
},
{
name: "return ClusterStatus if cluster kubeadm-config exist (e.g upgrade)",
clusterStatus: &kubeadmapi.ClusterStatus{
APIEndpoints: map[string]kubeadmapi.APIEndpoint{
"dummy": {AdvertiseAddress: "1.2.3.4", BindPort: 1234},
},
},
expectedClusterEndpoints: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
if tt.clusterStatus != nil {
createConfigMapWithStatus(tt.clusterStatus, client)
}
actual, err := getClusterStatus(client)
if err != nil {
t.Error("GetClusterStatus returned unexpected error")
return
}
if tt.expectedClusterEndpoints != len(actual.APIEndpoints) {
t.Error("actual ClusterStatus doesn't return expected endpoints")
}
})
}
}
// createConfigMapWithStatus create a ConfigMap with ClusterStatus for TestGetClusterStatus
func createConfigMapWithStatus(statusToCreate *kubeadmapi.ClusterStatus, client clientset.Interface) error {
statusYaml, err := configutil.MarshalKubeadmConfigObject(statusToCreate)
if err != nil {
return err
}
return apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: string(statusYaml),
},
})
}

View File

@ -900,6 +900,7 @@ func RunInitMasterChecks(execer utilsexec.Interface, cfg *kubeadmapi.InitConfigu
// Only do etcd related checks when no external endpoints were specified
checks = append(checks,
PortOpenCheck{port: kubeadmconstants.EtcdListenClientPort},
PortOpenCheck{port: kubeadmconstants.EtcdListenPeerPort},
DirAvailableCheck{Path: cfg.Etcd.Local.DataDir},
)
}

View File

@ -25,6 +25,7 @@ go_library(
"//cmd/kubeadm/app/util:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
@ -35,6 +36,7 @@ go_library(
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/cluster-bootstrap/token/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)

View File

@ -18,13 +18,14 @@ package config
import (
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"strings"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
@ -196,12 +197,8 @@ func getNodeNameFromKubeletConfig(kubeconfigDir string) (string, error) {
// getAPIEndpoint returns the APIEndpoint for the current node
func getAPIEndpoint(data map[string]string, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint) error {
// gets the ClusterStatus from kubeadm-config
clusterStatusData, ok := data[constants.ClusterStatusConfigMapKey]
if !ok {
return fmt.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterStatusConfigMapKey)
}
clusterStatus := &kubeadmapi.ClusterStatus{}
if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterStatusData), clusterStatus); err != nil {
clusterStatus, err := unmarshalClusterStatus(data)
if err != nil {
return err
}
@ -232,3 +229,34 @@ func getComponentConfigs(client clientset.Interface, clusterConfiguration *kubea
}
return nil
}
// GetClusterStatus returns the kubeadm cluster status read from the kubeadm-config ConfigMap
func GetClusterStatus(client clientset.Interface) (*kubeadmapi.ClusterStatus, error) {
configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(constants.KubeadmConfigConfigMap, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return &kubeadmapi.ClusterStatus{}, nil
}
if err != nil {
return nil, err
}
clusterStatus, err := unmarshalClusterStatus(configMap.Data)
if err != nil {
return nil, err
}
return clusterStatus, nil
}
func unmarshalClusterStatus(data map[string]string) (*kubeadmapi.ClusterStatus, error) {
clusterStatusData, ok := data[constants.ClusterStatusConfigMapKey]
if !ok {
return nil, errors.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterStatusConfigMapKey)
}
clusterStatus := &kubeadmapi.ClusterStatus{}
if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterStatusData), clusterStatus); err != nil {
return nil, err
}
return clusterStatus, nil
}

View File

@ -770,6 +770,94 @@ func TestGetInitConfigurationFromCluster(t *testing.T) {
}
}
func TestGetGetClusterStatus(t *testing.T) {
var tests = []struct {
name string
configMaps []fakeConfigMap
expectedEndpoints int
expectedError bool
}{
{
name: "invalid missing config map",
expectedEndpoints: 0,
},
{
name: "valid v1beta1",
configMaps: []fakeConfigMap{
{
name: kubeadmconstants.KubeadmConfigConfigMap,
data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1beta1"]),
},
},
},
expectedEndpoints: 1,
},
{
name: "valid v1alpha3",
configMaps: []fakeConfigMap{
{
name: kubeadmconstants.KubeadmConfigConfigMap,
data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: string(cfgFiles["ClusterStatus_v1alpha3"]),
},
},
},
expectedEndpoints: 1,
},
{
name: "invalid missing ClusterStatusConfigMapKey in the config map",
configMaps: []fakeConfigMap{
{
name: kubeadmconstants.KubeadmConfigConfigMap,
data: map[string]string{},
},
},
expectedError: true,
},
{
name: "invalid wrong value in the config map",
configMaps: []fakeConfigMap{
{
name: kubeadmconstants.KubeadmConfigConfigMap,
data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: "not a kubeadm type",
},
},
},
expectedError: true,
},
}
for _, rt := range tests {
t.Run(rt.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
for _, c := range rt.configMaps {
err := c.create(client)
if err != nil {
t.Errorf("couldn't create ConfigMap %s", c.name)
return
}
}
clusterStatus, err := GetClusterStatus(client)
if rt.expectedError != (err != nil) {
t.Errorf("unexpected return err from GetClusterStatus: %v", err)
return
}
if rt.expectedError {
return
}
// Test expected values in clusterStatus
if len(clusterStatus.APIEndpoints) != rt.expectedEndpoints {
t.Errorf("unexpected ClusterStatus return value")
}
})
}
}
type fakeConfigMap struct {
name string
data map[string]string

View File

@ -8,9 +8,12 @@ go_library(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)

View File

@ -26,9 +26,12 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/golang/glog"
"github.com/pkg/errors"
clientset "k8s.io/client-go/kubernetes"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/config"
"k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
@ -40,6 +43,8 @@ type ClusterInterrogator interface {
GetVersion() (string, error)
HasTLS() bool
WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error)
Sync() error
AddMember(name string, peerAddrs string) ([]Member, error)
}
// Client provides connection parameters for an etcd cluster
@ -125,6 +130,108 @@ func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir st
return New(endpoints, "", "", "")
}
// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in
// the kubeadm-config ConfigMap in kube-system namespace.
// Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check).
func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
// Gets the cluster status
clusterStatus, err := config.GetClusterStatus(client)
if err != nil {
return nil, err
}
// Get the list of etcd endpoints from cluster status
endpoints := []string{}
for _, e := range clusterStatus.APIEndpoints {
endpoints = append(endpoints, fmt.Sprintf("https://%s:%d", e.AdvertiseAddress, constants.EtcdListenClientPort))
}
glog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
// Creates an etcd client
etcdClient, err := New(
endpoints,
filepath.Join(certificatesDir, constants.EtcdCACertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
)
if err != nil {
return nil, err
}
// synchronizes client's endpoints with the known endpoints from the etcd membership.
err = etcdClient.Sync()
if err != nil {
return nil, err
}
return etcdClient, nil
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c Client) Sync() error {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
})
if err != nil {
return err
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = cli.Sync(ctx)
cancel()
if err != nil {
return err
}
glog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
c.Endpoints = cli.Endpoints()
return nil
}
// Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency
// across kubeadm codebase
type Member struct {
Name string
PeerURL string
}
// AddMember notifies an existing etcd cluster that a new member is joining
func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
// Adds a new member to the cluster
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := cli.MemberAdd(ctx, []string{peerAddrs})
cancel()
if err != nil {
return nil, err
}
// Returns the updated list of etcd members
ret := []Member{}
for _, m := range resp.Members {
// fixes the entry for the joining member (that doesn't have a name set in the initialCluster returned by etcd)
if m.Name == "" {
ret = append(ret, Member{Name: name, PeerURL: m.PeerURLs[0]})
} else {
ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
}
}
return ret, nil
}
// GetVersion returns the etcd version of the cluster.
// An error is returned if the version of all endpoints do not match
func (c Client) GetVersion() (string, error) {