Merge pull request #30649 from dagnello/openstack-lbaasv2-multiport

Automatic merge from submit-queue

fix Openstack provider to allow more than one service port for lbaas v2

This resolves bug #30477 where if a service defines multiple ports for load balancer, the plugin will fail with multiple ports are not supported.

@anguslees @jianhuiz
This commit is contained in:
Kubernetes Submit Queue 2016-08-23 22:36:09 -07:00 committed by GitHub
commit e427ab0baa

View File

@ -56,6 +56,8 @@ type LbaasV2 struct {
LoadBalancer
}
type empty struct{}
func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
seen := make(map[string]bool)
@ -238,22 +240,23 @@ func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loa
return &loadbalancerList[0], nil
}
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error {
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second()
for {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
return err
return "", err
}
if loadbalancer.ProvisioningStatus == "ACTIVE" {
return nil
return "ACTIVE", nil
} else if loadbalancer.ProvisioningStatus == "ERROR" {
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state")
}
time.Sleep(1 * time.Second)
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
}
}
@ -304,16 +307,16 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
for _, port := range ports {
if port.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
}
affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity
@ -377,68 +380,73 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: name,
Protocol: listeners.ProtocolTCP,
ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: name,
Protocol: v2_pools.ProtocolTCP,
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
Address: addr,
SubnetID: lbaas.opts.SubnetId,
for portIndex, port := range ports {
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
}
if lbaas.opts.CreateMonitor {
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: monitors.TypeTCP,
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2_pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
if lbaas.opts.CreateMonitor {
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: string(port.Protocol),
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
status := &api.LoadBalancerStatus{}
@ -475,9 +483,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *api.Servic
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, hosts)
ports := service.Spec.Ports
if len(ports) > 1 {
return fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
@ -489,50 +495,37 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *api.Servic
return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName)
}
// Set of member (addresses) that _should_ exist
addrs := map[string]bool{}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol string
Port int
}
lbListeners := make(map[portKey]listeners.Listener)
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
listenersList, err := listeners.ExtractListeners(page)
if err != nil {
return err
return false, err
}
addrs[addr] = true
for _, l := range listenersList {
key := portKey{Protocol: l.Protocol, Port: l.ProtocolPort}
lbListeners[key] = l
}
return true, nil
})
if err != nil {
return err
}
// Iterate over members in each pool that _do_ exist
var poolID string
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2_pools.Pool)
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2_pools.ExtractPools(page)
if err != nil {
return false, err
}
for _, pool := range poolsList {
poolID = pool.ID
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range membersList {
if _, found := addrs[member.Address]; found {
// Member already exists, remove from update list
delete(addrs, member.Address)
} else {
// Member needs to be deleted
err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr()
if err != nil {
return false, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
return true, nil
})
if err != nil {
return false, err
for _, p := range poolsList {
for _, l := range p.Listeners {
lbPools[l.ID] = p
}
}
return true, nil
@ -541,19 +534,79 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *api.Servic
return err
}
// Anything left in addrs is a new member that needs to be added to a pool
for addr := range addrs {
_, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{
Address: addr,
ProtocolPort: int(ports[0].NodePort),
SubnetID: lbaas.opts.SubnetId,
}).Extract()
// Compose Set of member (addresses) that _should_ exist
addrs := map[string]empty{}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
addrs[addr] = empty{}
}
// Check for adding/removing members associated with each port
for _, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: string(port.Protocol),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
}
// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
}
// Find existing pool members (by address) for this port
members := make(map[string]v2_pools.Member)
err := v2_pools.ListAssociateMembers(lbaas.network, pool.ID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range membersList {
members[member.Address] = member
}
return true, nil
})
if err != nil {
return err
}
// Add any new members for this port
for addr := range addrs {
if _, ok := members[addr]; ok {
// Already exists, do not create member
continue
}
_, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok {
// Still present, do not delete member
continue
}
err = v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
return nil
}
@ -735,7 +788,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *api.Servic
ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
return nil, fmt.Errorf("multiple ports are not supported in openstack v1 load balancers")
} else if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
@ -804,7 +857,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *api.Servic
_, err = members.Create(lb.network, members.CreateOpts{
PoolID: pool.ID,
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
ProtocolPort: int(ports[0].NodePort), //Note: only handles single port
Address: addr,
}).Extract()
if err != nil {