Merge pull request #47726 from rootfs/revert-45528

Automatic merge from submit-queue (batch tested with PRs 47726, 47693, 46909, 46812)

manually revert #45528

**What this PR does / why we need it**:
Revert #45528
**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #47657

**Special notes for your reviewer**:
@humblec @liggitt @saad-ali @kubernetes/kubernetes-release-managers 
**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-19 18:33:59 -07:00 committed by GitHub
commit 5e6355ca9d

View File

@ -80,9 +80,6 @@ const (
// absGidMin <= defGidMin <= defGidMax <= absGidMax
absoluteGidMin = 2000
absoluteGidMax = math.MaxInt32
heketiAnn = "heketi-dynamic-provisioner"
glusterTypeAnn = "gluster.org/type"
glusterDescAnn = "Gluster: Dynamically provisioned PV"
linuxGlusterMountBinary = "mount.glusterfs"
autoUnmountBinaryVer = "3.11"
)
@ -146,57 +143,12 @@ func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volu
if kubeClient == nil {
return nil, fmt.Errorf("glusterfs: failed to get kube client to initialize mounter")
}
ep, err := kubeClient.Core().Endpoints(ns).Get(epName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
claim := spec.PersistentVolume.Spec.ClaimRef.Name
checkEpName := dynamicEpSvcPrefix + claim
if epName != checkEpName {
return nil, fmt.Errorf("failed to get endpoint %s, error %v", epName, err)
}
glog.Errorf("glusterfs: failed to get endpoint %s[%v]", epName, err)
if spec != nil && spec.PersistentVolume.Annotations[volumehelper.VolumeDynamicallyCreatedByKey] == heketiAnn {
class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get storageclass, error: %v", err)
}
cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient())
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to parse parameters, error: %v", err)
}
scConfig := *cfg
cli := gcli.NewClient(scConfig.url, scConfig.user, scConfig.secretValue)
if cli == nil {
return nil, fmt.Errorf("glusterfs: failed to create heketi client, error: %v", err)
}
volumeID := dstrings.TrimPrefix(source.Path, volPrefix)
volInfo, err := cli.VolumeInfo(volumeID)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get volume info, error: %v", err)
}
endpointIPs, err := getClusterNodes(cli, volInfo.Cluster)
if err != nil {
return nil, fmt.Errorf("glusterfs: failed to get cluster nodes, error: %v", err)
}
// Give an attempt to recreate endpoint/service.
_, _, err = plugin.createEndpointService(ns, epName, endpointIPs, claim)
if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("glusterfs: failed to recreate endpoint/service, error: %v", err)
return nil, fmt.Errorf("failed to recreate endpoint/service, error: %v", err)
}
glog.V(3).Infof("glusterfs: endpoint/service [%v] successfully recreated ", epName)
} else {
return nil, err
}
if err != nil {
glog.Errorf("glusterfs: failed to get endpoints %s[%v]", epName, err)
return nil, err
}
glog.V(1).Infof("glusterfs: endpoints %v", ep)
return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(), exec.New())
}
@ -634,77 +586,6 @@ func (plugin *glusterfsPlugin) getGidTable(className string, min int, max int) (
return newGidTable, nil
}
//createEndpointService create an endpoint and service in provided namespace.
func (plugin *glusterfsPlugin) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {
addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}
// deleteEndpointService delete the endpoint and service from the provided namespace.
func (plugin *glusterfsPlugin) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}
func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) {
gidStr, ok := d.spec.Annotations[volumehelper.VolumeGidAnnotationKey]
@ -778,7 +659,7 @@ func (d *glusterfsVolumeDeleter) Delete() error {
dynamicEndpoint = pvSpec.Glusterfs.EndpointsName
}
glog.V(3).Infof("glusterfs: dynamic namespace and endpoint : [%v/%v]", dynamicNamespace, dynamicEndpoint)
err = d.plugin.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
if err != nil {
glog.Errorf("glusterfs: error when deleting endpoint/service :%v", err)
} else {
@ -840,11 +721,11 @@ func (p *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
gidStr := strconv.FormatInt(int64(gid), 10)
pv.Annotations = map[string]string{
volumehelper.VolumeGidAnnotationKey: gidStr,
volumehelper.VolumeDynamicallyCreatedByKey: heketiAnn,
glusterTypeAnn: "file",
"Description": glusterDescAnn,
v1.MountOptionAnnotation: "auto_unmount",
volumehelper.VolumeGidAnnotationKey: gidStr,
"kubernetes.io/createdby": "heketi-dynamic-provisioner",
"gluster.org/type": "file",
"Description": "Gluster: Dynamically provisioned PV",
v1.MountOptionAnnotation: "auto_unmount",
}
pv.Spec.Capacity = v1.ResourceList{
@ -853,6 +734,33 @@ func (p *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
return pv, nil
}
func (p *glusterfsVolumeProvisioner) GetClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}
// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}
func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolumeSource, size int, err error) {
var clusterIDs []string
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
@ -880,7 +788,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
return nil, 0, fmt.Errorf("error creating volume %v", err)
}
glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name)
dynamicHostIps, err := getClusterNodes(cli, volume.Cluster)
dynamicHostIps, err := p.GetClusterNodes(cli, volume.Cluster)
if err != nil {
glog.Errorf("glusterfs: error [%v] when getting cluster nodes for volume %s", err, volume)
return nil, 0, fmt.Errorf("error [%v] when getting cluster nodes for volume %s", err, volume)
@ -892,7 +800,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
// of volume creation.
epServiceName := dynamicEpSvcPrefix + p.options.PVC.Name
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.plugin.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint/service: %v", err)
deleteErr := cli.VolumeDelete(volume.Id)
@ -909,6 +817,75 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum
}, sz, nil
}
func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {
addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: epServiceName,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service")
}
_, err = kubeClient.Core().Endpoints(namespace).Create(endpoint)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create endpoint: %v", err)
return nil, nil, fmt.Errorf("error creating endpoint: %v", err)
}
service = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: epServiceName,
Namespace: namespace,
Labels: map[string]string{
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Protocol: "TCP", Port: 1}}}}
_, err = kubeClient.Core().Services(namespace).Create(service)
if err != nil && errors.IsAlreadyExists(err) {
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
err = nil
}
if err != nil {
glog.Errorf("glusterfs: failed to create service: %v", err)
return nil, nil, fmt.Errorf("error creating service: %v", err)
}
return endpoint, service, nil
}
func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) {
kubeClient := d.plugin.host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service")
}
err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil)
if err != nil {
glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err)
return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err)
}
glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName)
return nil
}
// parseSecret finds a given Secret instance and reads user password from it.
func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient)
@ -1058,32 +1035,3 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
return &cfg, nil
}
// getClusterNodes() returns the cluster nodes of a given cluster
func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
clusterinfo, err := cli.ClusterInfo(cluster)
if err != nil {
glog.Errorf("glusterfs: failed to get cluster details: %v", err)
return nil, fmt.Errorf("failed to get cluster details: %v", err)
}
// For the dynamically provisioned volume, we gather the list of node IPs
// of the cluster on which provisioned volume belongs to, as there can be multiple
// clusters.
for _, node := range clusterinfo.Nodes {
nodei, err := cli.NodeInfo(string(node))
if err != nil {
glog.Errorf("glusterfs: failed to get hostip: %v", err)
return nil, fmt.Errorf("failed to get hostip: %v", err)
}
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
dynamicHostIps = append(dynamicHostIps, ipaddr)
}
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
if len(dynamicHostIps) == 0 {
glog.Errorf("glusterfs: no hosts found: %v", err)
return nil, fmt.Errorf("no hosts found: %v", err)
}
return dynamicHostIps, nil
}