mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Pass namespaced service name to cloudprovider's EnsureLoadBalancer
Also has an AWS implementation that plugs the service name into the ELB and SG. Log the service name under GCE and OpenStack. Fixes #20668
This commit is contained in:
parent
242000d790
commit
5874b0cb9d
@ -23,6 +23,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// Interface is an abstract, pluggable interface for cloud providers.
|
||||
@ -82,8 +83,8 @@ type LoadBalancer interface {
|
||||
// GetLoadBalancer returns whether the specified load balancer exists, and
|
||||
// if so, what its status is.
|
||||
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
|
||||
// EnsureLoadBalancer creates a new load balancer, or updates an existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
UpdateLoadBalancer(name, region string, hosts []string) error
|
||||
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
|
||||
|
@ -43,6 +43,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider/aws"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
"github.com/golang/glog"
|
||||
@ -54,6 +55,9 @@ const ProviderName = "aws"
|
||||
// The tag name we use to differentiate multiple logically independent clusters running in the same AZ
|
||||
const TagNameKubernetesCluster = "KubernetesCluster"
|
||||
|
||||
// The tag name we use to differentiate multiple services. Used currently for ELBs only.
|
||||
const TagNameKubernetesService = "kubernetes.io/service-name"
|
||||
|
||||
// We sometimes read to see if something exists; then try to create it if we didn't find it
|
||||
// This can fail once in a consistent system if done in parallel
|
||||
// In an eventually consistent system, it could fail unboundedly
|
||||
@ -1731,8 +1735,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {
|
||||
|
||||
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
|
||||
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName)
|
||||
|
||||
if region != s.region {
|
||||
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
|
||||
@ -1779,7 +1783,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
||||
var securityGroupID string
|
||||
{
|
||||
sgName := "k8s-elb-" + name
|
||||
sgDescription := "Security group for Kubernetes ELB " + name
|
||||
sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", name, serviceName)
|
||||
securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription, vpcId)
|
||||
if err != nil {
|
||||
glog.Error("Error creating load balancer security group: ", err)
|
||||
@ -1828,7 +1832,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
||||
}
|
||||
|
||||
// Build the load balancer itself
|
||||
loadBalancer, err := s.ensureLoadBalancer(name, listeners, subnetIDs, securityGroupIDs)
|
||||
loadBalancer, err := s.ensureLoadBalancer(serviceName, name, listeners, subnetIDs, securityGroupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1850,7 +1854,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName))
|
||||
glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", name, serviceName, orEmpty(loadBalancer.DNSName))
|
||||
|
||||
// TODO: Wait for creation?
|
||||
|
||||
@ -2285,3 +2289,8 @@ func (s *AWSCloud) addFilters(filters []*ec2.Filter) []*ec2.Filter {
|
||||
}
|
||||
return filters
|
||||
}
|
||||
|
||||
// Returns the cluster name or an empty string
|
||||
func (s *AWSCloud) getClusterName() string {
|
||||
return s.filterTags[TagNameKubernetesCluster]
|
||||
}
|
||||
|
@ -24,10 +24,11 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/elb"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
func (s *AWSCloud) ensureLoadBalancer(name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
|
||||
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
|
||||
loadBalancer, err := s.describeLoadBalancer(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -47,7 +48,12 @@ func (s *AWSCloud) ensureLoadBalancer(name string, listeners []*elb.Listener, su
|
||||
|
||||
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
|
||||
|
||||
glog.Info("Creating load balancer with name: ", name)
|
||||
createRequest.Tags = []*elb.Tag{
|
||||
{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(s.getClusterName())},
|
||||
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
|
||||
}
|
||||
|
||||
glog.Infof("Creating load balancer for %v with name: ", namespacedName, name)
|
||||
_, err := s.elb.CreateLoadBalancer(createRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/autoscaling"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const TestClusterId = "clusterid.test"
|
||||
@ -698,7 +699,9 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
|
||||
t.Errorf("Expected GetLoadBalancer region mismatch error.")
|
||||
}
|
||||
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone)
|
||||
serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}
|
||||
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone)
|
||||
if err == nil || err.Error() != errorMessage {
|
||||
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const ProviderName = "fake"
|
||||
@ -130,7 +131,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
|
||||
|
||||
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
if f.Balancers == nil {
|
||||
f.Balancers = make(map[string]FakeBalancer)
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
@ -431,12 +432,12 @@ func isHTTPErrorCode(err error, code int) bool {
|
||||
// Due to an interesting series of design decisions, this handles both creating
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
portStr := []string{}
|
||||
for _, p := range ports {
|
||||
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames)
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName)
|
||||
|
||||
if len(hostNames) == 0 {
|
||||
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
|
||||
@ -483,11 +484,11 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
}
|
||||
if isSafeToReleaseIP {
|
||||
if err := gce.deleteStaticIP(name, region); err != nil {
|
||||
glog.Errorf("failed to release static IP %s for load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v): released static IP %s", name, ipAddress)
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -504,7 +505,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
isUserOwnedIP = true
|
||||
isSafeToReleaseIP = false
|
||||
ipAddress = requestedIP.String()
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", name, serviceName, ipAddress)
|
||||
} else if requestedIP.String() == fwdRuleIP {
|
||||
// The requested IP is not a static IP, but is currently assigned
|
||||
// to this forwarding rule, so we can keep it.
|
||||
@ -514,13 +515,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided non-static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
// The requested IP is not static and it is not assigned to the
|
||||
// current forwarding rule. It might be attached to a different
|
||||
// rule or it might not be part of this project at all. Either
|
||||
// way, we can't use it.
|
||||
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s: %v", requestedIP.String(), name, err)
|
||||
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", requestedIP.String(), name, serviceName, err)
|
||||
}
|
||||
} else {
|
||||
// The user did not request a specific IP.
|
||||
@ -541,13 +542,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
// use this IP and try to run through the process again, but we
|
||||
// should not release the IP unless it is explicitly flagged as OK.
|
||||
isSafeToReleaseIP = false
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): adopting static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
// For total clarity. The IP did not pre-exist and the user did
|
||||
// not ask for a particular one, so we can release the IP in case
|
||||
// of failure or success.
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): allocated static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", name, serviceName, ipAddress)
|
||||
}
|
||||
}
|
||||
|
||||
@ -567,12 +568,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): updated firewall", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName)
|
||||
} else {
|
||||
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created firewall", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
@ -596,13 +597,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err := gce.deleteForwardingRule(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted forwarding rule", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", name, serviceName)
|
||||
}
|
||||
if tpExists && tpNeedsUpdate {
|
||||
if err := gce.deleteTargetPool(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted target pool", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", name, serviceName)
|
||||
}
|
||||
|
||||
// Once we've deleted the resources (if necessary), build them back up (or for
|
||||
@ -611,7 +612,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
|
||||
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created target pool", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName)
|
||||
}
|
||||
if tpNeedsUpdate || fwdRuleNeedsUpdate {
|
||||
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
|
||||
@ -622,7 +623,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
||||
// of a user-requested IP, the "is user-owned" flag will be set,
|
||||
// preventing it from actually being released.
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created forwarding rule, IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", name, serviceName, ipAddress)
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
|
@ -46,6 +46,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const ProviderName = "openstack"
|
||||
@ -654,8 +655,8 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity)
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName)
|
||||
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
|
@ -321,7 +321,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
||||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||
err := s.createLoadBalancer(service)
|
||||
err := s.createLoadBalancer(service, namespacedName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
||||
}
|
||||
@ -371,7 +371,7 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
||||
func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName types.NamespacedName) error {
|
||||
ports, err := getPortsForLB(service)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -385,7 +385,7 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
||||
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||
// an error for unsupported protocols
|
||||
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
|
||||
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
|
||||
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user