mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 15:37:24 +00:00
Add LoadBalancer status to ServiceStatus
This will replace publicIPs
This commit is contained in:
@@ -63,10 +63,10 @@ func GetLoadBalancerName(service *api.Service) string {
|
||||
type TCPLoadBalancer interface {
|
||||
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
|
||||
// GetTCPLoadBalancer returns whether the specified load balancer exists, and
|
||||
// if so, what its IP address or hostname is.
|
||||
GetTCPLoadBalancer(name, region string) (endpoint string, exists bool, err error)
|
||||
// CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address or hostname of the balancer
|
||||
CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (string, error)
|
||||
// if so, what its status is.
|
||||
GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
|
||||
// CreateTCPLoadBalancer creates a new tcp load balancer. Returns the status of the balancer
|
||||
CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
// UpdateTCPLoadBalancer updates hosts under the specified load balancer.
|
||||
UpdateTCPLoadBalancer(name, region string, hosts []string) error
|
||||
// EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it
|
||||
|
||||
@@ -103,16 +103,23 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
|
||||
}
|
||||
|
||||
// GetTCPLoadBalancer is a stub implementation of TCPLoadBalancer.GetTCPLoadBalancer.
|
||||
func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (endpoint string, exists bool, err error) {
|
||||
return f.ExternalIP.String(), f.Exists, f.Err
|
||||
func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
|
||||
|
||||
return status, f.Exists, f.Err
|
||||
}
|
||||
|
||||
// CreateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.CreateTCPLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (string, error) {
|
||||
func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
f.Balancers = append(f.Balancers, FakeBalancer{name, region, externalIP, ports, hosts})
|
||||
return f.ExternalIP.String(), f.Err
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
|
||||
|
||||
return status, f.Err
|
||||
}
|
||||
|
||||
// UpdateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
|
||||
|
||||
@@ -282,15 +282,18 @@ func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
|
||||
}
|
||||
|
||||
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
|
||||
func (gce *GCECloud) GetTCPLoadBalancer(name, region string) (endpoint string, exists bool, err error) {
|
||||
fw, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
|
||||
func (gce *GCECloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
|
||||
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
|
||||
if err == nil {
|
||||
return fw.IPAddress, true, nil
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
|
||||
|
||||
return status, true, nil
|
||||
}
|
||||
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
return "", false, nil
|
||||
return nil, false, nil
|
||||
}
|
||||
return "", false, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
func isHTTPErrorCode(err error, code int) bool {
|
||||
@@ -314,17 +317,17 @@ func translateAffinityType(affinityType api.ServiceAffinity) GCEAffinityType {
|
||||
// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer.
|
||||
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
|
||||
// owned by the project and available to be used, and use them if they are.
|
||||
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (string, error) {
|
||||
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
|
||||
if err != nil {
|
||||
if !isHTTPErrorCode(err, http.StatusConflict) {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err)
|
||||
}
|
||||
|
||||
if len(ports) == 0 {
|
||||
return "", fmt.Errorf("no ports specified for GCE load balancer")
|
||||
return nil, fmt.Errorf("no ports specified for GCE load balancer")
|
||||
}
|
||||
minPort := 65536
|
||||
maxPort := 0
|
||||
@@ -344,19 +347,22 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I
|
||||
}
|
||||
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
|
||||
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
if op != nil {
|
||||
err = gce.waitForRegionOp(op, region)
|
||||
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
return fwd.IPAddress, nil
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
|
||||
|
||||
@@ -457,15 +457,19 @@ func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.Virtual
|
||||
return &vipList[0], nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (endpoint string, exists bool, err error) {
|
||||
func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
|
||||
vip, err := getVipByName(lb.network, name)
|
||||
if err == ErrNotFound {
|
||||
return "", false, nil
|
||||
return nil, false, nil
|
||||
}
|
||||
if vip == nil {
|
||||
return "", false, err
|
||||
return nil, false, err
|
||||
}
|
||||
return vip.Address, true, err
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
return status, true, err
|
||||
}
|
||||
|
||||
// TODO: This code currently ignores 'region' and always creates a
|
||||
@@ -473,11 +477,11 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (endpoint string
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinity api.ServiceAffinity) (string, error) {
|
||||
func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity)
|
||||
|
||||
if len(ports) > 1 {
|
||||
return "", fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
}
|
||||
|
||||
var persistence *vips.SessionPersistence
|
||||
@@ -487,7 +491,7 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
|
||||
case api.ServiceAffinityClientIP:
|
||||
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
||||
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
||||
}
|
||||
|
||||
lbmethod := lb.opts.LBMethod
|
||||
@@ -501,13 +505,13 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
|
||||
LBMethod: lbmethod,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lb.compute, host)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = members.Create(lb.network, members.CreateOpts{
|
||||
@@ -517,7 +521,7 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,14 +535,14 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
|
||||
if err != nil {
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -556,10 +560,13 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
}
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return vip.Address, nil
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
||||
|
||||
@@ -240,20 +240,16 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
||||
} else {
|
||||
// If we don't have any cached memory of the load balancer, we have to ask
|
||||
// the cloud provider for what it knows about it.
|
||||
endpoint, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region)
|
||||
status, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable
|
||||
}
|
||||
if exists && stringSlicesEqual(service.Spec.PublicIPs, []string{endpoint}) {
|
||||
// TODO: If we could read more of the spec (ports, affinityType) of the
|
||||
// existing load balancer, we could better determine if an update is
|
||||
// necessary in more cases. For now, we optimistically assume that a
|
||||
// matching IP suffices.
|
||||
glog.Infof("LB already exists with endpoint %s for previously uncached service %s", endpoint, namespacedName)
|
||||
if exists && api.LoadBalancerStatusEqual(status, &service.Status.LoadBalancer) {
|
||||
glog.Infof("LB already exists with status %s for previously uncached service %s", status, namespacedName)
|
||||
return nil, notRetryable
|
||||
} else if exists {
|
||||
glog.Infof("Deleting old LB for previously uncached service %s whose endpoint %s doesn't match the service's desired IPs %v",
|
||||
namespacedName, endpoint, service.Spec.PublicIPs)
|
||||
namespacedName, status, service.Spec.PublicIPs)
|
||||
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
|
||||
return err, retryable
|
||||
}
|
||||
@@ -268,20 +264,24 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
||||
glog.V(2).Infof("Creating LB for service %s", namespacedName)
|
||||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
publicIPstring := fmt.Sprint(service.Spec.PublicIPs)
|
||||
|
||||
// Save the state so we can avoid a write if it doesn't change
|
||||
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||
|
||||
err := s.createExternalLoadBalancer(service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
|
||||
}
|
||||
|
||||
if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) {
|
||||
// Write the state if changed
|
||||
if api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
||||
glog.Infof("Not persisting unchanged service to registry.")
|
||||
return nil, notRetryable
|
||||
}
|
||||
|
||||
// If creating the load balancer succeeded, persist the updated service.
|
||||
if err = s.persistUpdate(service); err != nil {
|
||||
return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||
}
|
||||
return nil, notRetryable
|
||||
}
|
||||
@@ -301,13 +301,13 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
||||
return nil
|
||||
}
|
||||
// TODO: Try to resolve the conflict if the change was unrelated to load
|
||||
// balancers and public IPs. For now, just rely on the fact that we'll
|
||||
// balancer status. For now, just rely on the fact that we'll
|
||||
// also process the update that caused the resource version to change.
|
||||
if errors.IsConflict(err) {
|
||||
glog.Infof("Not persisting update to service that has been changed since we received it: %v", err)
|
||||
return nil
|
||||
}
|
||||
glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v",
|
||||
glog.Warningf("Failed to persist updated LoadBalancerStatus to service %s after creating its external load balancer: %v",
|
||||
service.Name, err)
|
||||
time.Sleep(clientRetryInterval)
|
||||
}
|
||||
@@ -328,21 +328,23 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
|
||||
for _, publicIP := range service.Spec.PublicIPs {
|
||||
// TODO: Make this actually work for multiple IPs by using different
|
||||
// names for each. For now, we'll just create the first and break.
|
||||
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
|
||||
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
|
||||
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
service.Status.LoadBalancer = *status
|
||||
}
|
||||
service.Spec.PublicIPs = []string{endpoint}
|
||||
break
|
||||
}
|
||||
} else {
|
||||
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
|
||||
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
|
||||
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
service.Status.LoadBalancer = *status
|
||||
}
|
||||
service.Spec.PublicIPs = []string{endpoint}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user