From 936c81ddfbd5370fe15e9a5bc7cfc934dcd348fe Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Tue, 16 May 2017 19:51:21 +0530 Subject: [PATCH] Recreate endpoint/service if it lost in between. In some setups, after creation of dynamic PVs and before mounting/using these PVs in a pod, the endpoint/service got mistakenly deleted by the user/developer. By making these methods 'plugin' specific, we can call it from mounter if there are scenarios where the endpoint and service got wiped in between accidentally. Signed-off-by: Humble Chirammal hchiramm@redhat.com --- pkg/volume/glusterfs/glusterfs.go | 272 ++++++++++++++++++------------ 1 file changed, 162 insertions(+), 110 deletions(-) diff --git a/pkg/volume/glusterfs/glusterfs.go b/pkg/volume/glusterfs/glusterfs.go index c12674deb98..73cf6d3cdac 100644 --- a/pkg/volume/glusterfs/glusterfs.go +++ b/pkg/volume/glusterfs/glusterfs.go @@ -80,6 +80,9 @@ const ( // absGidMin <= defGidMin <= defGidMax <= absGidMax absoluteGidMin = 2000 absoluteGidMax = math.MaxInt32 + heketiAnn = "heketi-dynamic-provisioner" + glusterTypeAnn = "gluster.org/type" + glusterDescAnn = "Gluster: Dynamically provisioned PV" ) func (plugin *glusterfsPlugin) Init(host volume.VolumeHost) error { @@ -141,12 +144,57 @@ func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volu if kubeClient == nil { return nil, fmt.Errorf("glusterfs: failed to get kube client to initialize mounter") } + ep, err := kubeClient.Core().Endpoints(ns).Get(epName, metav1.GetOptions{}) - if err != nil { - glog.Errorf("glusterfs: failed to get endpoints %s[%v]", epName, err) - return nil, err + if err != nil && errors.IsNotFound(err) { + claim := spec.PersistentVolume.Spec.ClaimRef.Name + checkEpName := dynamicEpSvcPrefix + claim + if epName != checkEpName { + return nil, fmt.Errorf("failed to get endpoint %s, error %v", epName, err) + } + glog.Errorf("glusterfs: failed to get endpoint %s[%v]", epName, err) + if spec != nil && spec.PersistentVolume.Annotations["kubernetes.io/createdby"] == heketiAnn { + class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume) + if err != nil { + return nil, fmt.Errorf("glusterfs: failed to get storageclass, error: %v", err) + } + + cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient()) + if err != nil { + return nil, fmt.Errorf("glusterfs: failed to parse parameters, error: %v", err) + } + + scConfig := *cfg + cli := gcli.NewClient(scConfig.url, scConfig.user, scConfig.secretValue) + if cli == nil { + return nil, fmt.Errorf("glusterfs: failed to create heketi client, error: %v", err) + } + + volumeID := dstrings.TrimPrefix(source.Path, volPrefix) + volInfo, err := cli.VolumeInfo(volumeID) + if err != nil { + return nil, fmt.Errorf("glusterfs: failed to get volume info, error: %v", err) + } + + endpointIPs, err := getClusterNodes(cli, volInfo.Cluster) + if err != nil { + return nil, fmt.Errorf("glusterfs: failed to get cluster nodes, error: %v", err) + } + + // Give an attempt to recreate endpoint/service. + + _, _, err = plugin.createEndpointService(ns, epName, endpointIPs, claim) + + if err != nil && !errors.IsAlreadyExists(err) { + glog.Errorf("glusterfs: failed to recreate endpoint/service, error: %v", err) + return nil, fmt.Errorf("failed to recreate endpoint/service, error: %v", err) + } + glog.V(3).Infof("glusterfs: endpoint/service [%v] successfully recreated ", epName) + } else { + return nil, err + } } - glog.V(1).Infof("glusterfs: endpoints %v", ep) + return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(), exec.New()) } @@ -567,6 +615,77 @@ func (p *glusterfsPlugin) getGidTable(className string, min int, max int) (*MinM return newGidTable, nil } +//createEndpointService create an endpoint and service in provided namespace. +func (plugin *glusterfsPlugin) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) { + + addrlist := make([]v1.EndpointAddress, len(hostips)) + for i, v := range hostips { + addrlist[i].IP = v + } + endpoint = &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: epServiceName, + Labels: map[string]string{ + "gluster.kubernetes.io/provisioned-for-pvc": pvcname, + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: addrlist, + Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}}, + }}, + } + kubeClient := plugin.host.GetKubeClient() + if kubeClient == nil { + return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service") + } + _, err = kubeClient.Core().Endpoints(namespace).Create(endpoint) + if err != nil && errors.IsAlreadyExists(err) { + glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace) + err = nil + } + if err != nil { + glog.Errorf("glusterfs: failed to create endpoint: %v", err) + return nil, nil, fmt.Errorf("error creating endpoint: %v", err) + } + service = &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: epServiceName, + Namespace: namespace, + Labels: map[string]string{ + "gluster.kubernetes.io/provisioned-for-pvc": pvcname, + }, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + {Protocol: "TCP", Port: 1}}}} + _, err = kubeClient.Core().Services(namespace).Create(service) + if err != nil && errors.IsAlreadyExists(err) { + glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace) + err = nil + } + if err != nil { + glog.Errorf("glusterfs: failed to create service: %v", err) + return nil, nil, fmt.Errorf("error creating service: %v", err) + } + return endpoint, service, nil +} + +// deleteEndpointService delete the endpoint and service from the provided namespace. +func (plugin *glusterfsPlugin) deleteEndpointService(namespace string, epServiceName string) (err error) { + kubeClient := plugin.host.GetKubeClient() + if kubeClient == nil { + return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service") + } + err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil) + if err != nil { + glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err) + return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err) + } + glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName) + return nil +} + func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) { gidStr, ok := d.spec.Annotations[volumehelper.VolumeGidAnnotationKey] @@ -640,7 +759,7 @@ func (d *glusterfsVolumeDeleter) Delete() error { dynamicEndpoint = pvSpec.Glusterfs.EndpointsName } glog.V(3).Infof("glusterfs: dynamic namespace and endpoint : [%v/%v]", dynamicNamespace, dynamicEndpoint) - err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint) + err = d.plugin.deleteEndpointService(dynamicNamespace, dynamicEndpoint) if err != nil { glog.Errorf("glusterfs: error when deleting endpoint/service :%v", err) } else { @@ -685,7 +804,7 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { } glog.Errorf("glusterfs: create volume err: %v.", err) - return nil, fmt.Errorf("glusterfs: create volume err: %v.", err) + return nil, fmt.Errorf("glusterfs: create volume err: %v", err) } pv := new(v1.PersistentVolume) pv.Spec.PersistentVolumeSource.Glusterfs = glusterfs @@ -699,9 +818,9 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { pv.Annotations = map[string]string{ volumehelper.VolumeGidAnnotationKey: gidStr, - "kubernetes.io/createdby": "heketi-dynamic-provisioner", - "gluster.org/type": "file", - "Description": "Gluster: Dynamically provisioned PV", + "kubernetes.io/createdby": heketiAnn, + glusterTypeAnn: "file", + "Description": glusterDescAnn, } pv.Spec.Capacity = v1.ResourceList{ @@ -710,33 +829,6 @@ func (r *glusterfsVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { return pv, nil } -func (p *glusterfsVolumeProvisioner) GetClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) { - clusterinfo, err := cli.ClusterInfo(cluster) - if err != nil { - glog.Errorf("glusterfs: failed to get cluster details: %v", err) - return nil, fmt.Errorf("failed to get cluster details: %v", err) - } - - // For the dynamically provisioned volume, we gather the list of node IPs - // of the cluster on which provisioned volume belongs to, as there can be multiple - // clusters. - for _, node := range clusterinfo.Nodes { - nodei, err := cli.NodeInfo(string(node)) - if err != nil { - glog.Errorf("glusterfs: failed to get hostip: %v", err) - return nil, fmt.Errorf("failed to get hostip: %v", err) - } - ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "") - dynamicHostIps = append(dynamicHostIps, ipaddr) - } - glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps) - if len(dynamicHostIps) == 0 { - glog.Errorf("glusterfs: no hosts found: %v", err) - return nil, fmt.Errorf("no hosts found: %v", err) - } - return dynamicHostIps, nil -} - func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolumeSource, size int, err error) { var clusterIDs []string capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] @@ -764,7 +856,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum return nil, 0, fmt.Errorf("error creating volume %v", err) } glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name) - dynamicHostIps, err := p.GetClusterNodes(cli, volume.Cluster) + dynamicHostIps, err := getClusterNodes(cli, volume.Cluster) if err != nil { glog.Errorf("glusterfs: error [%v] when getting cluster nodes for volume %s", err, volume) return nil, 0, fmt.Errorf("error [%v] when getting cluster nodes for volume %s", err, volume) @@ -776,7 +868,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum // of volume creation. epServiceName := dynamicEpSvcPrefix + p.options.PVC.Name epNamespace := p.options.PVC.Namespace - endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name) + endpoint, service, err := p.plugin.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name) if err != nil { glog.Errorf("glusterfs: failed to create endpoint/service: %v", err) deleteErr := cli.VolumeDelete(volume.Id) @@ -793,75 +885,6 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsVolum }, sz, nil } -func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) { - - addrlist := make([]v1.EndpointAddress, len(hostips)) - for i, v := range hostips { - addrlist[i].IP = v - } - endpoint = &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: epServiceName, - Labels: map[string]string{ - "gluster.kubernetes.io/provisioned-for-pvc": pvcname, - }, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: addrlist, - Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}}, - }}, - } - kubeClient := p.plugin.host.GetKubeClient() - if kubeClient == nil { - return nil, nil, fmt.Errorf("glusterfs: failed to get kube client when creating endpoint service") - } - _, err = kubeClient.Core().Endpoints(namespace).Create(endpoint) - if err != nil && errors.IsAlreadyExists(err) { - glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace) - err = nil - } - if err != nil { - glog.Errorf("glusterfs: failed to create endpoint: %v", err) - return nil, nil, fmt.Errorf("error creating endpoint: %v", err) - } - service = &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: epServiceName, - Namespace: namespace, - Labels: map[string]string{ - "gluster.kubernetes.io/provisioned-for-pvc": pvcname, - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - {Protocol: "TCP", Port: 1}}}} - _, err = kubeClient.Core().Services(namespace).Create(service) - if err != nil && errors.IsAlreadyExists(err) { - glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace) - err = nil - } - if err != nil { - glog.Errorf("glusterfs: failed to create service: %v", err) - return nil, nil, fmt.Errorf("error creating service: %v", err) - } - return endpoint, service, nil -} - -func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) { - kubeClient := d.plugin.host.GetKubeClient() - if kubeClient == nil { - return fmt.Errorf("glusterfs: failed to get kube client when deleting endpoint service") - } - err = kubeClient.Core().Services(namespace).Delete(epServiceName, nil) - if err != nil { - glog.Errorf("glusterfs: error deleting service %s/%s: %v", namespace, epServiceName, err) - return fmt.Errorf("error deleting service %s/%s: %v", namespace, epServiceName, err) - } - glog.V(1).Infof("glusterfs: service/endpoint %s/%s deleted successfully", namespace, epServiceName) - return nil -} - // parseSecret finds a given Secret instance and reads user password from it. func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) { secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient) @@ -957,7 +980,7 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa if len(parseVolumeTypeInfo) >= 2 { newReplicaCount, err := convertVolumeParam(parseVolumeTypeInfo[1]) if err != nil { - return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", err, parseVolumeTypeInfo[1], "volumetype", glusterfsPluginName) + return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", err, parseVolumeTypeInfo[1], "volumetype", glusterfsPluginName) } cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: newReplicaCount}} } else { @@ -967,11 +990,11 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa if len(parseVolumeTypeInfo) >= 3 { newDisperseData, err := convertVolumeParam(parseVolumeTypeInfo[1]) if err != nil { - return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", parseVolumeTypeInfo[1], err, "volumetype", glusterfsPluginName) + return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", parseVolumeTypeInfo[1], err, "volumetype", glusterfsPluginName) } newDisperseRedundancy, err := convertVolumeParam(parseVolumeTypeInfo[2]) if err != nil { - return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s.", err, parseVolumeTypeInfo[2], "volumetype", glusterfsPluginName) + return nil, fmt.Errorf("error [%v] when parsing value %q of option '%s' for volume plugin %s", err, parseVolumeTypeInfo[2], "volumetype", glusterfsPluginName) } cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityEC, Disperse: gapi.DisperseDurability{Data: newDisperseData, Redundancy: newDisperseRedundancy}} } else { @@ -1011,3 +1034,32 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa return &cfg, nil } + +// getClusterNodes() returns the cluster nodes of a given cluster + +func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) { + clusterinfo, err := cli.ClusterInfo(cluster) + if err != nil { + glog.Errorf("glusterfs: failed to get cluster details: %v", err) + return nil, fmt.Errorf("failed to get cluster details: %v", err) + } + + // For the dynamically provisioned volume, we gather the list of node IPs + // of the cluster on which provisioned volume belongs to, as there can be multiple + // clusters. + for _, node := range clusterinfo.Nodes { + nodei, err := cli.NodeInfo(string(node)) + if err != nil { + glog.Errorf("glusterfs: failed to get hostip: %v", err) + return nil, fmt.Errorf("failed to get hostip: %v", err) + } + ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "") + dynamicHostIps = append(dynamicHostIps, ipaddr) + } + glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps) + if len(dynamicHostIps) == 0 { + glog.Errorf("glusterfs: no hosts found: %v", err) + return nil, fmt.Errorf("no hosts found: %v", err) + } + return dynamicHostIps, nil +}