diff --git a/cluster/addons/addon-manager/CHANGELOG.md b/cluster/addons/addon-manager/CHANGELOG.md index 482fc767176..2d3a6011b19 100644 --- a/cluster/addons/addon-manager/CHANGELOG.md +++ b/cluster/addons/addon-manager/CHANGELOG.md @@ -1,3 +1,6 @@ +### Version 8.4 (Thu November 30 2017 zou nengren @zouyee) + - Update kubectl to v1.8.4. + ### Version 6.4-beta.2 (Mon June 12 2017 Jeff Grafton ) - Update kubectl to v1.6.4. - Refresh base images. diff --git a/cluster/addons/addon-manager/Makefile b/cluster/addons/addon-manager/Makefile index 01956102d88..854cd4e2557 100644 --- a/cluster/addons/addon-manager/Makefile +++ b/cluster/addons/addon-manager/Makefile @@ -15,8 +15,8 @@ IMAGE=gcr.io/google-containers/kube-addon-manager ARCH?=amd64 TEMP_DIR:=$(shell mktemp -d) -VERSION=v6.5 -KUBECTL_VERSION?=v1.6.4 +VERSION=v8.4 +KUBECTL_VERSION?=v1.8.4 ifeq ($(ARCH),amd64) BASEIMAGE?=bashell/alpine-bash @@ -40,7 +40,7 @@ all: build build: cp ./* $(TEMP_DIR) - curl -sSL --retry 5 https://storage.googleapis.com/kubernetes-release/release/$(KUBECTL_VERSION)/bin/linux/$(ARCH)/kubectl > $(TEMP_DIR)/kubectl + curl -sSL --retry 5 https://dl.k8s.io/release/$(KUBECTL_VERSION)/bin/linux/$(ARCH)/kubectl > $(TEMP_DIR)/kubectl chmod +x $(TEMP_DIR)/kubectl cd $(TEMP_DIR) && sed -i.back "s|BASEIMAGE|$(BASEIMAGE)|g" Dockerfile docker build --pull -t $(IMAGE)-$(ARCH):$(VERSION) $(TEMP_DIR) diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index a77bac3a4a0..170c366c90a 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -77,7 +77,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) - if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) } return allPlugins diff --git a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go index 948956e46d9..54da3c27513 100644 --- a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go +++ b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/defaults.go @@ -156,26 +156,24 @@ func SetDefaults_NodeConfiguration(obj *NodeConfiguration) { } } -// SetDefaultsEtcdSelfHosted sets defaults for self-hosted etcd +// SetDefaultsEtcdSelfHosted sets defaults for self-hosted etcd if used func SetDefaultsEtcdSelfHosted(obj *MasterConfiguration) { - if obj.Etcd.SelfHosted == nil { - obj.Etcd.SelfHosted = &SelfHostedEtcd{} - } + if obj.Etcd.SelfHosted != nil { + if obj.Etcd.SelfHosted.ClusterServiceName == "" { + obj.Etcd.SelfHosted.ClusterServiceName = DefaultEtcdClusterServiceName + } - if obj.Etcd.SelfHosted.ClusterServiceName == "" { - obj.Etcd.SelfHosted.ClusterServiceName = DefaultEtcdClusterServiceName - } + if obj.Etcd.SelfHosted.EtcdVersion == "" { + obj.Etcd.SelfHosted.EtcdVersion = constants.DefaultEtcdVersion + } - if obj.Etcd.SelfHosted.EtcdVersion == "" { - obj.Etcd.SelfHosted.EtcdVersion = constants.DefaultEtcdVersion - } + if obj.Etcd.SelfHosted.OperatorVersion == "" { + obj.Etcd.SelfHosted.OperatorVersion = DefaultEtcdOperatorVersion + } - if obj.Etcd.SelfHosted.OperatorVersion == "" { - obj.Etcd.SelfHosted.OperatorVersion = DefaultEtcdOperatorVersion - } - - if obj.Etcd.SelfHosted.CertificatesDir == "" { - obj.Etcd.SelfHosted.CertificatesDir = DefaultEtcdCertDir + if obj.Etcd.SelfHosted.CertificatesDir == "" { + obj.Etcd.SelfHosted.CertificatesDir = DefaultEtcdCertDir + } } } diff --git a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go index 349dde5510f..ee7b84f1b1e 100644 --- a/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go +++ b/cmd/kubeadm/app/apis/kubeadm/v1alpha1/types.go @@ -147,7 +147,7 @@ type NodeConfiguration struct { // KubeletConfiguration contains elements describing initial remote configuration of kubelet type KubeletConfiguration struct { - BaseConfig *kubeletconfigv1alpha1.KubeletConfiguration `json:"baseConfig"` + BaseConfig *kubeletconfigv1alpha1.KubeletConfiguration `json:"baseConfig,omitempty"` } // HostPathMount contains elements describing volumes that are mounted from the diff --git a/cmd/kubeadm/app/cmd/init.go b/cmd/kubeadm/app/cmd/init.go index 7088e1af0b2..7d09adce957 100644 --- a/cmd/kubeadm/app/cmd/init.go +++ b/cmd/kubeadm/app/cmd/init.go @@ -433,7 +433,7 @@ func (i *Init) Run(out io.Writer) error { // Temporary control plane is up, now we create our self hosted control // plane components and remove the static manifests: fmt.Println("[self-hosted] Creating self-hosted control plane.") - if err := selfhostingphase.CreateSelfHostedControlPlane(manifestDir, kubeConfigDir, i.cfg, client, waiter); err != nil { + if err := selfhostingphase.CreateSelfHostedControlPlane(manifestDir, kubeConfigDir, i.cfg, client, waiter, i.dryRun); err != nil { return fmt.Errorf("error creating self hosted control plane: %v", err) } } diff --git a/cmd/kubeadm/app/cmd/phases/selfhosting.go b/cmd/kubeadm/app/cmd/phases/selfhosting.go index b9200ca57e9..9bf6530ee1f 100644 --- a/cmd/kubeadm/app/cmd/phases/selfhosting.go +++ b/cmd/kubeadm/app/cmd/phases/selfhosting.go @@ -103,7 +103,7 @@ func getSelfhostingSubCommand() *cobra.Command { // Converts the Static Pod-hosted control plane into a self-hosted one waiter := apiclient.NewKubeWaiter(client, 2*time.Minute, os.Stdout) - err = selfhosting.CreateSelfHostedControlPlane(constants.GetStaticPodDirectory(), constants.KubernetesDir, internalcfg, client, waiter) + err = selfhosting.CreateSelfHostedControlPlane(constants.GetStaticPodDirectory(), constants.KubernetesDir, internalcfg, client, waiter, false) kubeadmutil.CheckErr(err) }, } diff --git a/cmd/kubeadm/app/cmd/upgrade/apply.go b/cmd/kubeadm/app/cmd/upgrade/apply.go index b4ea162c14e..7313bffb6c6 100644 --- a/cmd/kubeadm/app/cmd/upgrade/apply.go +++ b/cmd/kubeadm/app/cmd/upgrade/apply.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" "k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" @@ -119,14 +120,11 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command { func RunApply(flags *applyFlags) error { // Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap) - upgradeVars, err := enforceRequirements(flags.parent.featureGatesString, flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig, flags.dryRun, flags.parent.ignorePreflightErrorsSet) + upgradeVars, err := enforceRequirements(flags.parent, flags.dryRun, flags.newK8sVersionStr) if err != nil { return err } - // Set the upgraded version on the external config object now - upgradeVars.cfg.KubernetesVersion = flags.newK8sVersionStr - // Grab the external, versioned configuration and convert it to the internal type for usage here later internalcfg := &kubeadmapi.MasterConfiguration{} legacyscheme.Scheme.Convert(upgradeVars.cfg, internalcfg, nil) @@ -144,6 +142,10 @@ func RunApply(flags *applyFlags) error { } flags.newK8sVersion = k8sVer + if err := features.ValidateVersion(features.InitFeatureGates, internalcfg.FeatureGates, internalcfg.KubernetesVersion); err != nil { + return err + } + // Enforce the version skew policies if err := EnforceVersionPolicies(flags, upgradeVars.versionGetter); err != nil { return fmt.Errorf("[upgrade/version] FATAL: %v", err) @@ -167,7 +169,7 @@ func RunApply(flags *applyFlags) error { } // Upgrade RBAC rules and addons. - if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil { + if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion, flags.dryRun); err != nil { return fmt.Errorf("[upgrade/postupgrade] FATAL post-upgrade error: %v", err) } diff --git a/cmd/kubeadm/app/cmd/upgrade/common.go b/cmd/kubeadm/app/cmd/upgrade/common.go index 2fbbf08f510..254c9afbce9 100644 --- a/cmd/kubeadm/app/cmd/upgrade/common.go +++ b/cmd/kubeadm/app/cmd/upgrade/common.go @@ -48,35 +48,37 @@ type upgradeVariables struct { } // enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure -func enforceRequirements(featureGatesString, kubeConfigPath, cfgPath string, printConfig, dryRun bool, ignoreChecksErrors sets.String) (*upgradeVariables, error) { - client, err := getClient(kubeConfigPath, dryRun) +func enforceRequirements(flags *cmdUpgradeFlags, dryRun bool, newK8sVersion string) (*upgradeVariables, error) { + client, err := getClient(flags.kubeConfigPath, dryRun) if err != nil { - return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err) + return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", flags.kubeConfigPath, err) } // Run healthchecks against the cluster - if err := upgrade.CheckClusterHealth(client, ignoreChecksErrors); err != nil { + if err := upgrade.CheckClusterHealth(client, flags.ignorePreflightErrorsSet); err != nil { return nil, fmt.Errorf("[upgrade/health] FATAL: %v", err) } // Fetch the configuration from a file or ConfigMap and validate it - cfg, err := upgrade.FetchConfiguration(client, os.Stdout, cfgPath) + cfg, err := upgrade.FetchConfiguration(client, os.Stdout, flags.cfgPath) if err != nil { return nil, fmt.Errorf("[upgrade/config] FATAL: %v", err) } + // If a new k8s version should be set, apply the change before printing the config + if len(newK8sVersion) != 0 { + cfg.KubernetesVersion = newK8sVersion + } + // If the user told us to print this information out; do it! - if printConfig { + if flags.printConfig { printConfiguration(cfg, os.Stdout) } - cfg.FeatureGates, err = features.NewFeatureGate(&features.InitFeatureGates, featureGatesString) + cfg.FeatureGates, err = features.NewFeatureGate(&features.InitFeatureGates, flags.featureGatesString) if err != nil { return nil, fmt.Errorf("[upgrade/config] FATAL: %v", err) } - if err := features.ValidateVersion(features.InitFeatureGates, cfg.FeatureGates, cfg.KubernetesVersion); err != nil { - return nil, err - } return &upgradeVariables{ client: client, diff --git a/cmd/kubeadm/app/cmd/upgrade/common_test.go b/cmd/kubeadm/app/cmd/upgrade/common_test.go index e7105798970..5ebff084e0a 100644 --- a/cmd/kubeadm/app/cmd/upgrade/common_test.go +++ b/cmd/kubeadm/app/cmd/upgrade/common_test.go @@ -52,8 +52,7 @@ func TestPrintConfiguration(t *testing.T) { keyFile: "" imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" @@ -86,8 +85,7 @@ func TestPrintConfiguration(t *testing.T) { keyFile: "" imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" @@ -130,8 +128,7 @@ func TestPrintConfiguration(t *testing.T) { operatorVersion: v0.1.0 imageRepository: "" kubeProxy: {} - kubeletConfiguration: - baseConfig: null + kubeletConfiguration: {} kubernetesVersion: v1.7.1 networking: dnsDomain: "" diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index 61e0b4376a6..09031f81654 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -54,8 +54,8 @@ func NewCmdPlan(parentFlags *cmdUpgradeFlags) *cobra.Command { // RunPlan takes care of outputting available versions to upgrade to for the user func RunPlan(parentFlags *cmdUpgradeFlags) error { - // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never set dry-run for plan. - upgradeVars, err := enforceRequirements(parentFlags.featureGatesString, parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig, false, parentFlags.ignorePreflightErrorsSet) + // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never dry-run when planning. + upgradeVars, err := enforceRequirements(parentFlags, false, "") if err != nil { return err } diff --git a/cmd/kubeadm/app/features/features.go b/cmd/kubeadm/app/features/features.go index 2a9fa52ac78..36f5a495c4e 100644 --- a/cmd/kubeadm/app/features/features.go +++ b/cmd/kubeadm/app/features/features.go @@ -33,10 +33,10 @@ const ( // CoreDNS is alpha in v1.9 CoreDNS = "CoreDNS" - // SelfHosting is beta in v1.8 + // SelfHosting is beta in v1.9 SelfHosting = "SelfHosting" - // StoreCertsInSecrets is alpha in v1.8 + // StoreCertsInSecrets is alpha in v1.8 and v1.9 StoreCertsInSecrets = "StoreCertsInSecrets" // DynamicKubeletConfig is alpha in v1.9 @@ -47,9 +47,10 @@ var v190 = version.MustParseSemantic("v1.9.0-alpha.1") // InitFeatureGates are the default feature gates for the init command var InitFeatureGates = FeatureList{ - SelfHosting: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Beta}}, - StoreCertsInSecrets: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}}, - HighAvailability: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, + SelfHosting: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Beta}}, + StoreCertsInSecrets: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}}, + // We don't want to advertise this feature gate exists in v1.9 to avoid confusion as it is not yet working + HighAvailability: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190, HiddenInHelpText: true}, CoreDNS: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, DynamicKubeletConfig: {FeatureSpec: utilfeature.FeatureSpec{Default: false, PreRelease: utilfeature.Alpha}, MinimumVersion: v190}, } @@ -57,7 +58,8 @@ var InitFeatureGates = FeatureList{ // Feature represents a feature being gated type Feature struct { utilfeature.FeatureSpec - MinimumVersion *version.Version + MinimumVersion *version.Version + HiddenInHelpText bool } // FeatureList represents a list of feature gates @@ -113,6 +115,10 @@ func Keys(featureList FeatureList) []string { func KnownFeatures(f *FeatureList) []string { var known []string for k, v := range *f { + if v.HiddenInHelpText { + continue + } + pre := "" if v.PreRelease != utilfeature.GA { pre = fmt.Sprintf("%s - ", v.PreRelease) diff --git a/cmd/kubeadm/app/phases/kubelet/kubelet.go b/cmd/kubeadm/app/phases/kubelet/kubelet.go index 7d2ac055156..aec20293b62 100644 --- a/cmd/kubeadm/app/phases/kubelet/kubelet.go +++ b/cmd/kubeadm/app/phases/kubelet/kubelet.go @@ -43,7 +43,7 @@ import ( // CreateBaseKubeletConfiguration creates base kubelet configuration for dynamic kubelet configuration feature. func CreateBaseKubeletConfiguration(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { - fmt.Printf("[kubelet] Uploading a ConfigMap %q in namespace %s with base configuration for the kubelets in the cluster", + fmt.Printf("[kubelet] Uploading a ConfigMap %q in namespace %s with base configuration for the kubelets in the cluster\n", kubeadmconstants.KubeletBaseConfigurationConfigMap, metav1.NamespaceSystem) _, kubeletCodecs, err := kubeletconfigscheme.NewSchemeAndCodecs() @@ -95,7 +95,7 @@ func ConsumeBaseKubeletConfiguration(nodeName string) error { // updateNodeWithConfigMap updates node ConfigSource with KubeletBaseConfigurationConfigMap func updateNodeWithConfigMap(client clientset.Interface, nodeName string) error { - fmt.Printf("[kubelet] Using Dynamic Kubelet Config for node %q; config sourced from ConfigMap %q in namespace %s", + fmt.Printf("[kubelet] Using Dynamic Kubelet Config for node %q; config sourced from ConfigMap %q in namespace %s\n", nodeName, kubeadmconstants.KubeletBaseConfigurationConfigMap, metav1.NamespaceSystem) // Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. @@ -203,7 +203,7 @@ func getLocalNodeTLSBootstrappedClient() (clientset.Interface, error) { // WriteInitKubeletConfigToDiskOnMaster writes base kubelet configuration to disk on master. func WriteInitKubeletConfigToDiskOnMaster(cfg *kubeadmapi.MasterConfiguration) error { - fmt.Printf("[kubelet] Writing base configuration of kubelets to disk on master node %s", cfg.NodeName) + fmt.Printf("[kubelet] Writing base configuration of kubelets to disk on master node %s\n", cfg.NodeName) _, kubeletCodecs, err := kubeletconfigscheme.NewSchemeAndCodecs() if err != nil { diff --git a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go index 4e8e83072ae..18ae9563f8f 100644 --- a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go +++ b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go @@ -52,7 +52,7 @@ const ( // 8. In order to avoid race conditions, we have to make sure that static pod is deleted correctly before we continue // Otherwise, there is a race condition when we proceed without kubelet having restarted the API server correctly and the next .Create call flakes // 9. Do that for the kube-apiserver, kube-controller-manager and kube-scheduler in a loop -func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubeadmapi.MasterConfiguration, client clientset.Interface, waiter apiclient.Waiter) error { +func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubeadmapi.MasterConfiguration, client clientset.Interface, waiter apiclient.Waiter, dryRun bool) error { // Adjust the timeout slightly to something self-hosting specific waiter.SetTimeout(selfHostingWaitTimeout) @@ -104,9 +104,11 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea return err } - // Remove the old Static Pod manifest - if err := os.RemoveAll(manifestPath); err != nil { - return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err) + // Remove the old Static Pod manifest if not dryrunning + if !dryRun { + if err := os.RemoveAll(manifestPath); err != nil { + return fmt.Errorf("unable to delete static pod manifest for %s [%v]", componentName, err) + } } // Wait for the mirror Pod hash to be removed; otherwise we'll run into race conditions here when the kubelet hasn't had time to diff --git a/cmd/kubeadm/app/phases/upgrade/BUILD b/cmd/kubeadm/app/phases/upgrade/BUILD index 907707d127e..644794563b8 100644 --- a/cmd/kubeadm/app/phases/upgrade/BUILD +++ b/cmd/kubeadm/app/phases/upgrade/BUILD @@ -36,6 +36,7 @@ go_library( "//cmd/kubeadm/app/util:go_default_library", "//cmd/kubeadm/app/util/apiclient:go_default_library", "//cmd/kubeadm/app/util/config:go_default_library", + "//cmd/kubeadm/app/util/dryrun:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/util/version:go_default_library", "//pkg/version:go_default_library", diff --git a/cmd/kubeadm/app/phases/upgrade/policy.go b/cmd/kubeadm/app/phases/upgrade/policy.go index 594596ad8c4..185aa6675ad 100644 --- a/cmd/kubeadm/app/phases/upgrade/policy.go +++ b/cmd/kubeadm/app/phases/upgrade/policy.go @@ -28,6 +28,9 @@ const ( // MaximumAllowedMinorVersionUpgradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go MaximumAllowedMinorVersionUpgradeSkew = 1 + // MaximumAllowedMinorVersionDowngradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go + MaximumAllowedMinorVersionDowngradeSkew = 1 + // MaximumAllowedMinorVersionKubeletSkew describes how many minor versions the control plane version and the kubelet can skew in a kubeadm cluster MaximumAllowedMinorVersionKubeletSkew = 1 ) @@ -72,23 +75,41 @@ func EnforceVersionPolicies(versionGetter VersionGetter, newK8sVersionStr string skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the minimum supported version %q. Please specify a higher version to upgrade to", newK8sVersionStr, clusterVersionStr)) } - // Make sure new version is higher than the current Kubernetes version - if clusterVersion.AtLeast(newK8sVersion) { - // Even though we don't officially support downgrades, it "should work", and if user(s) need it and are willing to try; they can do so with --force - skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the cluster version %q. Downgrades are not supported yet", newK8sVersionStr, clusterVersionStr)) - } else { - // If this code path runs, it's an upgrade (this code will run most of the time) - // kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported. Enforce that here - if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew { - skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew)) + // kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported right away + if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew { + tooLargeUpgradeSkewErr := fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew) + // If the version that we're about to upgrade to is a released version, we should fully enforce this policy + // If the version is a CI/dev/experimental version, it's okay to jump two minor version steps, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeUpgradeSkewErr) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeUpgradeSkewErr) + } + } + + // kubeadm doesn't support downgrades between two minor versions; e.g. a v1.9 -> v1.7 downgrade is not supported right away + if newK8sVersion.Minor() < clusterVersion.Minor()-MaximumAllowedMinorVersionDowngradeSkew { + tooLargeDowngradeSkewErr := fmt.Errorf("Specified version to downgrade to %q is too low; kubeadm can downgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionDowngradeSkew) + // If the version that we're about to downgrade to is a released version, we should fully enforce this policy + // If the version is a CI/dev/experimental version, it's okay to jump two minor version steps, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeDowngradeSkewErr) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeDowngradeSkewErr) } } // If the kubeadm version is lower than what we want to upgrade to; error if kubeadmVersion.LessThan(newK8sVersion) { if newK8sVersion.Minor() > kubeadmVersion.Minor() { - // This is totally unsupported; kubeadm has no idea how it should handle a newer minor release than itself - skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor())) + tooLargeKubeadmSkew := fmt.Errorf("Specified version to upgrade to %q is at least one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor()) + // This is unsupported; kubeadm has no idea how it should handle a newer minor release than itself + // If the version is a CI/dev/experimental version though, lower the severity of this check, but then require the -f flag + if len(newK8sVersion.PreRelease()) == 0 { + skewErrors.Mandatory = append(skewErrors.Mandatory, tooLargeKubeadmSkew) + } else { + skewErrors.Skippable = append(skewErrors.Skippable, tooLargeKubeadmSkew) + } } else { // Upgrading to a higher patch version than kubeadm is ok if the user specifies --force. Not recommended, but possible. skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is higher than the kubeadm version %q. Upgrade kubeadm first using the tool you used to install kubeadm", newK8sVersionStr, kubeadmVersionStr)) diff --git a/cmd/kubeadm/app/phases/upgrade/policy_test.go b/cmd/kubeadm/app/phases/upgrade/policy_test.go index 9b81609000b..64f6d0e692c 100644 --- a/cmd/kubeadm/app/phases/upgrade/policy_test.go +++ b/cmd/kubeadm/app/phases/upgrade/policy_test.go @@ -46,23 +46,21 @@ func TestEnforceVersionPolicies(t *testing.T) { }, newK8sVersion: "v1.9.0", }, - { // downgrades not supported + { // downgrades ok vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", kubeletVersion: "v1.8.3", kubeadmVersion: "v1.8.3", }, - newK8sVersion: "v1.8.2", - expectedSkippableErrs: 1, + newK8sVersion: "v1.8.2", }, - { // upgrades without bumping the version number not supported yet. TODO: Change this? + { // upgrades without bumping the version number ok vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", kubeletVersion: "v1.8.3", kubeadmVersion: "v1.8.3", }, - newK8sVersion: "v1.8.3", - expectedSkippableErrs: 1, + newK8sVersion: "v1.8.3", }, { // new version must be higher than v1.8.0 vg: &fakeVersionGetter{ @@ -72,7 +70,6 @@ func TestEnforceVersionPolicies(t *testing.T) { }, newK8sVersion: "v1.7.10", expectedMandatoryErrs: 1, // version must be higher than v1.8.0 - expectedSkippableErrs: 1, // version shouldn't be downgraded }, { // upgrading two minor versions in one go is not supported vg: &fakeVersionGetter{ @@ -84,6 +81,15 @@ func TestEnforceVersionPolicies(t *testing.T) { expectedMandatoryErrs: 1, // can't upgrade two minor versions expectedSkippableErrs: 1, // kubelet <-> apiserver skew too large }, + { // downgrading two minor versions in one go is not supported + vg: &fakeVersionGetter{ + clusterVersion: "v1.10.3", + kubeletVersion: "v1.10.3", + kubeadmVersion: "v1.10.0", + }, + newK8sVersion: "v1.8.3", + expectedMandatoryErrs: 1, // can't downgrade two minor versions + }, { // kubeadm version must be higher than the new kube version. However, patch version skews may be forced vg: &fakeVersionGetter{ clusterVersion: "v1.8.3", diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade.go b/cmd/kubeadm/app/phases/upgrade/postupgrade.go index 2eae65ab1a2..b16a2fbe6e0 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade.go @@ -18,6 +18,8 @@ package upgrade import ( "fmt" + "os" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" @@ -31,14 +33,16 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo" nodebootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting" "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" + dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" "k8s.io/kubernetes/pkg/util/version" ) // PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do // Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade -func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version) error { +func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version, dryRun bool) error { errs := []error{} // Upload currently used configuration to the cluster @@ -63,6 +67,11 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC errs = append(errs, err) } + // Upgrade to a self-hosted control plane if possible + if err := upgradeToSelfHosting(client, cfg, newK8sVer, dryRun); err != nil { + errs = append(errs, err) + } + // TODO: Is this needed to do here? I think that updating cluster info should probably be separate from a normal upgrade // Create the cluster-info ConfigMap with the associated RBAC rules // if err := clusterinfo.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil { @@ -92,9 +101,11 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC if err := dns.EnsureDNSAddon(cfg, client); err != nil { errs = append(errs, err) } - - if err := coreDNSDeployment(cfg, client); err != nil { - errs = append(errs, err) + // Remove the old kube-dns deployment if coredns is now used + if !dryRun { + if err := removeOldKubeDNSDeploymentIfCoreDNSIsUsed(cfg, client); err != nil { + errs = append(errs, err) + } } if err := proxy.EnsureProxyAddon(cfg, client); err != nil { @@ -103,22 +114,41 @@ func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterC return errors.NewAggregate(errs) } -func coreDNSDeployment(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { +func removeOldKubeDNSDeploymentIfCoreDNSIsUsed(cfg *kubeadmapi.MasterConfiguration, client clientset.Interface) error { if features.Enabled(cfg.FeatureGates, features.CoreDNS) { return apiclient.TryRunCommand(func() error { - getCoreDNS, err := client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Get(kubeadmconstants.CoreDNS, metav1.GetOptions{}) + coreDNSDeployment, err := client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Get(kubeadmconstants.CoreDNS, metav1.GetOptions{}) if err != nil { return err } - if getCoreDNS.Status.ReadyReplicas == 0 { + if coreDNSDeployment.Status.ReadyReplicas == 0 { return fmt.Errorf("the CodeDNS deployment isn't ready yet") } - err = client.AppsV1beta2().Deployments(metav1.NamespaceSystem).Delete(kubeadmconstants.KubeDNS, nil) - if err != nil { - return err - } - return nil - }, 5) + return apiclient.DeleteDeploymentForeground(client, metav1.NamespaceSystem, kubeadmconstants.KubeDNS) + }, 10) } return nil } + +func upgradeToSelfHosting(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, newK8sVer *version.Version, dryRun bool) error { + if features.Enabled(cfg.FeatureGates, features.SelfHosting) && !IsControlPlaneSelfHosted(client) && newK8sVer.AtLeast(v190alpha3) { + + waiter := getWaiter(dryRun, client) + + // kubeadm will now convert the static Pod-hosted control plane into a self-hosted one + fmt.Println("[self-hosted] Creating self-hosted control plane.") + if err := selfhosting.CreateSelfHostedControlPlane(kubeadmconstants.GetStaticPodDirectory(), kubeadmconstants.KubernetesDir, cfg, client, waiter, dryRun); err != nil { + return fmt.Errorf("error creating self hosted control plane: %v", err) + } + } + return nil +} + +// getWaiter gets the right waiter implementation for the right occasion +// TODO: Consolidate this with what's in init.go? +func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter { + if dryRun { + return dryrunutil.NewWaiter() + } + return apiclient.NewKubeWaiter(client, 30*time.Minute, os.Stdout) +} diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go b/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go index 8243a8100fa..ef8f5d305d5 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade_v18_19.go @@ -30,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/util/version" ) +// TODO: Maybe move these constants elsewhere in future releases var v190 = version.MustParseSemantic("v1.9.0") +var v190alpha3 = version.MustParseSemantic("v1.9.0-alpha.3") var expiry = 180 * 24 * time.Hour // backupAPIServerCertAndKey backups the old cert and key of kube-apiserver to a specified directory. diff --git a/cmd/kubeadm/app/util/apiclient/idempotency.go b/cmd/kubeadm/app/util/apiclient/idempotency.go index a2f11a74459..319ad5c1b96 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency.go @@ -107,6 +107,15 @@ func DeleteDaemonSetForeground(client clientset.Interface, namespace, name strin return client.AppsV1beta2().DaemonSets(namespace).Delete(name, deleteOptions) } +// DeleteDeploymentForeground deletes the specified Deployment in foreground mode; i.e. it blocks until/makes sure all the managed Pods are deleted +func DeleteDeploymentForeground(client clientset.Interface, namespace, name string) error { + foregroundDelete := metav1.DeletePropagationForeground + deleteOptions := &metav1.DeleteOptions{ + PropagationPolicy: &foregroundDelete, + } + return client.AppsV1beta2().Deployments(namespace).Delete(name, deleteOptions) +} + // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil { diff --git a/cmd/kubeadm/app/util/error.go b/cmd/kubeadm/app/util/error.go index c27ab860d7e..61327cf5608 100644 --- a/cmd/kubeadm/app/util/error.go +++ b/cmd/kubeadm/app/util/error.go @@ -80,7 +80,7 @@ func checkErr(prefix string, err error, handleErr func(string, int)) { func FormatErrMsg(errs []error) string { var errMsg string for _, err := range errs { - errMsg = fmt.Sprintf("%s\t-%s\n", errMsg, err.Error()) + errMsg = fmt.Sprintf("%s\t- %s\n", errMsg, err.Error()) } return errMsg } diff --git a/cmd/kubeadm/app/util/error_test.go b/cmd/kubeadm/app/util/error_test.go index 831007550ad..c28a6cc0566 100644 --- a/cmd/kubeadm/app/util/error_test.go +++ b/cmd/kubeadm/app/util/error_test.go @@ -64,13 +64,13 @@ func TestFormatErrMsg(t *testing.T) { fmt.Errorf(errMsg1), fmt.Errorf(errMsg2), }, - expect: "\t-" + errMsg1 + "\n" + "\t-" + errMsg2 + "\n", + expect: "\t- " + errMsg1 + "\n" + "\t- " + errMsg2 + "\n", }, { errs: []error{ fmt.Errorf(errMsg1), }, - expect: "\t-" + errMsg1 + "\n", + expect: "\t- " + errMsg1 + "\n", }, } diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index 002b5de8c5d..ef41bb8e909 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -100,7 +100,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) - if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) } return allPlugins diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index c080e7b5fdf..3c01e6b05a4 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -603,3 +603,7 @@ func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.N func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller") } + +func (adc *attachDetachController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index de16d73206e..6d6ac0edf9b 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -277,3 +277,7 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (* func (expc *expandController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels unsupported in expandController") } + +func (expc *expandController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index d111ed07111..27d45629e8a 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -108,3 +108,7 @@ func (adc *PersistentVolumeController) GetExec(pluginName string) mount.Exec { func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController") } + +func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName { + return "" +} diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index c0baa1cf57b..ae7847bc6fb 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -188,6 +188,10 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { return node.Labels, nil } +func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName { + return kvh.kubelet.nodeName +} + func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec { exec, err := kvh.getMountExec(pluginName) if err != nil { diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 11ca86e66f5..2c0b577b8f0 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -52,19 +52,20 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string csiSource, err := getCSISourceFromSpec(spec) if err != nil { glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err)) - return "", errors.New("missing CSI persistent volume") + return "", err } + node := string(nodeName) pvName := spec.PersistentVolume.GetName() - attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName)) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node) attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ Name: attachID, }, Spec: storage.VolumeAttachmentSpec{ - NodeName: string(nodeName), - Attacher: csiPluginName, + NodeName: node, + Attacher: csiSource.Driver, Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, @@ -72,7 +73,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string Status: storage.VolumeAttachmentStatus{Attached: false}, } - attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) + _, err = c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) alreadyExist := false if err != nil { if !apierrs.IsAlreadyExists(err) { @@ -83,19 +84,23 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string } if alreadyExist { - glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle)) + glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, csiSource.VolumeHandle)) } else { - glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle)) + glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, csiSource.VolumeHandle)) } // probe for attachment update here // NOTE: any error from waiting for attachment is logged only. This is because // the primariy intent of the enclosing method is to create VolumeAttachment. // DONOT return that error here as it is mitigated in attacher.WaitForAttach. + volAttachmentOK := true if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil { - glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err)) + volAttachmentOK = false + glog.Error(log("attacher.Attach attempted to wait for attachment to be ready, but failed with: %v", err)) } + glog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment verified=%t: attachment object [%s]", volAttachmentOK, attachID)) + return attachID, nil } @@ -151,7 +156,7 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim } func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { - glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs))) + glog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs))) attached := make(map[*volume.Spec]bool) @@ -165,13 +170,15 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No glog.Error(log("attacher.VolumesAreAttached failed: %v", err)) continue } - attachID := getAttachmentName(source.VolumeHandle, string(nodeName)) + + attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName)) glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID)) attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err)) continue } + glog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached)) attached[spec] = attach.Status.Attached } @@ -201,10 +208,11 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { glog.Error(log("detacher.Detach insufficient info encoded in volumeName")) return errors.New("volumeName missing expected data") } + + driverName := parts[0] volID := parts[1] - attachID := getAttachmentName(volID, string(nodeName)) - err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil) - if err != nil { + attachID := getAttachmentName(volID, driverName, string(nodeName)) + if err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil); err != nil { glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err)) return err } @@ -257,12 +265,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { return nil } -func hashAttachmentName(volName, nodeName string) string { - result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName))) - return fmt.Sprintf("%x", result) -} - -func getAttachmentName(volName, nodeName string) string { - // TODO consider using a different prefix for attachment - return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName)) +// getAttachmentName returns csi- +func getAttachmentName(volName, csiDriverName, nodeName string) string { + result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName))) + return fmt.Sprintf("csi-%x", result) } diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 2e1cb3fc61f..2b809eb5d64 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -17,13 +17,11 @@ limitations under the License. package csi import ( - "crypto/sha256" "fmt" "os" "testing" "time" - "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1alpha1" apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +36,7 @@ func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttach }, Spec: storage.VolumeAttachmentSpec{ NodeName: nodeName, - Attacher: csiPluginName, + Attacher: "mock", Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, @@ -64,47 +62,93 @@ func TestAttacherAttach(t *testing.T) { testCases := []struct { name string - pv *v1.PersistentVolume nodeName string - attachHash [32]byte + driverName string + volumeName string + attachID string shouldFail bool }{ { name: "test ok 1", - pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"), - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))), + nodeName: "testnode-01", + driverName: "testdriver-01", + volumeName: "testvol-01", + attachID: getAttachmentName("testvol-01", "testdriver-01", "testnode-01"), }, { name: "test ok 2", - pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"), - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))), + nodeName: "node02", + driverName: "driver02", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), }, { - name: "missing spec", - pv: nil, - nodeName: "test-node", - attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))), + name: "mismatch vol", + nodeName: "node02", + driverName: "driver02", + volumeName: "vol01", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, + }, + { + name: "mismatch driver", + nodeName: "node02", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, + }, + { + name: "mismatch node", + nodeName: "node000", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), shouldFail: true, }, } - for _, tc := range testCases { - var spec *volume.Spec - if tc.pv != nil { - spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + // attacher loop + for i, tc := range testCases { + t.Log("test case: ", tc.name) + spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false) + + go func(id, nodename string, fail bool) { + attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) + if !fail && err != nil { + t.Error("was not expecting failure, but got err: ", err) + } + if attachID != id && !fail { + t.Errorf("expecting attachID %v, got %v", id, attachID) + } + }(tc.attachID, tc.nodeName, tc.shouldFail) + + // update attachment to avoid long waitForAttachment + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + // wait for attachment to be saved + var attach *storage.VolumeAttachment + for i := 0; i < 100; i++ { + attach, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + <-ticker.C + continue + } + t.Error(err) + } + if attach != nil { + break + } } - attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName)) - if tc.shouldFail && err == nil { - t.Error("expected failure, but got nil err") + if attach == nil { + t.Error("attachment not found") } - if attachID != "" { - expectedID := fmt.Sprintf("pv-%x", tc.attachHash) - if attachID != expectedID { - t.Errorf("expecting attachID %v, got %v", expectedID, attachID) - } + attach.Status.Attached = true + _, err = csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Update(attach) + if err != nil { + t.Error(err) } } } @@ -136,8 +180,8 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) - attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName)) - + volID := fmt.Sprintf("test-vol-%d", i) + attachID := getAttachmentName(volID, testDriver, nodeName) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = tc.attached attachment.Status.AttachError = tc.attachErr @@ -150,7 +194,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { } }() - retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout) + retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout) if tc.shouldFail && err == nil { t.Error("expecting failure, but err is nil") } @@ -192,7 +236,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { pv := makeTestPV("test-pv", 10, testDriver, volName) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) specs = append(specs, spec) - attachID := getAttachmentName(volName, nodeName) + attachID := getAttachmentName(volName, testDriver, nodeName) attachment := makeTestAttachment(attachID, nodeName, pv.GetName()) attachment.Status.Attached = stat _, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment) @@ -239,9 +283,9 @@ func TestAttacherDetach(t *testing.T) { attachID string shouldFail bool }{ - {name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))}, - {name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))}, - {name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true}, + {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, + {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, + {name: "object not found", volID: "vol-001", attachID: getAttachmentName("vol-002", testDriver, nodeName), shouldFail: true}, } for _, tc := range testCases { diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 22b4d5bae6b..9db7f868e09 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -39,6 +39,7 @@ type csiClient interface { targetPath string, accessMode api.PersistentVolumeAccessMode, volumeInfo map[string]string, + volumeAttribs map[string]string, fsType string, ) error NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error @@ -141,6 +142,7 @@ func (c *csiDriverClient) NodePublishVolume( targetPath string, accessMode api.PersistentVolumeAccessMode, volumeInfo map[string]string, + volumeAttribs map[string]string, fsType string, ) error { @@ -161,6 +163,7 @@ func (c *csiDriverClient) NodePublishVolume( TargetPath: targetPath, Readonly: readOnly, PublishVolumeInfo: volumeInfo, + VolumeAttributes: volumeAttribs, VolumeCapability: &csipb.VolumeCapability{ AccessMode: &csipb.VolumeCapability_AccessMode{ diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 54605cdb483..18d2e83639c 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -90,6 +90,7 @@ func TestClientNodePublishVolume(t *testing.T) { tc.targetPath, api.ReadWriteOnce, map[string]string{"device": "/dev/null"}, + map[string]string{"attr0": "val0"}, tc.fsType, ) diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 8645752ad24..2c4e0ad8492 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -17,6 +17,7 @@ limitations under the License. package csi import ( + "encoding/json" "errors" "fmt" "path" @@ -24,7 +25,6 @@ import ( "github.com/golang/glog" grpctx "golang.org/x/net/context" api "k8s.io/api/core/v1" - storage "k8s.io/api/storage/v1alpha1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -77,11 +77,18 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error { func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir)) + csiSource, err := getCSISourceFromSpec(c.spec) + if err != nil { + glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err)) + return err + } + ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout) defer cancel() csi := c.csiClient - pvName := c.spec.PersistentVolume.GetName() + nodeName := string(c.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) // ensure version is supported if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { @@ -92,48 +99,47 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName if c.volumeInfo == nil { - //TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve - //the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable) - attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{}) + attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { - glog.Error(log("failed to get volume attachments: %v", err)) + glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err)) return err } - var attachment *storage.VolumeAttachment - for _, attach := range attachList.Items { - if attach.Spec.Source.PersistentVolumeName != nil && - *attach.Spec.Source.PersistentVolumeName == pvName { - attachment = &attach - break - } - } - if attachment == nil { - glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName)) + glog.Error(log("unable to find VolumeAttachment [id=%s]", attachID)) return errors.New("no existing VolumeAttachment found") } c.volumeInfo = attachment.Status.AttachmentMetadata } + // get volume attributes + // TODO: for alpha vol atttributes are passed via PV.Annotations + // Beta will fix that + attribs, err := getVolAttribsFromSpec(c.spec) + if err != nil { + glog.Error(log("mounter.SetUpAt failed to extract volume attributes from PV annotations: %v", err)) + return err + } + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI accessMode := api.ReadWriteOnce if c.spec.PersistentVolume.Spec.AccessModes != nil { accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] } - err := csi.NodePublishVolume( + err = csi.NodePublishVolume( ctx, c.volumeID, c.readOnly, dir, accessMode, c.volumeInfo, + attribs, "ext4", //TODO needs to be sourced from PV or somewhere else ) if err != nil { - glog.Errorf(log("Mounter.Setup failed: %v", err)) + glog.Errorf(log("Mounter.SetupAt failed: %v", err)) return err } glog.V(4).Infof(log("successfully mounted %s", dir)) @@ -192,3 +198,26 @@ func (c *csiMountMgr) TearDownAt(dir string) error { return nil } + +// getVolAttribsFromSpec exracts CSI VolumeAttributes information from PV.Annotations +// using key csi.kubernetes.io/volume-attributes. The annotation value is expected +// to be a JSON-encoded object of form {"key0":"val0",...,"keyN":"valN"} +func getVolAttribsFromSpec(spec *volume.Spec) (map[string]string, error) { + if spec == nil { + return nil, errors.New("missing volume spec") + } + annotations := spec.PersistentVolume.GetAnnotations() + if annotations == nil { + return nil, nil // no annotations found + } + jsonAttribs := annotations[csiVolAttribsAnnotationKey] + if jsonAttribs == "" { + return nil, nil // csi annotation not found + } + attribs := map[string]string{} + if err := json.Unmarshal([]byte(jsonAttribs), &attribs); err != nil { + glog.Error(log("error parsing csi PV.Annotation [%s]=%s: %v", csiVolAttribsAnnotationKey, jsonAttribs, err)) + return nil, err + } + return attribs, nil +} diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 5e7f6083b60..19e931ed4ce 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -26,8 +26,10 @@ import ( storage "k8s.io/api/storage/v1alpha1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + fakeclient "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" + volumetest "k8s.io/kubernetes/pkg/volume/testing" ) var ( @@ -68,7 +70,14 @@ func TestMounterGetPath(t *testing.T) { func TestMounterSetUp(t *testing.T) { plug, tmpDir := newTestPlugin(t) defer os.RemoveAll(tmpDir) - + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHostWithNodeName( + tmpDir, + fakeClient, + nil, + "fakeNode", + ) + plug.host = host pv := makeTestPV("test-pv", 10, testDriver, testVol) pvName := pv.GetName() @@ -88,9 +97,11 @@ func TestMounterSetUp(t *testing.T) { csiMounter := mounter.(*csiMountMgr) csiMounter.csiClient = setupClient(t) + attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName())) + attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ - Name: "pv-1234556775313", + Name: attachID, }, Spec: storage.VolumeAttachmentSpec{ NodeName: "test-node", @@ -150,3 +161,50 @@ func TestUnmounterTeardown(t *testing.T) { } } + +func TestGetVolAttribsFromSpec(t *testing.T) { + testCases := []struct { + name string + annotations map[string]string + attribs map[string]string + shouldFail bool + }{ + { + name: "attribs ok", + annotations: map[string]string{"key0": "val0", csiVolAttribsAnnotationKey: `{"k0":"attr0","k1":"attr1","k2":"attr2"}`, "keyN": "valN"}, + attribs: map[string]string{"k0": "attr0", "k1": "attr1", "k2": "attr2"}, + }, + + { + name: "missing attribs", + annotations: map[string]string{"key0": "val0", "keyN": "valN"}, + }, + { + name: "missing annotations", + }, + { + name: "bad json", + annotations: map[string]string{"key0": "val0", csiVolAttribsAnnotationKey: `{"k0""attr0","k1":"attr1,"k2":"attr2"`, "keyN": "valN"}, + attribs: map[string]string{"k0": "attr0", "k1": "attr1", "k2": "attr2"}, + shouldFail: true, + }, + } + spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, testDriver, testVol), false) + for _, tc := range testCases { + t.Log("test case:", tc.name) + spec.PersistentVolume.Annotations = tc.annotations + attribs, err := getVolAttribsFromSpec(spec) + if !tc.shouldFail && err != nil { + t.Error("test case should not fail, but err != nil", err) + } + eq := true + for k, v := range attribs { + if tc.attribs[k] != v { + eq = false + } + } + if !eq { + t.Errorf("expecting attribs %#v, but got %#v", tc.attribs, attribs) + } + } +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 876de2281bc..8862cda98d9 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -34,14 +34,14 @@ import ( ) const ( - csiName = "csi" - csiPluginName = "kubernetes.io/csi" + csiPluginName = "kubernetes.io/csi" + csiVolAttribsAnnotationKey = "csi.volume.kubernetes.io/volume-attributes" // TODO (vladimirvivien) implement a more dynamic way to discover // the unix domain socket path for each installed csi driver. // TODO (vladimirvivien) would be nice to name socket with a .sock extension // for consistency. - csiAddrTemplate = "/var/lib/kubelet/plugins/%v" + csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock" csiTimeout = 15 * time.Second volNameSep = "^" ) diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index bbfd1874f00..42ceaa40d2f 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -303,6 +303,9 @@ type VolumeHost interface { // Returns the labels on the node GetNodeLabels() (map[string]string, error) + + // Returns the name of the node + GetNodeName() types.NodeName } // VolumePluginMgr tracks registered plugins. diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index c940abdbe45..766792dd8a2 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -53,6 +53,7 @@ type fakeVolumeHost struct { exec mount.Exec writer io.Writer nodeLabels map[string]string + nodeName string } func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { @@ -69,6 +70,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf return volHost } +func NewFakeVolumeHostWithNodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost { + volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil) + volHost.nodeName = nodeName + return volHost +} + func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud} host.mounter = &mount.FakeMounter{} @@ -177,6 +184,10 @@ func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) { return f.nodeLabels, nil } +func (f *fakeVolumeHost) GetNodeName() types.NodeName { + return types.NodeName(f.nodeName) +} + func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin { if _, ok := config.OtherAttributes["fake-property"]; ok { return []VolumePlugin{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index cc0813f5de1..c94c0295334 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -58,16 +58,25 @@ func buildControllerRoles() ([]rbac.ClusterRole, []rbac.ClusterRoleBinding) { // controllerRoleBindings is a slice of roles used for controllers controllerRoleBindings := []rbac.ClusterRoleBinding{} - addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"}, - Rules: []rbac.PolicyRule{ - rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(), - rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), - rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), - eventsRule(), - }, - }) + addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbac.ClusterRole { + role := rbac.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "attachdetach-controller"}, + Rules: []rbac.PolicyRule{ + rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("persistentvolumes", "persistentvolumeclaims").RuleOrDie(), + rbac.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbac.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + eventsRule(), + }, + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + role.Rules = append(role.Rules, rbac.NewRule("get", "create", "delete", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()) + } + + return role + }()) + addControllerRole(&controllerRoles, &controllerRoleBindings, rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "clusterrole-aggregation-controller"}, Rules: []rbac.PolicyRule{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 02c896128aa..c098d30ad6c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -143,6 +143,12 @@ func NodeRules() []rbac.PolicyRule { pvcStatusPolicyRule := rbac.NewRule("get", "update", "patch").Groups(legacyGroup).Resources("persistentvolumeclaims/status").RuleOrDie() nodePolicyRules = append(nodePolicyRules, pvcStatusPolicyRule) } + + // CSI + if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { + volAttachRule := rbac.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie() + nodePolicyRules = append(nodePolicyRules, volAttachRule) + } return nodePolicyRules } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 7b463699ea1..6cdaecb83cf 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -120,7 +120,7 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { node, err := c.Get(id) if apierrors.IsNotFound(err) { - return nil, fmt.Errorf("node '%v' not found", id) + return nil, err } if err != nil { @@ -1214,6 +1214,10 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [ if affinity != nil && affinity.PodAntiAffinity != nil { existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { + if apierrors.IsNotFound(err) { + glog.Errorf("Node not found, %v", existingPod.Spec.NodeName) + continue + } return nil, err } existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) diff --git a/plugin/pkg/scheduler/algorithm/priorities/BUILD b/plugin/pkg/scheduler/algorithm/priorities/BUILD index f473a44b4c2..2c335c4004e 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/BUILD +++ b/plugin/pkg/scheduler/algorithm/priorities/BUILD @@ -36,6 +36,7 @@ go_library( "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index ae168d9c763..af2cd8f6c20 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -21,6 +21,7 @@ import ( "sync" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -137,6 +138,10 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node processPod := func(existingPod *v1.Pod) error { existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { + if apierrors.IsNotFound(err) { + glog.Errorf("Node not found, %v", existingPod.Spec.NodeName) + return nil + } return err } existingPodAffinity := existingPod.Spec.Affinity @@ -189,19 +194,21 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node } processNode := func(i int) { nodeInfo := nodeNameToInfo[allNodeNames[i]] - if hasAffinityConstraints || hasAntiAffinityConstraints { - // We need to process all the nodes. - for _, existingPod := range nodeInfo.Pods() { - if err := processPod(existingPod); err != nil { - pm.setError(err) + if nodeInfo.Node() != nil { + if hasAffinityConstraints || hasAntiAffinityConstraints { + // We need to process all the nodes. + for _, existingPod := range nodeInfo.Pods() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } } - } - } else { - // The pod doesn't have any constraints - we need to check only existing - // ones that have some. - for _, existingPod := range nodeInfo.PodsWithAffinity() { - if err := processPod(existingPod); err != nil { - pm.setError(err) + } else { + // The pod doesn't have any constraints - we need to check only existing + // ones that have some. + for _, existingPod := range nodeInfo.PodsWithAffinity() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } } } }