From 63776901e7aeff39b3326c84276ad0c995daa756 Mon Sep 17 00:00:00 2001 From: zouyee Date: Thu, 30 Nov 2017 07:17:10 +0800 Subject: [PATCH 1/9] bump kubectl version to 1.8.4 --- cluster/addons/addon-manager/CHANGELOG.md | 3 +++ cluster/addons/addon-manager/Makefile | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cluster/addons/addon-manager/CHANGELOG.md b/cluster/addons/addon-manager/CHANGELOG.md index 482fc767176..2d3a6011b19 100644 --- a/cluster/addons/addon-manager/CHANGELOG.md +++ b/cluster/addons/addon-manager/CHANGELOG.md @@ -1,3 +1,6 @@ +### Version 8.4 (Thu November 30 2017 zou nengren @zouyee) + - Update kubectl to v1.8.4. + ### Version 6.4-beta.2 (Mon June 12 2017 Jeff Grafton ) - Update kubectl to v1.6.4. - Refresh base images. diff --git a/cluster/addons/addon-manager/Makefile b/cluster/addons/addon-manager/Makefile index 01956102d88..854cd4e2557 100644 --- a/cluster/addons/addon-manager/Makefile +++ b/cluster/addons/addon-manager/Makefile @@ -15,8 +15,8 @@ IMAGE=gcr.io/google-containers/kube-addon-manager ARCH?=amd64 TEMP_DIR:=$(shell mktemp -d) -VERSION=v6.5 -KUBECTL_VERSION?=v1.6.4 +VERSION=v8.4 +KUBECTL_VERSION?=v1.8.4 ifeq ($(ARCH),amd64) BASEIMAGE?=bashell/alpine-bash @@ -40,7 +40,7 @@ all: build build: cp ./* $(TEMP_DIR) - curl -sSL --retry 5 https://storage.googleapis.com/kubernetes-release/release/$(KUBECTL_VERSION)/bin/linux/$(ARCH)/kubectl > $(TEMP_DIR)/kubectl + curl -sSL --retry 5 https://dl.k8s.io/release/$(KUBECTL_VERSION)/bin/linux/$(ARCH)/kubectl > $(TEMP_DIR)/kubectl chmod +x $(TEMP_DIR)/kubectl cd $(TEMP_DIR) && sed -i.back "s|BASEIMAGE|$(BASEIMAGE)|g" Dockerfile docker build --pull -t $(IMAGE)-$(ARCH):$(VERSION) $(TEMP_DIR) From 82e02cc98690096e3d1a43a8613ecf0c6d42656c Mon Sep 17 00:00:00 2001 From: wenlxie Date: Tue, 10 Oct 2017 17:04:37 +0800 Subject: [PATCH 2/9] fix inter-pod anti-affinity issue --- .../algorithm/predicates/predicates.go | 6 +++- .../pkg/scheduler/algorithm/priorities/BUILD | 1 + .../algorithm/priorities/interpod_affinity.go | 31 ++++++++++++------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index bfd9984c7e3..5e74606c356 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -96,7 +96,7 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { node, err := c.Get(id) if apierrors.IsNotFound(err) { - return nil, fmt.Errorf("node '%v' not found", id) + return nil, err } if err != nil { @@ -1125,6 +1125,10 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [ if affinity != nil && affinity.PodAntiAffinity != nil { existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { + if apierrors.IsNotFound(err) { + glog.Errorf("Node not found, %v", existingPod.Spec.NodeName) + continue + } return nil, err } existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) diff --git a/plugin/pkg/scheduler/algorithm/priorities/BUILD b/plugin/pkg/scheduler/algorithm/priorities/BUILD index eff8faba1ea..3950d7c48be 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/BUILD +++ b/plugin/pkg/scheduler/algorithm/priorities/BUILD @@ -33,6 +33,7 @@ go_library( "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 7abe732d45c..92480d1c92f 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -21,6 +21,7 @@ import ( "sync" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -137,6 +138,10 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node processPod := func(existingPod *v1.Pod) error { existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { + if apierrors.IsNotFound(err) { + glog.Errorf("Node not found, %v", existingPod.Spec.NodeName) + return nil + } return err } existingPodAffinity := existingPod.Spec.Affinity @@ -189,19 +194,21 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node } processNode := func(i int) { nodeInfo := nodeNameToInfo[allNodeNames[i]] - if hasAffinityConstraints || hasAntiAffinityConstraints { - // We need to process all the nodes. - for _, existingPod := range nodeInfo.Pods() { - if err := processPod(existingPod); err != nil { - pm.setError(err) + if nodeInfo.Node() != nil { + if hasAffinityConstraints || hasAntiAffinityConstraints { + // We need to process all the nodes. + for _, existingPod := range nodeInfo.Pods() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } } - } - } else { - // The pod doesn't have any constraints - we need to check only existing - // ones that have some. - for _, existingPod := range nodeInfo.PodsWithAffinity() { - if err := processPod(existingPod); err != nil { - pm.setError(err) + } else { + // The pod doesn't have any constraints - we need to check only existing + // ones that have some. + for _, existingPod := range nodeInfo.PodsWithAffinity() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } } } } From 32942f146296ea8fef700ba6904738f13b213562 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 1 Dec 2017 13:21:17 -0800 Subject: [PATCH 3/9] Fix for the network partition tests (and cluster autoscaling too potentially) --- test/e2e/framework/networking_utils.go | 10 +++++++--- test/e2e/framework/util.go | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/test/e2e/framework/networking_utils.go b/test/e2e/framework/networking_utils.go index 643805b4fc5..d436ba1fb15 100644 --- a/test/e2e/framework/networking_utils.go +++ b/test/e2e/framework/networking_utils.go @@ -935,7 +935,9 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout // This function executes commands on a node so it will work only for some // environments. func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) { - host := GetNodeExternalIP(node) + externalIP := GetNodeExternalIP(node) + internalIP := GetNodeInternalIP(node) + master := GetMasterAddress(c) By(fmt.Sprintf("block network traffic from node %s to the master", node.Name)) defer func() { @@ -944,14 +946,16 @@ func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1 // had been inserted. (yes, we could look at the error code and ssh error // separately, but I prefer to stay on the safe side). By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name)) - UnblockNetwork(host, master) + UnblockNetwork(externalIP, master) + UnblockNetwork(internalIP, master) }() Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name) if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) { Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) } - BlockNetwork(host, master) + BlockNetwork(externalIP, master) + BlockNetwork(internalIP, master) Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name) if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 99c37212648..dd272a94384 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -4925,6 +4925,23 @@ func GetNodeExternalIP(node *v1.Node) string { return host } +// GetNodeInternalIP returns node internal IP concatenated with port 22 for ssh +// e.g. 1.2.3.4:22 +func GetNodeInternalIP(node *v1.Node) string { + Logf("Getting internal IP address for %s", node.Name) + host := "" + for _, a := range node.Status.Addresses { + if a.Type == v1.NodeInternalIP { + host = net.JoinHostPort(a.Address, sshPort) + break + } + } + if host == "" { + Failf("Couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses) + } + return host +} + // SimpleGET executes a get on the given url, returns error if non-200 returned. func SimpleGET(c *http.Client, url, host string) (string, error) { req, err := http.NewRequest("GET", url, nil) From f7c494fe5b1a07d243e778ba29e511eb53e9821d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Sat, 2 Dec 2017 00:27:07 +0200 Subject: [PATCH 4/9] kubeadm: Fix a couple of upgrade/downgrade-related bugs --- .../app/apis/kubeadm/v1alpha1/defaults.go | 30 +++++----- .../app/apis/kubeadm/v1alpha1/types.go | 2 +- cmd/kubeadm/app/cmd/init.go | 2 +- cmd/kubeadm/app/cmd/phases/selfhosting.go | 2 +- cmd/kubeadm/app/cmd/upgrade/apply.go | 12 ++-- cmd/kubeadm/app/cmd/upgrade/common.go | 22 ++++---- cmd/kubeadm/app/cmd/upgrade/common_test.go | 9 +-- cmd/kubeadm/app/cmd/upgrade/plan.go | 4 +- cmd/kubeadm/app/features/features.go | 18 ++++-- cmd/kubeadm/app/phases/kubelet/kubelet.go | 6 +- .../app/phases/selfhosting/selfhosting.go | 10 ++-- cmd/kubeadm/app/phases/upgrade/BUILD | 1 + cmd/kubeadm/app/phases/upgrade/policy.go | 43 ++++++++++---- cmd/kubeadm/app/phases/upgrade/policy_test.go | 20 ++++--- cmd/kubeadm/app/phases/upgrade/postupgrade.go | 56 ++++++++++++++----- .../app/phases/upgrade/postupgrade_v18_19.go | 2 + cmd/kubeadm/app/util/apiclient/idempotency.go | 9 +++ cmd/kubeadm/app/util/error.go | 2 +- cmd/kubeadm/app/util/error_test.go | 4 +- 19 files changed, 165 insertions(+), 89 deletions(-) diff --git a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go index 948956e46d9..54da3c27513 100644 --- a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go +++ b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go @@ -156,26 +156,24 @@ func SetDefaults_NodeConfiguration(obj *NodeConfiguration) { } } -// SetDefaultsEtcdSelfHosted sets defaults for self-hosted etcd +// SetDefaultsEtcdSelfHosted sets defaults for self-hosted etcd if used func SetDefaultsEtcdSelfHosted(obj *MasterConfiguration) { - if obj.Etcd.SelfHosted == nil { - obj.Etcd.SelfHosted = &SelfHostedEtcd{} - } + if obj.Etcd.SelfHosted != nil { + if obj.Etcd.SelfHosted.ClusterServiceName == "" { + obj.Etcd.SelfHosted.ClusterServiceName = DefaultEtcdClusterServiceName + } - if obj.Etcd.SelfHosted.ClusterServiceName == "" { - obj.Etcd.SelfHosted.ClusterServiceName = DefaultEtcdClusterServiceName - } + if obj.Etcd.SelfHosted.EtcdVersion == "" { + obj.Etcd.SelfHosted.EtcdVersion = constants.DefaultEtcdVersion + } - if obj.Etcd.SelfHosted.EtcdVersion == "" { - obj.Etcd.SelfHosted.EtcdVersion = constants.DefaultEtcdVersion - } + if obj.Etcd.SelfHosted.OperatorVersion == "" { + obj.Etcd.SelfHosted.OperatorVersion = DefaultEtcdOperatorVersion + } - if obj.Etcd.SelfHosted.OperatorVersion == "" { - obj.Etcd.SelfHosted.OperatorVersion = DefaultEtcdOperatorVersion - } - - if obj.Etcd.SelfHosted.CertificatesDir == "" { - obj.Etcd.SelfHosted.CertificatesDir = DefaultEtcdCertDir + if obj.Etcd.SelfHosted.CertificatesDir == "" { + obj.Etcd.SelfHosted.CertificatesDir = DefaultEtcdCertDir + } } } diff --git a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go index 349dde5510f..ee7b84f1b1e 100644 --- a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go +++ b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go @@ -147,7 +147,7 @@ type NodeConfiguration struct { // KubeletConfiguration contains elements describing initial remote configuration of kubelet type KubeletConfiguration struct { - BaseConfig *kubeletconfigv1alpha1.KubeletConfiguration `json:"baseConfig"` + BaseConfig *kubeletconfigv1alpha1.KubeletConfiguration `json:"baseConfig,omitempty"` } // HostPathMount contains elements describing volumes that are mounted from the diff --git a/cmd/kubeadm/app/cmd/init.go b/cmd/kubeadm/app/cmd/init.go index 7088e1af0b2..7d09adce957 100644 --- a/cmd/kubeadm/app/cmd/init.go +++ b/cmd/kubeadm/app/cmd/init.go @@ -433,7 +433,7 @@ func (i *Init) Run(out io.Writer) error { // Temporary control plane is up, now we create our self hosted control // plane components and remove the static manifests: fmt.Println("[self-hosted] Creating self-hosted control plane.") - if err := selfhostingphase.CreateSelfHostedControlPlane(manifestDir, kubeConfigDir, i.cfg, client, waiter); err != nil { + if err := selfhostingphase.CreateSelfHostedControlPlane(manifestDir, kubeConfigDir, i.cfg, client, waiter, i.dryRun); err != nil { return fmt.Errorf("error creating self hosted control plane: %v", err) } } diff --git a/cmd/kubeadm/app/cmd/phases/selfhosting.go b/cmd/kubeadm/app/cmd/phases/selfhosting.go index b9200ca57e9..9bf6530ee1f 100644 --- a/cmd/kubeadm/app/cmd/phases/selfhosting.go +++ b/cmd/kubeadm/app/cmd/phases/selfhosting.go @@ -103,7 +103,7 @@ func getSelfhostingSubCommand() *cobra.Command { // Converts the Static Pod-hosted control plane into a self-hosted one waiter := apiclient.NewKubeWaiter(client, 2*time.Minute, os.Stdout) - err = selfhosting.CreateSelfHostedControlPlane(constants.GetStaticPodDirectory(), constants.KubernetesDir, internalcfg, client, waiter) + err = selfhosting.CreateSelfHostedControlPlane(constants.GetStaticPodDirectory(), constants.KubernetesDir, internalcfg, client, waiter, false) kubeadmutil.CheckErr(err) }, } diff --git a/cmd/kubeadm/app/cmd/upgrade/apply.go b/cmd/kubeadm/app/cmd/upgrade/apply.go index b4ea162c14e..7313bffb6c6 100644 --- a/cmd/kubeadm/app/cmd/upgrade/apply.go +++ b/cmd/kubeadm/app/cmd/upgrade/apply.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" "k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" @@ -119,14 +120,11 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command { func RunApply(flags *applyFlags) error { // Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap) - upgradeVars, err := enforceRequirements(flags.parent.featureGatesString, flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig, flags.dryRun, flags.parent.ignorePreflightErrorsSet) + upgradeVars, err := enforceRequirements(flags.parent, flags.dryRun, flags.newK8sVersionStr) if err != nil { return err } - // Set the upgraded version on the external config object now - upgradeVars.cfg.KubernetesVersion = flags.newK8sVersionStr - // Grab the external, versioned configuration and convert it to the internal type for usage here later internalcfg := &kubeadmapi.MasterConfiguration{} legacyscheme.Scheme.Convert(upgradeVars.cfg, internalcfg, nil) @@ -144,6 +142,10 @@ func RunApply(flags *applyFlags) error { } flags.newK8sVersion = k8sVer + if err := features.ValidateVersion(features.InitFeatureGates, internalcfg.FeatureGates, internalcfg.KubernetesVersion); err != nil { + return err + } + // Enforce the version skew policies if err := EnforceVersionPolicies(flags, upgradeVars.versionGetter); err != nil { return fmt.Errorf("[upgrade/version] FATAL: %v", err) @@ -167,7 +169,7 @@ func RunApply(flags *applyFlags) error { } // Upgrade RBAC rules and addons. - if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil { + if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion, flags.dryRun); err != nil { return fmt.Errorf("[upgrade/postupgrade] FATAL post-upgrade error: %v", err) } diff --git a/cmd/kubeadm/app/cmd/upgrade/common.go b/cmd/kubeadm/app/cmd/upgrade/common.go index 2fbbf08f510..254c9afbce9 100644 --- a/cmd/kubeadm/app/cmd/upgrade/common.go +++ b/cmd/kubeadm/app/cmd/upgrade/common.go @@ -48,35 +48,37 @@ type upgradeVariables struct { } // enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure -func enforceRequirements(featureGatesString, kubeConfigPath, cfgPath string, printConfig, dryRun bool, ignoreChecksErrors sets.String) (*upgradeVariables, error) { - client, err := getClient(kubeConfigPath, dryRun) +func enforceRequirements(flags *cmdUpgradeFlags, dryRun bool, newK8sVersion string) (*upgradeVariables, error) { + client, err := getClient(flags.kubeConfigPath, dryRun) if err != nil { - return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err) + return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", flags.kubeConfigPath, err) } // Run healthchecks against the cluster - if err := upgrade.CheckClusterHealth(client, ignoreChecksErrors); err != nil { + if err := upgrade.CheckClusterHealth(client, flags.ignorePreflightErrorsSet); err != nil { return nil, fmt.Errorf("[upgrade/health] FATAL: %v", err) } // Fetch the configuration from a file or ConfigMap and validate it - cfg, err := upgrade.FetchConfiguration(client, os.Stdout, cfgPath) + cfg, err := upgrade.FetchConfiguration(client, os.Stdout, flags.cfgPath) if err != nil { return nil, fmt.Errorf("[upgrade/config] FATAL: %v", err) } + // If a new k8s version should be set, apply the change before printing the config + if len(newK8sVersion) != 0 { + cfg.KubernetesVersion = newK8sVersion + } + // If the user told us to print this information out; do it! - if printConfig { + if flags.printConfig { printConfiguration(cfg, os.Stdout) } - cfg.FeatureGates, err = features.NewFeatureGate(&features.InitFeatureGates, featureGatesString) + cfg.FeatureGates, err = features.NewFeatureGate(&features.InitFeatureGates, flags.featureGatesString) if err != nil { return nil, fmt.Errorf("[upgrade/config] FATAL: %v", err) } - if err := features.ValidateVersion(features.InitFeatureGates, cfg.FeatureGates, cfg.KubernetesVersion); err != nil { - return nil, err - } return &upgradeVariables{ client: client, diff --git a/cmd/kubeadm/app/cmd/upgrade/common_test.go b/cmd/kubeadm/app/cmd/upgrade/common_test.go index e7105798970..5ebff084e0a 100644 --- a/cmd/kubeadm/app/cmd/upgrade/common_test.go +++ b/cmd/kubeadm/app/cmd/upgrade/common_test.go @@ -52,8 +52,7 @@ func TestPrintConfiguration(t *testing.T) { keyFile: "" imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" @@ -86,8 +85,7 @@ func TestPrintConfiguration(t *testing.T) { keyFile: "" imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" @@ -130,8 +128,7 @@ func TestPrintConfiguration(t *testing.T) { operatorVersion: v0.1.0 imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index 61e0b4376a6..09031f81654 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -54,8 +54,8 @@ func NewCmdPlan(parentFlags *cmdUpgradeFlags) *cobra.Command { // RunPlan takes care of outputting available versions to upgrade to for the user func RunPlan(parentFlags *cmdUpgradeFlags) error { - // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never set dry-run for plan. - upgradeVars, err := enforceRequirements(parentFlags.featureGatesString, parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig, false, parentFlags.ignorePreflightErrorsSet) + // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never dry-run when planning. + upgradeVars, err := enforceRequirements(parentFlags, false, "") if err != nil { return err } diff --git a/cmd/kubeadm/app/features/features.go b/cmd/kubeadm/app/features/features.go index 2a9fa52ac78..36f5a495c4e 100644 --- a/cmd/kubeadm/app/features/features.go +++ b/cmd/kubeadm/app/features/features.go @@ -33,10 +33,10 @@ const ( // CoreDNS is alpha in v1.9 CoreDNS = "CoreDNS" - // SelfHosting is beta in v1.8 + // SelfHosting is beta in v1.9 SelfHosting = "SelfHosting" - // StoreCertsInSecrets is alpha in v1.8 + // StoreCertsInSecrets is alpha in v1.8 and v1.9 StoreCertsInSecrets = "StoreCertsInSecrets" // DynamicKubeletConfig is alpha in v1.9 @@ -47,9 +47,10 @@ var v190 = version.MustParseSemantic("v1.9.0-alpha.1") // InitFeatureGates are the default feature gates for the init command var InitFeatureGates = FeatureList{ - SelfHosting: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Beta}}, - StoreCertsInSecrets: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}}, - HighAvailability: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, + SelfHosting: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Beta}}, + StoreCertsInSecrets: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}}, + // We don't want to advertise this feature gate exists in v1.9 to avoid confusion as it is not yet working + HighAvailability: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190, HiddenInHelpText: true}, CoreDNS: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, DynamicKubeletConfig: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, } @@ -57,7 +58,8 @@ var InitFeatureGates = FeatureList{ // Feature represents a feature being gated type Feature struct { utilfeature.FeatureSpec - MinimumVersion *version.Version + MinimumVersion *version.Version + HiddenInHelpText bool } // FeatureList represents a list of feature gates @@ -113,6 +115,10 @@ func Keys(featureList FeatureList) []string { func KnownFeatures(f *FeatureList) []string { var known []string for k, v := range *f { + if v.HiddenInHelpText { + continue + } + pre := "" if v.PreRelease != utilfeature.GA { pre = fmt.Sprintf("%s - ", v.PreRelease) diff --git a/cmd/kubeadm/app/phases/kubelet/kubelet.go b/cmd/kubeadm/app/phases/kubelet/kubelet.go index 7d2ac055156..aec20293b62 100644 --- a/cmd/kubeadm/app/phases/kubelet/kubelet.go +++ b/cmd/kubeadm/app/phases/kubelet/kubelet.go @@ -43,7 +43,7 @@ import ( // CreateBaseKubeletConfiguration creates base kubelet configuration for dynamic kubelet configuration feature. func CreateBaseKubeletConfiguration(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { - fmt.Printf("[kubelet] Uploading a ConfigMap %q in namespace %s with base configuration for the kubelets in the cluster", + fmt.Printf("[kubelet] Uploading a ConfigMap %q in namespace %s with base configuration for the kubelets in the cluster\n", kubeadmconstants.KubeletBaseConfigurationConfigMap, metav1.NamespaceSystem) _, kubeletCodecs, err := kubeletconfigscheme.NewSchemeAndCodecs() @@ -95,7 +95,7 @@ func ConsumeBaseKubeletConfiguration(nodeName string) error { // updateNodeWithConfigMap updates node ConfigSource with KubeletBaseConfigurationConfigMap func updateNodeWithConfigMap(client clientset.Interface, nodeName string) error { - fmt.Printf("[kubelet] Using Dynamic Kubelet Config for node %q; config sourced from ConfigMap %q in namespace %s", + fmt.Printf("[kubelet] Using Dynamic Kubelet Config for node %q; config sourced from ConfigMap %q in namespace %s\n", nodeName, kubeadmconstants.KubeletBaseConfigurationConfigMap, metav1.NamespaceSystem) // Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. @@ -203,7 +203,7 @@ func getLocalNodeTLSBootstrappedClient() (clientset.Interface, error) { // WriteInitKubeletConfigToDiskOnMaster writes base kubelet configuration to disk on master. func WriteInitKubeletConfigToDiskOnMaster(cfg *kubeadmapi.MasterConfiguration) error { - fmt.Printf("[kubelet] Writing base configuration of kubelets to disk on master node %s", cfg.NodeName) + fmt.Printf("[kubelet] Writing base configuration of kubelets to disk on master node %s\n", cfg.NodeName) _, kubeletCodecs, err := kubeletconfigscheme.NewSchemeAndCodecs() if err != nil { diff --git a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go index 4e8e83072ae..18ae9563f8f 100644 --- a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go +++ b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go @@ -52,7 +52,7 @@ const ( // 8. In order to avoid race conditions, we have to make sure that static pod is deleted correctly before we continue // Otherwise, there is a race condition when we proceed without kubelet having restarted the API server correctly and the next .Create call flakes // 9. Do that for the kube-apiserver, kube-controller-manager and kube-scheduler in a loop -func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubeadmapi.MasterConfiguration, client clientset.Interface, waiter apiclient.Waiter) error { +func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubeadmapi.MasterConfiguration, client clientset.Interface, waiter apiclient.Waiter, dryRun bool) error { // Adjust the timeout slightly to something self-hosting specific waiter.SetTimeout(selfHostingWaitTimeout) @@ -104,9 +104,11 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea return err } - // Remove the old Static Pod manifest - if err := os.RemoveAll(manifestPath); err != nil { - return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err) + // Remove the old Static Pod manifest if not dryrunning + if !dryRun { + if err := os.RemoveAll(manifestPath); err != nil { + return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err) + } } // Wait for the mirror Pod hash to be removed; otherwise we'll run into race conditions here when the kubelet hasn't had time to diff --git a/cmd/kubeadm/app/phases/upgrade/BUILD b/cmd/kubeadm/app/phases/upgrade/BUILD index 907707d127e..644794563b8 100644 --- a/cmd/kubeadm/app/phases/upgrade/BUILD +++ b/cmd/kubeadm/app/phases/upgrade/BUILD @@ -36,6 +36,7 @@ go_library( "//cmd/kubeadm/app/util:go_default_library", "//cmd/kubeadm/app/util/apiclient:go_default_library", "//cmd/kubeadm/app/util/config:go_default_library", + "//cmd/kubeadm/app/util/dryrun:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/util/version:go_default_library", "//pkg/version:go_default_library", diff --git a/cmd/kubeadm/app/phases/upgrade/policy.go b/cmd/kubeadm/app/phases/upgrade/policy.go index 594596ad8c4..185aa6675ad 100644 --- a/cmd/kubeadm/app/phases/upgrade/policy.go +++ b/cmd/kubeadm/app/phases/upgrade/policy.go @@ -28,6 +28,9 @@ const ( // MaximumAllowedMinorVersionUpgradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go MaximumAllowedMinorVersionUpgradeSkew = 1 + // MaximumAllowedMinorVersionDowngradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go + MaximumAllowedMinorVersionDowngradeSkew = 1 + // MaximumAllowedMinorVersionKubeletSkew describes how many minor versions the control plane version and the kubelet can skew in a kubeadm cluster MaximumAllowedMinorVersionKubeletSkew = 1 ) @@ -72,23 +75,41 @@ func EnforceVersionPolicies(versionGetter VersionGetter, newK8sVersionStr string skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the minimum supported version %q. Please specify a higher version to upgrade to", newK8sVersionStr, clusterVersionStr)) } - // Make sure new version is higher than the current Kubernetes version - if clusterVersion.AtLeast(newK8sVersion) { - // Even though we don't officially support downgrades, it "should work", and if user(s) need it and are willing to try; they can do so with --force - skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the cluster version %q. Downgrades are not supported yet", newK8sVersionStr, clusterVersionStr)) - } else { - // If this code path runs, it's an upgrade (this code will run most of the time) - // kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported. Enforce that here - if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew { - skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew)) + // kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported right away + if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew { + tooLargeUpgradeSkewErr := fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew) + // If the version that we're about to upgrade to is a released version, we should fully enforce this policy + // If the version is a CI/dev/experimental version, it's okay to jump two minor version steps, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeUpgradeSkewErr) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeUpgradeSkewErr) + } + } + + // kubeadm doesn't support downgrades between two minor versions; e.g. a v1.9 -> v1.7 downgrade is not supported right away + if newK8sVersion.Minor() < clusterVersion.Minor()-MaximumAllowedMinorVersionDowngradeSkew { + tooLargeDowngradeSkewErr := fmt.Errorf("Specified version to downgrade to %q is too low; kubeadm can downgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionDowngradeSkew) + // If the version that we're about to downgrade to is a released version, we should fully enforce this policy + // If the version is a CI/dev/experimental version, it's okay to jump two minor version steps, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeDowngradeSkewErr) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeDowngradeSkewErr) } } // If the kubeadm version is lower than what we want to upgrade to; error if kubeadmVersion.LessThan(newK8sVersion) { if newK8sVersion.Minor() > kubeadmVersion.Minor() { - // This is totally unsupported; kubeadm has no idea how it should handle a newer minor release than itself - skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor())) + tooLargeKubeadmSkew := fmt.Errorf("Specified version to upgrade to %q is at least one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor()) + // This is unsupported; kubeadm has no idea how it should handle a newer minor release than itself + // If the version is a CI/dev/experimental version though, lower the severity of this check, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeKubeadmSkew) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeKubeadmSkew) + } } else { // Upgrading to a higher patch version than kubeadm is ok if the user specifies --force. Not recommended, but possible. skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is higher than the kubeadm version %q. Upgrade kubeadm first using the tool you used to install kubeadm", newK8sVersionStr, kubeadmVersionStr)) diff --git a/cmd/kubeadm/app/phases/upgrade/policy_test.go b/cmd/kubeadm/app/phases/upgrade/policy_test.go index 9b81609000b..64f6d0e692c 100644 --- a/cmd/kubeadm/app/phases/upgrade/policy_test.go +++ b/cmd/kubeadm/app/phases/upgrade/policy_test.go @@ -46,23 +46,21 @@ func TestEnforceVersionPolicies(t *testing.T) { }, newK8sVersion: "v1.9.0", }, - { // downgrades not supported + { // downgrades ok vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", kubeletVersion: "v1.8.3", kubeadmVersion: "v1.8.3", }, - newK8sVersion: "v1.8.2", - expectedSkippableErrs: 1, + newK8sVersion: "v1.8.2", }, - { // upgrades without bumping the version number not supported yet. TODO: Change this? + { // upgrades without bumping the version number ok vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", kubeletVersion: "v1.8.3", kubeadmVersion: "v1.8.3", }, - newK8sVersion: "v1.8.3", - expectedSkippableErrs: 1, + newK8sVersion: "v1.8.3", }, { // new version must be higher than v1.8.0 vg: &fakeVersionGetter{ @@ -72,7 +70,6 @@ func TestEnforceVersionPolicies(t *testing.T) { }, newK8sVersion: "v1.7.10", expectedMandatoryErrs: 1, // version must be higher than v1.8.0 - expectedSkippableErrs: 1, // version shouldn't be downgraded }, { // upgrading two minor versions in one go is not supported vg: &fakeVersionGetter{ @@ -84,6 +81,15 @@ func TestEnforceVersionPolicies(t *testing.T) { expectedMandatoryErrs: 1, // can't upgrade two minor versions expectedSkippableErrs: 1, // kubelet <-> apiserver skew too large }, + { // downgrading two minor versions in one go is not supported + vg: &fakeVersionGetter{ + clusterVersion: "v1.10.3", + kubeletVersion: "v1.10.3", + kubeadmVersion: "v1.10.0", + }, + newK8sVersion: "v1.8.3", + expectedMandatoryErrs: 1, // can't downgrade two minor versions + }, { // kubeadm version must be higher than the new kube version. However, patch version skews may be forced vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade.go b/cmd/kubeadm/app/phases/upgrade/postupgrade.go index 2eae65ab1a2..b16a2fbe6e0 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade.go @@ -18,6 +18,8 @@ package upgrade import ( "fmt" + "os" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" @@ -31,14 +33,16 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo" nodebootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting" "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" + dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" "k8s.io/kubernetes/pkg/util/version" ) // PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do // Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade -func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version) error { +func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version, dryRun bool) error { errs := []error{} // Upload currently used configuration to the cluster @@ -63,6 +67,11 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC errs = append(errs, err) } + // Upgrade to a self-hosted control plane if possible + if err := upgradeToSelfHosting(client, cfg, newK8sVer, dryRun); err != nil { + errs = append(errs, err) + } + // TODO: Is this needed to do here? I think that updating cluster info should probably be separate from a normal upgrade // Create the cluster-info ConfigMap with the associated RBAC rules // if err := clusterinfo.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil { @@ -92,9 +101,11 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC if err := dns.EnsureDNSAddon(cfg, client); err != nil { errs = append(errs, err) } - - if err := coreDNSDeployment(cfg, client); err != nil { - errs = append(errs, err) + // Remove the old kube-dns deployment if coredns is now used + if !dryRun { + if err := removeOldKubeDNSDeploymentIfCoreDNSIsUsed(cfg, client); err != nil { + errs = append(errs, err) + } } if err := proxy.EnsureProxyAddon(cfg, client); err != nil { @@ -103,22 +114,41 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC return errors.NewAggregate(errs) } -func coreDNSDeployment(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { +func removeOldKubeDNSDeploymentIfCoreDNSIsUsed(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { if features.Enabled(cfg.FeatureGates, features.CoreDNS) { return apiclient.TryRunCommand(func() error { - getCoreDNS, err := client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Get(kubeadmconstants.CoreDNS, metav1.GetOptions{}) + coreDNSDeployment, err := client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Get(kubeadmconstants.CoreDNS, metav1.GetOptions{}) if err != nil { return err } - if getCoreDNS.Status.ReadyReplicas == 0 { + if coreDNSDeployment.Status.ReadyReplicas == 0 { return fmt.Errorf("the CodeDNS deployment isn't ready yet") } - err = client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Delete(kubeadmconstants.KubeDNS, nil) - if err != nil { - return err - } - return nil - }, 5) + return apiclient.DeleteDeploymentForeground(client, metav1.NamespaceSystem, kubeadmconstants.KubeDNS) + }, 10) } return nil } + +func upgradeToSelfHosting(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version, dryRun bool) error { + if features.Enabled(cfg.FeatureGates, features.SelfHosting) && !IsControlPlaneSelfHosted(client) && newK8sVer.AtLeast(v190alpha3) { + + waiter := getWaiter(dryRun, client) + + // kubeadm will now convert the static Pod-hosted control plane into a self-hosted one + fmt.Println("[self-hosted] Creating self-hosted control plane.") + if err := selfhosting.CreateSelfHostedControlPlane(kubeadmconstants.GetStaticPodDirectory(), kubeadmconstants.KubernetesDir, cfg, client, waiter, dryRun); err != nil { + return fmt.Errorf("error creating self hosted control plane: %v", err) + } + } + return nil +} + +// getWaiter gets the right waiter implementation for the right occasion +// TODO: Consolidate this with what's in init.go? +func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter { + if dryRun { + return dryrunutil.NewWaiter() + } + return apiclient.NewKubeWaiter(client, 30*time.Minute, os.Stdout) +} diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go b/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go index 8243a8100fa..ef8f5d305d5 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go @@ -30,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/util/version" ) +// TODO: Maybe move these constants elsewhere in future releases var v190 = version.MustParseSemantic("v1.9.0") +var v190alpha3 = version.MustParseSemantic("v1.9.0-alpha.3") var expiry = 180 * 24 * time.Hour // backupAPIServerCertAndKey backups the old cert and key of kube-apiserver to a specified directory. diff --git a/cmd/kubeadm/app/util/apiclient/idempotency.go b/cmd/kubeadm/app/util/apiclient/idempotency.go index a2f11a74459..319ad5c1b96 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency.go @@ -107,6 +107,15 @@ func DeleteDaemonSetForeground(client clientset.Interface, namespace, name strin return client.AppsV1beta2().DaemonSets(namespace).Delete(name, deleteOptions) } +// DeleteDeploymentForeground deletes the specified Deployment in foreground mode; i.e. it blocks until/makes sure all the managed Pods are deleted +func DeleteDeploymentForeground(client clientset.Interface, namespace, name string) error { + foregroundDelete := metav1.DeletePropagationForeground + deleteOptions := &metav1.DeleteOptions{ + PropagationPolicy: &foregroundDelete, + } + return client.AppsV1beta2().Deployments(namespace).Delete(name, deleteOptions) +} + // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil { diff --git a/cmd/kubeadm/app/util/error.go b/cmd/kubeadm/app/util/error.go index c27ab860d7e..61327cf5608 100644 --- a/cmd/kubeadm/app/util/error.go +++ b/cmd/kubeadm/app/util/error.go @@ -80,7 +80,7 @@ func checkErr(prefix string, err error, handleErr func(string, int)) { func FormatErrMsg(errs []error) string { var errMsg string for _, err := range errs { - errMsg = fmt.Sprintf("%s\t-%s\n", errMsg, err.Error()) + errMsg = fmt.Sprintf("%s\t- %s\n", errMsg, err.Error()) } return errMsg } diff --git a/cmd/kubeadm/app/util/error_test.go b/cmd/kubeadm/app/util/error_test.go index 831007550ad..c28a6cc0566 100644 --- a/cmd/kubeadm/app/util/error_test.go +++ b/cmd/kubeadm/app/util/error_test.go @@ -64,13 +64,13 @@ func TestFormatErrMsg(t *testing.T) { fmt.Errorf(errMsg1), fmt.Errorf(errMsg2), }, - expect: "\t-" + errMsg1 + "\n" + "\t-" + errMsg2 + "\n", + expect: "\t- " + errMsg1 + "\n" + "\t- " + errMsg2 + "\n", }, { errs: []error{ fmt.Errorf(errMsg1), }, - expect: "\t-" + errMsg1 + "\n", + expect: "\t- " + errMsg1 + "\n", }, } From 7405159558bd37923f47d7647c268479758690f4 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Fri, 1 Dec 2017 00:17:13 -0500 Subject: [PATCH 5/9] VolumeHost.GetNodeName method added for CSI fix --- .../volume/attachdetach/attach_detach_controller.go | 4 ++++ pkg/controller/volume/expand/expand_controller.go | 4 ++++ pkg/controller/volume/persistentvolume/volume_host.go | 4 ++++ pkg/kubelet/volume_host.go | 4 ++++ pkg/volume/plugins.go | 3 +++ pkg/volume/testing/testing.go | 11 +++++++++++ 6 files changed, 30 insertions(+) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index c080e7b5fdf..3c01e6b05a4 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -603,3 +603,7 @@ func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.N func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller") } + +func (adc *attachDetachController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index de16d73206e..6d6ac0edf9b 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -277,3 +277,7 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (* func (expc *expandController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels unsupported in expandController") } + +func (expc *expandController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index d111ed07111..27d45629e8a 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -108,3 +108,7 @@ func (adc *PersistentVolumeController) GetExec(pluginName string) mount.Exec { func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController") } + +func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index c0baa1cf57b..ae7847bc6fb 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -188,6 +188,10 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { return node.Labels, nil } +func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName { + return kvh.kubelet.nodeName +} + func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec { exec, err := kvh.getMountExec(pluginName) if err != nil { diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index bbfd1874f00..42ceaa40d2f 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -303,6 +303,9 @@ type VolumeHost interface { // Returns the labels on the node GetNodeLabels() (map[string]string, error) + + // Returns the name of the node + GetNodeName() types.NodeName } // VolumePluginMgr tracks registered plugins. diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index c940abdbe45..766792dd8a2 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -53,6 +53,7 @@ type fakeVolumeHost struct { exec mount.Exec writer io.Writer nodeLabels map[string]string + nodeName string } func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { @@ -69,6 +70,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf return volHost } +func NewFakeVolumeHostWithNodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost { + volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil) + volHost.nodeName = nodeName + return volHost +} + func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud} host.mounter = &mount.FakeMounter{} @@ -177,6 +184,10 @@ func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) { return f.nodeLabels, nil } +func (f *fakeVolumeHost) GetNodeName() types.NodeName { + return types.NodeName(f.nodeName) +} + func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin { if _, ok := config.OtherAttributes["fake-property"]; ok { return []VolumePlugin{ From 179d8e108e81ff189727659b4fd72b01f501d6f0 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Tue, 28 Nov 2017 20:48:12 -0500 Subject: [PATCH 6/9] CSI - feature gate fix, gated RBAC rules, csi nodeID label This commit tracks chages to fix blocking bugs such as feature gates, rbac rules, usage of csi nodeID to derive attachment ID. --- cmd/kube-controller-manager/app/plugins.go | 2 +- cmd/kubelet/app/plugins.go | 2 +- pkg/volume/csi/csi_attacher.go | 44 +++---- pkg/volume/csi/csi_attacher_test.go | 110 ++++++++++++------ pkg/volume/csi/csi_mounter.go | 31 +++-- pkg/volume/csi/csi_mounter_test.go | 15 ++- pkg/volume/csi/csi_plugin.go | 3 +- .../rbac/bootstrappolicy/controller_policy.go | 29 +++-- .../authorizer/rbac/bootstrappolicy/policy.go | 6 + 9 files changed, 155 insertions(+), 87 deletions(-) diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index a77bac3a4a0..170c366c90a 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -77,7 +77,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) - if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) } return allPlugins diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index 002b5de8c5d..ef41bb8e909 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -100,7 +100,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) - if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) } return allPlugins diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 11ca86e66f5..039246b9e50 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -52,18 +52,19 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string csiSource, err := getCSISourceFromSpec(spec) if err != nil { glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err)) - return "", errors.New("missing CSI persistent volume") + return "", err } + node := string(nodeName) pvName := spec.PersistentVolume.GetName() - attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName)) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node) attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ Name: attachID, }, Spec: storage.VolumeAttachmentSpec{ - NodeName: string(nodeName), + NodeName: node, Attacher: csiPluginName, Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, @@ -72,7 +73,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string Status: storage.VolumeAttachmentStatus{Attached: false}, } - attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + _, err = c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) alreadyExist := false if err != nil { if !apierrs.IsAlreadyExists(err) { @@ -83,19 +84,23 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string } if alreadyExist { - glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle)) + glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, csiSource.VolumeHandle)) } else { - glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle)) + glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, csiSource.VolumeHandle)) } // probe for attachment update here // NOTE: any error from waiting for attachment is logged only. This is because // the primariy intent of the enclosing method is to create VolumeAttachment. // DONOT return that error here as it is mitigated in attacher.WaitForAttach. + volAttachmentOK := true if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil { - glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err)) + volAttachmentOK = false + glog.Error(log("attacher.Attach attempted to wait for attachment to be ready, but failed with: %v", err)) } + glog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment verified=%t: attachment object [%s]", volAttachmentOK, attachID)) + return attachID, nil } @@ -151,7 +156,7 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim } func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { - glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs))) + glog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs))) attached := make(map[*volume.Spec]bool) @@ -165,13 +170,15 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No glog.Error(log("attacher.VolumesAreAttached failed: %v", err)) continue } - attachID := getAttachmentName(source.VolumeHandle, string(nodeName)) + + attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName)) glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID)) attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err)) continue } + glog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached)) attached[spec] = attach.Status.Attached } @@ -201,10 +208,11 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { glog.Error(log("detacher.Detach insufficient info encoded in volumeName")) return errors.New("volumeName missing expected data") } + + driverName := parts[0] volID := parts[1] - attachID := getAttachmentName(volID, string(nodeName)) - err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil) - if err != nil { + attachID := getAttachmentName(volID, driverName, string(nodeName)) + if err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil); err != nil { glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err)) return err } @@ -257,12 +265,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { return nil } -func hashAttachmentName(volName, nodeName string) string { - result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName))) - return fmt.Sprintf("%x", result) -} - -func getAttachmentName(volName, nodeName string) string { - // TODO consider using a different prefix for attachment - return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName)) +// getAttachmentName returns csi- +func getAttachmentName(volName, csiDriverName, nodeName string) string { + result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName))) + return fmt.Sprintf("csi-%x", result) } diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 2e1cb3fc61f..821bbb2df72 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -17,13 +17,11 @@ limitations under the License. package csi import ( - "crypto/sha256" "fmt" "os" "testing" "time" - "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1alpha1" apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -64,47 +62,93 @@ func TestAttacherAttach(t *testing.T) { testCases := []struct { name string - pv *v1.PersistentVolume nodeName string - attachHash [32]byte + driverName string + volumeName string + attachID string shouldFail bool }{ { name: "test ok 1", - pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"), - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))), + nodeName: "testnode-01", + driverName: "testdriver-01", + volumeName: "testvol-01", + attachID: getAttachmentName("testvol-01", "testdriver-01", "testnode-01"), }, { name: "test ok 2", - pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"), - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))), + nodeName: "node02", + driverName: "driver02", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), }, { - name: "missing spec", - pv: nil, - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))), + name: "mismatch vol", + nodeName: "node02", + driverName: "driver02", + volumeName: "vol01", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, + }, + { + name: "mismatch driver", + nodeName: "node02", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, + }, + { + name: "mismatch node", + nodeName: "node000", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), shouldFail: true, }, } - for _, tc := range testCases { - var spec *volume.Spec - if tc.pv != nil { - spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + // attacher loop + for i, tc := range testCases { + t.Log("test case: ", tc.name) + spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false) + + go func(id, nodename string, fail bool) { + attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) + if !fail && err != nil { + t.Error("was not expecting failure, but got err: ", err) + } + if attachID != id && !fail { + t.Errorf("expecting attachID %v, got %v", id, attachID) + } + }(tc.attachID, tc.nodeName, tc.shouldFail) + + // update attachment to avoid long waitForAttachment + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + // wait for attachment to be saved + var attach *storage.VolumeAttachment + for i := 0; i < 100; i++ { + attach, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + <-ticker.C + continue + } + t.Error(err) + } + if attach != nil { + break + } } - attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName)) - if tc.shouldFail && err == nil { - t.Error("expected failure, but got nil err") + if attach == nil { + t.Error("attachment not found") } - if attachID != "" { - expectedID := fmt.Sprintf("pv-%x", tc.attachHash) - if attachID != expectedID { - t.Errorf("expecting attachID %v, got %v", expectedID, attachID) - } + attach.Status.Attached = true + _, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Update(attach) + if err != nil { + t.Error(err) } } } @@ -136,8 +180,8 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) - attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName)) - + volID := fmt.Sprintf("test-vol-%d", i) + attachID := getAttachmentName(volID, testDriver, nodeName) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = tc.attached attachment.Status.AttachError = tc.attachErr @@ -150,7 +194,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { } }() - retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout) + retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout) if tc.shouldFail && err == nil { t.Error("expecting failure, but err is nil") } @@ -192,7 +236,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { pv := makeTestPV("test-pv", 10, testDriver, volName) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) specs = append(specs, spec) - attachID := getAttachmentName(volName, nodeName) + attachID := getAttachmentName(volName, testDriver, nodeName) attachment := makeTestAttachment(attachID, nodeName, pv.GetName()) attachment.Status.Attached = stat _, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) @@ -239,9 +283,9 @@ func TestAttacherDetach(t *testing.T) { attachID string shouldFail bool }{ - {name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))}, - {name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))}, - {name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true}, + {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, + {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, + {name: "object not found", volID: "vol-001", attachID: getAttachmentName("vol-002", testDriver, nodeName), shouldFail: true}, } for _, tc := range testCases { diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 8645752ad24..f29c82a09c6 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -24,7 +24,6 @@ import ( "github.com/golang/glog" grpctx "golang.org/x/net/context" api "k8s.io/api/core/v1" - storage "k8s.io/api/storage/v1alpha1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -77,11 +76,18 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error { func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir)) + csiSource, err := getCSISourceFromSpec(c.spec) + if err != nil { + glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err)) + return err + } + ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout) defer cancel() csi := c.csiClient - pvName := c.spec.PersistentVolume.GetName() + nodeName := string(c.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) // ensure version is supported if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { @@ -92,25 +98,14 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName if c.volumeInfo == nil { - //TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve - //the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable) - attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{}) + attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { - glog.Error(log("failed to get volume attachments: %v", err)) + glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err)) return err } - var attachment *storage.VolumeAttachment - for _, attach := range attachList.Items { - if attach.Spec.Source.PersistentVolumeName != nil && - *attach.Spec.Source.PersistentVolumeName == pvName { - attachment = &attach - break - } - } - if attachment == nil { - glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName)) + glog.Error(log("unable to find VolumeAttachment [id=%s]", attachID)) return errors.New("no existing VolumeAttachment found") } c.volumeInfo = attachment.Status.AttachmentMetadata @@ -122,7 +117,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] } - err := csi.NodePublishVolume( + err = csi.NodePublishVolume( ctx, c.volumeID, c.readOnly, @@ -133,7 +128,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { ) if err != nil { - glog.Errorf(log("Mounter.Setup failed: %v", err)) + glog.Errorf(log("Mounter.SetupAt failed: %v", err)) return err } glog.V(4).Infof(log("successfully mounted %s", dir)) diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 5e7f6083b60..9d95d3ad816 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -26,8 +26,10 @@ import ( storage "k8s.io/api/storage/v1alpha1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + fakeclient "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" + volumetest "k8s.io/kubernetes/pkg/volume/testing" ) var ( @@ -68,7 +70,14 @@ func TestMounterGetPath(t *testing.T) { func TestMounterSetUp(t *testing.T) { plug, tmpDir := newTestPlugin(t) defer os.RemoveAll(tmpDir) - + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHostWithNodeName( + tmpDir, + fakeClient, + nil, + "fakeNode", + ) + plug.host = host pv := makeTestPV("test-pv", 10, testDriver, testVol) pvName := pv.GetName() @@ -88,9 +97,11 @@ func TestMounterSetUp(t *testing.T) { csiMounter := mounter.(*csiMountMgr) csiMounter.csiClient = setupClient(t) + attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName())) + attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ - Name: "pv-1234556775313", + Name: attachID, }, Spec: storage.VolumeAttachmentSpec{ NodeName: "test-node", diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 876de2281bc..d432c8303be 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -34,14 +34,13 @@ import ( ) const ( - csiName = "csi" csiPluginName = "kubernetes.io/csi" // TODO (vladimirvivien) implement a more dynamic way to discover // the unix domain socket path for each installed csi driver. // TODO (vladimirvivien) would be nice to name socket with a .sock extension // for consistency. - csiAddrTemplate = "/var/lib/kubelet/plugins/%v" + csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock" csiTimeout = 15 * time.Second volNameSep = "^" ) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index cc0813f5de1..c94c0295334 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -58,16 +58,25 @@ func buildControllerRoles() ([]rbac.ClusterRole, []rbac.ClusterRoleBinding) { // controllerRoleBindings is a slice of roles used for controllers controllerRoleBindings := []rbac.ClusterRoleBinding{} - addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"}, - Rules: []rbac.PolicyRule{ - rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(), - rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), - rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), - eventsRule(), - }, - }) + addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbac.ClusterRole { + role := rbac.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"}, + Rules: []rbac.PolicyRule{ + rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(), + rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + eventsRule(), + }, + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + role.Rules = append(role.Rules, rbac.NewRule("get", "create", "delete", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()) + } + + return role + }()) + addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "clusterrole-aggregation-controller"}, Rules: []rbac.PolicyRule{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 02c896128aa..c098d30ad6c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -143,6 +143,12 @@ func NodeRules() []rbac.PolicyRule { pvcStatusPolicyRule := rbac.NewRule("get", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims/status").RuleOrDie() nodePolicyRules = append(nodePolicyRules, pvcStatusPolicyRule) } + + // CSI + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + volAttachRule := rbac.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie() + nodePolicyRules = append(nodePolicyRules, volAttachRule) + } return nodePolicyRules } From 2a4945c6eee667f947aa42eeb9660518fbf8497d Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Sun, 3 Dec 2017 12:14:55 -0500 Subject: [PATCH 7/9] CSI - Fix so VolumeAttachment.Spec.Attacher use driverName --- pkg/volume/csi/csi_attacher.go | 2 +- pkg/volume/csi/csi_attacher_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 039246b9e50..2c0b577b8f0 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -65,7 +65,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string }, Spec: storage.VolumeAttachmentSpec{ NodeName: node, - Attacher: csiPluginName, + Attacher: csiSource.Driver, Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 821bbb2df72..2b809eb5d64 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -36,7 +36,7 @@ func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttach }, Spec: storage.VolumeAttachmentSpec{ NodeName: nodeName, - Attacher: csiPluginName, + Attacher: "mock", Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, From fa519069d99d58a66c58d06bbb440d3b4d450792 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Sat, 2 Dec 2017 14:50:23 -0500 Subject: [PATCH 8/9] CSI - Extract volume attributes from PV annotations" csi - code review fixes --- pkg/volume/csi/csi_client.go | 3 ++ pkg/volume/csi/csi_client_test.go | 1 + pkg/volume/csi/csi_mounter.go | 34 +++++++++++++++++++++ pkg/volume/csi/csi_mounter_test.go | 47 ++++++++++++++++++++++++++++++ pkg/volume/csi/csi_plugin.go | 3 +- 5 files changed, 87 insertions(+), 1 deletion(-) diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 22b4d5bae6b..9db7f868e09 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -39,6 +39,7 @@ type csiClient interface { targetPath string, accessMode api.PersistentVolumeAccessMode, volumeInfo map[string]string, + volumeAttribs map[string]string, fsType string, ) error NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error @@ -141,6 +142,7 @@ func (c *csiDriverClient) NodePublishVolume( targetPath string, accessMode api.PersistentVolumeAccessMode, volumeInfo map[string]string, + volumeAttribs map[string]string, fsType string, ) error { @@ -161,6 +163,7 @@ func (c *csiDriverClient) NodePublishVolume( TargetPath: targetPath, Readonly: readOnly, PublishVolumeInfo: volumeInfo, + VolumeAttributes: volumeAttribs, VolumeCapability: &csipb.VolumeCapability{ AccessMode: &csipb.VolumeCapability_AccessMode{ diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 54605cdb483..18d2e83639c 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -90,6 +90,7 @@ func TestClientNodePublishVolume(t *testing.T) { tc.targetPath, api.ReadWriteOnce, map[string]string{"device": "/dev/null"}, + map[string]string{"attr0": "val0"}, tc.fsType, ) diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index f29c82a09c6..2c4e0ad8492 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -17,6 +17,7 @@ limitations under the License. package csi import ( + "encoding/json" "errors" "fmt" "path" @@ -111,6 +112,15 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { c.volumeInfo = attachment.Status.AttachmentMetadata } + // get volume attributes + // TODO: for alpha vol atttributes are passed via PV.Annotations + // Beta will fix that + attribs, err := getVolAttribsFromSpec(c.spec) + if err != nil { + glog.Error(log("mounter.SetUpAt failed to extract volume attributes from PV annotations: %v", err)) + return err + } + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI accessMode := api.ReadWriteOnce if c.spec.PersistentVolume.Spec.AccessModes != nil { @@ -124,6 +134,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { dir, accessMode, c.volumeInfo, + attribs, "ext4", //TODO needs to be sourced from PV or somewhere else ) @@ -187,3 +198,26 @@ func (c *csiMountMgr) TearDownAt(dir string) error { return nil } + +// getVolAttribsFromSpec exracts CSI VolumeAttributes information from PV.Annotations +// using key csi.kubernetes.io/volume-attributes. The annotation value is expected +// to be a JSON-encoded object of form {"key0":"val0",...,"keyN":"valN"} +func getVolAttribsFromSpec(spec *volume.Spec) (map[string]string, error) { + if spec == nil { + return nil, errors.New("missing volume spec") + } + annotations := spec.PersistentVolume.GetAnnotations() + if annotations == nil { + return nil, nil // no annotations found + } + jsonAttribs := annotations[csiVolAttribsAnnotationKey] + if jsonAttribs == "" { + return nil, nil // csi annotation not found + } + attribs := map[string]string{} + if err := json.Unmarshal([]byte(jsonAttribs), &attribs); err != nil { + glog.Error(log("error parsing csi PV.Annotation [%s]=%s: %v", csiVolAttribsAnnotationKey, jsonAttribs, err)) + return nil, err + } + return attribs, nil +} diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 9d95d3ad816..19e931ed4ce 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -161,3 +161,50 @@ func TestUnmounterTeardown(t *testing.T) { } } + +func TestGetVolAttribsFromSpec(t *testing.T) { + testCases := []struct { + name string + annotations map[string]string + attribs map[string]string + shouldFail bool + }{ + { + name: "attribs ok", + annotations: map[string]string{"key0": "val0", csiVolAttribsAnnotationKey: `{"k0":"attr0","k1":"attr1","k2":"attr2"}`, "keyN": "valN"}, + attribs: map[string]string{"k0": "attr0", "k1": "attr1", "k2": "attr2"}, + }, + + { + name: "missing attribs", + annotations: map[string]string{"key0": "val0", "keyN": "valN"}, + }, + { + name: "missing annotations", + }, + { + name: "bad json", + annotations: map[string]string{"key0": "val0", csiVolAttribsAnnotationKey: `{"k0""attr0","k1":"attr1,"k2":"attr2"`, "keyN": "valN"}, + attribs: map[string]string{"k0": "attr0", "k1": "attr1", "k2": "attr2"}, + shouldFail: true, + }, + } + spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, testDriver, testVol), false) + for _, tc := range testCases { + t.Log("test case:", tc.name) + spec.PersistentVolume.Annotations = tc.annotations + attribs, err := getVolAttribsFromSpec(spec) + if !tc.shouldFail && err != nil { + t.Error("test case should not fail, but err != nil", err) + } + eq := true + for k, v := range attribs { + if tc.attribs[k] != v { + eq = false + } + } + if !eq { + t.Errorf("expecting attribs %#v, but got %#v", tc.attribs, attribs) + } + } +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index d432c8303be..8862cda98d9 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -34,7 +34,8 @@ import ( ) const ( - csiPluginName = "kubernetes.io/csi" + csiPluginName = "kubernetes.io/csi" + csiVolAttribsAnnotationKey = "csi.volume.kubernetes.io/volume-attributes" // TODO (vladimirvivien) implement a more dynamic way to discover // the unix domain socket path for each installed csi driver. From c72fc5006c5d15c36877146b6a3e664332962b54 Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Mon, 4 Dec 2017 04:26:14 -0800 Subject: [PATCH 9/9] Revert "Fix for the network partition tests" --- test/e2e/framework/networking_utils.go | 10 +++------- test/e2e/framework/util.go | 17 ----------------- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/test/e2e/framework/networking_utils.go b/test/e2e/framework/networking_utils.go index d436ba1fb15..643805b4fc5 100644 --- a/test/e2e/framework/networking_utils.go +++ b/test/e2e/framework/networking_utils.go @@ -935,9 +935,7 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout // This function executes commands on a node so it will work only for some // environments. func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) { - externalIP := GetNodeExternalIP(node) - internalIP := GetNodeInternalIP(node) - + host := GetNodeExternalIP(node) master := GetMasterAddress(c) By(fmt.Sprintf("block network traffic from node %s to the master", node.Name)) defer func() { @@ -946,16 +944,14 @@ func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1 // had been inserted. (yes, we could look at the error code and ssh error // separately, but I prefer to stay on the safe side). By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name)) - UnblockNetwork(externalIP, master) - UnblockNetwork(internalIP, master) + UnblockNetwork(host, master) }() Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name) if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) { Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) } - BlockNetwork(externalIP, master) - BlockNetwork(internalIP, master) + BlockNetwork(host, master) Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name) if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index dd272a94384..99c37212648 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -4925,23 +4925,6 @@ func GetNodeExternalIP(node *v1.Node) string { return host } -// GetNodeInternalIP returns node internal IP concatenated with port 22 for ssh -// e.g. 1.2.3.4:22 -func GetNodeInternalIP(node *v1.Node) string { - Logf("Getting internal IP address for %s", node.Name) - host := "" - for _, a := range node.Status.Addresses { - if a.Type == v1.NodeInternalIP { - host = net.JoinHostPort(a.Address, sshPort) - break - } - } - if host == "" { - Failf("Couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses) - } - return host -} - // SimpleGET executes a get on the given url, returns error if non-200 returned. func SimpleGET(c *http.Client, url, host string) (string, error) { req, err := http.NewRequest("GET", url, nil)