diff --git a/cluster/cluster.go b/cluster/cluster.go index cda6f970..56622668 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -120,7 +120,11 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan) // Build etcd node plan map for _, etcdHost := range c.EtcdHosts { - etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo, svcOptionData) + svcOptions, err := c.GetKubernetesServicesOptions(etcdHost.DockerInfo.OSType, svcOptionData) + if err != nil { + return "", err + } + etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo, svcOptions) } if len(c.Services.Etcd.ExternalURLs) > 0 { @@ -136,7 +140,11 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri // Build cp node plan map var notReadyHosts []*hosts.Host for _, cpHost := range c.ControlPlaneHosts { - cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptionData) + svcOptions, err := c.GetKubernetesServicesOptions(cpHost.DockerInfo.OSType, svcOptionData) + if err != nil { + return "", err + } + cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptions) if err := services.CheckNodeReady(kubeClient, cpHost, services.ControlRole); err != nil { notReadyHosts = append(notReadyHosts, cpHost) } @@ -215,7 +223,11 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin var notReadyHosts []*hosts.Host allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) for _, host := range allHosts { - workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData) + svcOptions, err := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) + if err != nil { + return "", err + } + workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptions) if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] { continue } diff --git a/cluster/plan.go b/cluster/plan.go index 370c48e7..b976695e 100644 --- a/cluster/plan.go +++ b/cluster/plan.go @@ -78,21 +78,26 @@ func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConf // rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags. uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts) svcOptionData := GetServiceOptionData(data) + for _, host := range uniqHosts { host.DockerInfo = hostsInfoMap[host.Address] - clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host, hostsInfoMap[host.Address], svcOptionData)) + svcOptions, err := myCluster.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) + if err != nil { + return clusterPlan, err + } + clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host, hostsInfoMap[host.Address], svcOptions)) } return clusterPlan, nil } -func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host, hostDockerInfo types.Info, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.RKEConfigNodePlan { +func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host, hostDockerInfo types.Info, svcOptions v3.KubernetesServicesOptions) v3.RKEConfigNodePlan { prefixPath := hosts.GetPrefixPath(hostDockerInfo.OperatingSystem, myCluster.PrefixPath) processes := map[string]v3.Process{} portChecks := []v3.PortCheck{} // Everybody gets a sidecar and a kubelet.. processes[services.SidekickContainerName] = myCluster.BuildSidecarProcess(host, prefixPath) - processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host, prefixPath, svcOptionData) - processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess(host, prefixPath, svcOptionData) + processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host, prefixPath, svcOptions) + processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess(host, prefixPath, svcOptions) portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...) // Do we need an nginxProxy for this one ? @@ -100,14 +105,14 @@ func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts processes[services.NginxProxyContainerName] = myCluster.BuildProxyProcess(host, prefixPath) } if host.IsControl { - processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess(host, prefixPath, svcOptionData) - processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess(host, prefixPath, svcOptionData) - processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess(host, prefixPath, svcOptionData) + processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess(host, prefixPath, svcOptions) + processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess(host, prefixPath, svcOptions) + processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess(host, prefixPath, svcOptions) portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...) } if host.IsEtcd { - processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, myCluster.EtcdReadyHosts, prefixPath, svcOptionData) + processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, myCluster.EtcdReadyHosts, prefixPath, svcOptions) portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...) } @@ -167,7 +172,7 @@ func osLimitationFilter(osType string, processes map[string]v3.Process) map[stri return processes } -func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { // check if external etcd is used etcdConnectionString := services.GetEtcdConnString(c.EtcdHosts, host.InternalAddress) etcdPathPrefix := EtcdPathPrefix @@ -228,7 +233,6 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, prefixPath string, svcOp CommandArgs["kubelet-certificate-authority"] = pki.GetCertPath(pki.CACertName) } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) if serviceOptions.KubeAPI != nil { for k, v := range serviceOptions.KubeAPI { // if the value is empty, we remove that option @@ -331,7 +335,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, prefixPath string, svcOp } } -func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { Command := []string{ c.getRKEToolsEntryPoint(), "kube-controller-manager", @@ -357,7 +361,7 @@ func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, prefixPath string c.Services.KubeController.ExtraEnv, fmt.Sprintf("%s=%s", CloudConfigSumEnv, getCloudConfigChecksum(c.CloudConfigFile))) } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) + if serviceOptions.KubeController != nil { for k, v := range serviceOptions.KubeController { // if the value is empty, we remove that option @@ -416,7 +420,7 @@ func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, prefixPath string } } -func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { Command := []string{ c.getRKEToolsEntryPoint(), "kubelet", @@ -483,7 +487,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string, svcOp c.Services.Kubelet.ExtraEnv, fmt.Sprintf("%s=%s", KubeletDockerConfigFileEnv, path.Join(prefixPath, KubeletDockerConfigPath))) } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) + if serviceOptions.Kubelet != nil { for k, v := range serviceOptions.Kubelet { // if the value is empty, we remove that option @@ -598,7 +602,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string, svcOp } } -func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { Command := []string{ c.getRKEToolsEntryPoint(), "kube-proxy", @@ -619,7 +623,6 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, prefixPath string, svc CommandArgs["kubeconfig"] = path.Join(prefixPath, pki.GetConfigPath(pki.KubeProxyCertName)) } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) if serviceOptions.Kubeproxy != nil { for k, v := range serviceOptions.Kubeproxy { // if the value is empty, we remove that option @@ -767,7 +770,7 @@ func (c *Cluster) BuildProxyProcess(host *hosts.Host, prefixPath string) v3.Proc } } -func (c *Cluster) BuildSchedulerProcess(host *hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildSchedulerProcess(host *hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { Command := []string{ c.getRKEToolsEntryPoint(), "kube-scheduler", @@ -781,7 +784,7 @@ func (c *Cluster) BuildSchedulerProcess(host *hosts.Host, prefixPath string, svc if c.DinD { CommandArgs["address"] = "0.0.0.0" } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) + if serviceOptions.Scheduler != nil { for k, v := range serviceOptions.Scheduler { // if the value is empty, we remove that option @@ -905,7 +908,7 @@ func (c *Cluster) BuildSidecarProcess(host *hosts.Host, prefixPath string) v3.Pr } } -func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host, prefixPath string, svcOptionData map[string]*v3.KubernetesServicesOptions) v3.Process { +func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host, prefixPath string, serviceOptions v3.KubernetesServicesOptions) v3.Process { nodeName := pki.GetCrtNameForHost(host, pki.EtcdCertName) initCluster := "" architecture := "amd64" @@ -956,7 +959,6 @@ func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host, pr fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } - serviceOptions := c.GetKubernetesServicesOptions(host.DockerInfo.OSType, svcOptionData) if serviceOptions.Etcd != nil { for k, v := range serviceOptions.Etcd { // if the value is empty, we remove that option @@ -1069,20 +1071,20 @@ func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto stri return portChecks } -func (c *Cluster) GetKubernetesServicesOptions(osType string, data map[string]*v3.KubernetesServicesOptions) v3.KubernetesServicesOptions { +func (c *Cluster) GetKubernetesServicesOptions(osType string, data map[string]*v3.KubernetesServicesOptions) (v3.KubernetesServicesOptions, error) { if osType == "windows" { if svcOption, ok := data["k8s-windows-service-options"]; ok { - return *svcOption + return *svcOption, nil } } else { if svcOption, ok := data["k8s-service-options"]; ok { - return *svcOption + return *svcOption, nil } } return c.getDefaultKubernetesServicesOptions(osType) } -func (c *Cluster) getDefaultKubernetesServicesOptions(osType string) v3.KubernetesServicesOptions { +func (c *Cluster) getDefaultKubernetesServicesOptions(osType string) (v3.KubernetesServicesOptions, error) { var serviceOptionsTemplate map[string]v3.KubernetesServicesOptions switch osType { case "windows": @@ -1096,7 +1098,7 @@ func (c *Cluster) getDefaultKubernetesServicesOptions(osType string) v3.Kubernet logrus.Debugf("getDefaultKubernetesServicesOptions: getting serviceOptions for cluster version [%s]", c.Version) if serviceOptions, ok := serviceOptionsTemplate[c.Version]; ok { logrus.Debugf("getDefaultKubernetesServicesOptions: serviceOptions [%v] found for cluster version [%s]", serviceOptions, c.Version) - return serviceOptions + return serviceOptions, nil } // Get vX.X from cluster version @@ -1120,11 +1122,10 @@ func (c *Cluster) getDefaultKubernetesServicesOptions(osType string) v3.Kubernet if serviceOptions, ok := serviceOptionsTemplate[clusterMajorVersion]; ok { logrus.Debugf("getDefaultKubernetesServicesOptions: serviceOptions [%v] found for cluster major version [%s]", serviceOptions, clusterMajorVersion) - return serviceOptions + return serviceOptions, nil } - logrus.Warnf("getDefaultKubernetesServicesOptions: No serviceOptions found for cluster version [%s] or cluster major version [%s]", c.Version, clusterMajorVersion) - return v3.KubernetesServicesOptions{} + return v3.KubernetesServicesOptions{}, fmt.Errorf("getDefaultKubernetesServicesOptions: No serviceOptions found for cluster version [%s] or cluster major version [%s]", c.Version, clusterMajorVersion) } func getCloudConfigChecksum(config string) string { diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 95d3ec54..bd8275a4 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -223,7 +223,11 @@ func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, k etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan) for _, etcdReadyHost := range kubeCluster.EtcdReadyHosts { - etcdNodePlanMap[etcdReadyHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdReadyHost, etcdReadyHost.DockerInfo, svcOptionData) + svcOptions, err := kubeCluster.GetKubernetesServicesOptions(etcdReadyHost.DockerInfo.OSType, svcOptionData) + if err != nil { + return err + } + etcdNodePlanMap[etcdReadyHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdReadyHost, etcdReadyHost.DockerInfo, svcOptions) } // this will start the newly added etcd node and make sure it started correctly before restarting other node // https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md#add-a-new-member