diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 7e1467a6..4bf3b1cb 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -173,46 +173,46 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd b } func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, svcOptions *v3.KubernetesServicesOptions) error { - etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts, kubeCluster.InactiveHosts, false) - etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) - clientCert := cert.EncodeCertPEM(currentCluster.Certificates[pki.KubeNodeCertName].Certificate) - clientKey := cert.EncodePrivateKeyPEM(currentCluster.Certificates[pki.KubeNodeCertName].Key) - - // check if the whole etcd plane is replaced + log.Infof(ctx, "[reconcile] Check etcd hosts to be deleted") if isEtcdPlaneReplaced(ctx, currentCluster, kubeCluster) { + logrus.Warnf("%v", EtcdPlaneNodesReplacedErr) return fmt.Errorf("%v", EtcdPlaneNodesReplacedErr) } - // check if Node changed its public IP - for i := range etcdToDelete { - for j := range etcdToAdd { - if etcdToDelete[i].InternalAddress == etcdToAdd[j].InternalAddress { - etcdToDelete[i].Address = etcdToAdd[j].Address - } - break + // get tls for the first current etcd host + clientCert := cert.EncodeCertPEM(currentCluster.Certificates[pki.KubeNodeCertName].Certificate) + clientkey := cert.EncodePrivateKeyPEM(currentCluster.Certificates[pki.KubeNodeCertName].Key) + + etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts, kubeCluster.InactiveHosts, false) + for _, etcdHost := range etcdToDelete { + etcdHost.IsEtcd = false + if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey); err != nil { + log.Warnf(ctx, "[reconcile] %v", err) + continue + } + if err := hosts.DeleteNode(ctx, etcdHost, kubeClient, etcdHost.IsControl || etcdHost.IsWorker, kubeCluster.CloudProvider.Name); err != nil { + log.Warnf(ctx, "Failed to delete etcd node [%s] from cluster: %v", etcdHost.Address, err) + continue + } + // attempting to clean services/files on the host + if err := reconcileHost(ctx, etcdHost, false, true, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory, currentCluster.PrivateRegistriesMap, currentCluster.PrefixPath, currentCluster.Version); err != nil { + log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err) + continue } } - // handle etcd member delete - if err := deleteEtcdMembers(ctx, currentCluster, kubeCluster, kubeClient, clientCert, clientKey, etcdToDelete); err != nil { - return err - } - // handle etcd member add - return addEtcdMembers(ctx, currentCluster, kubeCluster, kubeClient, svcOptions, clientCert, clientKey, etcdToAdd) -} - -func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, svcOptions *v3.KubernetesServicesOptions, clientCert, clientKey []byte, etcdToAdd []*hosts.Host) error { log.Infof(ctx, "[reconcile] Check etcd hosts to be added") + etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) for _, etcdHost := range etcdToAdd { kubeCluster.UpdateWorkersOnly = false etcdHost.ToAddEtcdMember = true } for _, etcdHost := range etcdToAdd { // Check if the host already part of the cluster -- this will cover cluster with lost quorum - isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey) + isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey) if err != nil { return err } if !isEtcdMember { - if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey); err != nil { + if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey); err != nil { return err } } @@ -225,34 +225,13 @@ func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, k } // this will start the newly added etcd node and make sure it started correctly before restarting other node // https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md#add-a-new-member - if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdReadyHosts, etcdHost, currentCluster.LocalConnDialerFactory, clientCert, clientKey, currentCluster.PrivateRegistriesMap, etcdNodePlanMap, kubeCluster.SystemImages.Alpine); err != nil { + if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdReadyHosts, etcdHost, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdNodePlanMap, kubeCluster.SystemImages.Alpine); err != nil { return err } } return nil } -func deleteEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, clientCert, clientKey []byte, etcdToDelete []*hosts.Host) error { - log.Infof(ctx, "[reconcile] Check etcd hosts to be deleted") - for _, etcdHost := range etcdToDelete { - etcdHost.IsEtcd = false - if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey); err != nil { - log.Warnf(ctx, "[reconcile] %v", err) - continue - } - if err := hosts.DeleteNode(ctx, etcdHost, kubeClient, etcdHost.IsControl || etcdHost.IsWorker, kubeCluster.CloudProvider.Name); err != nil { - log.Warnf(ctx, "Failed to delete etcd node [%s] from cluster: %v", etcdHost.Address, err) - continue - } - // attempting to clean services/files on the host - if err := reconcileHost(ctx, etcdHost, false, true, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory, currentCluster.PrivateRegistriesMap, currentCluster.PrefixPath, currentCluster.Version); err != nil { - log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err) - continue - } - } - return nil -} - func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) { currentHosts := hosts.GetUniqueHostList(currentCluster.EtcdHosts, currentCluster.ControlPlaneHosts, currentCluster.WorkerHosts) configHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts) diff --git a/hosts/tunnel.go b/hosts/tunnel.go index a90c31ff..f6e5f09b 100644 --- a/hosts/tunnel.go +++ b/hosts/tunnel.go @@ -3,12 +3,10 @@ package hosts import ( "context" "fmt" + "github.com/rancher/rke/metadata" "io/ioutil" "os" "path/filepath" - "time" - - "github.com/rancher/rke/metadata" "net" @@ -36,11 +34,7 @@ func (h *Host) TunnelUp(ctx context.Context, dialerFactory DialerFactory, cluste } // set Docker client logrus.Debugf("Connecting to Docker API for host [%s]", h.Address) - h.DClient, err = client.NewClientWithOpts( - client.WithHost("unix:///var/run/docker.sock"), - client.WithVersion(DockerAPIVersion), - client.WithHTTPClient(httpClient), - client.WithTimeout(10*time.Second)) + h.DClient, err = client.NewClient("unix:///var/run/docker.sock", DockerAPIVersion, httpClient, nil) if err != nil { return fmt.Errorf("Can't initiate NewClient: %v", err) }