From e1068c9a495e99913f6f6d26765bee00f3db426d Mon Sep 17 00:00:00 2001 From: Markus Suonto Date: Wed, 28 Sep 2016 09:20:59 +0300 Subject: [PATCH] EnsureLoadBalancer update instead of recreate existing LBs --- .../openstack/openstack_loadbalancer.go | 440 ++++++++++++++---- 1 file changed, 346 insertions(+), 94 deletions(-) diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go index 239e7c35e25..de996e9e3d7 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -241,6 +241,161 @@ func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loa return &loadbalancerList[0], nil } +func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) { + var existingListeners []listeners.Listener + err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) { + listenerList, err := listeners.ExtractListeners(page) + if err != nil { + return false, err + } + existingListeners = append(existingListeners, listenerList...) + + return true, nil + }) + if err != nil { + return nil, err + } + + return existingListeners, nil +} + +// get listener for a port or nil if does not exist +func getListenerForPort(existingListeners []listeners.Listener, port api.ServicePort) *listeners.Listener { + for _, l := range existingListeners { + if l.Protocol == string(port.Protocol) && l.ProtocolPort == int(port.Port) { + return &l + } + } + + return nil +} + +// Get pool for a listener. A listener always has exactly one pool. +func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2_pools.Pool, error) { + listenerPools := make([]v2_pools.Pool, 0, 1) + err := v2_pools.List(client, v2_pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) { + poolsList, err := v2_pools.ExtractPools(page) + if err != nil { + return false, err + } + for _, p := range poolsList { + for _, l := range p.Listeners { + if l.ID == listenerID { + listenerPools = append(listenerPools, p) + } + } + } + if len(listenerPools) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(listenerPools) == 0 { + return nil, ErrNotFound + } else if len(listenerPools) > 1 { + return nil, ErrMultipleResults + } + + return &listenerPools[0], nil +} + +func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2_pools.Member, error) { + var members []v2_pools.Member + err := v2_pools.ListAssociateMembers(client, id, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) { + membersList, err := v2_pools.ExtractMembers(page) + if err != nil { + return false, err + } + members = append(members, membersList...) + + return true, nil + }) + if err != nil { + return nil, err + } + + return members, nil +} + +// Each pool has exactly one or zero monitors. ListOpts does not seem to filter anything. +func getMonitorByPoolID(client *gophercloud.ServiceClient, id string) (*v2_monitors.Monitor, error) { + var monitorList []v2_monitors.Monitor + err := v2_monitors.List(client, v2_monitors.ListOpts{PoolID: id}).EachPage(func(page pagination.Page) (bool, error) { + monitorsList, err := v2_monitors.ExtractMonitors(page) + if err != nil { + return false, err + } + + for _, monitor := range monitorsList { + // bugfix, filter by poolid + for _, pool := range monitor.Pools { + if pool.ID == id { + monitorList = append(monitorList, monitor) + } + } + } + if len(monitorList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(monitorList) == 0 { + return nil, ErrNotFound + } else if len(monitorList) > 1 { + return nil, ErrMultipleResults + } + + return &monitorList[0], nil +} + +// Check if a member exists for node +func memberExists(members []v2_pools.Member, addr string) bool { + for _, member := range members { + if member.Address == addr { + return true + } + } + + return false +} + +func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener { + for i, existingListener := range existingListeners { + if existingListener.ID == id { + existingListeners[i] = existingListeners[len(existingListeners)-1] + existingListeners = existingListeners[:len(existingListeners)-1] + break + } + } + + return existingListeners +} + +func popMember(members []v2_pools.Member, addr string) []v2_pools.Member { + for i, member := range members { + if member.Address == addr { + members[i] = members[len(members)-1] + members = members[:len(members)-1] + } + } + + return members +} + func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) { start := time.Now().Second() for { @@ -283,6 +438,26 @@ func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID s } } +func (lbaas *LbaasV2) createLoadBalancer(service *api.Service, name string) (*loadbalancers.LoadBalancer, error) { + createOpts := loadbalancers.CreateOpts{ + Name: name, + Description: fmt.Sprintf("Kubernetes external service %s", name), + VipSubnetID: lbaas.opts.SubnetId, + } + + loadBalancerIP := service.Spec.LoadBalancerIP + if loadBalancerIP != "" { + createOpts.VipAddress = loadBalancerIP + } + + loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract() + if err != nil { + return nil, fmt.Errorf("Error creating loadbalancer %v: %v", createOpts, err) + } + + return loadbalancer, nil +} + func (lbaas *LbaasV2) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) { loadBalancerName := cloudprovider.GetLoadBalancerName(service) loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName) @@ -340,101 +515,126 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") } - glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService)) - _, exists, err := lbaas.GetLoadBalancer(clusterName, apiService) + name := cloudprovider.GetLoadBalancerName(apiService) + loadbalancer, err := getLoadbalancerByName(lbaas.network, name) if err != nil { - return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) + if err != ErrNotFound { + return nil, fmt.Errorf("Error getting loadbalancer %s: %v", name, err) + } + glog.V(2).Infof("Creating loadbalancer %s", name) + loadbalancer, err = lbaas.createLoadBalancer(apiService, name) + if err != nil { + // Unknown error, retry later + return nil, fmt.Errorf("Error creating loadbalancer %s: %v", name, err) + } + } else { + glog.V(2).Infof("LoadBalancer %s already exists", name) } - // TODO: Implement a more efficient update strategy for common changes than delete & create - // In particular, if we implement hosts update, we can get rid of UpdateHosts - if exists { - err := lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - if err != nil { - return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) - } - } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod) if lbmethod == "" { lbmethod = v2_pools.LBMethodRoundRobin } - name := cloudprovider.GetLoadBalancerName(apiService) - createOpts := loadbalancers.CreateOpts{ - Name: name, - Description: fmt.Sprintf("Kubernetes external service %s", name), - VipSubnetID: lbaas.opts.SubnetId, - } - - loadBalancerIP := apiService.Spec.LoadBalancerIP - if loadBalancerIP != "" { - createOpts.VipAddress = loadBalancerIP - } - - loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract() + oldListeners, err := getListenersByLoadBalancerID(lbaas.network, loadbalancer.ID) if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err + return nil, fmt.Errorf("Error getting LB %s listeners: %v", name, err) } - - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - for portIndex, port := range ports { - listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{ - Name: fmt.Sprintf("listener_%s_%d", name, portIndex), - Protocol: listeners.Protocol(port.Protocol), - ProtocolPort: int(port.Port), - LoadbalancerID: loadbalancer.ID, - }).Extract() - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err - } - - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - - pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{ - Name: fmt.Sprintf("pool_%s_%d", name, portIndex), - Protocol: v2_pools.Protocol(port.Protocol), - LBMethod: lbmethod, - ListenerID: listener.ID, - Persistence: persistence, - }).Extract() - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err - } - - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - - for _, nodeName := range nodeNames { - addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName)) - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err - } - - _, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ - ProtocolPort: int(port.NodePort), - Address: addr, - SubnetID: lbaas.opts.SubnetId, + listener := getListenerForPort(oldListeners, port) + if listener == nil { + glog.V(4).Infof("Creating listener for port %d", int(port.Port)) + listener, err = listeners.Create(lbaas.network, listeners.CreateOpts{ + Name: fmt.Sprintf("listener_%s_%d", name, portIndex), + Protocol: listeners.Protocol(port.Protocol), + ProtocolPort: int(port.Port), + LoadbalancerID: loadbalancer.ID, }).Extract() if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err + // Unknown error, retry later + return nil, fmt.Errorf("Error creating LB listener: %v", err) } - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) } - if lbaas.opts.CreateMonitor { - _, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{ + glog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID) + + // After all ports have been processed, remaining listeners are removed as obsolete. + // Pop valid listeners. + oldListeners = popListener(oldListeners, listener.ID) + pool, err := getPoolByListenerID(lbaas.network, loadbalancer.ID, listener.ID) + if err != nil && err != ErrNotFound { + // Unknown error, retry later + return nil, fmt.Errorf("Error getting pool for listener %s: %v", listener.ID, err) + } + if pool == nil { + glog.V(4).Infof("Creating pool for listener %s", listener.ID) + pool, err = v2_pools.Create(lbaas.network, v2_pools.CreateOpts{ + Name: fmt.Sprintf("pool_%s_%d", name, portIndex), + Protocol: v2_pools.Protocol(port.Protocol), + LBMethod: lbmethod, + ListenerID: listener.ID, + Persistence: persistence, + }).Extract() + if err != nil { + // Unknown error, retry later + return nil, fmt.Errorf("Error creating pool for listener %s: %v", listener.ID, err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + glog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID) + members, err := getMembersByPoolID(lbaas.network, pool.ID) + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error getting pool members %s: %v", pool.ID, err) + } + for _, nodeName := range nodeNames { + addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName)) + if err != nil { + if err == ErrNotFound { + // Node failure, do not create member + glog.Warningf("Failed to create LB pool member for node %s: %v", nodeName, err) + continue + } else { + return nil, fmt.Errorf("Error getting address for node %s: %v", nodeName, err) + } + } + + if !memberExists(members, addr) { + glog.V(4).Infof("Creating member for pool %s", pool.ID) + _, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ + ProtocolPort: int(port.NodePort), + Address: addr, + SubnetID: lbaas.opts.SubnetId, + }).Extract() + if err != nil { + return nil, fmt.Errorf("Error creating LB pool member for node: %s, %v", nodeName, err) + } + + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + // After all members have been processed, remaining members are deleted as obsolete. + members = popMember(members, addr) + + glog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, nodeName, addr) + } + + // Delete obsolete members for this pool + for _, member := range members { + glog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address) + err := v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + monitorID := pool.MonitorID + if monitorID == "" && lbaas.opts.CreateMonitor { + glog.V(4).Infof("Creating monitor for pool %s", pool.ID) + monitor, err := v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{ PoolID: pool.ID, Type: string(port.Protocol), Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()), @@ -442,40 +642,92 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser MaxRetries: int(lbaas.opts.MonitorMaxRetries), }).Extract() if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err + return nil, fmt.Errorf("Error creating LB pool healthmonitor: %v", err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + monitorID = monitor.ID + } + + glog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID) + } + + // All remaining listeners are obsolete, delete + for _, listener := range oldListeners { + glog.V(4).Infof("Deleting obsolete listener %s:", listener.ID) + // get pool for listener + pool, err := getPoolByListenerID(lbaas.network, loadbalancer.ID, listener.ID) + if err != nil && err != ErrNotFound { + return nil, fmt.Errorf("Error getting pool for obsolete listener %s: %v", listener.ID, err) + } + if pool != nil { + // get and delete monitor + monitorID := pool.MonitorID + if monitorID != "" { + glog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID) + err = v2_monitors.Delete(lbaas.network, monitorID).ExtractErr() + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + // get and delete pool members + members, err := getMembersByPoolID(lbaas.network, pool.ID) + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error getting members for pool %s: %v", pool.ID, err) + } + if members != nil { + for _, member := range members { + glog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address) + err := v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + } + glog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID) + // delete pool + err = v2_pools.Delete(lbaas.network, pool.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err) } waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) } + // delete listener + err = listeners.Delete(lbaas.network, listener.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return nil, fmt.Errorf("Error deleteting obsolete listener: %v", err) + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + glog.V(2).Infof("Deleted obsolete listener: %s", listener.ID) } status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}} - if lbaas.opts.FloatingNetworkId != "" { - portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress) - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err - } - + portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress) + if err != nil { + return nil, fmt.Errorf("Error getting port for LB vip %s: %v", loadbalancer.VipAddress, err) + } + floatIP, err := getFloatingIPByPortID(lbaas.network, portID) + if err != nil && err != ErrNotFound { + return nil, fmt.Errorf("Error getting floating ip for port %s: %v", portID, err) + } + if floatIP == nil && lbaas.opts.FloatingNetworkId != "" { + glog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID) floatIPOpts := floatingips.CreateOpts{ FloatingNetworkID: lbaas.opts.FloatingNetworkId, PortID: portID, } - floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract() + floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract() if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) - return nil, err + return nil, fmt.Errorf("Error creating LB floatingip %+v: %v", floatIPOpts, err) } - - status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP}) } + status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP}) + return status, nil }