diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index 30c79055059..6863336c6f6 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -28,24 +28,19 @@ import ( "strings" "time" + "gopkg.in/gcfg.v1" + "github.com/rackspace/gophercloud" "github.com/rackspace/gophercloud/openstack" "github.com/rackspace/gophercloud/openstack/blockstorage/v1/volumes" "github.com/rackspace/gophercloud/openstack/compute/v2/extensions/volumeattach" "github.com/rackspace/gophercloud/openstack/compute/v2/flavors" "github.com/rackspace/gophercloud/openstack/compute/v2/servers" - "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips" - "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members" - "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors" - "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools" - "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips" "github.com/rackspace/gophercloud/pagination" - "gopkg.in/gcfg.v1" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/cloudprovider" ) @@ -81,8 +76,15 @@ func (d *MyDuration) UnmarshalText(text []byte) error { return nil } +type LoadBalancer struct { + network *gophercloud.ServiceClient + compute *gophercloud.ServiceClient + opts LoadBalancerOpts +} + type LoadBalancerOpts struct { - SubnetId string `gcfg:"subnet-id"` // required + LBVersion string `gcfg:"lb-version"` // v1 or v2 + SubnetId string `gcfg:"subnet-id"` // required FloatingNetworkId string `gcfg:"floating-network-id"` LBMethod string `gcfg:"lb-method"` CreateMonitor bool `gcfg:"create-monitor"` @@ -504,12 +506,6 @@ func (os *OpenStack) ScrubDNS(nameservers, searches []string) (nsOut, srchOut [] return nameservers, searches } -type LoadBalancer struct { - network *gophercloud.ServiceClient - compute *gophercloud.ServiceClient - opts LoadBalancerOpts -} - func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) { glog.V(4).Info("openstack.LoadBalancer() called") @@ -532,7 +528,12 @@ func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) { glog.V(1).Info("Claiming to support LoadBalancer") - return &LoadBalancer{network, compute, os.lbOpts}, true + if os.lbOpts.LBVersion == "v2" { + return &LbaasV2{LoadBalancer{network, compute, os.lbOpts}}, true + } else { + + return &LbaasV1{LoadBalancer{network, compute, os.lbOpts}}, true + } } func isNotFound(err error) bool { @@ -540,412 +541,6 @@ func isNotFound(err error) bool { return ok && e.Actual == http.StatusNotFound } -func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) { - opts := pools.ListOpts{ - Name: name, - } - pager := pools.List(client, opts) - - poolList := make([]pools.Pool, 0, 1) - - err := pager.EachPage(func(page pagination.Page) (bool, error) { - p, err := pools.ExtractPools(page) - if err != nil { - return false, err - } - poolList = append(poolList, p...) - if len(poolList) > 1 { - return false, ErrMultipleResults - } - return true, nil - }) - if err != nil { - if isNotFound(err) { - return nil, ErrNotFound - } - return nil, err - } - - if len(poolList) == 0 { - return nil, ErrNotFound - } else if len(poolList) > 1 { - return nil, ErrMultipleResults - } - - return &poolList[0], nil -} - -func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) { - opts := vips.ListOpts{ - Name: name, - } - pager := vips.List(client, opts) - - vipList := make([]vips.VirtualIP, 0, 1) - - err := pager.EachPage(func(page pagination.Page) (bool, error) { - v, err := vips.ExtractVIPs(page) - if err != nil { - return false, err - } - vipList = append(vipList, v...) - if len(vipList) > 1 { - return false, ErrMultipleResults - } - return true, nil - }) - if err != nil { - if isNotFound(err) { - return nil, ErrNotFound - } - return nil, err - } - - if len(vipList) == 0 { - return nil, ErrNotFound - } else if len(vipList) > 1 { - return nil, ErrMultipleResults - } - - return &vipList[0], nil -} - -func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) { - opts := floatingips.ListOpts{ - PortID: portID, - } - pager := floatingips.List(client, opts) - - floatingIPList := make([]floatingips.FloatingIP, 0, 1) - - err := pager.EachPage(func(page pagination.Page) (bool, error) { - f, err := floatingips.ExtractFloatingIPs(page) - if err != nil { - return false, err - } - floatingIPList = append(floatingIPList, f...) - if len(floatingIPList) > 1 { - return false, ErrMultipleResults - } - return true, nil - }) - if err != nil { - if isNotFound(err) { - return nil, ErrNotFound - } - return nil, err - } - - if len(floatingIPList) == 0 { - return nil, ErrNotFound - } else if len(floatingIPList) > 1 { - return nil, ErrMultipleResults - } - - return &floatingIPList[0], nil -} - -func (lb *LoadBalancer) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - vip, err := getVipByName(lb.network, loadBalancerName) - if err == ErrNotFound { - return nil, false, nil - } - if vip == nil { - return nil, false, err - } - - status := &api.LoadBalancerStatus{} - status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}} - - return status, true, err -} - -// TODO: This code currently ignores 'region' and always creates a -// loadbalancer in only the current OpenStack region. We should take -// a list of regions (from config) and query/create loadbalancers in -// each region. - -func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations) - - ports := apiService.Spec.Ports - if len(ports) > 1 { - return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") - } else if len(ports) == 0 { - return nil, fmt.Errorf("no ports provided to openstack load balancer") - } - - // The service controller verified all the protocols match on the ports, just check and use the first one - // TODO: Convert all error messages to use an event recorder - if ports[0].Protocol != api.ProtocolTCP { - return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") - } - - affinity := apiService.Spec.SessionAffinity - var persistence *vips.SessionPersistence - switch affinity { - case api.ServiceAffinityNone: - persistence = nil - case api.ServiceAffinityClientIP: - persistence = &vips.SessionPersistence{Type: "SOURCE_IP"} - default: - return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) - } - - sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService) - if err != nil { - return nil, err - } - - if !service.IsAllowAll(sourceRanges) { - return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") - } - - glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService)) - _, exists, err := lb.GetLoadBalancer(apiService) - if err != nil { - return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) - } - - // TODO: Implement a more efficient update strategy for common changes than delete & create - // In particular, if we implement hosts update, we can get rid of UpdateHosts - if exists { - err := lb.EnsureLoadBalancerDeleted(apiService) - if err != nil { - return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) - } - } - - lbmethod := lb.opts.LBMethod - if lbmethod == "" { - lbmethod = pools.LBMethodRoundRobin - } - name := cloudprovider.GetLoadBalancerName(apiService) - pool, err := pools.Create(lb.network, pools.CreateOpts{ - Name: name, - Protocol: pools.ProtocolTCP, - SubnetID: lb.opts.SubnetId, - LBMethod: lbmethod, - }).Extract() - if err != nil { - return nil, err - } - - for _, host := range hosts { - addr, err := getAddressByName(lb.compute, host) - if err != nil { - return nil, err - } - - _, err = members.Create(lb.network, members.CreateOpts{ - PoolID: pool.ID, - ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port - Address: addr, - }).Extract() - if err != nil { - pools.Delete(lb.network, pool.ID) - return nil, err - } - } - - var mon *monitors.Monitor - if lb.opts.CreateMonitor { - mon, err = monitors.Create(lb.network, monitors.CreateOpts{ - Type: monitors.TypeTCP, - Delay: int(lb.opts.MonitorDelay.Duration.Seconds()), - Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()), - MaxRetries: int(lb.opts.MonitorMaxRetries), - }).Extract() - if err != nil { - pools.Delete(lb.network, pool.ID) - return nil, err - } - - _, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract() - if err != nil { - monitors.Delete(lb.network, mon.ID) - pools.Delete(lb.network, pool.ID) - return nil, err - } - } - - createOpts := vips.CreateOpts{ - Name: name, - Description: fmt.Sprintf("Kubernetes external service %s", name), - Protocol: "TCP", - ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port - PoolID: pool.ID, - SubnetID: lb.opts.SubnetId, - Persistence: persistence, - } - - loadBalancerIP := apiService.Spec.LoadBalancerIP - if loadBalancerIP != "" { - createOpts.Address = loadBalancerIP - } - - vip, err := vips.Create(lb.network, createOpts).Extract() - if err != nil { - if mon != nil { - monitors.Delete(lb.network, mon.ID) - } - pools.Delete(lb.network, pool.ID) - return nil, err - } - - status := &api.LoadBalancerStatus{} - - status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}} - - if lb.opts.FloatingNetworkId != "" { - floatIPOpts := floatingips.CreateOpts{ - FloatingNetworkID: lb.opts.FloatingNetworkId, - PortID: vip.PortID, - } - floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract() - if err != nil { - return nil, err - } - - status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP}) - } - - return status, nil - -} - -func (lb *LoadBalancer) UpdateLoadBalancer(service *api.Service, hosts []string) error { - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts) - - vip, err := getVipByName(lb.network, loadBalancerName) - if err != nil { - return err - } - - // Set of member (addresses) that _should_ exist - addrs := map[string]bool{} - for _, host := range hosts { - addr, err := getAddressByName(lb.compute, host) - if err != nil { - return err - } - - addrs[addr] = true - } - - // Iterate over members that _do_ exist - pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID}) - err = pager.EachPage(func(page pagination.Page) (bool, error) { - memList, err := members.ExtractMembers(page) - if err != nil { - return false, err - } - - for _, member := range memList { - if _, found := addrs[member.Address]; found { - // Member already exists - delete(addrs, member.Address) - } else { - // Member needs to be deleted - err = members.Delete(lb.network, member.ID).ExtractErr() - if err != nil { - return false, err - } - } - } - - return true, nil - }) - if err != nil { - return err - } - - // Anything left in addrs is a new member that needs to be added - for addr := range addrs { - _, err := members.Create(lb.network, members.CreateOpts{ - PoolID: vip.PoolID, - Address: addr, - ProtocolPort: vip.ProtocolPort, - }).Extract() - if err != nil { - return err - } - } - - return nil -} - -func (lb *LoadBalancer) EnsureLoadBalancerDeleted(service *api.Service) error { - loadBalancerName := cloudprovider.GetLoadBalancerName(service) - glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName) - - vip, err := getVipByName(lb.network, loadBalancerName) - if err != nil && err != ErrNotFound { - return err - } - - if lb.opts.FloatingNetworkId != "" && vip != nil { - floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID) - if err != nil && !isNotFound(err) { - return err - } - if floatingIP != nil { - err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr() - if err != nil && !isNotFound(err) { - return err - } - } - } - - // We have to delete the VIP before the pool can be deleted, - // so no point continuing if this fails. - if vip != nil { - err := vips.Delete(lb.network, vip.ID).ExtractErr() - if err != nil && !isNotFound(err) { - return err - } - } - - var pool *pools.Pool - if vip != nil { - pool, err = pools.Get(lb.network, vip.PoolID).Extract() - if err != nil && !isNotFound(err) { - return err - } - } else { - // The VIP is gone, but it is conceivable that a Pool - // still exists that we failed to delete on some - // previous occasion. Make a best effort attempt to - // cleanup any pools with the same name as the VIP. - pool, err = getPoolByName(lb.network, service.Name) - if err != nil && err != ErrNotFound { - return err - } - } - - if pool != nil { - for _, monId := range pool.MonitorIDs { - _, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract() - if err != nil { - return err - } - - err = monitors.Delete(lb.network, monId).ExtractErr() - if err != nil && !isNotFound(err) { - return err - } - } - err = pools.Delete(lb.network, pool.ID).ExtractErr() - if err != nil && !isNotFound(err) { - return err - } - } - - return nil -} - func (os *OpenStack) Zones() (cloudprovider.Zones, bool) { glog.V(1).Info("Claiming to support Zones") diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go new file mode 100644 index 00000000000..1cbb44e6f27 --- /dev/null +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -0,0 +1,993 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "time" + + "github.com/rackspace/gophercloud" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers" + v2_monitors "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors" + v2_pools "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools" + neutron_ports "github.com/rackspace/gophercloud/openstack/networking/v2/ports" + "github.com/rackspace/gophercloud/pagination" + + "fmt" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/service" + "k8s.io/kubernetes/pkg/cloudprovider" +) + +// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use, +// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state. +const loadbalancerActiveTimeoutSeconds = 120 +const loadbalancerDeleteTimeoutSeconds = 30 + +// LoadBalancer implementation for LBaaS v1 +type LbaasV1 struct { + LoadBalancer +} + +// LoadBalancer implementation for LBaaS v2 +type LbaasV2 struct { + LoadBalancer +} + +func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) { + var portID string + + err := neutron_ports.List(client, neutron_ports.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) { + portList, err := neutron_ports.ExtractPorts(page) + if err != nil { + return false, err + } + + for _, port := range portList { + for _, ip := range port.FixedIPs { + if ip.IPAddress == ipAddress { + portID = port.ID + return false, nil + } + } + } + + return true, nil + }) + + return portID, err +} + +func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) { + opts := floatingips.ListOpts{ + PortID: portID, + } + pager := floatingips.List(client, opts) + + floatingIPList := make([]floatingips.FloatingIP, 0, 1) + + err := pager.EachPage(func(page pagination.Page) (bool, error) { + f, err := floatingips.ExtractFloatingIPs(page) + if err != nil { + return false, err + } + floatingIPList = append(floatingIPList, f...) + if len(floatingIPList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(floatingIPList) == 0 { + return nil, ErrNotFound + } else if len(floatingIPList) > 1 { + return nil, ErrMultipleResults + } + + return &floatingIPList[0], nil +} + +func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) { + opts := pools.ListOpts{ + Name: name, + } + pager := pools.List(client, opts) + + poolList := make([]pools.Pool, 0, 1) + + err := pager.EachPage(func(page pagination.Page) (bool, error) { + p, err := pools.ExtractPools(page) + if err != nil { + return false, err + } + poolList = append(poolList, p...) + if len(poolList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(poolList) == 0 { + return nil, ErrNotFound + } else if len(poolList) > 1 { + return nil, ErrMultipleResults + } + + return &poolList[0], nil +} + +func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) { + opts := vips.ListOpts{ + Name: name, + } + pager := vips.List(client, opts) + + vipList := make([]vips.VirtualIP, 0, 1) + + err := pager.EachPage(func(page pagination.Page) (bool, error) { + v, err := vips.ExtractVIPs(page) + if err != nil { + return false, err + } + vipList = append(vipList, v...) + if len(vipList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(vipList) == 0 { + return nil, ErrNotFound + } else if len(vipList) > 1 { + return nil, ErrMultipleResults + } + + return &vipList[0], nil +} + +func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) { + opts := loadbalancers.ListOpts{ + Name: name, + } + pager := loadbalancers.List(client, opts) + + loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1) + + err := pager.EachPage(func(page pagination.Page) (bool, error) { + v, err := loadbalancers.ExtractLoadbalancers(page) + if err != nil { + return false, err + } + loadbalancerList = append(loadbalancerList, v...) + if len(loadbalancerList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, err + } + + if len(loadbalancerList) == 0 { + return nil, ErrNotFound + } else if len(loadbalancerList) > 1 { + return nil, ErrMultipleResults + } + + return &loadbalancerList[0], nil +} + +func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error { + start := time.Now().Second() + for { + loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract() + if err != nil { + return err + } + if loadbalancer.ProvisioningStatus == "ACTIVE" { + return nil + } + + time.Sleep(1 * time.Second) + + if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds { + return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time") + + } + } +} + +func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error { + start := time.Now().Second() + for { + _, err := loadbalancers.Get(client, loadbalancerID).Extract() + if err != nil { + if err == ErrNotFound { + return nil + } else { + return err + } + } + + time.Sleep(1 * time.Second) + + if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds { + return fmt.Errorf("Loadbalancer failed to delete within the alloted time") + } + + } +} + +func (lbaas *LbaasV2) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName) + if err == ErrNotFound { + return nil, false, nil + } + if loadbalancer == nil { + return nil, false, err + } + + status := &api.LoadBalancerStatus{} + status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}} + + return status, true, err +} + +// TODO: This code currently ignores 'region' and always creates a +// loadbalancer in only the current OpenStack region. We should take +// a list of regions (from config) and query/create loadbalancers in +// each region. + +func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations) + + ports := apiService.Spec.Ports + if len(ports) > 1 { + return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") + } else if len(ports) == 0 { + return nil, fmt.Errorf("no ports provided to openstack load balancer") + } + + // The service controller verified all the protocols match on the ports, just check and use the first one + // TODO: Convert all error messages to use an event recorder + if ports[0].Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") + } + + affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity + var persistence *v2_pools.SessionPersistence + switch affinity { + case api.ServiceAffinityNone: + persistence = nil + case api.ServiceAffinityClientIP: + persistence = &v2_pools.SessionPersistence{Type: "SOURCE_IP"} + default: + return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) + } + + sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService) + if err != nil { + return nil, err + } + + if !service.IsAllowAll(sourceRanges) { + return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") + } + + glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService)) + _, exists, err := lbaas.GetLoadBalancer(apiService) + if err != nil { + return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := lbaas.EnsureLoadBalancerDeleted(apiService) + if err != nil { + return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) + } + } + + lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod) + if lbmethod == "" { + lbmethod = v2_pools.LBMethodRoundRobin + } + name := cloudprovider.GetLoadBalancerName(apiService) + + createOpts := loadbalancers.CreateOpts{ + Name: name, + Description: fmt.Sprintf("Kubernetes external service %s", name), + VipSubnetID: lbaas.opts.SubnetId, + } + + loadBalancerIP := apiService.Spec.LoadBalancerIP + if loadBalancerIP != "" { + createOpts.VipAddress = loadBalancerIP + } + + loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + + listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{ + Name: name, + Protocol: listeners.ProtocolTCP, + ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port + LoadbalancerID: loadbalancer.ID, + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + + pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{ + Name: name, + Protocol: v2_pools.ProtocolTCP, + LBMethod: lbmethod, + ListenerID: listener.ID, + Persistence: persistence, + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + for _, host := range hosts { + addr, err := getAddressByName(lbaas.compute, host) + if err != nil { + return nil, err + } + + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + + _, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ + Name: name, + ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port + Address: addr, + SubnetID: lbaas.opts.SubnetId, + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + } + + if lbaas.opts.CreateMonitor { + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + + _, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{ + PoolID: pool.ID, + Type: monitors.TypeTCP, + Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()), + Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()), + MaxRetries: int(lbaas.opts.MonitorMaxRetries), + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + } + + status := &api.LoadBalancerStatus{} + + status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}} + + if lbaas.opts.FloatingNetworkId != "" { + portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress) + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + floatIPOpts := floatingips.CreateOpts{ + FloatingNetworkID: lbaas.opts.FloatingNetworkId, + PortID: portID, + } + floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP}) + } + + return status, nil +} + +func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts) + + ports := service.Spec.Ports + if len(ports) > 1 { + return fmt.Errorf("multiple ports are not yet supported in openstack load balancers") + } else if len(ports) == 0 { + return fmt.Errorf("no ports provided to openstack load balancer") + } + + loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName) + if err != nil { + return err + } + if loadbalancer == nil { + return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName) + } + + // Set of member (addresses) that _should_ exist + addrs := map[string]bool{} + for _, host := range hosts { + addr, err := getAddressByName(lbaas.compute, host) + if err != nil { + return err + } + addrs[addr] = true + } + + // Iterate over members in each pool that _do_ exist + var poolID string + err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) { + poolsList, err := v2_pools.ExtractPools(page) + if err != nil { + return false, err + } + + for _, pool := range poolsList { + poolID = pool.ID + err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) { + membersList, err := v2_pools.ExtractMembers(page) + if err != nil { + return false, err + } + + for _, member := range membersList { + if _, found := addrs[member.Address]; found { + // Member already exists, remove from update list + delete(addrs, member.Address) + } else { + // Member needs to be deleted + err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr() + if err != nil { + return false, err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + } + + return true, nil + }) + if err != nil { + return false, err + } + } + return true, nil + }) + if err != nil { + return err + } + + // Anything left in addrs is a new member that needs to be added to a pool + for addr := range addrs { + _, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{ + Address: addr, + ProtocolPort: int(ports[0].NodePort), + SubnetID: lbaas.opts.SubnetId, + }).Extract() + if err != nil { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + return nil +} + +func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(service *api.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName) + + loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName) + if err != nil && err != ErrNotFound { + return err + } + if loadbalancer == nil { + return nil + } + + if lbaas.opts.FloatingNetworkId != "" && loadbalancer != nil { + portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress) + if err != nil { + return err + } + + floatingIP, err := getFloatingIPByPortID(lbaas.network, portID) + if err != nil && err != ErrNotFound { + return err + } + if floatingIP != nil { + err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + } + } + + // get all listeners associated with this loadbalancer + var listenerIDs []string + err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) { + listenerList, err := listeners.ExtractListeners(page) + if err != nil { + return false, err + } + + for _, listener := range listenerList { + listenerIDs = append(listenerIDs, listener.ID) + } + + return true, nil + }) + if err != nil { + return err + } + + // get all pools associated with this loadbalancer + var poolIDs []string + err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) { + poolsList, err := v2_pools.ExtractPools(page) + if err != nil { + return false, err + } + + for _, pool := range poolsList { + poolIDs = append(poolIDs, pool.ID) + } + + return true, nil + }) + if err != nil { + return err + } + + // get all members associated with each poolIDs + var memberIDs []string + for _, poolID := range poolIDs { + err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) { + membersList, err := v2_pools.ExtractMembers(page) + if err != nil { + return false, err + } + + for _, member := range membersList { + memberIDs = append(memberIDs, member.ID) + } + + return true, nil + }) + if err != nil { + return err + } + } + + // get all monitors associated with each poolIDs + var monitorIDs []string + for _, poolID := range poolIDs { + err = v2_monitors.List(lbaas.network, v2_monitors.ListOpts{PoolID: poolID}).EachPage(func(page pagination.Page) (bool, error) { + monitorsList, err := v2_monitors.ExtractMonitors(page) + if err != nil { + return false, err + } + + for _, monitor := range monitorsList { + monitorIDs = append(monitorIDs, monitor.ID) + } + + return true, nil + }) + if err != nil { + return err + } + } + + // delete all monitors + for _, monitorID := range monitorIDs { + err := v2_monitors.Delete(lbaas.network, monitorID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + // delete all members and pools + for _, poolID := range poolIDs { + // delete all members for this pool + for _, memberID := range memberIDs { + err := v2_pools.DeleteMember(lbaas.network, poolID, memberID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + // delete pool + err := v2_pools.Delete(lbaas.network, poolID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + // delete all listeners + for _, listenerID := range listenerIDs { + err := listeners.Delete(lbaas.network, listenerID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + // delete loadbalancer + err = loadbalancers.Delete(lbaas.network, loadbalancer.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerDeleted(lbaas.network, loadbalancer.ID) + return nil +} + +func (lb *LbaasV1) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + vip, err := getVipByName(lb.network, loadBalancerName) + if err == ErrNotFound { + return nil, false, nil + } + if vip == nil { + return nil, false, err + } + + status := &api.LoadBalancerStatus{} + status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}} + + return status, true, err +} + +// TODO: This code currently ignores 'region' and always creates a +// loadbalancer in only the current OpenStack region. We should take +// a list of regions (from config) and query/create loadbalancers in +// each region. + +func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations) + + ports := apiService.Spec.Ports + if len(ports) > 1 { + return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") + } else if len(ports) == 0 { + return nil, fmt.Errorf("no ports provided to openstack load balancer") + } + + // The service controller verified all the protocols match on the ports, just check and use the first one + // TODO: Convert all error messages to use an event recorder + if ports[0].Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") + } + + affinity := apiService.Spec.SessionAffinity + var persistence *vips.SessionPersistence + switch affinity { + case api.ServiceAffinityNone: + persistence = nil + case api.ServiceAffinityClientIP: + persistence = &vips.SessionPersistence{Type: "SOURCE_IP"} + default: + return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) + } + + sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService) + if err != nil { + return nil, err + } + + if !service.IsAllowAll(sourceRanges) { + return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers") + } + + glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService)) + _, exists, err := lb.GetLoadBalancer(apiService) + if err != nil { + return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := lb.EnsureLoadBalancerDeleted(apiService) + if err != nil { + return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) + } + } + + lbmethod := lb.opts.LBMethod + if lbmethod == "" { + lbmethod = pools.LBMethodRoundRobin + } + name := cloudprovider.GetLoadBalancerName(apiService) + pool, err := pools.Create(lb.network, pools.CreateOpts{ + Name: name, + Protocol: pools.ProtocolTCP, + SubnetID: lb.opts.SubnetId, + LBMethod: lbmethod, + }).Extract() + if err != nil { + return nil, err + } + + for _, host := range hosts { + addr, err := getAddressByName(lb.compute, host) + if err != nil { + return nil, err + } + + _, err = members.Create(lb.network, members.CreateOpts{ + PoolID: pool.ID, + ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port + Address: addr, + }).Extract() + if err != nil { + pools.Delete(lb.network, pool.ID) + return nil, err + } + } + + var mon *monitors.Monitor + if lb.opts.CreateMonitor { + mon, err = monitors.Create(lb.network, monitors.CreateOpts{ + Type: monitors.TypeTCP, + Delay: int(lb.opts.MonitorDelay.Duration.Seconds()), + Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()), + MaxRetries: int(lb.opts.MonitorMaxRetries), + }).Extract() + if err != nil { + pools.Delete(lb.network, pool.ID) + return nil, err + } + + _, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract() + if err != nil { + monitors.Delete(lb.network, mon.ID) + pools.Delete(lb.network, pool.ID) + return nil, err + } + } + + createOpts := vips.CreateOpts{ + Name: name, + Description: fmt.Sprintf("Kubernetes external service %s", name), + Protocol: "TCP", + ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port + PoolID: pool.ID, + SubnetID: lb.opts.SubnetId, + Persistence: persistence, + } + + loadBalancerIP := apiService.Spec.LoadBalancerIP + if loadBalancerIP != "" { + createOpts.Address = loadBalancerIP + } + + vip, err := vips.Create(lb.network, createOpts).Extract() + if err != nil { + if mon != nil { + monitors.Delete(lb.network, mon.ID) + } + pools.Delete(lb.network, pool.ID) + return nil, err + } + + status := &api.LoadBalancerStatus{} + + status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}} + + if lb.opts.FloatingNetworkId != "" { + floatIPOpts := floatingips.CreateOpts{ + FloatingNetworkID: lb.opts.FloatingNetworkId, + PortID: vip.PortID, + } + floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract() + if err != nil { + return nil, err + } + + status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP}) + } + + return status, nil + +} + +func (lb *LbaasV1) UpdateLoadBalancer(service *api.Service, hosts []string) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts) + + vip, err := getVipByName(lb.network, loadBalancerName) + if err != nil { + return err + } + + // Set of member (addresses) that _should_ exist + addrs := map[string]bool{} + for _, host := range hosts { + addr, err := getAddressByName(lb.compute, host) + if err != nil { + return err + } + + addrs[addr] = true + } + + // Iterate over members that _do_ exist + pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID}) + err = pager.EachPage(func(page pagination.Page) (bool, error) { + memList, err := members.ExtractMembers(page) + if err != nil { + return false, err + } + + for _, member := range memList { + if _, found := addrs[member.Address]; found { + // Member already exists + delete(addrs, member.Address) + } else { + // Member needs to be deleted + err = members.Delete(lb.network, member.ID).ExtractErr() + if err != nil { + return false, err + } + } + } + + return true, nil + }) + if err != nil { + return err + } + + // Anything left in addrs is a new member that needs to be added + for addr := range addrs { + _, err := members.Create(lb.network, members.CreateOpts{ + PoolID: vip.PoolID, + Address: addr, + ProtocolPort: vip.ProtocolPort, + }).Extract() + if err != nil { + return err + } + } + + return nil +} + +func (lb *LbaasV1) EnsureLoadBalancerDeleted(service *api.Service) error { + loadBalancerName := cloudprovider.GetLoadBalancerName(service) + glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName) + + vip, err := getVipByName(lb.network, loadBalancerName) + if err != nil && err != ErrNotFound { + return err + } + + if lb.opts.FloatingNetworkId != "" && vip != nil { + floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID) + if err != nil && !isNotFound(err) { + return err + } + if floatingIP != nil { + err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + } + } + + // We have to delete the VIP before the pool can be deleted, + // so no point continuing if this fails. + if vip != nil { + err := vips.Delete(lb.network, vip.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + } + + var pool *pools.Pool + if vip != nil { + pool, err = pools.Get(lb.network, vip.PoolID).Extract() + if err != nil && !isNotFound(err) { + return err + } + } else { + // The VIP is gone, but it is conceivable that a Pool + // still exists that we failed to delete on some + // previous occasion. Make a best effort attempt to + // cleanup any pools with the same name as the VIP. + pool, err = getPoolByName(lb.network, service.Name) + if err != nil && err != ErrNotFound { + return err + } + } + + if pool != nil { + for _, monId := range pool.MonitorIDs { + _, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract() + if err != nil { + return err + } + + err = monitors.Delete(lb.network, monId).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + } + err = pools.Delete(lb.network, pool.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + } + + return nil +} diff --git a/pkg/cloudprovider/providers/openstack/openstack_test.go b/pkg/cloudprovider/providers/openstack/openstack_test.go index 06f618e4546..1093348be9f 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_test.go +++ b/pkg/cloudprovider/providers/openstack/openstack_test.go @@ -67,15 +67,15 @@ func TestReadConfig(t *testing.T) { } cfg, err := readConfig(strings.NewReader(` -[Global] -auth-url = http://auth.url -username = user -[LoadBalancer] -create-monitor = yes -monitor-delay = 1m -monitor-timeout = 30s -monitor-max-retries = 3 -`)) + [Global] + auth-url = http://auth.url + username = user + [LoadBalancer] + create-monitor = yes + monitor-delay = 1m + monitor-timeout = 30s + monitor-max-retries = 3 + `)) if err != nil { t.Fatalf("Should succeed when a valid config is provided: %s", err) } @@ -204,6 +204,8 @@ func TestLoadBalancer(t *testing.T) { t.Skipf("No config found in environment") } + cfg.LoadBalancer.LBVersion = "v2" + os, err := newOpenStack(cfg) if err != nil { t.Fatalf("Failed to construct/authenticate OpenStack: %s", err) @@ -223,6 +225,32 @@ func TestLoadBalancer(t *testing.T) { } } +func TestLoadBalancerV2(t *testing.T) { + cfg, ok := configFromEnv() + if !ok { + t.Skipf("No config found in environment") + } + cfg.LoadBalancer.LBVersion = "v2" + + os, err := newOpenStack(cfg) + if err != nil { + t.Fatalf("Failed to construct/authenticate OpenStack: %s", err) + } + + lbaas, ok := os.LoadBalancer() + if !ok { + t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") + } + + _, exists, err := lbaas.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) + if err != nil { + t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) + } + if exists { + t.Fatalf("GetLoadBalancer(\"noexist\") returned exists") + } +} + func TestZones(t *testing.T) { os := OpenStack{ provider: &gophercloud.ProviderClient{